How to integrate Apache Iceberg with Ververica Platform?

Question

How to integrate Iceberg connector for Flink with Ververica Platform (VVP)?

Answer

Note:This section applies to Flink 1.18.0 (or higher) with Ververica Platform 2.12.0.

Ververica Platform does not support the Flink Iceberg connector out of the box. But the Iceberg connector can be integrated with VVP by using custom Docker images. This article shows the required steps to achieve it. At the end of this article, you will be able to read and write to Iceberg tables.

This tutorial consists of two parts:

  1. VVP Flink SQL deployment integration
  2. VVP JAR-based deployment integration

Since VVP includes an advanced SQL editing, validating and debugging environment, some steps are needed to integrate Iceberg into Flink SQL deployment, while the JAR-based deployments integration is relative simple and straightforward.

Note:This article is for Iceberg catalog type 'hadoop'. Catalog type 'hive' is out of scope of this article.

Integration with Flink SQL

1) Add Iceberg as a custom connector and catalog to the VVP gateway by building your own Gateway Docker image.

Before doing that, download the following JAR files from the Maven Central registry or your internal package registry and store them in a local folder named jars:

  1. https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.17/1.4.3/iceberg-flink-runtime-1.17-1.4.3.jar
  2. https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar
  3. https://repo1.maven.org/maven2/joda-time/joda-time/2.12.7/joda-time-2.12.7.jar
  4. https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.12.262/aws-java-sdk-core-1.12.262.jar
  5. https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.12.262/aws-java-sdk-s3-1.12.262.jar
  6. https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.12.262/aws-java-sdk-dynamodb-1.12.262.jar
  7. https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.18.0/flink-s3-fs-hadoop-1.18.0.jar
  8. https://repo1.maven.org/maven2/org/apache/flink/flink-hadoop-fs/1.18.0/flink-hadoop-fs-1.18.0.jar

Note: we use iceberg-flink-runtime 1.17, because no newer runtime version is available as of writing this article. However, this Iceberg should work fine with Flink cluster of version 1.18 too (verified).

Additionally, download Hadoop libraries which are needed for Iceberg Hadoop catalog-type. Using below shell script, download Hadoop distribution and unarchive it to the jars folder as well:

APACHE_HADOOP_URL=https://archive.apache.org/dist/hadoop/
HADOOP_VERSION=3.3.4
curl ${APACHE_HADOOP_URL}/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz -o hadoop-${HADOOP_VERSION}.tar.gz
tar xzvf hadoop-${HADOOP_VERSION}.tar.gz

Create YAML file catalog-meta.yaml for the Iceberg SQL catalog with the following content:

# catalog-meta.yaml
catalog:
  type: iceberg
  packaged: true
  properties:
    - key: type
      required: true
      description: Must be set to 'iceberg' to configure this catalog.
    - key: catalog-type
      required: false
      description: hive, hadoop or rest for built-in catalogs, or left unset for custom catalog implementations using catalog-impl
    - key: catalog-database
      required: false
      description: default database name
    - key: catalog-impl
      required: false
      description: The fully-qualified class name of a custom catalog implementation. Must be set if catalog-type is unset
    - key: property-version
      required: false
      description: Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is 1
    - key: cache-enable
      required: false
      description: Whether to enable catalog cache, default value is true
    - key: cache.expiration-interval-ms
      required: false
      description: How long catalog entries are locally cached, in milliseconds; negative values like -1 will disable expiration, value 0 is not allowed to set. default value is -1
    - key: warehouse
      required: false
      description: The Hive warehouse location, users should specify this path if neither set the hive-conf-dir to specify a location containing a hive-site.xml configuration file nor add a correct hive-site.xml to classpath

Create another YAML file connector-meta.yaml for the Iceberg connector with the following content:

# connector-meta.yaml
connector:
  type: iceberg
  packaged: true
  source: true
  sink: true
  lookup: false
  properties:
  - key: connector
    required: true
    description: Must be set to 'iceberg' to configure this connector.
    defaultValue: iceberg
  - key: catalog-database
    required: false
    description: default database name
    defaultValue: default
  - key: catalog-name
    required: true
    description: User-specified catalog name. It's required because the connector don't have any default value.
  - key: catalog-table
    required: false 
    description: The iceberg table name in the backend catalog. Default to use the table name in the flink CREATE TABLE sentence.
  - key: catalog-type
    required: false
    description: hive, hadoop or rest for built-in catalogs, or left unset for custom catalog implementations using catalog-impl

