Skip to content

Ververica Cloud, a fully-managed cloud service for stream processing!

Learn more

How-to guide: Synchronize MySQL sub-database and sub-table using Flink CDC


by

In the Online Transaction Processing (OLTP) system, to solve the problem of a large amount of data in a single table, the method of sub-database and table is usually used to split a single large table to improve the throughput of the system. However, to facilitate data analysis, it is generally necessary to merge the table splits from the sub-databases and sub-tables into a large table when synchronizing to the data warehouse or data lake.

This tutorial will show you how to use Flink CDC to build a real-time data lake for the above-presented scenario. The examples in this article will all be based on Docker with the use of Flink SQL. There is no need for a line of Java/Scala code or installation of an IDE. The entire content of this guide contains the docker-compose file.

The whole process will be shown by synchronizing data from MySQL to Iceberg, as shown in the diagram below.

Prerequisites

Step 1: Create a docker-compose.yml file

Create a Docker Compose file (docker-compose.yml) with the following content:

version: '2.1'
services:
  sql-client:
    user: flink:flink
    image: yuxialuo/flink-sql-client:1.13.2.v1 
    depends_on:
      - jobmanager
      - mysql
    environment:
      FLINK_JOBMANAGER_HOST: jobmanager
      MYSQL_HOST: mysql
    volumes:
      - shared-tmpfs:/tmp/iceberg
  jobmanager:
    user: flink:flink
    image: flink:1.13.2-scala_2.11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
    volumes:
      - shared-tmpfs:/tmp/iceberg
  taskmanager:
    user: flink:flink
    image: flink:1.13.2-scala_2.11
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
    volumes:
      - shared-tmpfs:/tmp/iceberg
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw

volumes:
  shared-tmpfs:
    driver: local
    driver_opts:
      type: "tmpfs"
      device: "tmpfs"

The containers in this docker-compose file include:

  • SQL-Client: Flink SQL Client, used to submit SQL queries and view SQL execution results
  • Flink Cluster: contains Flink JobManager and Flink TaskManager to execute Flink SQL
  • MySQL: as the data source of sub-database and sub-table, store the user table

Note

If you want to run this guide in your own Flink environment, you need to download the packages listed below and put them in the lib directory of the Flink directory, i.e., FLINK_HOME/lib/.

flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar

flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar

All Docker Compose-related commands used in this tutorial will need to be executed in the directory where docker-compose.yml is located.

Execute the following command where docker-compose.yml is located to start the components needed for this guide:

docker-compose up -d

This command will automatically start all containers defined in the Docker Compose file in the detached mode.

Step 2: Prepare data in the MySQL database

Enter the MySQL container

docker-compose exec mysql mysql -uroot -p123456

Create data, tables, and populate the data.

CREATE DATABASE db_1;
 USE db_1;
 CREATE TABLE user_1 (
   id INTEGER NOT NULL PRIMARY KEY,
   name VARCHAR(255) NOT NULL DEFAULT 'flink',
   address VARCHAR(1024),
   phone_number VARCHAR(512),
   email VARCHAR(255)
 );
 INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234","user_110@foo.com");

 CREATE TABLE user_2 (
   id INTEGER NOT NULL PRIMARY KEY,
   name VARCHAR(255) NOT NULL DEFAULT 'flink',
   address VARCHAR(1024),
   phone_number VARCHAR(512),
   email VARCHAR(255)
 );
