How to Add Scheduling Capabilities to Ververica Platform: Sidecar Container

Question

I have two streaming applications. One of them (unbounded-source-blackhole-sink) has an unbounded source which lets the application run continuously and never finish. The other one (bounded-source-blackhole-sink) has a bounded source and comes to an end after processing all the input. I would like to automate the coordination of these two deployments so that every X hours:

  1. Stop the unbounded-source-blackhole-sink
  2. Start the bounded-source-blackhole-sink
  3. Wait till the bounded-source-blackhole-sink is finished
  4. Start the unbounded-source-blackhole-sink without loosing state

Answer

Note: This section applies to Flink 1.15 with Ververica Platform 2.8.

To showcase how can this feature be implemented as an addition to Ververica Platform, we will create a NodeJS application which continuously monitors the Ververica Platform. This application will be deployed as a side car to the current Ververica Platform containers. Initially, our directory structure for the application will look like the following:

 scheduler
 |
 |_ _ coordinator
 |   |
 |   |_ _ function.json
 |   |
 |   |_ _ package.json
 |
 |_ _ deployment
 |   |
 |   |_ _ function.json
 |   |
 |   |_ _ package.json
 |
 |_ _ Dockerfile

Deployment

This is the module for setting up the transition rules as well as the business logic for interpreting the transition rules.

function.js

const STATE_RUNNING = "RUNNING";
const STATE_FINISHED = "FINISHED";
const STATE_CANCELLED = "CANCELLED";
const STATE_SUSPENDED = "SUSPENDED";

const options = (deploymentName, requestMethod) => {
    return {
        host:"localhost",
        port:"8080",
        path:"/api/v1/namespaces/default/deployments/" + deploymentName,
        method:requestMethod,
        headers: { "Content-Type":"application/json" }
    }
}

const TRANSITION_RULES = {
    "RULE_FIRST": {
        nextRule:"RULE_SECOND",
        monitorDuration:5 * 60 * 1000,
        monitorDeployment:"unbounded-source-blackhole-sink",
        monitorState:STATE_RUNNING,
        triggerDeployment:"unbounded-source-blackhole-sink",
        triggerState:STATE_SUSPENDED
    },
    "RULE_SECOND": {
        nextRule:"RULE_THIRD",
        monitorDuration:5 * 60 * 1000,
        monitorDeployment:"unbounded-source-blackhole-sink",
        monitorState:STATE_SUSPENDED,
        triggerDeployment:"bounded-source-blackhole-sink",
        triggerState:STATE_RUNNING
    },
    "RULE_THIRD": {
        nextRule:"RULE_FOURTH",
        monitorDuration:5 * 60 * 1000,
        monitorDeployment:"bounded-source-blackhole-sink",
        monitorState:STATE_FINISHED,
        triggerDeployment:"bounded-source-blackhole-sink",
        triggerState:STATE_CANCELLED
    },
    "RULE_FOURTH": {
        nextRule:"RULE_FIRST",
        monitorDuration:5 * 60 * 1000,
        monitorDeployment:"deduplication-request-sink",
        monitorState:STATE_CANCELLED,
        triggerDeployment:"unbounded-source-blackhole-sink",
        triggerState:STATE_RUNNING
    }
}

exports.shouldTransition = (deploymentManifest, transitionRule) => {
    return transitionRule.monitorState === JSON.parse(deploymentManifest).status.state;
}

exports.getMonitorDuration = (transitionRule) => {
    return transitionRule.monitorDuration;
}

exports.getMonitorOption = (transitionRule) => {
    return options(transitionRule.monitorDeployment, "GET");
}

exports.getTriggerOption = (transitionRule) => {
    return options(transitionRule.triggerDeployment, "PATCH");
}

exports.getTriggerState = (transitionRule) => {
    return JSON.stringify({ spec: { state:transitionRule.triggerState } });
}

exports.getNextRule = (transitionRule) => {
    return TRANSITION_RULES[transitionRule.nextRule]
}

exports.getStartRule = () => {
    return TRANSITION_RULES["RULE_FIRST"];
}

package.json

{
    "name": "deployment",
    "version": "1.0.0",
    "main": "function.js"
}

Coordinator

This is the application that coordinates the monitoring and transitioning functionalities by using the simple http module of NodeJS.

function.js

const http = require("http");

const deployment = require("../deployment");

let deploymentManifest = "";

const schedule = (transitionRule) => {
    setTimeout(
        monitor, 
        deployment.getMonitorDuration(transitionRule), 
        transitionRule);
}

const monitor = (transitionRule) => {
    const request = http.request(
        deployment.getMonitorOption(transitionRule), 
        callback(transitionRule));
    request.end();
}

const transition = (transitionRule) => {
    const request = http.request(
        deployment.getTriggerOption(transitionRule));
    request.write(deployment.getTriggerState(transitionRule));
    request.end();
}

const callback = (transitionRule) => (response) => {
    deploymentManifest = "";
    response.on("data", onData);
    response.on("end", onEnd(transitionRule))
}

const onEnd = (transitionRule) => () => {
    if (deployment.shouldTransition(deploymentManifest, transitionRule)) {
        transition(transitionRule);
        schedule(deployment.getNextRule(transitionRule));
    } else {
        schedule(transitionRule);
    }
}

const onData = (deploymentManifestChunk) => {
    deploymentManifest += deploymentManifestChunk;
}

schedule(deployment.getStartRule());

package.json

{
    "name": "coordinator",
    "version": "1.0.0",
    "main": "function.js",
    "scripts": {
        "start": "node function.js"
    }
}

Dockerfile

This is the Dockerfile we use to build our image.

FROM node:16

WORKDIR /usr/src/app

COPY deployment deployment
COPY coordinator coordinator

WORKDIR coordinator

EXPOSE 8000
CMD ["npm", "start"]

One can build the image containing the scheduler application by running the following command, after replacing <CONTAINER_REGISTRY> and <IMAGE_TAG> with appropriate registry address and your desired image tag.

docker build --tag <CONTAINER_REGISTRY>/scheduler:<IMAGE_TAG> .

Ververica Platform

Once we have the docker image, we can add this to the deployment manifest of the Ververica Platform's list of containers, after we template the helm charts[2] for deploying.

....
containers:
    - name: scheduler
      image: <CONTAINER_REGISTRY>/scheduler:<IMAGE_TAG>
    - name: appmanager
      image: registry.ververica.com/v2.8/vvp-appmanager:2.8.0
....

Related Information