How to integrate AWS DynamoDB with Ververica Platform for FlinkSQL?
Answer
Note: This section applies to Flink 1.19 with Ververica Platform 2.13.3.
Ververica Platform does not support the Flink DynamoDB SQL connector out of the box. But the open source DynamoDB SQL connector can be integrated with VVP directly within VVP UI by using Create Connector function. This article shows the required steps to achieve it. At the end of this article, you will be able to write to DynamoDB tables with your FlinkSQL deployments.
Integration with FlinkSQL
1) Find the JAR file of the FlinkSQL DynamoDB connector which is compatible with the Flink version the SQL deployments will use.Important: Please be sure to use the sql based connector, since Flink has also a DynamoDB connector for DataStream API, however, this is out of scope for this article.
On VVP, the new FlinkSQL deployments that are initiated via the integrated SQL editor must use the latest Flink image version offered. For the VVP version 2.13.3 which is in scope of this KB article, the latest Flink image version offered is 1.19. Therefore, we will use the latest available connector for Flink 1.19, which is "flink-sql-connector-dynamodb-4.3.0-1.19.jar".
You can download the connector JAR from this link: https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-dynamodb/4.3.0-1.19/flink-sql-connector-dynamodb-4.3.0-1.19.jar
2) Open the Connectors menu on VVP and import the connector jar via "Create Connector" button:
Upload the jar either from its public URL if your system has access to it, or directly within VVP UI via drag-drop or simply select its file to upload:
After uploading the jar file and proceed with the dynamodb connector, you will see the third step that is "Configure Connector". In this menu, you can modify/delete properties per your use case. For our case, we keep the default settings as shown below:
Finally, click "Finish" to finish adding the connector into VVP.
Open the SQL editor and execute the following sample SQLs:
Below is the table that we generate dummy data inside, which are to be inserted into the dynamoDB table: This table will be having a data generated 1 row per second:
Important: Before copy-paste any given SQL of this article into your VVP SQL editor, replace the boxes seen around the catalog (vvp), database (default) and the table name (dynamoorders) with a character backtick `
CREATE TABLE vvp.default.dynamoorders (
string_column STRING,
numeric_column BIGINT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
)
Below will be the table definition on FlinkSQL, which will be representing the real table on dynamoDB:
CREATE TABLE vvp.default.flinkdynamo (
string_column STRING,
numeric_column BIGINT
)
WITH (
'connector' = 'dynamodb',
'table-name' = 'flinkdynamo',
'aws.region' = 'us-east-2',
'aws.endpoint' = 'http://dynamodb-dynamodb-local:8000'
);
- connector: The exact connector name we created in Step-2 above, which is "dynamodb".
- table-name: The real table name which exists in DynamoDB.
Finally, we insert the dummy table data into dynamoDB table:
INSERT INTO flinkdynamo SELECT * from dynamoorders;
This above INSERT statement will initiate a FlinkSQL deployment, which will sink data to DynamoDB:
Here we can see that the data is being written into the table in DynamoDB:
Related Information