How to Integrate Apache Hive with Ververica Platform: External Tables with Avro Schema on S3

Question

How can I use Apache Flink's Hive Catalog and Flink SQL on Ververica Platform to easily read/write data from/to Apache Hive's External Tables that are using Avro Schema stored on S3?

Answer

Note: This section applies to Apache Flink 1.18.1 and Apache Hive 3.1.3 with Ververica Platform 2.12.1 or later.

Ververica Platform supports integrating additional catalogs such as Hive Catalog both by uploading them via UI and by building a custom docker image for the Gateway container. Ververica Platform's UI for uploading catalogs supports uploading of a single jar file only. Since external tables with Avro schema requires additional dependencies, specifically when Apache Hive is using S3 to store both the schema and the data, it is not possible to use Ververica Platform's UI. Therefore, this article will follow the approach of building a custom docker image for the Ververica Platform's gateway container with the required dependencies.

1) Apache Hive

Throughout the article, we will use a database on Apache Hive that uses S3 as the location. We will be creating an events database with an external table named login with two fields which has the Avro schema that is stored on S3. The initial step is to create an S3 bucket to store both the data and the schema of the table we will be using, followed by uploading the schema to that bucket. We will also store the scripts to run on Apache Hive CLI on S3, so that we can set up our environment on Apache Hive easily.

We will use the AWS CLI's s3api, specifically with create-bucket[1] and put-object[2] commands, to perform the creation of the bucket and uploading the schema file - schema.avsc and the scripts - create_database.hql and create_table.hql. Finally, we will run from Apache Hive CLI[3], which have access to your Apache Hive cluster, to create the database and the external table.

Following are the scripts to run,

AWS CLI Commands

## Create Bucket
aws s3api create-bucket <BUCKET_NAME>

## Upload Schema
aws s3api put-object --bucket <BUCKET_NAME> --key schemas/login_event.avsc --body <LOCAL_PATH_TO_LOGIN_EVENT.AVSC>

## Upload Create Database Script
aws s3api put-object --bucket <BUCKET_NAME> --key scripts/create_database.hql --body <LOCAL_PATH_TO_CREATE_DATABASE.HQL>

## Upload Create Table Script
aws s3api put-object --bucket <BUCKET_NAME> --key scripts/create_table.hql --body <LOCAL_PATH_TO_CREATE_TABLE.HQL>

Apache Hive CLI Commands

## Create Database
hive -f 's3a://<BUCKET_NAME>/scripts/create_database.hql'

## Create Table
hive -f 's3a://<BUCKET_NAME>/scripts/create_table.hql'

followed by the actual contents of the file in the preceding section.

login_event.avsc

{
    "type": "record",
    "name": "LoginEvent",
    "fields": [
        {
            "name": "user_name",
            "type": "string"
        },
        {
            "name": "login_time",
            "type": "long"
        }
    ]
}

create_database.hql

CREATE DATABASE events LOCATION 's3a://<BUCKET_NAME>/warehouse';

create_table.hql

CREATE EXTERNAL TABLE events.login (user_name STRING, login_time BIGINT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
WITH SERDEPROPERTIES ('avro.schema.url' = 's3a://<BUCKET_NAME>/schemas/login_event.avsc')
STORED AS AVRO;

2) Ververica Platform

The next step is to build a custom image for the gateway[4] container with the required dependencies, create a configmap on Kubernetes for storing hive-site.xml which has the connectivity information to Apache Hive for the Ververica Platform. Finally, we will deploy Ververica Platform with the configmap attached to the gateway container which uses the custom image with additional parameters in the values.yaml file using Helm[5].

Note: When using a different version of Ververica Platform, please check the latest[6] supported Apache Flink version for the specific Ververica Platform version and upgrade the Apache Flink version in the following scripts accordingly.

Following are the scripts to run,

Kubernetes Commands

## Create the configmap
kubectl apply -f configmap.yaml

Docker Commands

