Knowledge base

How to Integrate Ververica Platform with Apache Pulsar

Written by Ali Zeybek | 18 February 2024

Question

I want to use Ververica Platform's Flink SQL development functionalities [1] with Apache Pulsar. How to integrate Apache Pulsar metadata with Ververica Platform so that I can directly run Flink SQL queries that uses Apache Pulsar as source/sink without having to register each topic manually?

Answer

Note: This section applies to Flink 1.15 with Ververica Platform 2.8.

Overview

Apache Pulsar [2] is a distributed, open source pub-sub messaging and streaming platform for real-time workloads. Apache Flink's Apache Pulsar DataStream API connector [3] is maintained by the Apache Flink community. However, Apache Pulsar Table API connector as well as Apache Pulsar Catalog is still released as part of the Stream Native's fork [4] of Apache Flink.

Currently, Apache Pulsar Catalog implements two separate mechanisms, one is used to infer the metadata information for already available Apache Pulsar tenants, namespaces and the topics automatically, named native tables. Whereas explicit table is used to persist the metadata information of the tables registered via Flink SQL/Table API in an Apache Pulsar topic.

For the purpose of this KB article, we only want a version of the Apache Pulsar Catalog that serves the purpose of the native tables integration since Ververica Platform already has its built-in VVP catalog [5] for persisting the metadata information of the the tables registered via Flink SQL. Since there is not yet such separation exists within Apache Pulsar Catalog, we will go ahead and use a custom implementation of the Apache Pulsar Catalog, which is basically a read-only version for the native tables. The current implementation is available at the specific fork of the Apache Flink [6] based on the 1.15.2 release.

Setup

Once you clone [6] locally, you will build the project by running

./mvnw clean package -DskipTests

in the root folder (flink) of the project. Once it completes, you will have the necessary artifact (flink-sql-connector-pulsar-1.15.2.jar) for the Pulsar SQL Connector and the Pulsar Catalog available at the following path in the project:

flink/flink-connectors/flink-sql-connector-pulsar/target/flink-sql-connector-pulsar-1.15.2.jar

The next step is the add the connector and the catalog definitions to Ververica Platform's Gateway container by building a custom docker image using the following resources:

catalog-meta.yaml

catalog:
  type: pulsar
  packaged: true
  readOnly: true
  properties:
    - key: type
      required: true
    - key: catalog-admin-url
      required: true
    - key: catalog-service-url
      required: true

connector-meta.yaml

connector:
  type: pulsar
  packaged: true
  source: true
  sink: true
  lookup: false
  supportedFormats:
    - json

Dockerfile

FROM registry.ververica.com/v2.8/vvp-gateway:2.8.1

USER root:root

RUN mkdir -p /vvp/sql/opt/catalogs/pulsar
COPY catalog-meta.yaml /vvp/sql/opt/catalogs/pulsar/catalog-meta.yaml
COPY flink-sql-connector-pulsar-1.15.2.jar /vvp/sql/opt/catalogs/pulsar/flink-sql-connector-pulsar-1.15.2.jar

RUN mkdir -p /vvp/sql/opt/connectors/pulsar
COPY connector-meta.yaml /vvp/sql/opt/connectors/pulsar/connector-meta.yaml
COPY flink-sql-connector-pulsar-1.15.2.jar /vvp/sql/opt/connectors/pulsar/flink-sql-connector-pulsar-1.15.2.jar

USER 999:999

Once you have Ververica Platform up and running with the packaged Pulsar SQL Connector and Pulsar Catalog, you can create a catalog, run queries in the SQL Editor and create SQL Deployments that can use Apache Pulsar topics as a source/sink without having to register them manually.

Bear in mind that at the moment, ReadOnlyPulsarCatalog implementation only supports json format. Also, both the current implementation of the ReadOnlyPulsarCatalog and PulsarCatalog expects the schema information to be available for the given topic in the Apache Pulsar cluster [7].

Example

CREATE CATALOG

CREATE CATALOG pulsar
WITH (
'type' = 'pulsar',
'catalog-admin-url' = 'http://<PULSAR_HOST>:<ADMIN_PORT>',
'catalog-service-url' = 'pulsar://<PULSAR_HOST>:<DATA_PORT>'
);

SELECT FROM

SELECT *
FROM pulsar.<PULSAR_TENANT>/<PULSAR_NAMESPACE>.<PULSAR_TOPIC>;

Related Information