How to Add Scheduling Capabilities to Ververica Platform: Airflow Integration

Question

I have two streaming applications. One of them (unbounded-source-blackhole-sink) has an unbounded source which lets the application run continuously and never finish. The other one (bounded-source-blackhole-sink) has a bounded source and comes to an end after processing all the input. I would like to automate the coordination of these two deployments so that every X hours:

  1. Stop the unbounded-source-blackhole-sink
  2. Start the bounded-source-blackhole-sink
  3. Wait till the bounded-source-blackhole-sink is finished
  4. Start the unbounded-source-blackhole-sink without loosing state

Answer

Note: This section applies to Flink 1.15 with Ververica Platform 2.8.

Overview

Previously, in How to Add Scheduling Capabilities to Ververica Platform: Sidecar Container, we have tackled the exact same problem by developing a Node JS application and deployed it as a side car container alongside Ververica Platform. In that article, all the logic to monitor, schedule and trigger the state transitions of Apache Flink deployments for all the steps mentioned above (1-4) were part of a single application. However, this approach is quite naive in the sense that it does not have advanced scheduling capabilities such that having the state for different runs and lacking the templating options as the transition rules were hard coded.

Apache Airflow is a platform for programmatically authoring, scheduling and monitoring workflows that are designed to be finite batch workflows. It was not built for infinitely running event based workflows and it is not a streaming solution [1]. Therefore it makes a perfect match to combine Ververica Platform forUnified Stream and Batch Processing Workflows with Apache Airflow for the authoring, scheduling and monitoring those workflows.

Introduction

In this article, we will take the same Node JS application and evolve it so that the logic of the application is only responsible for executing only one step of the transitioning logic we explained above (1-4). Furthermore, we will get the parameters for the transitioning logic from the environment variables so that we can re-use the application for different strategies. The next step is then to use Apache Airflow and create a DAG that defines a separate task for each of the steps (1-4) that depends on each other. For each task, we will use Kubernetes Pod Operator of Apache Airflow that runs our Node JS application.

Setup

Ververica Platform

Note: This setup uses Ververica Platform Namespace named default that comes with the installation. It also assumes that port forwarding is done so that Ververica Platform is accessible via localhost.

Here we will run the following scripts to create the example Apache Flink deployments:

First step

Note: This setup assumes that there is a Kubernetes Namespace named vvp-jobs is created and Ververica Platform have enough permission to manage Apache Flink deployments on this namespace.

Run the following to register the vvp-jobs deployment target

curl -X POST \
  "http://localhost:8080/api/v1/namespaces/default/deployment-targets" \
  -H "accept: application/json" \
  -H "Content-Type: application/json" \
  --data @vvp-deployment-target-vvp-jobs.json

where vvp-deployment-target-vvp-jobs.json is as follows

{
	"metadata": {
		"name": "vvp-jobs"
	},
	"spec": {
		"kubernetes": {
			"namespace": "vvp-jobs"
		}
	}
}

Second step

Run the following to register the unbounded_source table to the catalog

curl -X POST \
  "http://localhost:8080/sql/v1beta1/namespaces/default/sqlscripts:execute?catalog=vvp&database=default" \
  -H "accept: application/json" \
  -H "Content-Type: application/json" \
  --data @vvp-catalog-unbounded-source.json

where vvp-catalog-unbounded-source.json is as follows

{
  "statement": "CREATE TABLE vvp.default.unbounded_source (name STRING) WITH ('connector' = 'datagen', 'rows-per-second' = '10');"
}

Third step

Run the following to register the bounded_source table to the catalog

curl -X POST \
  "http://localhost:8080/sql/v1beta1/namespaces/default/sqlscripts:execute?catalog=vvp&database=default" \
  -H "accept: application/json" \
  -H "Content-Type: application/json" \
  --data @vvp-catalog-bounded-source.json

where vvp-catalog-bounded-source.json is as follows

{
  "statement": "CREATE TABLE vvp.default.bounded_source (name STRING) WITH ('connector' = 'datagen', 'number-of-rows' = '10');"
}

Fourth step

Run the following to register the blackhole_sink table to the catalog

curl -X POST \
  "http://localhost:8080/sql/v1beta1/namespaces/default/sqlscripts:execute?catalog=vvp&database=default" \
  -H "accept: application/json" \
  -H "Content-Type: application/json" \
  --data @vvp-catalog-blackhole-sink.json