## Build the image
docker build --tag <CONTAINER_REGISTRY>/v2.12/vvp-gateway:2.12.1 --file Dockerfile .

## Push the image
docker push <CONTAINER_REGISTRY>/v2.12/vvp-gateway:2.12.1

Helm Commands

## Deploy Ververica Platform
helm install vvp ververica-platform \
    --repo https://charts.ververica.com \
    --namespace <VVP_NAMESPACE> \
    --version 5.8.1 \
    --values values.yaml

followed by the actual contents of the file in the preceding section.

configmap.yaml

---
apiVersion: v1
kind: ConfigMap
metadata:
    name: hive-metastore
    namespace: <VVP_NAMESPACE>
data:
    hive-site.xml: |
        <?xml version="1.0"?>
        <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
        <configuration>
            <property>
                <name>hive.metastore.uris</name>
                <value>thrift://<HIVE_METASTORE_IP>:9083</value>
            </property>
            <property>
                <name>fs.s3a.aws.credentials.provider</name>
                <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
            </property>
            <property>
                <name>fs.s3a.impl</name>
                <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
            </property>
            <property>
                <name>fs.s3a.access.key</name>
                <value><S3_ACCESS_KEY></value>
            </property>
            <property>
                <name>fs.s3a.secret.key</name>
                <value><S3_SECRET_KEY></value>
            </property>
        </configuration>

catalog-meta.yaml

catalog:
    type: hive
    packaged: true
    readOnly: true
        properties:
            - key: type
              required: true
            - key: default-database
              required: false
            - key: property-version
              required: false
            - key: hive-conf-dir
              required: false
            - key: hive-version
              required: false
            - key: hadoop-conf-dir
              required: false

Dockerfile

FROM alpine/curl:3.14 AS dependency-downloader

RUN curl -o /tmp/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.18.1/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar
RUN curl -o /tmp/flink-connector-files-1.18.1.jar https://repo1.maven.org/maven2/org/apache/flink/flink-connector-files/1.18.1/flink-connector-files-1.18.1.jar
RUN curl -o /tmp/flink-s3-fs-hadoop-1.18.1.jar https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.18.1/flink-s3-fs-hadoop-1.18.1.jar

RUN curl -o /tmp/hadoop-mapreduce-client-core-3.3.6.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-core/3.3.6/hadoop-mapreduce-client-core-3.3.6.jar
RUN curl -o /tmp/hadoop-aws-3.3.6.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.6/hadoop-aws-3.3.6.jar
RUN curl -o /tmp/aws-java-sdk-bundle-1.12.664.jar https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.664/aws-java-sdk-bundle-1.12.664.jar

# ----

FROM registry.ververica.com/v2.12/vvp-gateway:2.12.1

COPY catalog-meta.yaml /vvp/sql/opt/catalogs/hive/catalog-meta.yaml

COPY --from=dependency-downloader /tmp/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar /vvp/sql/opt/catalogs/hive/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar
COPY --from=dependency-downloader /tmp/flink-connector-files-1.18.1.jar /vvp/sql/opt/catalogs/hive/flink-connector-files-1.18.1.jar
COPY --from=dependency-downloader /tmp/flink-s3-fs-hadoop-1.18.1.jar /vvp/sql/opt/catalogs/hive/flink-s3-fs-hadoop-1.18.1.jar

COPY --from=dependency-downloader /tmp/hadoop-mapreduce-client-core-3.3.6.jar /vvp/sql/opt/catalogs/hive/hadoop-mapreduce-client-core-3.3.6.jar
COPY --from=dependency-downloader /tmp/hadoop-aws-3.3.6.jar /vvp/sql/opt/catalogs/hive/hadoop-aws-3.3.6.jar
COPY --from=dependency-downloader /tmp/aws-java-sdk-bundle-1.12.664.jar /vvp/sql/opt/catalogs/hive/aws-java-sdk-bundle-1.12.664.jar

values.yaml

