Knowledge base

How to Deploy Stateful Functions Application in Ververica Platform: Embedded Module

Written by Ali Zeybek | 18 February 2024

Question

How can I develop an Apache Flink application with Stateful Functions API using embedded modules and deploy it to Ververica Platform?

Answer

Note: This section applies to Flink 1.14 with Ververica Platform 2.6.

There are several ways to manage deployment of the Stateful Function applications, such as using the Application Modules. In this article, we will go through a different method, using the Embedded Modules, for deploying a Stateful Function Application that utilises embedded functions to generate a simple Greeter application in Ververica Platform.

Application

The overall pipeline is responsible of consuming events from an Apache Kafka topic, converting it into a Greeter Request, passing it alongside an embedded Greeter Stateful Function implementations, written in Java, and finally print the result to the standard output.

Here; Identifiers, Types and the Deserialisation Schema for the Stateful Functions are defined. Moreover, the data class to hold the necessary information as well as the functionalities are defined in the following class.

Identifiers.java

import com.google.protobuf.StringValue;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;

public class Identifiers {
    public static final IngressIdentifier<StringValue> INGRESS_IDENTIFIER =
            new IngressIdentifier<>(StringValue.class, "com.ververica.stateful-functions", "greeter-request");
    public static final FunctionType FUNCTION_TYPE =
            new FunctionType("com.ververica.stateful-functions", "greeter");
}

GreeterRequest.java

public class GreeterRequest {
    public String name;

    public GreeterRequest() {}
    public GreeterRequest(String name) {
        this.name = name;
    }
}

GreeterRequestDeserializer.java

import com.google.protobuf.StringValue;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.nio.charset.StandardCharsets;

public class GreeterRequestDeserializer implements KafkaIngressDeserializer<StringValue> {
    @Override
    public StringValue deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) {
        return StringValue.of(new String(consumerRecord.value(), StandardCharsets.UTF_8));
    }
}

Stateful Function

We first need to define our GreeterFunction implementation.

import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;

public class GreeterFunction implements StatefulFunction {
    @Override
    public void invoke(Context context, Object greeterRequest) {
        System.out.println("Hello there, " + context.self().id() + "!");
    }
}

Embedded Module

The next step is to assemble the routing by defining the Embedded Module

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
import org.myorg.quickstart.data.GreeterRequest;
import org.myorg.quickstart.deserializer.GreeterRequestDeserializer;
import org.myorg.quickstart.function.GreeterFunction;
import org.myorg.quickstart.identifiers.Identifiers;

import java.util.Map;

public class EmbeddedModule implements StatefulFunctionModule {
    @Override
    public void configure(Map<String, String> map, Binder binder) {
        binder.bindIngress(KafkaIngressBuilder
                .forIdentifier(Identifiers.INGRESS_IDENTIFIER)
                .withKafkaAddress("kafka:9092")
                .withTopic("greeter-request")
                .withConsumerGroupId("greeter-request")
                .withDeserializer(GreeterRequestDeserializer.class)
                .withStartupPosition(KafkaIngressStartupPosition.fromLatest())
                .build());

        binder.bindIngressRouter(
                Identifiers.INGRESS_IDENTIFIER,
                (greeterRequest, downstream) -> {
                    String key;
                    try {
                        key = new ObjectMapper().readValue(greeterRequest.getValue(), GreeterRequest.class).name;
                    } catch (JsonProcessingException e) {
                        key = "NOT_PARSABLE";
                    }
                    downstream.forward(Identifiers.FUNCTION_TYPE, key, greeterRequest);
                });

        binder.bindFunctionProvider(Identifiers.FUNCTION_TYPE, x -> new GreeterFunction());
    }
}

Deployment

Finally, we deploy the build and use the main method to deploy the built Apache Flink application in Ververica Platform by passing the main method as the entrypoint.

Artifact Configuration

artifact:
    flinkImageRegistry: flink
        flinkImageRepository: greeter
        flinkImageTag: 1.0.0
        flinkVersion: '1.14'
        jarUri: 'file:///flink/lib/statefun-flink-distribution-3.2.0.jar'
        kind: JAR

Dockerfile

FROM alpine/curl:3.14 AS dependency-downloader

RUN curl -o /tmp/statefun-flink-core.jar https://repo1.maven.org/maven2/org/apache/flink/statefun-flink-core/3.2.0/statefun-flink-core-3.2.0.jar
RUN curl -o /tmp/statefun-flink-distribution-3.2.0.jar https://repo1.maven.org/maven2/org/apache/flink/statefun-flink-distribution/3.2.0/statefun-flink-distribution-3.2.0.jar

# ----

FROM registry.ververica.com/v2.6/flink:1.14.4-stream1-scala_2.12-java8

COPY --from=dependency-downloader /tmp/statefun-flink-core.jar /opt/flink/lib/statefun-flink-core.jar
COPY --from=dependency-downloader /tmp/statefun-flink-distribution-3.2.0.jar /flink/lib/statefun-flink-distribution-3.2.0.jar

RUN sed -i "s/org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint/org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint/" /flink/bin/flink-console.sh

COPY target/quickstart-0.1.jar /opt/statefun/modules/greeter/quickstart-0.1.jar

Related Information