where vvp-catalog-blackhole-sink.json is as follows

{
  "statement": "CREATE TABLE vvp.default.blackhole_sink (name STRING) WITH ('connector' = 'blackhole');"
}

Fifth step

Run the following to create unbounded-source-blackhole-sink deployment

curl -X POST \
  "http://localhost:8080/api/v1/namespaces/default/deployments" \
  -H "accept: application/json" \
  -H "Content-Type: application/json" \
  --data @vvp-deployment-unbounded-source-blackhole-sink.json

where vvp-deployment-unbounded-source-blackhole-sink.json is as follows

{
  "apiVersion": "v1",
  "kind": "Deployment",
  "metadata": {
    "name": "unbounded-source-blackhole-sink"
  },
  "spec": {
    "deploymentTargetName": "vvp-jobs",
    "state": "RUNNING",
      "template": {
        "spec": {
          "artifact": {
            "kind": "SQLSCRIPT",
            "sqlScript": "INSERT INTO blackhole_sink SELECT * FROM unbounded_source;"
          }
        }
    }
  }
}

Sixth step

Run the following to create bounded-source-blackhole-sink deployment

curl -X POST \
  "http://localhost:8080/api/v1/namespaces/default/deployments" \
  -H "accept: application/json" \
  -H "Content-Type: application/json" \
  --data @vvp-deployment-bounded-source-blackhole-sink.json

where vvp-deployment-bounded-source-blackhole-sink.json is as follows

{
  "apiVersion": "v1",
  "kind": "Deployment",
  "metadata": {
    "name": "bounded-source-blackhole-sink"
  },
  "spec": {
    "deploymentTargetName": "vvp-jobs",
    "state": "CANCELLED",
    "template": {
      "spec": {
        "artifact": {
          "kind": "SQLSCRIPT",
          "sqlScript": "INSERT INTO blackhole_sink SELECT * FROM bounded_source;"
        }
      }
    }
  }
}

Scheduler

For the scheduler application we will run the following commands to build and publish our Docker image to our registry. Initially, our directory structure for the application will look like the following:

 scheduler
 |
 |_ _ coordinator
 |   |
 |   |_ _ function.json
 |   |
 |   |_ _ package.json
 |
 |_ _ deployment
 |   |
 |   |_ _ function.json
 |   |
 |   |_ _ package.json
 |
 |_ _ Dockerfile

where we execute the commands under the scheduler directory.

First step

docker build \
  --tag <CONTAINER_REGISTRY>/scheduler:1.0.0 \
  --file Dockerfile \
  .

Second step

docker push <CONTAINER_REGISTRY>/scheduler:1.0.0

with the content of the files are as follow:

Dickefile

FROM node:16

WORKDIR /usr/src/app

COPY deployment deployment

COPY coordinator coordinator

WORKDIR coordinator

EXPOSE 8000

CMD ["npm", "start"]

coordinator/function.json

const http = require("http");

const deployment = require("../deployment");

let deploymentManifest = "";

constmonitor = (transitionRule, monitorCount) => {
  const request = http.request(deployment.getMonitorOption(transitionRule), callback(transitionRule, monitorCount));
  request.end();
}

const callback = (transitionRule, monitorCount) => (response) => {
  deploymentManifest = "";
  response.on("data", onData);
  response.on("end", onEnd(transitionRule, monitorCount))
}

const onData = (deploymentManifestChunk) => {
  deploymentManifest += deploymentManifestChunk;
}

const onEnd = (transitionRule, monitorCount) => () => {
  if (deployment.shouldTransition(deploymentManifest, transitionRule)) {
    transition(transitionRule);
  } else {
    schedule(transitionRule, monitorCount + 1);
  }
}

const transition = (transitionRule) => {
  const request = http.request(deployment.getTriggerOption(transitionRule));
  request.write(deployment.getTriggerState(transitionRule));
  request.end();
}

const schedule = (transitionRule, monitorCount) => {
  assertSchedule(transitionRule, monitorCount);
  setTimeout(monitor, deployment.getMonitorDuration(transitionRule), transitionRule, monitorCount);
}

const assertSchedule = (transitionRule, monitorCount) => {
  if (monitorCount == deployment.getMonitorCount(transitionRule)) {
    throw new Error("Deployment state is not triggering the transition!");
  }
}