vvp:
...Ververica Platform configuration...

gateway:
    image:
        repository: <CONTAINER_REGISTRY>/v2.12/vvp-gateway

env:
    - name: JAVA_TOOL_OPTIONS
      value: "--add-opens=java.base/java.net=ALL-UNNAMED"

volumeMounts:
    - name: hive-site
      mountPath: /etc/hive

volumes:
    - name: hive-site
      configMap:
          name: hive-metastore

3) Flink SQL

Finally, we will register Hive Catalog using Flink SQL, define a pipeline that populates data and define an extract-transform-load pipeline which aggregates the data to Elasticsearch for visualisation purposes.

Create Catalog

CREATE CATALOG hive
WITH (
    'type' = 'hive',
    'hive-version' = '3.1.3',
    'hive-conf-dir' = '/etc/hive'
);

Populate Data

INSERT INTO hive.events.login
VALUES ('john_doe', 1708723294000), ('jane_doe', 1708723294000);

Create Sink

CREATE DATABASE vvp.events_analytics;

CREATE VIEW vvp.events_analytics.login_events_year_month AS (
    SELECT
        user_name,
        CONCAT(
            CAST(YEAR(TO_TIMESTAMP_LTZ(login_time, 3)) AS STRING),
            '_',
            CAST(MONTH(TO_TIMESTAMP_LTZ(login_time, 3)) AS STRING)
        ) AS yyyy_mm
    FROM hive.events.login
);

CREATE TABLE vvp.events_analytics.login_events_count_per_month (
    user_name STRING,
    yyyy_mm STRING,
    login_count BIGINT,
    PRIMARY KEY (user_name, yyyy_mm) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://<ELASTICSEARCH_HOST>:<ELASTICSEARCH_PORT>',
    'index' = 'login_events_count_per_month'
);

Extract Transform Load

INSERT INTO vvp.events_analytics.login_events_count_per_month
SELECT
    user_name,
    yyyy_mm,
    COUNT(*) AS login_count
FROM vvp.events_analytics.login_events_year_month
GROUP BY user_name, yyyy_mm;

4) Conclusion

Ververica Platform supports the integration of additional catalogs in a flexible way[7] for different use case scenarios. When Apache Hive's external tables are using Avro Schema stored on S3, an S3 access is needed for Apache Flink's Catalog API to fetch the schema from S3 in order to fully parse the metadata information of the table, otherwise returns with an error schema.

Therefore, we have used both hadoop-aws[8] and aws-java-sdk-bundle in our gateway image where the catalog access operations are executed. Since external tables' actual data resides in filesystem and not in Apache Hive's warehouse, we have also used Apache Flink's Filesystem connector.

We have also used AWS's SimpleAWSCredentialsProvider with an access key and a secret key for the demo purposes. However, in a production environment, IAM role based permission policy is suggested for accessing both the schema and the data residing on S3.

If you would like to see all the setup working in place, feel free to check the sandbox project which deploys Apache Hive on EMR and integrates with Ververica Platform using Terraform[9].

3) References

[1]: https://docs.aws.amazon.com/cli/latest/reference/s3api/create-bucket.html

[2]: https://docs.aws.amazon.com/cli/latest/reference/s3api/put-object.html

[3]: https://cwiki.apache.org/confluence/display/Hive/GettingStarted#GettingStarted-RunningHiveCLI

[4]: https://ververica.zendesk.com/hc/en-us/articles/360013488139

[5]: https://docs.ververica.com/vvp/installation-and-upgrades/install-with-helm

[6]: https://docs.ververica.com/vvp/installation-and-upgrades/vvp-docker-images

[7]: https://docs.ververica.com/vvp/user-guide/sql-development/catalogs-databases

[8]: https://hadoop.apache.org/docs/r3.3.6/hadoop-aws/tools/hadoop-aws/index.html

[9]: https://github.com/alibahadirzeybek/sandbox/tree/vvp-hive-catalog