INSERT INTO user_2 VALUES (120,"user_120","Shanghai","123567891234","user_120@foo.com");
CREATE DATABASE db_2;
USE db_2;
CREATE TABLE user_1 (
  id INTEGER NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone_number VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234", NULL);

CREATE TABLE user_2 (
  id INTEGER NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone_number VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_2 VALUES (220,"user_220","Shanghai","123567891234","user_220@foo.com");

Step 3: Create tables using Flink DDL with Flink SQL CLI

Use the following command to enter the Flink SQL CLI container:

docker-compose exec sql-client ./sql-client

You will see the following interface:

Turn on the checkpoint and do a checkpoint every 3 seconds. The checkpoint is not enabled by default, and we need to enable the checkpoint to allow Iceberg to submit the transactions. Moreover, MySQL-CDC must wait for a complete checkpoint before the binlog reading phase starts to avoid out-of-order binlog records.

SET execution.checkpointing.interval = 3s;

Create a source table user_source to capture the data of all databases and tables in MySQL and use regular expressions to match these databases and tables used in the configuration items of the table. Moreover, the table also defines a metadata column to distinguish which database and table the data comes from.

CREATE TABLE user_source (
    database_name STRING METADATA VIRTUAL,
    table_name STRING METADATA VIRTUAL,
    `id` DECIMAL(20, 0) NOT NULL,
    name STRING,
    address STRING,
    phone_number STRING,
    email STRING,
    PRIMARY KEY (`id`) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'mysql',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'db_[0-9]+',
    'table-name' = 'user_[0-9]+'
  );

Create a sink table all_users_sink to load data into Iceberg. In this sink table, we define a composite primary key (database_name, table_name, id) where the values of id fields may be the same.

CREATE TABLE all_users_sink (
    database_name STRING,
    table_name    STRING,
    `id`          DECIMAL(20, 0) NOT NULL,
    name          STRING,
    address       STRING,
    phone_number  STRING,
    email         STRING,
    PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
  ) WITH (
    'connector'='iceberg',
    'catalog-name'='iceberg_catalog',
    'catalog-type'='hadoop',  
    'warehouse'='file:///tmp/iceberg/warehouse',
    'format-version'='2'
  );

Step 4: Stream to Iceberg

Use the following Flink SQL statement to write data from MySQL to Iceberg.

-- Flink SQL
INSERT INTO all_users_sink select * from user_source;

The command above will start a streaming job to continuously synchronize the full and incremental data in the MySQL database to Iceberg. You can see this running job in the Flink UI:

Use the following command to see the written files in Iceberg.

docker-compose exec sql-client tree /tmp/iceberg/warehouse/default_database/

Note

In your environment, the actual files may differ from the screenshot above, but the overall directory structure should be similar.

Use the following Flink SQL statement to query the data in all_users_sink.

SELECT * FROM all_users_sink;

We can see the following query results in the Flink SQL CLI.

Insert a new row into the db_1.user_1 table.

INSERT INTO db_1.user_1 VALUES (111,"user_111","Shanghai","123567891234","user_111@foo.com");

Update db_1.user_2 table data.

UPDATE db_1.user_2 SET address='Beijing' WHERE id=120;

Delete a row in the db_2.user_2 table.

DELETE FROM db_2.user_2 WHERE id=220;

The final query results are as follows.

From the latest results of Iceberg, you can see that the new record has been added, the address has been updated, and the old record has been deleted, which is exactly the same as the data update we made in MySQL.

Step 5: Clean the environment

You can clean the environment by executing the following command in the directory where the docker-compose.yml file is located to stop all containers.

docker-compose down

Ververica Academy

Article by:

Ververica

Comments

Our Latest Blogs

Q&A with Erik de Nooij: Insights into Apache Flink and the Future of Streaming Data featured image
by Kaye Lincoln 09 April 2024

Q&A with Erik de Nooij: Insights into Apache Flink and the Future of Streaming Data

Ververica is proud to host the Flink Forward conferences, uniting Apache Flink® and streaming data communities. Each year we nominate a Program Chair to select a broad range of Program Committee...
Read More
Ververica donates Flink CDC - Empowering Real-Time Data Integration for the Community featured image
by Ververica 03 April 2024

Ververica donates Flink CDC - Empowering Real-Time Data Integration for the Community

Ververica has officially donated Flink Change Data Capture (CDC) to the Apache Software Foundation. In this blog, we’ll explore the significance of this milestone, and how it positions Flink CDC as a...
Read More
Announcing the Release of Apache Flink 1.19 featured image
by Lincoln Lee 18 March 2024

Announcing the Release of Apache Flink 1.19

The Apache Flink PMC is pleased to announce the release of Apache Flink 1.19.0. As usual, we are looking at a packed release with a wide variety of improvements and new features. Overall, 162 people...
Read More