monitor(deployment.getStartRule(), 0); 

coordinator/package.json

{
  "name": "coordinator",
  "version": "1.0.0",
  "description": "This is a simple example of a coordinator application for Ververica Platform implemented in JavaScript.",
  "main": "function.js",
  "scripts": {
    "start": "node function.js"
  }
}

deployment/function.json

const options = (deploymentName, requestMethod) => {
  return {
    host: process.env.VVP_HOST,
    port: process.env.VVP_PORT,
    path: "/api/v1/namespaces/" + process.env.VVP_NAMESPACE + "/deployments/" + deploymentName,
    method: requestMethod,
    headers: { "Content-Type":"application/json" }
  }
}

const TRANSITION_RULE = {
  monitorCount: parseInt(process.env.VVP_MONITOR_COUNT),
  monitorDuration: process.env.VVP_MONITOR_DURATION,
  monitorDeployment: process.env.VVP_MONITOR_DEPLOYMENT,
  monitorState: process.env.VVP_MONITOR_STATE,
  triggerDeployment: process.env.VVP_TRIGGER_DEPLOYMENT,
  triggerState: process.env.VVP_TRIGGER_STATE
}

exports.shouldTransition = (deploymentManifest, transitionRule) => {
  return transitionRule.monitorState === JSON.parse(deploymentManifest).status.state;

}

exports.getMonitorCount = (transitionRule) => {
  return transitionRule.monitorCount;
}

exports.getMonitorDuration = (transitionRule) => {
  return transitionRule.monitorDuration;
}

exports.getMonitorOption = (transitionRule) => {
  return options(transitionRule.monitorDeployment, "GET");
}

exports.getTriggerOption = (transitionRule) => {
  return options(transitionRule.triggerDeployment, "PATCH");
}

exports.getTriggerState = (transitionRule) => {
  return JSON.stringify({ spec: { state:transitionRule.triggerState } });
}

exports.getStartRule = () => {
  returnTRANSITION_RULE;
}

deployment/package.json

{
  "name": "deployment",
  "version": "1.0.0",
  "description": "This is a simple example of a deployment application for Ververica Platform implemented in JavaScript.",
  "main": "function.js"
}

Airflow

Finally, we will bundle the DAG with the Airflow image and use that image to deploy it. You can check other ways to manage DAGs of Airflow in their documentation [2].

The steps are as follows:

First step

docker build \
  --tag <CONTAINER_REGISTRY>/airflow:1.0.0 \
  --file Dockerfile \
  .

where Dockerfile content is

FROM apache/airflow

COPY scheduler.py /opt/airflow/dags/scheduler.py

and the scheduler.py content is

from airflow import DAG
from airflow.decorators import task
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.operators.bash import BashOperator

from datetime import datetime

from kubernetes.client import models as k8s

