Skip to content

Ververica Cloud, a fully-managed cloud service for stream processing!

Learn more

How-to guide: Build Streaming ETL for MySQL and Postgres based on Flink CDC


by

This tutorial will show how to quickly build streaming ETL for MySQL and Postgres based on Flink CDC. The examples in this article will all be done using the Flink SQL CLI, requiring only SQL and no Java/Scala code or the installation of an IDE.

Let’s assume that you are running an e-commerce business. The data of products and orders is stored in MySQL and the shipments corresponding to the orders are stored in Postgres. In order to make an analysis of the order table easier, you need to combine it with the relevant commodity and logistics data to create a new table and write it to ElasticSearch in real time.

The overall architecture of the system can be summarized in the following figure:

Prerequisites

Step 1: Create a docker-compose.yml file

Copy the following content into your docker-compose.yml file:

version: '2.1'
services:
  postgres:
    image: debezium/example-postgres:1.1
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
  elasticsearch:
    image: elastic/elasticsearch:7.6.0
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.type=single-node
    ports:
      - "9200:9200"
      - "9300:9300"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
  kibana:
    image: elastic/kibana:7.6.0
    ports:
      - "5601:5601"

The containers in this docker-compose file include:

  • MySQL: products table and order table will be stored in this database
  • Postgres: shipments table will be stored in this database
  • Elasticsearch: the final enriched_orders table will be written to Elasticsearch
  • Kibana: will be used to visualize ElasticSearch data

Create a directory for this project, e.g. <my-project> directory and move the docker-compose file to that directory.

Execute the following command in the same directory to install the components needed for this how-to guide:

docker-compose up -d

This command will automatically start all containers defined in the Docker Compose configuration in the detached mode. You can use “docker ps” to observe whether the mentioned containers are running or not, visit http://localhost:5601/ to check Kibana.

Step 2: Download Flink and the required dependencies

Download Flink 1.16.0 and extract it in your <my-project> directory.The extracted directory will simply be called Flink 1.16.0.

tar -xvf flink-1.16.0-bin-scala_2.12.tgz

Download the dependencies listed below:

You can also compile the snapshots locally. Clone the repository and follow these instructions. Remember that the snapshots must be 2.4 CDC version.

Place these dependencies in

flink-1.16.0/lib/ 

Step 3: Check MySQL server timezone

Make sure that the MySQL server has a timezone offset that matches the configured time zone on your machine.

Enter the MySQL container.

sudo docker-compose exec mysql bash 

Check MySQL timezone of My SQL time by running one of the commands below:

mysql -e "SELECT @@global.time_zone;" -p123456

or

mysql -e "SELECT NOW();" -p123456

Set the time that matches your local machine if there is a time discrepancy. Remember to change the UTC accordingly in the command below.

mysql -e "SET GLOBAL time_zone = '+1:00';" -p123456

Make sure that the timezone has been set.

mysql -e "SELECT @@global.time_zone;" -p123456

Leave the container.

exit

Step 4: Prepare data in the MySQL database

Enter the MySQL container.

docker-compose exec mysql mysql -uroot -p123456 

Create a database, products table, orders table, and insert data.


-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");

CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

Leave the container.

exit

Step 5: Prepare data in the Postgres database

Enter the Postgres container.

docker-compose exec postgres psql -h localhost -U postgres

Create a shipments table and insert data.


-- PG
CREATE TABLE shipments (
  shipment_id SERIAL NOT NULL PRIMARY KEY,
  order_id SERIAL NOT NULL,
  origin VARCHAR(255) NOT NULL,
  destination VARCHAR(255) NOT NULL,
  is_arrived BOOLEAN NOT NULL
);
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
ALTER TABLE public.shipments REPLICA IDENTITY FULL;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
       (default,10002,'Hangzhou','Shanghai',false),
       (default,10003,'Shanghai','Hangzhou',false);

Leave the Container.

exit

Step 6: Start Flink cluster and Flink SQL CLI

Use the following command to change to the Flink directory.

cd flink-16.0

Start the Flink cluster with the following command:

./bin/start-cluster.sh

