How to integrate Apache Iceberg with Ververica Platform?
Question
How to integrate Iceberg connector for Flink with Ververica Platform (VVP)?
Answer
Note:This section applies to Flink 1.18.0 (or higher) with Ververica Platform 2.12.0.
Ververica Platform does not support the Flink Iceberg connector out of the box. But the Iceberg connector can be integrated with VVP by using custom Docker images. This article shows the required steps to achieve it. At the end of this article, you will be able to read and write to Iceberg tables.
This tutorial consists of two parts:
- VVP Flink SQL deployment integration
- VVP JAR-based deployment integration
Since VVP includes an advanced SQL editing, validating and debugging environment, some steps are needed to integrate Iceberg into Flink SQL deployment, while the JAR-based deployments integration is relative simple and straightforward.
Note:This article is for Iceberg catalog type 'hadoop'. Catalog type 'hive' is out of scope of this article.
Integration with Flink SQL
1) Add Iceberg as a custom connector and catalog to the VVP gateway by building your own Gateway Docker image.
Before doing that, download the following JAR files from the Maven Central registry or your internal package registry and store them in a local folder named jars
:
- https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.17/1.4.3/iceberg-flink-runtime-1.17-1.4.3.jar
- https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar
- https://repo1.maven.org/maven2/joda-time/joda-time/2.12.7/joda-time-2.12.7.jar
- https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.12.262/aws-java-sdk-core-1.12.262.jar
- https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.12.262/aws-java-sdk-s3-1.12.262.jar
- https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.12.262/aws-java-sdk-dynamodb-1.12.262.jar
- https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.18.0/flink-s3-fs-hadoop-1.18.0.jar
- https://repo1.maven.org/maven2/org/apache/flink/flink-hadoop-fs/1.18.0/flink-hadoop-fs-1.18.0.jar
Note: we use iceberg-flink-runtime 1.17, because no newer runtime version is available as of writing this article. However, this Iceberg should work fine with Flink cluster of version 1.18 too (verified).
Additionally, download Hadoop libraries which are needed for Iceberg Hadoop catalog-type. Using below shell script, download Hadoop distribution and unarchive it to the jars
folder as well:
APACHE_HADOOP_URL=https://archive.apache.org/dist/hadoop/
HADOOP_VERSION=3.3.4
curl ${APACHE_HADOOP_URL}/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz -o hadoop-${HADOOP_VERSION}.tar.gz
tar xzvf hadoop-${HADOOP_VERSION}.tar.gz
Create YAML file catalog-meta.yaml
for the Iceberg SQL catalog with the following content:
# catalog-meta.yaml
catalog:
type: iceberg
packaged: true
properties:
- key: type
required: true
description: Must be set to 'iceberg' to configure this catalog.
- key: catalog-type
required: false
description: hive, hadoop or rest for built-in catalogs, or left unset for custom catalog implementations using catalog-impl
- key: catalog-database
required: false
description: default database name
- key: catalog-impl
required: false
description: The fully-qualified class name of a custom catalog implementation. Must be set if catalog-type is unset
- key: property-version
required: false
description: Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is 1
- key: cache-enable
required: false
description: Whether to enable catalog cache, default value is true
- key: cache.expiration-interval-ms
required: false
description: How long catalog entries are locally cached, in milliseconds; negative values like -1 will disable expiration, value 0 is not allowed to set. default value is -1
- key: warehouse
required: false
description: The Hive warehouse location, users should specify this path if neither set the hive-conf-dir to specify a location containing a hive-site.xml configuration file nor add a correct hive-site.xml to classpath
Create another YAML file connector-meta.yaml
for the Iceberg connector with the following content:
# connector-meta.yaml
connector:
type: iceberg
packaged: true
source: true
sink: true
lookup: false
properties:
- key: connector
required: true
description: Must be set to 'iceberg' to configure this connector.
defaultValue: iceberg
- key: catalog-database
required: false
description: default database name
defaultValue: default
- key: catalog-name
required: true
description: User-specified catalog name. It's required because the connector don't have any default value.
- key: catalog-table
required: false
description: The iceberg table name in the backend catalog. Default to use the table name in the flink CREATE TABLE sentence.
- key: catalog-type
required: false
description: hive, hadoop or rest for built-in catalogs, or left unset for custom catalog implementations using catalog-impl
Create Dockerfile for the VVP Gateway container customisation with the following content.
# Put your desired version of VVP and Flink in the following commands
FROM registry.ververica.com/v2.12/vvp-gateway:2.12.0
# Iceberg Catalog
COPY catalog-meta.yaml /vvp/sql/opt/catalogs/iceberg/catalog-meta.yaml
COPY jars/iceberg-flink-runtime-1.17-1.4.3.jar /vvp/sql/opt/catalogs/iceberg/
COPY jars/hadoop-aws-3.3.4.jar /vvp/sql/opt/catalogs/iceberg/
COPY jars/flink-s3-fs-hadoop-1.18.0.jar /vvp/sql/opt/catalogs/iceberg/
# Iceberg Connector
COPY connector-meta.yaml /vvp/sql/opt/connectors/iceberg/connector-meta.yaml
COPY jars/iceberg-flink-runtime-1.17-1.4.3.jar /vvp/sql/opt/connectors/iceberg/
COPY jars/hadoop-aws-3.3.4.jar /vvp/sql/opt/connectors/iceberg/
COPY jars/flink-s3-fs-hadoop-1.18.0.jar /vvp/sql/opt/connectors/iceberg/
# VVP SQL Editor libraries for Iceberg
COPY jars/flink-hadoop-fs-1.18.0.jar /vvp/app/lib/
Ensure 2 YAML files are stored in the working directory and the jars
folders with all the mentioned files above.
├── catalog-meta.yaml
├── connector-meta.yaml
└── jars/
├── aws-java-sdk-core-1.12.262.jar
├── aws-java-sdk-dynamodb-1.12.262.jar
├── aws-java-sdk-s3-1.12.262.jar
├── flink-hadoop-fs-1.18.0.jar
├── flink-s3-fs-hadoop-1.18.0.jar
├── hadoop-3.3.4
├── hadoop-aws-3.3.4.jar
├── iceberg-flink-runtime-1.17-1.4.3.jar
└── joda-time-2.12.7.jar
Now build your Gateway Image.
docker build -t <put your registry here>/vvp-gateway:2.12.0-iceberg -f Dockerfile .
docker push <put your registry here>/vvp-gateway:2.12.0-iceberg
2) Build custom Flink image for SQL Session Clusters.
Create file named Dockerfile-Flink
with the following content:
FROM registry.ververica.com/v2.12/flink:1.18.0-stream1-scala_2.12-java11
# For Flink SQL and VVP SQL Editor support
COPY jars/hadoop-3.3.4/share/hadoop/common/lib/* ../opt/hadoop/
COPY jars/hadoop-3.3.4/share/hadoop/common/hadoop-common-3.3.4.jar ../opt/hadoop/
COPY jars/hadoop-3.3.4/share/hadoop/hdfs/hadoop-hdfs-client-3.3.4.jar ../opt/hadoop/
COPY jars/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar ../opt/hadoop/
# See more at https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/advanced/#hadoop-dependencies
ENV HADOOP_CLASSPATH="/flink/opt/hadoop/*"
# For JAR-based Deployments support
COPY jars/hadoop-aws-3.3.4.jar ../lib/
COPY jars/aws-java-sdk-core-1.12.262.jar ../lib/
COPY jars/aws-java-sdk-s3-1.12.262.jar ../lib/
COPY jars/aws-java-sdk-dynamodb-1.12.262.jar ../lib/
COPY jars/joda-time-2.12.7.jar ../lib/
Then build Flink image by running the following command:
docker build -t <put your registry here>/flink:1.18.0-iceberg -f Dockerfile-Flink .
docker push <put your registry here>/flink:1.18.0-iceberg
3) Use the newly built Gateway image in your VVP cluster. For that, set the repository and tag in the Helm values.yaml file and also configure other values.
# values.yaml
gateway:
image:
repository: <your registry>/<your repository>
tag: 2.12.0-iceberg
vvp:
flinkVersionMetadata:
- flinkImageRegistry: <put your registry here>
flinkImageRepository: flink
flinkVersion: 1.18
imageTag: 1.18.0-iceberg
useForSqlDeployments: true
defaultFor:
- 1.18
blobStorage:
baseUri: s3://<set bucket name here ...>
s3:
endpoint: <optionally set S3 endpoint address (it is needed for MinIO)>
globalSessionClusterDefaults: |
spec:
kubernetes:
jobManagerPodTemplate:
spec:
containers:
- name: flink-jobmanager
env:
- name: HADOOP_CONF_DIR
value: /etc/hadoop/conf
volumeMounts:
- name: hadoop-s3-creds
mountPath: /etc/hadoop/conf
volumes:
- name: hadoop-s3-creds
secret:
secretName: s3-creds-secret
taskManagerPodTemplate:
spec:
containers:
- name: flink-taskmanager
env:
- name: HADOOP_CONF_DIR
value: /etc/hadoop/conf
volumeMounts:
- name: hadoop-s3-creds
mountPath: /etc/hadoop/conf
volumes:
- name: hadoop-s3-creds
secret:
secretName: s3-creds-secret
blobStorageCredentials:
s3:
accessKeyId: <set value ...>
secretAccessKey: <set value ...>
env:
- name: HADOOP_CONF_DIR
value: /etc/hadoop/conf
volumes:
- name: hadoop-s3-creds
secret:
secretName: s3-creds-secret
volumeMounts:
- name: hadoop-s3-creds
mountPath: /etc/hadoop/conf
In summary, we set the following values in this file:
- Flink Image which includes Iceberg dependencies
- Gateway Image which includes Iceberg dependencies
- For S3: bucket name, endpoint (optionally), credentials
globalSessionClusterDefaults
: we set an environment variable to propagate S3 credentials tohadoop-aws
library- mounting secret: Hadoop XML file with S3 access configuration
4) Create K8s secret in your VVP namespace with the following content:
Important:Put your credentials and other values into the below XML configuration.
apiVersion: v1
kind: Secret
metadata:
name: s3-creds-secret
type: Opaque
stringData:
core-site.xml: |-
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value> put right value here !!! </value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value> put right value here !!! </value>
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name>
<value>false</value>
<description>Enables or disables SSL connections to S3.</description>
</property>
<!--
Remove below property if you use AWS S3.
Below property is set for MinIO service running in the vvp namespace
-->
<property>
<name>fs.s3a.endpoint</name>
<value>http://minio.vvp.svc:9000</value>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
</property>
</configuration>
Apply secret to the vvp
and your jobs namespaces like vvp-jobs
:
kubectl apply -f <your secret file name>.yaml -n vvp
kubectl apply -f <your secret file name>.yaml -n vvp-jobs
5) Install/upgrade your VVP instance with the configured values via Helm.
helm upgrade --install vvp ververica/ververica-platform \
--namespace vvp \
-f values.yaml
6) Now test Iceberg Catalog integration by creating catalog, database and table in the VVP SQL Editor. Use below script for test:
CREATE CATALOG icebergcatalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'catalog-database'='default',
'property-version'='1',
'warehouse'='s3a://<your bucket from the values.yaml>/icebergcatalog'
);
USE CATALOG icebergcatalog;
CREATE DATABASE default;
USE CATALOG icebergcatalog;
CREATE TABLE sample (
id BIGINT COMMENT 'unique id',
data STRING
);
7) Insert data to the Iceberg table through the new VVPDeployment:
USE CATALOG icebergcatalog;
INSERT INTO sample VALUES (1, 'a');
8) Read table datain SQL Editor using a Flink Session Cluster:
USE CATALOG icebergcatalog;
SELECT * FROM sample;
Integration with Flink DataStream API
Since we have installed the Iceberg connector dependencies into the Flink Docker image and set the Deployment Defaults in Ververica Platform, only the following steps are need:
1. First, build you job JAR file and use it in your VVP Deployment.
2. Before starting your VVP Deployment, add iceberg-flink-runtime-1.17-1.4.3.jar
dependency to the list of Additional Dependencies of your VVP Deployment.
Related Information
More from:
"Tips & Tutorials"
- How do I use Images stored in a Google Artifact Registry (GAR) for my Ververica platform Installation or deployments, regardless of VVP Cluster Location?
- How to customize Audit logs in the Ververica Platform
- How to run my Flink job with Scala 2.13 and Scala 3 version in VVP?
- How to Set Prometheus to Report Flink Job Metrics of a Specific Deployment Target
- How to migrate the metadata persistence of the Ververica Platform from SQLite to Postgres database with minimal downtime?