Skip to content
  • There are no suggestions because the search field is empty.

How to Add Scheduling Capabilities to Ververica Platform: Sidecar Container

I have two streaming applications. One of them (unbounded-source-blackhole-sink) has an unbounded source which lets the application run continuously and never finish. The other one (bounded-source-blackhole-sink) has a bounded source and comes to an end after processing all the input. I would like to automate the coordination of these two deployments so that every X hours:

  1. Stop the unbounded-source-blackhole-sink
  2. Start the bounded-source-blackhole-sink
  3. Wait till the bounded-source-blackhole-sink is finished
  4. Start the unbounded-source-blackhole-sink without loosing state

Answer

Note: This section applies to Flink 1.15 with Ververica Platform 2.8.

To showcase how can this feature be implemented as an addition to Ververica Platform, we will create a NodeJS application which continuously monitors the Ververica Platform. This application will be deployed as a side car to the current Ververica Platform containers. Initially, our directory structure for the application will look like the following:

 scheduler
|
|_ _ coordinator
| |
| |_ _ function.json
| |
| |_ _ package.json
|
|_ _ deployment
| |
| |_ _ function.json
| |
| |_ _ package.json
|
|_ _ Dockerfile

Deployment

This is the module for setting up the transition rules as well as the business logic for interpreting the transition rules.

function.js

const STATE_RUNNING = "RUNNING";
const STATE_FINISHED = "FINISHED";
const STATE_CANCELLED = "CANCELLED";
const STATE_SUSPENDED = "SUSPENDED";

const options = (deploymentName, requestMethod) => {
return {
host:"localhost",
port:"8080",
path:"/api/v1/namespaces/default/deployments/" + deploymentName,
method:requestMethod,
headers: { "Content-Type":"application/json" }
}
}

const TRANSITION_RULES = {
"RULE_FIRST": {
nextRule:"RULE_SECOND",
monitorDuration:5 * 60 * 1000,
monitorDeployment:"unbounded-source-blackhole-sink",
monitorState:STATE_RUNNING,
triggerDeployment:"unbounded-source-blackhole-sink",
triggerState:STATE_SUSPENDED
},
"RULE_SECOND": {
nextRule:"RULE_THIRD",
monitorDuration:5 * 60 * 1000,
monitorDeployment:"unbounded-source-blackhole-sink",
monitorState:STATE_SUSPENDED,
triggerDeployment:"bounded-source-blackhole-sink",
triggerState:STATE_RUNNING
},
"RULE_THIRD": {
nextRule:"RULE_FOURTH",
monitorDuration:5 * 60 * 1000,
monitorDeployment:"bounded-source-blackhole-sink",
monitorState:STATE_FINISHED,
triggerDeployment:"bounded-source-blackhole-sink",
triggerState:STATE_CANCELLED
},
"RULE_FOURTH": {
nextRule:"RULE_FIRST",
monitorDuration:5 * 60 * 1000,
monitorDeployment:"deduplication-request-sink",
monitorState:STATE_CANCELLED,
triggerDeployment:"unbounded-source-blackhole-sink",
triggerState:STATE_RUNNING
}
}

exports.shouldTransition = (deploymentManifest, transitionRule) => {
return transitionRule.monitorState === JSON.parse(deploymentManifest).status.state;
}

exports.getMonitorDuration = (transitionRule) => {
return transitionRule.monitorDuration;
}

exports.getMonitorOption = (transitionRule) => {
return options(transitionRule.monitorDeployment, "GET");
}

exports.getTriggerOption = (transitionRule) => {
return options(transitionRule.triggerDeployment, "PATCH");
}

exports.getTriggerState = (transitionRule) => {
return JSON.stringify({ spec: { state:transitionRule.triggerState } });
}

exports.getNextRule = (transitionRule) => {
return TRANSITION_RULES[transitionRule.nextRule]
}

exports.getStartRule = () => {
return TRANSITION_RULES["RULE_FIRST"];
}

package.json

{
"name": "deployment",
"version": "1.0.0",
"main": "function.js"
}

Coordinator

This is the application that coordinates the monitoring and transitioning functionalities by using the simple http module of NodeJS.

function.js

const http = require("http");

const deployment = require("../deployment");

let deploymentManifest = "";

const schedule = (transitionRule) => {
setTimeout(
monitor,
deployment.getMonitorDuration(transitionRule),
transitionRule);
}

const monitor = (transitionRule) => {
const request = http.request(
deployment.getMonitorOption(transitionRule),
callback(transitionRule));
request.end();
}

const transition = (transitionRule) => {
const request = http.request(
deployment.getTriggerOption(transitionRule));
request.write(deployment.getTriggerState(transitionRule));
request.end();
}

const callback = (transitionRule) => (response) => {
deploymentManifest = "";
response.on("data", onData);
response.on("end", onEnd(transitionRule))
}

const onEnd = (transitionRule) => () => {
if (deployment.shouldTransition(deploymentManifest, transitionRule)) {
transition(transitionRule);
schedule(deployment.getNextRule(transitionRule));
} else {
schedule(transitionRule);
}
}

const onData = (deploymentManifestChunk) => {
deploymentManifest += deploymentManifestChunk;
}

schedule(deployment.getStartRule());

package.json

{
"name": "coordinator",
"version": "1.0.0",
"main": "function.js",
"scripts": {
"start": "node function.js"
}
}

Dockerfile

This is the Dockerfile we use to build our image.

FROM node:16

WORKDIR /usr/src/app

COPY deployment deployment
COPY coordinator coordinator

WORKDIR coordinator

EXPOSE 8000
CMD ["npm", "start"]

One can build the image containing the scheduler application by running the following command, after replacing <CONTAINER_REGISTRY> and <IMAGE_TAG> with appropriate registry address and your desired image tag.

docker build --tag <CONTAINER_REGISTRY>/scheduler:<IMAGE_TAG> .

Ververica Platform

Once we have the docker image, we can add this to the deployment manifest of the Ververica Platform's list of containers, after we template the helm charts[2] for deploying.

....
containers:
- name: scheduler
image: <CONTAINER_REGISTRY>/scheduler:<IMAGE_TAG>
- name: appmanager
image: registry.ververica.com/v2.8/vvp-appmanager:2.8.0
....

Related Information