How to Deploy Stateful Functions Application in Ververica Platform: DataStream Integration

Question

How can I develop an Apache Flink application with Stateful Functions API using both embedded & remote functions and deploy it to Ververica Platform?

Answer

Note: This section applies to Flink 1.14 and later, with Ververica Platform 2.6 and later.

There are several ways to manage deployment of the Stateful Function applications, such as using the Application Modules. In this article, we will go through, using a different method - DataStream Integration, a step by step guide on how to mix & match different API's of Apache Flink so that we have an end-to-end pipeline deployed in Ververica Platform that utilises both embedded and remote functions to generate a simple Greeter application.

1) Application

The overall pipeline is responsible of consuming events from an Apache Kafka topic, converting it into a Greeting, passing it alongside a remote and an embedded Greeter Stateful Function implementations, written in JavaScript and Java respectively, and finally print the result to the standard output. For each stage of the pipeline, a stamp for the current Greeter -INGRESS, REMOTE, EMBEDDED, EGRESS- is added to the passed message's greeter list.

The consumption of events from the topic is handled using Table API, then converted into a DataStream and finally passed to Stateful Functions using Stateful Functions' DataStream API integration. Therefore Identifiers, Types and the Egress for the Stateful Functions are defined. Moreover, the data class to hold the necessary information as well as the functionalities are defined in the following class.

Identifiers.java

import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;

public class Identifiers {
    public static final String TYPE_GREETING = "com.ververica.type/greeting";
    public static final FunctionType FUNCTION_EMBEDDED = new FunctionType("com.ververica.function.embedded", "greeter");
    public static final FunctionType FUNCTION_REMOTE = new FunctionType("com.ververica.function.remote", "greeter");
    public static final EgressIdentifier<Greeting> EGRESS_DATASTREAM =
            new EgressIdentifier<>("com.ververica.egress.datastream", "greeter", Greeting.class);
}

Greeting.java

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;

import java.nio.charset.StandardCharsets;
import java.util.List;

public class Greeting {
    public String name;
    public List<String> greeters;

    public Greeting() {}

    public Greeting(String name, List<String> greeters) {
        this.name = name;
        this.greeters = greeters;
    }

    public static Greeting withGreeter(Greeting greeting, String greeter) {
        greeting.greeters.add(greeter);
        return greeting;
    }

    public static String toString(Greeting greeting) throws JsonProcessingException {
        return new ObjectMapper().writeValueAsString(greeting);
    }

    public static Greeting fromString(String value) throws JsonProcessingException {
        return new ObjectMapper().readValue(value, Greeting.class);
    }

    public static ByteString toValue(Greeting greeting) throws JsonProcessingException {
        return ByteString.copyFrom(toString(greeting), StandardCharsets.UTF_8);
    }

    public static Greeting fromValue(ByteString value) throws JsonProcessingException {
        return fromString(value.toString(StandardCharsets.UTF_8));
    }
}

2) Ingress

For consuming the events, we will use the following code to read from a topic and convert it into a Table using the Table API and put the initial stamp into the Greeting event.

final Schema schema = Schema.newBuilder()
              .column("name", DataTypes.STRING())
              .build();

final FormatDescriptor formatDescriptor = FormatDescriptor.forFormat("json")
              .option("fail-on-missing-field", "false")
              .option("ignore-parse-errors", "true")
              .build();

final TableDescriptor tableDescriptor = TableDescriptor.forConnector("kafka")
              .schema(schema)
              .format(formatDescriptor)
              .option(KafkaConnectorOptions.KEY_FIELDS, Collections.singletonList("name"))
              .option(KafkaConnectorOptions.KEY_FORMAT, "json")
              .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, "kafka:9092")
              .option(KafkaConnectorOptions.PROPS_GROUP_ID, "greeter-request")
              .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.ScanStartupMode.LATEST_OFFSET)
              .option(KafkaConnectorOptions.TOPIC, Collections.singletonList("greeter-request"))
              .build();

