Flink Forward 2025 Barcelona: The Future of AI is Real-Time
Can I run an Apache Beam pipeline with Ververica Platform? What limitations exist? How can I set this up?
Note: This article applies to Flink 1.9 - 1.17 (Flink 1.18 not yet supported by Beam) with Ververica Platform 2.1 or later.
Yes, you can run Apache Beam pipelines. However, only the Classic (Java) FlinkRunner is supported, not the Portable runner.
In principle, this is what you need to do to submit your Apache Beam pipeline:
org.apache.flink
dependencies that are provided by the Flink cluster (optional for Flink 1.10, mandatory for Flink <= 1.9)--runner=FlinkRunner --output=/tmp/counts
flinkMaster
as its default value [auto]
is the right one here.Let us walk through a full example using the infamous WordCount from Apache Beam.
Follow the instructions at Apache Beam's WordCount example to get the code:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.41.0 \
-DgroupId=org.example \
-DartifactId=word-count-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
and package it with
cd word-count-beam
mvn clean package -P flink-runner
The default fat jar that is built from this example actually contains many unneeded classes that a Flink cluster provides on its own. For our setup, these are not needed and it may be safer to remove them than to deal with potential class loading issues from multiple class versions being available.
Note: This step may not be needed for Flink 1.10 or later. However, we highly recommend it for the mentioned reasons. For Flink versions 1.9 and below, it is mandatory to remove Flink classes that are already provided by a running Flink cluster.
Change the flink-runner
dependency in your pom.xml
file to exclude Flink cluster dependencies and change the scope of API dependencies to provided
:
<profile>
<id>flink-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>${flink.artifact.name}</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId> <!--- for Flink >= 1.14: flink-runtime -->
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-optimizer_2.11</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>
</profile>
These dependencies may change in the future and are based on the definitions in the Apache Beam repository. You can double-check your fat jar for any unexpected Flink dependencies.
Re-create the jar file:
mvn clean package -P flink-runner
Ververica Platform's artifact management can currently only upload files up to 50MB. You can, however, upload your jar file directly to your configured storage provider (see Artifact Management for details) or increase the upload limit in Ververica Platform.
Create a new deployment like the following, point it to your jar file and entrypoint class, e.g. org.apache.beam.examples.WordCount
, and be sure to pass --runner=FlinkRunner --output=/tmp/counts
as the main arguments for your Apache Beam pipeline's main
function. Make sure also to select the correct Flink version matching your project in the Advanced tab of the deployment editor.
Warning: make sure you allocate some managed memory in your taskmanager, e.g., taskmanager.memory.managed.fraction: '0.2'
. Otherwise, the deployment would fail with "Too little memory provided to sorter to perform task. Required are at least 12 pages. Current page size is 32768 bytes."
Click start and the deployment will run. Since the WordCount example is a simple batch job, it will eventually end up in FINISHED
state as shown above. Streaming jobs should transition to RUNNING
.
Enjoy!