How to Deploy Stateful Functions Application in Ververica Platform: Application Modules

Question

How can I develop an Apache Flink application with Stateful Functions API using remote functions and deploy it to Ververica Platform with Application Modules?

Answer

Note: This section applies to Flink 1.14 with Ververica Platform 2.6.

There are several ways to manage deployment of the Stateful Function applications, such as using the DataStream Integration. In this article, we will go through a different method, using the Application Modules, for deploying a Stateful Function Application that utilises remote functions to generate a simple Greeter application in Ververica Platform.

1) Application

The application logic is a basic Greeter application that consumes events from an Apache Kafka topic and just echoes it back to another topic. Here, we are using module.yaml file to let Apache Flink runtime to discover the ingress, stateful functions and egress information, as well as the initial routing of the messages from the ingress.

module.yaml

kind: io.statefun.endpoints.v2/http
spec:
    functions: com.ververica.function.remote/greeter
    urlPathTemplate: http://greeter:8000
---
kind: io.statefun.kafka.v1/ingress
spec:
    id: com.ververica.ingress/greeter-request
    address: kafka:9092
    consumerGroupId: greeter
    startupPosition:
        type: latest
    topics:
        - topic: greeter-request
          valueType: com.ververica.type/greeter-request
          targets:
              - com.ververica.function.remote/greeter
---
kind: io.statefun.kafka.v1/egress
spec:
    id: com.ververica.egress/greeter-response
    address: kafka:9092

2) Stateful Functions

The next step is to have the Stateful Function implementation and have it deployed and accessible. For the specific use case, we will use Stateful Function API's JavaScript SDK to implement the logic and we will deploy it to the kubernetes after we build the docker image.

greeter-function.js

const http = require("http");
const {StateFun, kafkaEgressMessage} = require("apache-flink-statefun");
const GreeterRequest = StateFun.jsonType("com.ververica.type/greeter-request");

let statefun = new StateFun();

statefun.bind({
    typename: "com.ververica.function.remote/greeter",
    fn(context, message) {
        let greeterRequest = message.as(GreeterRequest);
        context.send(
            kafkaEgressMessage({
                typename: "com.ververica.egress/greeter-response",
                topic: "greeter-response",
                key: greeterRequest.name,
                value: JSON.stringify(greeterRequest)
            })
        );
    }
});

http.createServer(statefun.handler()).listen(8000);

package.json

{
    "name": "greeter",
    "version": "3.2.0",
    "description": "This is a simple example of a stateful functions application implemented in JavaScript.",
    "main": "functions.js",
    "scripts": {
        "start": "node functions.js"
    },
    "dependencies": {
        "apache-flink-statefun": "3.2.0"
    }
}

Dockerfile

FROM node:16

WORKDIR /usr/src/app

COPY package.json ./
RUN npm install apache-flink-statefun@3.2.0

COPY greeter-function.js ./functions.js

EXPOSE 8000
CMD ["npm", "start"]

deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
    name: greeter
spec:
    replicas: 1
    selector:
        matchLabels:
            app: function
            component: greeter
    template:
        metadata:
            labels:
                app: function
                component: greeter
        spec:
            containers:
                - name: greeter
                  image: greeter:1.0.0
                  ports:
                      - name: endpoint
                        containerPort: 8000
                  livenessProbe:
                      tcpSocket:
                          port: 8000
                      initialDelaySeconds: 30
                      periodSeconds: 60

service.yaml

apiVersion: v1
kind: Service
metadata:
    name: greeter
spec:
    type: ClusterIP
    ports:
        - name: endpoint
          port: 8000
    selector:
        app: function
        component: greeter

3) Deployment

The last step is to deploy the Stateful Functions Application to Ververica Platform. The first thing to consider is what Apache Flink image to use for creating the deployment. Since Ververica does not have official images for Stateful Functions, we will build our own Docker image with the necessary dependencies added to the image.

Warning: The internal structure of the official images is not considered a public, stable API and may change in the future.

Warning: For custom images, we cannot guarantee compatibility between all possible base images, custom dependencies and Ververica Platform components. Therefore, we are only able to support custom images on a best-effort basis. Application Manager and Apache Flink themselves, as well as their integration with Kubernetes, remain supported.

Since we now will use a different entry point to run a Stateful Functions Application, we will also change the cluster entrypoint. Finally, we will also bundle the module.yaml file to be included in the Docker image so that we will not need to create a separate configmap and attach it to our deployment. When creating the deployment on Ververica Platform, choose the Apache Flink image to the newly built image, while setting the jarUri to 'file:///flink/lib/statefun-flink-distribution-3.2.0.jar'.

Dockerfile

FROM alpine/curl:3.14 AS dependency-downloader

RUN curl -o /tmp/statefun-flink-core.jar https://repo1.maven.org/maven2/org/apache/flink/statefun-flink-core/3.2.0/statefun-flink-core-3.2.0.jar
RUN curl -o /tmp/statefun-flink-distribution-3.2.0.jar https://repo1.maven.org/maven2/org/apache/flink/statefun-flink-distribution/3.2.0/statefun-flink-distribution-3.2.0.jar

#----

FROM registry.ververica.com/v2.6/flink:1.14.4-stream1-scala_2.12-java8

COPY --from=dependency-downloader /tmp/statefun-flink-core.jar /opt/flink/lib/statefun-flink-core.jar
COPY --from=dependency-downloader /tmp/statefun-flink-distribution-3.2.0.jar /flink/lib/statefun-flink-distribution-3.2.0.jar

RUN sed -i "s/org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint/org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint/" /flink/bin/flink-console.sh

COPY module.yaml /opt/statefun/modules/greeter/module.yaml

Artifact Configuration

artifact:
    flinkImageRegistry: flink
        flinkImageRepository: greeter
        flinkImageTag: 1.0.0
        flinkVersion: '1.14'
        jarUri: 'file:///flink/lib/statefun-flink-distribution-3.2.0.jar'
        kind: JAR

Related Information