Knowledge base

How to Run an Apache Beam Pipeline on Ververica Platform

Written by Jun Qin | 23 May 2024

Question

Can I run an Apache Beam pipeline with Ververica Platform? What limitations exist? How can I set this up?

Answer

Note: This article applies to Flink 1.9 - 1.17 (Flink 1.18 not yet supported by Beam) with Ververica Platform 2.1 or later.

Yes, you can run Apache Beam pipelines. However, only the Classic (Java) FlinkRunner is supported, not the Portable runner.

TL;DR

In principle, this is what you need to do to submit your Apache Beam pipeline:

  1. Create a fat-jar of your job and all Apache Beam dependencies
    • make sure you do not bundle any org.apache.flink dependencies that are provided by the Flink cluster (optional for Flink 1.10, mandatory for Flink <= 1.9)
  2. Create a Ververica Platform Deployment pointing to your jar file, your entrypoint class, and with these main arguments: --runner=FlinkRunner --output=/tmp/counts
    You don't need to specify flinkMaster as its default value [auto] is the right one here.
  3. Start the deployment

Full Example

Let us walk through a full example using the infamous WordCount from Apache Beam.

Setup

Follow the instructions at Apache Beam's WordCount example to get the code:

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.beam \
    -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
    -DarchetypeVersion=2.41.0 \
    -DgroupId=org.example \
    -DartifactId=word-count-beam \
    -Dversion="0.1" \
    -Dpackage=org.apache.beam.examples \
    -DinteractiveMode=false

and package it with

cd word-count-beam
mvn clean package -P flink-runner

Remove Flink Classes from Fat Jar

The default fat jar that is built from this example actually contains many unneeded classes that a Flink cluster provides on its own. For our setup, these are not needed and it may be safer to remove them than to deal with potential class loading issues from multiple class versions being available.

Note: This step may not be needed for Flink 1.10 or later. However, we highly recommend it for the mentioned reasons. For Flink versions 1.9 and below, it is mandatory to remove Flink classes that are already provided by a running Flink cluster.

Change the flink-runner dependency in your pom.xml file to exclude Flink cluster dependencies and change the scope of API dependencies to provided:

  <profile>
      <id>flink-runner</id>
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>${flink.artifact.name}</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
          <exclusions>
            <exclusion>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-clients_2.11</artifactId>
            </exclusion>
            <exclusion>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-core</artifactId>
            </exclusion>
            <exclusion>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-metrics-core</artifactId>
            </exclusion>
            <exclusion>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-runtime_2.11</artifactId> <!--- for Flink >= 1.14: flink-runtime -->
            </exclusion>
            <exclusion>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-runtime-web_2.11</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
      </dependencies>
      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-metrics-core</artifactId>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime_2.11</artifactId>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-annotations</artifactId>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-optimizer_2.11</artifactId>
            <scope>provided</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    </profile>

These dependencies may change in the future and are based on the definitions in the Apache Beam repository. You can double-check your fat jar for any unexpected Flink dependencies.

Re-create the jar file:

mvn clean package -P flink-runner

Upload Jar File

Ververica Platform's artifact management can currently only upload files up to 50MB. You can, however, upload your jar file directly to your configured storage provider (see Artifact Management for details) or increase the upload limit in Ververica Platform.

Create Deployment

Create a new deployment like the following, point it to your jar file and entrypoint class, e.g. org.apache.beam.examples.WordCount, and be sure to pass --runner=FlinkRunner --output=/tmp/counts as the main arguments for your Apache Beam pipeline's main function. Make sure also to select the correct Flink version matching your project in the Advanced tab of the deployment editor.

Warning: make sure you allocate some managed memory in your taskmanager, e.g., taskmanager.memory.managed.fraction: '0.2'. Otherwise, the deployment would fail with "Too little memory provided to sorter to perform task. Required are at least 12 pages. Current page size is 32768 bytes."

Start the Deployment

Click start and the deployment will run. Since the WordCount example is a simple batch job, it will eventually end up in FINISHED state as shown above. Streaming jobs should transition to RUNNING.

Enjoy!

Related Information