Create Dockerfile for the VVP Gateway container customisation with the following content.

# Put your desired version of VVP and Flink in the following commands
FROM registry.ververica.com/v2.12/vvp-gateway:2.12.0

# Iceberg Catalog
COPY catalog-meta.yaml /vvp/sql/opt/catalogs/iceberg/catalog-meta.yaml
COPY jars/iceberg-flink-runtime-1.17-1.4.3.jar /vvp/sql/opt/catalogs/iceberg/
COPY jars/hadoop-aws-3.3.4.jar /vvp/sql/opt/catalogs/iceberg/
COPY jars/flink-s3-fs-hadoop-1.18.0.jar /vvp/sql/opt/catalogs/iceberg/

# Iceberg Connector
COPY connector-meta.yaml /vvp/sql/opt/connectors/iceberg/connector-meta.yaml
COPY jars/iceberg-flink-runtime-1.17-1.4.3.jar /vvp/sql/opt/connectors/iceberg/
COPY jars/hadoop-aws-3.3.4.jar /vvp/sql/opt/connectors/iceberg/
COPY jars/flink-s3-fs-hadoop-1.18.0.jar /vvp/sql/opt/connectors/iceberg/

# VVP SQL Editor libraries for Iceberg
COPY jars/flink-hadoop-fs-1.18.0.jar /vvp/app/lib/

Ensure 2 YAML files are stored in the working directory and the jars folders with all the mentioned files above.

├── catalog-meta.yaml
├── connector-meta.yaml
└── jars/
    ├── aws-java-sdk-core-1.12.262.jar
    ├── aws-java-sdk-dynamodb-1.12.262.jar
    ├── aws-java-sdk-s3-1.12.262.jar
    ├── flink-hadoop-fs-1.18.0.jar
    ├── flink-s3-fs-hadoop-1.18.0.jar
    ├── hadoop-3.3.4
    ├── hadoop-aws-3.3.4.jar
    ├── iceberg-flink-runtime-1.17-1.4.3.jar
    └── joda-time-2.12.7.jar

Now build your Gateway Image.

docker build -t <put your registry here>/vvp-gateway:2.12.0-iceberg -f Dockerfile .
docker push <put your registry here>/vvp-gateway:2.12.0-iceberg

2) Build custom Flink image for SQL Session Clusters.

Create file named Dockerfile-Flink with the following content:

FROM registry.ververica.com/v2.12/flink:1.18.0-stream1-scala_2.12-java11

