I have two streaming applications. One of them (unbounded-source-blackhole-sink) has an unbounded source which lets the application run continuously and never finish. The other one (bounded-source-blackhole-sink) has a bounded source and comes to an end after processing all the input. I would like to automate the coordination of these two deployments so that every X hours:
Note: This section applies to Flink 1.15 with Ververica Platform 2.8.
Previously, in How to Add Scheduling Capabilities to Ververica Platform: Sidecar Container, we have tackled the exact same problem by developing a Node JS application and deployed it as a side car container alongside Ververica Platform. In that article, all the logic to monitor, schedule and trigger the state transitions of Apache Flink deployments for all the steps mentioned above (1-4) were part of a single application. However, this approach is quite naive in the sense that it does not have advanced scheduling capabilities such that having the state for different runs and lacking the templating options as the transition rules were hard coded.
Apache Airflow is a platform for programmatically authoring, scheduling and monitoring workflows that are designed to be finite batch workflows. It was not built for infinitely running event based workflows and it is not a streaming solution [1]. Therefore it makes a perfect match to combine Ververica Platform forUnified Stream and Batch Processing Workflows with Apache Airflow for the authoring, scheduling and monitoring those workflows.
In this article, we will take the same Node JS application and evolve it so that the logic of the application is only responsible for executing only one step of the transitioning logic we explained above (1-4). Furthermore, we will get the parameters for the transitioning logic from the environment variables so that we can re-use the application for different strategies. The next step is then to use Apache Airflow and create a DAG that defines a separate task for each of the steps (1-4) that depends on each other. For each task, we will use Kubernetes Pod Operator of Apache Airflow that runs our Node JS application.
Note: This setup uses Ververica Platform Namespace named default that comes with the installation. It also assumes that port forwarding is done so that Ververica Platform is accessible via localhost.
Here we will run the following scripts to create the example Apache Flink deployments:
Note: This setup assumes that there is a Kubernetes Namespace named vvp-jobs is created and Ververica Platform have enough permission to manage Apache Flink deployments on this namespace.
Run the following to register the vvp-jobs deployment target
curl -X POST \
"http://localhost:8080/api/v1/namespaces/default/deployment-targets" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
--data @vvp-deployment-target-vvp-jobs.json
where vvp-deployment-target-vvp-jobs.json is as follows
{
"metadata": {
"name": "vvp-jobs"
},
"spec": {
"kubernetes": {
"namespace": "vvp-jobs"
}
}
}
Run the following to register the unbounded_source table to the catalog
curl -X POST \
"http://localhost:8080/sql/v1beta1/namespaces/default/sqlscripts:execute?catalog=vvp&database=default" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
--data @vvp-catalog-unbounded-source.json
where vvp-catalog-unbounded-source.json is as follows
{
"statement": "CREATE TABLE vvp.default.unbounded_source (name STRING) WITH ('connector' = 'datagen', 'rows-per-second' = '10');"
}
Run the following to register the bounded_source table to the catalog
curl -X POST \
"http://localhost:8080/sql/v1beta1/namespaces/default/sqlscripts:execute?catalog=vvp&database=default" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
--data @vvp-catalog-bounded-source.json
where vvp-catalog-bounded-source.json is as follows
{
"statement": "CREATE TABLE vvp.default.bounded_source (name STRING) WITH ('connector' = 'datagen', 'number-of-rows' = '10');"
}
Run the following to register the blackhole_sink table to the catalog
curl -X POST \
"http://localhost:8080/sql/v1beta1/namespaces/default/sqlscripts:execute?catalog=vvp&database=default" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
--data @vvp-catalog-blackhole-sink.json
where vvp-catalog-blackhole-sink.json is as follows
{
"statement": "CREATE TABLE vvp.default.blackhole_sink (name STRING) WITH ('connector' = 'blackhole');"
}
Run the following to create unbounded-source-blackhole-sink deployment
curl -X POST \
"http://localhost:8080/api/v1/namespaces/default/deployments" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
--data @vvp-deployment-unbounded-source-blackhole-sink.json
where vvp-deployment-unbounded-source-blackhole-sink.json is as follows
{
"apiVersion": "v1",
"kind": "Deployment",
"metadata": {
"name": "unbounded-source-blackhole-sink"
},
"spec": {
"deploymentTargetName": "vvp-jobs",
"state": "RUNNING",
"template": {
"spec": {
"artifact": {
"kind": "SQLSCRIPT",
"sqlScript": "INSERT INTO blackhole_sink SELECT * FROM unbounded_source;"
}
}
}
}
}
Run the following to create bounded-source-blackhole-sink deployment
curl -X POST \
"http://localhost:8080/api/v1/namespaces/default/deployments" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
--data @vvp-deployment-bounded-source-blackhole-sink.json
where vvp-deployment-bounded-source-blackhole-sink.json is as follows
{
"apiVersion": "v1",
"kind": "Deployment",
"metadata": {
"name": "bounded-source-blackhole-sink"
},
"spec": {
"deploymentTargetName": "vvp-jobs",
"state": "CANCELLED",
"template": {
"spec": {
"artifact": {
"kind": "SQLSCRIPT",
"sqlScript": "INSERT INTO blackhole_sink SELECT * FROM bounded_source;"
}
}
}
}
}
For the scheduler application we will run the following commands to build and publish our Docker image to our registry. Initially, our directory structure for the application will look like the following:
scheduler
|
|_ _ coordinator
| |
| |_ _ function.json
| |
| |_ _ package.json
|
|_ _ deployment
| |
| |_ _ function.json
| |
| |_ _ package.json
|
|_ _ Dockerfile
where we execute the commands under the scheduler directory.
First step
docker build \
--tag <CONTAINER_REGISTRY>/scheduler:1.0.0 \
--file Dockerfile \
.
Second step
docker push <CONTAINER_REGISTRY>/scheduler:1.0.0
with the content of the files are as follow:
Dickefile
FROM node:16
WORKDIR /usr/src/app
COPY deployment deployment
COPY coordinator coordinator
WORKDIR coordinator
EXPOSE 8000
CMD ["npm", "start"]
coordinator/function.json
const http = require("http");
const deployment = require("../deployment");
let deploymentManifest = "";
constmonitor = (transitionRule, monitorCount) => {
const request = http.request(deployment.getMonitorOption(transitionRule), callback(transitionRule, monitorCount));
request.end();
}
const callback = (transitionRule, monitorCount) => (response) => {
deploymentManifest = "";
response.on("data", onData);
response.on("end", onEnd(transitionRule, monitorCount))
}
const onData = (deploymentManifestChunk) => {
deploymentManifest += deploymentManifestChunk;
}
const onEnd = (transitionRule, monitorCount) => () => {
if (deployment.shouldTransition(deploymentManifest, transitionRule)) {
transition(transitionRule);
} else {
schedule(transitionRule, monitorCount + 1);
}
}
const transition = (transitionRule) => {
const request = http.request(deployment.getTriggerOption(transitionRule));
request.write(deployment.getTriggerState(transitionRule));
request.end();
}
const schedule = (transitionRule, monitorCount) => {
assertSchedule(transitionRule, monitorCount);
setTimeout(monitor, deployment.getMonitorDuration(transitionRule), transitionRule, monitorCount);
}
const assertSchedule = (transitionRule, monitorCount) => {
if (monitorCount == deployment.getMonitorCount(transitionRule)) {
throw new Error("Deployment state is not triggering the transition!");
}
}
monitor(deployment.getStartRule(), 0);
coordinator/package.json
{
"name": "coordinator",
"version": "1.0.0",
"description": "This is a simple example of a coordinator application for Ververica Platform implemented in JavaScript.",
"main": "function.js",
"scripts": {
"start": "node function.js"
}
}
deployment/function.json
const options = (deploymentName, requestMethod) => {
return {
host: process.env.VVP_HOST,
port: process.env.VVP_PORT,
path: "/api/v1/namespaces/" + process.env.VVP_NAMESPACE + "/deployments/" + deploymentName,
method: requestMethod,
headers: { "Content-Type":"application/json" }
}
}
const TRANSITION_RULE = {
monitorCount: parseInt(process.env.VVP_MONITOR_COUNT),
monitorDuration: process.env.VVP_MONITOR_DURATION,
monitorDeployment: process.env.VVP_MONITOR_DEPLOYMENT,
monitorState: process.env.VVP_MONITOR_STATE,
triggerDeployment: process.env.VVP_TRIGGER_DEPLOYMENT,
triggerState: process.env.VVP_TRIGGER_STATE
}
exports.shouldTransition = (deploymentManifest, transitionRule) => {
return transitionRule.monitorState === JSON.parse(deploymentManifest).status.state;
}
exports.getMonitorCount = (transitionRule) => {
return transitionRule.monitorCount;
}
exports.getMonitorDuration = (transitionRule) => {
return transitionRule.monitorDuration;
}
exports.getMonitorOption = (transitionRule) => {
return options(transitionRule.monitorDeployment, "GET");
}
exports.getTriggerOption = (transitionRule) => {
return options(transitionRule.triggerDeployment, "PATCH");
}
exports.getTriggerState = (transitionRule) => {
return JSON.stringify({ spec: { state:transitionRule.triggerState } });
}
exports.getStartRule = () => {
returnTRANSITION_RULE;
}
deployment/package.json
{
"name": "deployment",
"version": "1.0.0",
"description": "This is a simple example of a deployment application for Ververica Platform implemented in JavaScript.",
"main": "function.js"
}
Finally, we will bundle the DAG with the Airflow image and use that image to deploy it. You can check other ways to manage DAGs of Airflow in their documentation [2].
The steps are as follows:
docker build \
--tag <CONTAINER_REGISTRY>/airflow:1.0.0 \
--file Dockerfile \
.
where Dockerfile content is
FROM apache/airflow
COPY scheduler.py /opt/airflow/dags/scheduler.py
and the scheduler.py content is
from airflow import DAG
from airflow.decorators import task
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
from kubernetes.client import models as k8s
with DAG(
dag_id="greeting-request-scheduler",
start_date=datetime(1970, 1, 1),
schedule="0 * * * *",
max_active_runs=1,
catchup=False
) as dag:
suspend_greeting_request_continuous_sink = KubernetesPodOperator(
name="suspend-greeting-request-continuous-sink",
namespace="airflow",
image="<CONTAINER_REGISTRY>/scheduler:1.0.0",
image_pull_policy="Always",
env_vars=[
k8s.V1EnvVar(name="VVP_HOST", value="vvp-ververica-platform.vvp"),
k8s.V1EnvVar(name="VVP_PORT", value="80"),
k8s.V1EnvVar(name="VVP_NAMESPACE", value="default"),
k8s.V1EnvVar(name="VVP_MONITOR_COUNT", value="3"),
k8s.V1EnvVar(name="VVP_MONITOR_DURATION", value="300000"),
k8s.V1EnvVar(name="VVP_MONITOR_DEPLOYMENT", value="greeting-request-continuous-sink"),
k8s.V1EnvVar(name="VVP_MONITOR_STATE", value="RUNNING"),
k8s.V1EnvVar(name="VVP_TRIGGER_DEPLOYMENT", value="greeting-request-continuous-sink"),
k8s.V1EnvVar(name="VVP_TRIGGER_STATE", value="SUSPENDED")
],
task_id="suspend_greeting_request_continuous_sink"
)
run_greeting_request_sink = KubernetesPodOperator(
name="run-greeting-request-sink",
namespace="airflow",
image="<CONTAINER_REGISTRY>/scheduler:1.0.0",
image_pull_policy="Always",
env_vars=[
k8s.V1EnvVar(name="VVP_HOST", value="vvp-ververica-platform.vvp"),
k8s.V1EnvVar(name="VVP_PORT", value="80"),
k8s.V1EnvVar(name="VVP_NAMESPACE", value="default"),
k8s.V1EnvVar(name="VVP_MONITOR_COUNT", value="3"),
k8s.V1EnvVar(name="VVP_MONITOR_DURATION", value="300000"),
k8s.V1EnvVar(name="VVP_MONITOR_DEPLOYMENT", value="greeting-request-continuous-sink"),
k8s.V1EnvVar(name="VVP_MONITOR_STATE", value="SUSPENDED"),
k8s.V1EnvVar(name="VVP_TRIGGER_DEPLOYMENT", value="greeting-request-sink"),
k8s.V1EnvVar(name="VVP_TRIGGER_STATE", value="RUNNING")
],
task_id="run_greeting_request_sink"
)
cancel_greeting_request_sink = KubernetesPodOperator(
name="cancel-greeting-request-sink",
namespace="airflow",
image="<CONTAINER_REGISTRY>/scheduler:1.0.0",
image_pull_policy="Always",
env_vars=[
k8s.V1EnvVar(name="VVP_HOST", value="vvp-ververica-platform.vvp"),
k8s.V1EnvVar(name="VVP_PORT", value="80"),
k8s.V1EnvVar(name="VVP_NAMESPACE", value="default"),
k8s.V1EnvVar(name="VVP_MONITOR_COUNT", value="3"),
k8s.V1EnvVar(name="VVP_MONITOR_DURATION", value="300000"),
k8s.V1EnvVar(name="VVP_MONITOR_DEPLOYMENT", value="greeting-request-sink"),
k8s.V1EnvVar(name="VVP_MONITOR_STATE", value="FINISHED"),
k8s.V1EnvVar(name="VVP_TRIGGER_DEPLOYMENT", value="greeting-request-sink"),
k8s.V1EnvVar(name="VVP_TRIGGER_STATE", value="CANCELLED")
],
task_id="cancel_greeting_request_sink"
)
run_greeting_request_continuous_sink = KubernetesPodOperator(
name="run-greeting-request-continuous-sink",
namespace="airflow",
image="<CONTAINER_REGISTRY>/scheduler:1.0.0",
image_pull_policy="Always",
env_vars=[
k8s.V1EnvVar(name="VVP_HOST", value="vvp-ververica-platform.vvp"),
k8s.V1EnvVar(name="VVP_PORT", value="80"),
k8s.V1EnvVar(name="VVP_NAMESPACE", value="default"),
k8s.V1EnvVar(name="VVP_MONITOR_COUNT", value="3"),
k8s.V1EnvVar(name="VVP_MONITOR_DURATION", value="300000"),
k8s.V1EnvVar(name="VVP_MONITOR_DEPLOYMENT", value="greeting-request-sink"),
k8s.V1EnvVar(name="VVP_MONITOR_STATE", value="CANCELLED"),
k8s.V1EnvVar(name="VVP_TRIGGER_DEPLOYMENT", value="greeting-request-continuous-sink"),
k8s.V1EnvVar(name="VVP_TRIGGER_STATE", value="RUNNING")
],
task_id="run_greeting_request_continuous_sink"
)
suspend_greeting_request_continuous_sink >> run_greeting_request_sink >> cancel_greeting_request_sink >> run_greeting_request_continuous_sink
docker push <CONTAINER_REGISTRY>/airflow:1.0.0
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm install airflow apache-airflow/airflow \
--namespace airflow \
--create-namespace \
--set images.airflow.pullPolicy=Always \
--set images.airflow.repository=<CONTAINER_REGISTRY>/airflow \
--set images.airflow.tag=1.0.0
Finally you can port forward Airflow service and play with its user interface to start the DAG.