with DAG(
    dag_id="greeting-request-scheduler",
    start_date=datetime(1970, 1, 1),
    schedule="0 * * * *",
    max_active_runs=1,
    catchup=False
) as dag:

    suspend_greeting_request_continuous_sink = KubernetesPodOperator(
        name="suspend-greeting-request-continuous-sink",
        namespace="airflow",
        image="<CONTAINER_REGISTRY>/scheduler:1.0.0",
        image_pull_policy="Always",
        env_vars=[
            k8s.V1EnvVar(name="VVP_HOST", value="vvp-ververica-platform.vvp"),
            k8s.V1EnvVar(name="VVP_PORT", value="80"),
            k8s.V1EnvVar(name="VVP_NAMESPACE", value="default"),
            k8s.V1EnvVar(name="VVP_MONITOR_COUNT", value="3"),
            k8s.V1EnvVar(name="VVP_MONITOR_DURATION", value="300000"),
            k8s.V1EnvVar(name="VVP_MONITOR_DEPLOYMENT", value="greeting-request-continuous-sink"),
            k8s.V1EnvVar(name="VVP_MONITOR_STATE", value="RUNNING"),
            k8s.V1EnvVar(name="VVP_TRIGGER_DEPLOYMENT", value="greeting-request-continuous-sink"),
            k8s.V1EnvVar(name="VVP_TRIGGER_STATE", value="SUSPENDED")
        ],
        task_id="suspend_greeting_request_continuous_sink"
    )

    run_greeting_request_sink = KubernetesPodOperator(
        name="run-greeting-request-sink",
        namespace="airflow",
        image="<CONTAINER_REGISTRY>/scheduler:1.0.0",
        image_pull_policy="Always",
        env_vars=[
            k8s.V1EnvVar(name="VVP_HOST", value="vvp-ververica-platform.vvp"),
            k8s.V1EnvVar(name="VVP_PORT", value="80"),
            k8s.V1EnvVar(name="VVP_NAMESPACE", value="default"),
            k8s.V1EnvVar(name="VVP_MONITOR_COUNT", value="3"),
            k8s.V1EnvVar(name="VVP_MONITOR_DURATION", value="300000"),
            k8s.V1EnvVar(name="VVP_MONITOR_DEPLOYMENT", value="greeting-request-continuous-sink"),
            k8s.V1EnvVar(name="VVP_MONITOR_STATE", value="SUSPENDED"),
            k8s.V1EnvVar(name="VVP_TRIGGER_DEPLOYMENT", value="greeting-request-sink"),
            k8s.V1EnvVar(name="VVP_TRIGGER_STATE", value="RUNNING")
        ],
        task_id="run_greeting_request_sink"
    )

    cancel_greeting_request_sink = KubernetesPodOperator(
        name="cancel-greeting-request-sink",
        namespace="airflow",
        image="<CONTAINER_REGISTRY>/scheduler:1.0.0",
        image_pull_policy="Always",
        env_vars=[
            k8s.V1EnvVar(name="VVP_HOST", value="vvp-ververica-platform.vvp"),
            k8s.V1EnvVar(name="VVP_PORT", value="80"),
            k8s.V1EnvVar(name="VVP_NAMESPACE", value="default"),
            k8s.V1EnvVar(name="VVP_MONITOR_COUNT", value="3"),
            k8s.V1EnvVar(name="VVP_MONITOR_DURATION", value="300000"),
            k8s.V1EnvVar(name="VVP_MONITOR_DEPLOYMENT", value="greeting-request-sink"),
            k8s.V1EnvVar(name="VVP_MONITOR_STATE", value="FINISHED"),
            k8s.V1EnvVar(name="VVP_TRIGGER_DEPLOYMENT", value="greeting-request-sink"),
            k8s.V1EnvVar(name="VVP_TRIGGER_STATE", value="CANCELLED")
        ],
        task_id="cancel_greeting_request_sink"
    )

    run_greeting_request_continuous_sink = KubernetesPodOperator(
        name="run-greeting-request-continuous-sink",
        namespace="airflow",
        image="<CONTAINER_REGISTRY>/scheduler:1.0.0",
        image_pull_policy="Always",
        env_vars=[
            k8s.V1EnvVar(name="VVP_HOST", value="vvp-ververica-platform.vvp"),
            k8s.V1EnvVar(name="VVP_PORT", value="80"),
            k8s.V1EnvVar(name="VVP_NAMESPACE", value="default"),
            k8s.V1EnvVar(name="VVP_MONITOR_COUNT", value="3"),
            k8s.V1EnvVar(name="VVP_MONITOR_DURATION", value="300000"),
            k8s.V1EnvVar(name="VVP_MONITOR_DEPLOYMENT", value="greeting-request-sink"),
            k8s.V1EnvVar(name="VVP_MONITOR_STATE", value="CANCELLED"),
            k8s.V1EnvVar(name="VVP_TRIGGER_DEPLOYMENT", value="greeting-request-continuous-sink"),
            k8s.V1EnvVar(name="VVP_TRIGGER_STATE", value="RUNNING")
        ],
        task_id="run_greeting_request_continuous_sink"
    )

suspend_greeting_request_continuous_sink >> run_greeting_request_sink >> cancel_greeting_request_sink >> run_greeting_request_continuous_sink

Second step

docker push <CONTAINER_REGISTRY>/airflow:1.0.0

Third step

helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm install airflow apache-airflow/airflow \
  --namespace airflow \
  --create-namespace \
  --set images.airflow.pullPolicy=Always \
  --set images.airflow.repository=<CONTAINER_REGISTRY>/airflow \
  --set images.airflow.tag=1.0.0

Finally you can port forward Airflow service and play with its user interface to start the DAG.

Related Information