Skip to content
  • There are no suggestions because the search field is empty.

How to integrate AWS DynamoDB with Ververica Platform for FlinkSQL?

Answer

Note: This section applies to Flink 1.19 with Ververica Platform 2.13.3.

Ververica Platform does not support the Flink DynamoDB SQL connector out of the box. But the open source DynamoDB SQL connector can be integrated with VVP directly within VVP UI by using Create Connector function. This article shows the required steps to achieve it. At the end of this article, you will be able to write to DynamoDB tables with your FlinkSQL deployments.

Integration with FlinkSQL

1) Find the JAR file of the FlinkSQL DynamoDB connector which is compatible with the Flink version the SQL deployments will use.

Important: Please be sure to use the sql based connector, since Flink has also a DynamoDB connector for DataStream API, however, this is out of scope for this article.

On VVP, the new FlinkSQL deployments that are initiated via the integrated SQL editor must use the latest Flink image version offered. For the VVP version 2.13.3 which is in scope of this KB article, the latest Flink image version offered is 1.19. Therefore, we will use the latest available connector for Flink 1.19, which is "flink-sql-connector-dynamodb-4.3.0-1.19.jar".

You can download the connector JAR from this link: https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-dynamodb/4.3.0-1.19/flink-sql-connector-dynamodb-4.3.0-1.19.jar

2) Open the Connectors menu on VVP and import the connector jar via "Create Connector" button:

Ververica Platform UI create SQL connector

Ververica Platform UI create connectorUpload the jar either from its public URL if your system has access to it, or directly within VVP UI via drag-drop or simply select its file to upload:

Ververica Platform UI Upload JAR to create connectorAfter uploading the jar file and proceed with the dynamodb connector, you will see the third step that is "Configure Connector". In this menu, you can modify/delete properties per your use case. For our case, we keep the default settings as shown below:

Ververica Platform finish creating connectorFinally, click "Finish" to finish adding the connector into VVP.

3) Now, test DynamoDB connector by creating the following tables in the VVP SQL Editor. Use below scripts for test:

Open the SQL editor and execute the following sample SQLs:

Below is the table that we generate dummy data inside, which are to be inserted into the dynamoDB table: This table will be having a data generated 1 row per second:

Important: Before copy-paste any given SQL of this article into your VVP SQL editor, replace the boxes seen around the catalog (vvp), database (default) and the table name (dynamoorders) with a character backtick `

CREATE TABLE vvp.default.dynamoorders (
string_column STRING,
numeric_column BIGINT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
)

Below will be the table definition on FlinkSQL, which will be representing the real table on dynamoDB:

CREATE TABLE vvp.default.flinkdynamo (
string_column STRING,
numeric_column BIGINT
)
WITH (
'connector' = 'dynamodb',
'table-name' = 'flinkdynamo',
'aws.region' = 'us-east-2',
'aws.endpoint' = 'http://dynamodb-dynamodb-local:8000'
);
  • connector: The exact connector name we created in Step-2 above, which is "dynamodb".
  • table-name: The real table name which exists in DynamoDB.

Finally, we insert the dummy table data into dynamoDB table:

INSERT INTO flinkdynamo SELECT * from dynamoorders;

This above INSERT statement will initiate a FlinkSQL deployment, which will sink data to DynamoDB:

Ververica Platform job pipeline with DynamoDB custom connectorHere we can see that the data is being written into the table in DynamoDB:

Data written into the table in DynamoDBRelated Information