# For Flink SQL and VVP SQL Editor support
COPY jars/hadoop-3.3.4/share/hadoop/common/lib/* ../opt/hadoop/
COPY jars/hadoop-3.3.4/share/hadoop/common/hadoop-common-3.3.4.jar ../opt/hadoop/
COPY jars/hadoop-3.3.4/share/hadoop/hdfs/hadoop-hdfs-client-3.3.4.jar ../opt/hadoop/
COPY jars/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar ../opt/hadoop/

# See more at https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/advanced/#hadoop-dependencies
ENV HADOOP_CLASSPATH="/flink/opt/hadoop/*"

# For JAR-based Deployments support
COPY jars/hadoop-aws-3.3.4.jar ../lib/
COPY jars/aws-java-sdk-core-1.12.262.jar ../lib/
COPY jars/aws-java-sdk-s3-1.12.262.jar ../lib/
COPY jars/aws-java-sdk-dynamodb-1.12.262.jar ../lib/
COPY jars/joda-time-2.12.7.jar ../lib/

Then build Flink image by running the following command:

docker build -t <put your registry here>/flink:1.18.0-iceberg -f Dockerfile-Flink .
docker push <put your registry here>/flink:1.18.0-iceberg

3) Use the newly built Gateway image in your VVP cluster. For that, set the repository and tag in the Helm values.yaml file and also configure other values.

# values.yaml
gateway:
  image:
    repository: <your registry>/<your repository>
    tag: 2.12.0-iceberg
vvp:
  flinkVersionMetadata:    
    - flinkImageRegistry: <put your registry here>
      flinkImageRepository: flink
      flinkVersion: 1.18
      imageTag: 1.18.0-iceberg 
      useForSqlDeployments: true
      defaultFor:
      - 1.18
  blobStorage:
    baseUri: s3://<set bucket name here ...>
    s3: 
      endpoint: <optionally set S3 endpoint address (it is needed for MinIO)>
  globalSessionClusterDefaults: |
    spec:
      kubernetes:
        jobManagerPodTemplate: 
          spec:
            containers:
              - name: flink-jobmanager
                env:
                  - name: HADOOP_CONF_DIR
                    value: /etc/hadoop/conf
                volumeMounts:
                  - name: hadoop-s3-creds
                    mountPath: /etc/hadoop/conf 
            volumes:
              - name: hadoop-s3-creds
                secret:
                  secretName: s3-creds-secret
        taskManagerPodTemplate: 
          spec:
            containers:
              - name: flink-taskmanager
                env:
                  - name: HADOOP_CONF_DIR
                    value: /etc/hadoop/conf
                volumeMounts:
                  - name: hadoop-s3-creds
                    mountPath: /etc/hadoop/conf 
            volumes:
              - name: hadoop-s3-creds
                secret:
                  secretName: s3-creds-secret
    
blobStorageCredentials:
  s3:
    accessKeyId: <set value ...>
    secretAccessKey: <set value ...>

env:
  - name: HADOOP_CONF_DIR
    value: /etc/hadoop/conf

volumes:
  - name: hadoop-s3-creds
    secret:
      secretName: s3-creds-secret

volumeMounts:
  - name: hadoop-s3-creds
    mountPath: /etc/hadoop/conf

In summary, we set the following values in this file:

  • Flink Image which includes Iceberg dependencies
  • Gateway Image which includes Iceberg dependencies
  • For S3: bucket name, endpoint (optionally), credentials
  • globalSessionClusterDefaults: we set an environment variable to propagate S3 credentials to hadoop-aws library
  • mounting secret: Hadoop XML file with S3 access configuration

4) Create K8s secret in your VVP namespace with the following content:

Important:Put your credentials and other values into the below XML configuration.

apiVersion: v1
kind: Secret
metadata:
  name: s3-creds-secret
type: Opaque
stringData:
  core-site.xml: |-
    <configuration>
      <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
      </property>
      <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
      </property>
      <property>
        <name>fs.s3a.access.key</name>
        <value> put right value here !!! </value>
      </property>
      <property>
        <name>fs.s3a.secret.key</name>
        <value> put right value here !!! </value>
      </property>
      <property>
        <name>fs.s3a.connection.ssl.enabled</name>
        <value>false</value>
        <description>Enables or disables SSL connections to S3.</description>
      </property>
      <!-- 
        Remove below property if you use AWS S3.
        Below property is set for MinIO service running in the vvp namespace 
      -->
      <property>
        <name>fs.s3a.endpoint</name>
        <value>http://minio.vvp.svc:9000</value>
      </property>
      <property>
        <name>fs.s3a.path.style.access</name>
        <value>true</value>
      </property>
      <property>
        <name>fs.s3a.aws.credentials.provider</name>
        <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
      </property>
    </configuration>

Apply secret to the vvp and your jobs namespaces like vvp-jobs:


kubectl apply -f <your secret file name>.yaml -n vvp
kubectl apply -f <your secret file name>.yaml -n vvp-jobs

5) Install/upgrade your VVP instance with the configured values via Helm.

helm upgrade --install vvp ververica/ververica-platform \
  --namespace vvp \
  -f values.yaml

6) Now test Iceberg Catalog integration by creating catalog, database and table in the VVP SQL Editor. Use below script for test:

CREATE CATALOG icebergcatalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'catalog-database'='default',
  'property-version'='1',
  'warehouse'='s3a://<your bucket from the values.yaml>/icebergcatalog'
);

USE CATALOG icebergcatalog;
CREATE DATABASE default;

USE CATALOG icebergcatalog;
CREATE TABLE sample (
  id BIGINT COMMENT 'unique id',
  data STRING
);

7) Insert data to the Iceberg table through the new VVPDeployment:

USE CATALOG icebergcatalog;
INSERT INTO sample VALUES (1, 'a');

8) Read table datain SQL Editor using a Flink Session Cluster:

USE CATALOG icebergcatalog;
SELECT * FROM sample;

Integration with Flink DataStream API

Since we have installed the Iceberg connector dependencies into the Flink Docker image and set the Deployment Defaults in Ververica Platform, only the following steps are need:

1. First, build you job JAR file and use it in your VVP Deployment.

2. Before starting your VVP Deployment, add iceberg-flink-runtime-1.17-1.4.3.jar dependency to the list of Additional Dependencies of your VVP Deployment.

Related Information