Your Cloud, Your Rules: Ververica's Bring Your Own Cloud Deployment
How can I develop an Apache Flink application with Stateful Functions API using embedded modules and deploy it to Ververica Platform?
Note: This section applies to Flink 1.14 with Ververica Platform 2.6.
There are several ways to manage deployment of the Stateful Function applications, such as using the Application Modules. In this article, we will go through a different method, using the Embedded Modules, for deploying a Stateful Function Application that utilises embedded functions to generate a simple Greeter application in Ververica Platform.
The overall pipeline is responsible of consuming events from an Apache Kafka topic, converting it into a Greeter Request, passing it alongside an embedded Greeter Stateful Function implementations, written in Java, and finally print the result to the standard output.
Here; Identifiers, Types and the Deserialisation Schema for the Stateful Functions are defined. Moreover, the data class to hold the necessary information as well as the functionalities are defined in the following class.
Identifiers.java
import com.google.protobuf.StringValue;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
public class Identifiers {
public static final IngressIdentifier<StringValue> INGRESS_IDENTIFIER =
new IngressIdentifier<>(StringValue.class, "com.ververica.stateful-functions", "greeter-request");
public static final FunctionType FUNCTION_TYPE =
new FunctionType("com.ververica.stateful-functions", "greeter");
}
GreeterRequest.java
public class GreeterRequest {
public String name;
public GreeterRequest() {}
public GreeterRequest(String name) {
this.name = name;
}
}
GreeterRequestDeserializer.java
import com.google.protobuf.StringValue;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.nio.charset.StandardCharsets;
public class GreeterRequestDeserializer implements KafkaIngressDeserializer<StringValue> {
@Override
public StringValue deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) {
return StringValue.of(new String(consumerRecord.value(), StandardCharsets.UTF_8));
}
}
We first need to define our GreeterFunction implementation.
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;
public class GreeterFunction implements StatefulFunction {
@Override
public void invoke(Context context, Object greeterRequest) {
System.out.println("Hello there, " + context.self().id() + "!");
}
}
The next step is to assemble the routing by defining the Embedded Module
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
import org.myorg.quickstart.data.GreeterRequest;
import org.myorg.quickstart.deserializer.GreeterRequestDeserializer;
import org.myorg.quickstart.function.GreeterFunction;
import org.myorg.quickstart.identifiers.Identifiers;
import java.util.Map;
public class EmbeddedModule implements StatefulFunctionModule {
@Override
public void configure(Map<String, String> map, Binder binder) {
binder.bindIngress(KafkaIngressBuilder
.forIdentifier(Identifiers.INGRESS_IDENTIFIER)
.withKafkaAddress("kafka:9092")
.withTopic("greeter-request")
.withConsumerGroupId("greeter-request")
.withDeserializer(GreeterRequestDeserializer.class)
.withStartupPosition(KafkaIngressStartupPosition.fromLatest())
.build());
binder.bindIngressRouter(
Identifiers.INGRESS_IDENTIFIER,
(greeterRequest, downstream) -> {
String key;
try {
key = new ObjectMapper().readValue(greeterRequest.getValue(), GreeterRequest.class).name;
} catch (JsonProcessingException e) {
key = "NOT_PARSABLE";
}
downstream.forward(Identifiers.FUNCTION_TYPE, key, greeterRequest);
});
binder.bindFunctionProvider(Identifiers.FUNCTION_TYPE, x -> new GreeterFunction());
}
}
Finally, we deploy the build and use the main method to deploy the built Apache Flink application in Ververica Platform by passing the main method as the entrypoint.
Artifact Configuration
artifact:
flinkImageRegistry: flink
flinkImageRepository: greeter
flinkImageTag: 1.0.0
flinkVersion: '1.14'
jarUri: 'file:///flink/lib/statefun-flink-distribution-3.2.0.jar'
kind: JAR
Dockerfile
FROM alpine/curl:3.14 AS dependency-downloader
RUN curl -o /tmp/statefun-flink-core.jar https://repo1.maven.org/maven2/org/apache/flink/statefun-flink-core/3.2.0/statefun-flink-core-3.2.0.jar
RUN curl -o /tmp/statefun-flink-distribution-3.2.0.jar https://repo1.maven.org/maven2/org/apache/flink/statefun-flink-distribution/3.2.0/statefun-flink-distribution-3.2.0.jar
# ----
FROM registry.ververica.com/v2.6/flink:1.14.4-stream1-scala_2.12-java8
COPY --from=dependency-downloader /tmp/statefun-flink-core.jar /opt/flink/lib/statefun-flink-core.jar
COPY --from=dependency-downloader /tmp/statefun-flink-distribution-3.2.0.jar /flink/lib/statefun-flink-distribution-3.2.0.jar
RUN sed -i "s/org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint/org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint/" /flink/bin/flink-console.sh
COPY target/quickstart-0.1.jar /opt/statefun/modules/greeter/quickstart-0.1.jar