final Table table = streamTableEnvironment.from(tableDescriptor).select($("name"));

final DataStream<Greeting> greetings = streamTableEnvironment.toDataStream(table)
              .map(row -> new Greeting(row.getFieldAs("name"), Collections.singletonList("INGRESS")));

3) Stateful Functions

We first need to define our Greeter implementations and then setup the routing of the messages in between them.

greeter-function.js

const http = require("http");

const {messageBuilder, StateFun} = require("apache-flink-statefun");

const Greeting = StateFun.jsonType("com.ververica.type/greeting");

let statefun = new StateFun();

statefun.bind({
    typename:"com.ververica.function.remote/greeter",
    fn(context, message) {
        letgreeting = message.as(Greeting)
        greeting.greeters.push("REMOTE");
        context.send(
            messageBuilder({typename:'com.ververica.function.embedded/greeter',
                            id:greeting.name,
                            value:greeting,
                            valueType:Greeting})
        );
    }
});

http.createServer(statefun.handler()).listen(8000);

GreeterFunction.java

import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;

public class GreeterFunction implements StatefulFunction {
    @Override
    public void invoke(Context context, Object message) {
        final TypedValue typedValue = (TypedValue) message;
        try {
            final Greeting greeting = Greeting.fromValue(typedValue.getValue());
            context.send(Identifiers.EGRESS_DATASTREAM, Greeting.withGreeter(greeting, "EMBEDDED"));
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
}

Message Routing

final StatefulFunctionEgressStreams statefulFunctionEgressStreams = StatefulFunctionDataStreamBuilder.builder("greeter")
              .withDataStreamAsIngress(dataStream)
              .withRequestReplyRemoteFunction(RequestReplyFunctionBuilder.requestReplyFunctionBuilder(
                            Identifiers.FUNCTION_REMOTE,
                            URI.create("http://greeter:8000"))
              )
              .withFunctionProvider(Identifiers.FUNCTION_EMBEDDED, functionType -> new GreeterFunction())
              .withEgressId(Identifiers.EGRESS_DATASTREAM)
              .build(streamExecutionEnvironment);

4) Egress

In order to get the output as the result and to put the final stamp into the message, we collect the egress as DataStream and just print it.

statefulFunctionEgressStreams.getDataStreamForEgressId(Identifiers.EGRESS_DATASTREAM)
              .map(greeting -> Greeting.withGreeter(greeting,"EGRESS"))
              .map(greeting -> Greeting.toString(greeting))
              .print();

5) Deployment

Finally, we deploy the build and deploy the remote function manually, and use the main method to deploy the built Apache Flink application in Ververica Platform by passing the main method as the entrypoint.

package.json

{
    "name": "greeter",
    "version": "3.2.0",
    "description": "This is a simple example of a stateful functions application implemented in JavaScript.",
    "main": "functions.js",
    "scripts": {
        "start": "node functions.js"
    },
    "dependencies": {
        "apache-flink-statefun": "3.2.0"
    }
}

Dockerfile

FROM node:16

WORKDIR /usr/src/app

COPY package.json ./

RUN npm install apache-flink-statefun@3.2.0

COPY greeter-function.js ./functions.js

EXPOSE 8000

CMD ["npm", "start"]

deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
    name: greeter
spec:
    replicas: 1
    selector:
        matchLabels:
            app: function
            component: greeter
    template:
        metadata:
            labels:
                app: function
                component: greeter
            spec:
                containers:
                    - name: greeter
                      image: greeter:1.0.0
                      ports:
                          - containerPort: 8000
                            name: endpoint
                      livenessProbe:
                          tcpSocket:
                              port: 8000
                          initialDelaySeconds: 30
                          periodSeconds: 60

service.yaml

apiVersion: v1
kind: Service
metadata:
    name: greeter
spec:
    type: ClusterIP
    ports:
        - name: endpoint
          port: 8000
    selector:
        app: function
        component: greeter

Related Information