If the start-up is successful, you can access the Flink Web UI at http://localhost:8081/, as shown below.

Start the Flink SQL CLI with the following command.

./bin/sql-client.sh

After the start-up is successful, you can see the following page:

Step 7: Create tables using Flink DDL in Flink SQL CLI

First turn on the checkpoint and do a checkpoint every 3 seconds.

SET execution.checkpointing.interval = 3s;

Using the Flink SQL CLI, create tables that correspond to the products, orders, and shipments tables in the database for the purpose of synchronizing the data from these databases.

CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'mydb',
    'table-name' = 'products'
  );

CREATE TABLE orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
   'connector' = 'mysql-cdc',
   'hostname' = 'localhost',
   'port' = '3306',
   'username' = 'root',
   'password' = '123456',
   'database-name' = 'mydb',
   'table-name' = 'orders'
 );

CREATE TABLE shipments (
   shipment_id INT,
   order_id INT,
   origin STRING,
   destination STRING,
   is_arrived BOOLEAN,
   PRIMARY KEY (shipment_id) NOT ENFORCED
 ) WITH (
   'connector' = 'postgres-cdc',
   'hostname' = 'localhost',
   'port' = '5432',
   'username' = 'postgres',
   'password' = 'postgres',
   'database-name' = 'postgres',
   'schema-name' = 'public',
   'table-name' = 'shipments'
 );

Finally, create an enriched_orders table to write the associated orders data to Elasticsearch.

CREATE TABLE enriched_orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   product_name STRING,
   product_description STRING,
   shipment_id INT,
   origin STRING,
   destination STRING,
   is_arrived BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://localhost:9200',
     'index' = 'enriched_orders'
 );

Step 8: Join the orders data and write it to Elasticsearch

Use Flink SQL to join the orders table with the product table and shipments table, and write the enriched_orders table into Elasticsearch.

INSERT INTO enriched_orders
 SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
 FROM orders AS o
 LEFT JOIN products AS p ON o.product_id = p.id
 LEFT JOIN shipments AS s ON o.order_id = s.order_id;

Visit Kibana on your local machine (http://localhost:5601/app/kibana#/management/kibana/index_pattern) to create an index pattern enriched_orders.    

You can see the written data at http://localhost:5601/app/kibana#/discover.  

Next, modify the data in the tables in the MySQL and Postgres databases and the orders data displayed in Kibana will also be updated in real time.

Insert a piece of data into a MySQL orders table

INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);

Update the status of an order in the MySQL orders table

UPDATE orders SET order_status = true WHERE order_id = 10004;

Insert a piece of data into the Postgres shipments table

INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);

Update the status of a shipment in the Postgres shipments table

UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;

Delete a piece of data in the MYSQL orders table.

DELETE FROM orders WHERE order_id = 10004;

Refresh Kibana to see the updated data.

Step 9: Clean the environment

After performing all the steps, run the following command in <my-project> directory to stop all the containers.

docker-compose down

Change directory to flink-1.16.0 and execute the following command to stop the Flink cluster.

./bin/stop-cluster.sh

New call-to-action

Article by:

Ververica

Comments

Our Latest Blogs

Announcing the Release of Apache Flink 1.19 featured image
by Lincoln Lee 18 March 2024

Announcing the Release of Apache Flink 1.19

The Apache Flink PMC is pleased to announce the release of Apache Flink 1.19.0. As usual, we are looking at a packed release with a wide variety of improvements and new features. Overall, 162 people...
Read More
Building real-time data views with Streamhouse featured image
by Alexey Novakov 10 January 2024

Building real-time data views with Streamhouse

Use Case Imagine an e-commerce company operating globally, we want to see, in near real-time, the amount of revenue generated per country while the order management system is processing ongoing...
Read More
Streamhouse: Data Processing Patterns featured image
by Giannis Polyzos 05 January 2024

Streamhouse: Data Processing Patterns

Introduction In October, at Flink Forward 2023, Streamhouse was officially introduced by Jing Ge, Head of Engineering at Ververica. In his keynote, Jing highlighted the need for Streamhouse,...
Read More