How to Run an Apache Beam Pipeline on Ververica Platform
Answer
Note: This article applies to Flink 1.9 - 1.17 (Flink 1.18 not yet supported by Beam) with Ververica Platform 2.1 or later.
Yes, you can run Apache Beam pipelines. However, only the Classic (Java) FlinkRunner is supported, not the Portable runner.
TL;DR
In principle, this is what you need to do to submit your Apache Beam pipeline:
- Create a fat-jar of your job and all Apache Beam dependencies
- make sure you do not bundle any
org.apache.flinkdependencies that are provided by the Flink cluster (optional for Flink 1.10, mandatory for Flink <= 1.9)
- make sure you do not bundle any
- Create a Ververica Platform Deployment pointing to your jar file, your entrypoint class, and with these main arguments:
--runner=FlinkRunner --output=/tmp/counts
You don't need to specifyflinkMasteras its default value[auto]is the right one here. - Start the deployment
Full Example
Let us walk through a full example using the infamous WordCount from Apache Beam.
Setup
Follow the instructions at Apache Beam's WordCount example to get the code:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.41.0 \
-DgroupId=org.example \
-DartifactId=word-count-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
and package it with
cd word-count-beam
mvn clean package -P flink-runner
Remove Flink Classes from Fat Jar
The default fat jar that is built from this example actually contains many unneeded classes that a Flink cluster provides on its own. For our setup, these are not needed and it may be safer to remove them than to deal with potential class loading issues from multiple class versions being available.
Note: This step may not be needed for Flink 1.10 or later. However, we highly recommend it for the mentioned reasons. For Flink versions 1.9 and below, it is mandatory to remove Flink classes that are already provided by a running Flink cluster.
Change the flink-runner dependency in your pom.xml file to exclude Flink cluster dependencies and change the scope of API dependencies to provided:
<profile>
<id>flink-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>${flink.artifact.name}</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId> <!--- for Flink >= 1.14: flink-runtime -->
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-optimizer_2.11</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>
</profile>
These dependencies may change in the future and are based on the definitions in the Apache Beam repository. You can double-check your fat jar for any unexpected Flink dependencies.
Re-create the jar file:
mvn clean package -P flink-runner
Upload Jar File
Ververica Platform's artifact management can currently only upload files up to 50MB. You can, however, upload your jar file directly to your configured storage provider (see Artifact Management for details) or increase the upload limit in Ververica Platform.
Create Deployment
Create a new deployment like the following, point it to your jar file and entrypoint class, e.g. org.apache.beam.examples.WordCount, and be sure to pass --runner=FlinkRunner --output=/tmp/counts as the main arguments for your Apache Beam pipeline's main function. Make sure also to select the correct Flink version matching your project in the Advanced tab of the deployment editor.

Warning: make sure you allocate some managed memory in your taskmanager, e.g., taskmanager.memory.managed.fraction: '0.2'. Otherwise, the deployment would fail with "Too little memory provided to sorter to perform task. Required are at least 12 pages. Current page size is 32768 bytes."
Start the Deployment
Click start and the deployment will run. Since the WordCount example is a simple batch job, it will eventually end up in FINISHED state as shown above. Streaming jobs should transition to RUNNING.
Enjoy!