Skip to content

Ververica Cloud, a fully-managed cloud service for stream processing!

Learn more

Bootstrap Data Pipeline via Flink HybridSource


A common requirement in the area of data engineering is to first process existing historical data before processing continuously live data. Processing existing data first is also referred to as bootstrapping the system.

How to easily achieve this with Apache Flink? In this blog-post we will look at Flink's HybridSource which is specifically designed for such a task. If you want to clone the repository with the code from this blog post, use

git clone


As a demo example, we will implement the following data pipeline as a Flink Job.

Copy of Bootstrap Flink State using HybridSource

Fig. 1 - Flink job graph which consumes data from CSV files and then switches to Kafka topic.

In order to process data sources sequentially and switch from one to another as per the job graph topology, we will first add a FileSource with CSV data files and then a KafkaSource which is going to provide an unbounded source of data.

Data Preparation

In order to prepare data files in CSV format, we will write a special one-time Flink Job.

For all code examples we will use Scala and its awesome scala-cli tool. Please install the latest version of it in your environment, if you want to reproduce code examples yourself.

Create file with the following code:

//> using dep "org.flinkextended::flink-scala-api:1.17.1_1.0.0"
//> using dep "org.apache.flink:flink-clients:1.17.1"
//> using dep "org.apache.flink:flink-csv:1.17.1"
//> using dep "org.apache.flink:flink-connector-files:1.17.1"
//> using dep "org.apache.flink:flink-table-runtime:1.17.1"
//> using dep "org.apache.flink:flink-table-planner-loader:1.17.1"

import org.apache.flink.table.api._
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions
import org.apache.flink.api._
import org.apache.flink.api.serializers._

import{Long => JLong}

val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings =
val table = TableEnvironment.create(settings)

Then append this block of code which represents record schema:

val schema = Schema.newBuilder
  .column("id", DataTypes.INT())
  .column("bid_price", DataTypes.DOUBLE())
  .column("order_time", DataTypes.TIMESTAMP(2))

As you can see, our record contains three columns: id, bid_price and order_time.

Then append one more block of code which is using Table API and datagen connector to store data in CSV format into a specific file system path:

    .option(DataGenConnectorOptions.NUMBER_OF_ROWS, JLong(1000))
    .option("", "sequence")
    .option("", "1")
    .option("", "10000")

val currentDirectory =".").getCanonicalPath

    .option("format", "csv")
    .option("sink.rolling-policy.file-size", "124 kb")
    .option("path", s"file://$currentDirectory/sink-table")

table.executeSql("insert into SinkTable select * from SourceTable").print

Assuming that file is in your current shell directory, run scala-cli via the following command:


Once this script is executed, we get the following set of CSV files in the current shell sink-table directory:

Below are data records in one of the CSV files:

Each file is part of the generated data set. The number of files can be controlled by changing the values of sink.rolling-policy.file-sizeand DataGenConnectorOptions.NUMBER_OF_ROWS in the code.

Now let’s generate similar data for the second source, which is the Kafka topic. For that you would need to have a Kafka cluster which you can use for further steps in this blog post. The Kafka version used to test all code examples in this blog-post is 2.7. One Kafka broker will be enough for testing.

In the same Scala file that we used for CSV data generation, replace SinkTable with the following:

val brokers = "<put-your-kakfa-host-here>:9092"
    .option("properties.bootstrap.servers", brokers)
    .option("topic", "bids")
    .option("format", "csv")
    .option("value.format", "csv")

Enter your Kafka broker hostnames as a comma-separated list. The hostname(s) must be accessible from the machine where the Scala script is executed. After running the updated script, we get data in the Kafka topic “bids”. Below we can see an example of data stored in the topic:

# kafka-console-consumer --bootstrap-server $BOOTSTRAP_SERVER --topic bids --from-beginning
10002,1.312920916299031E308,"2023-08-15 14:37:35.411"
10012,2.495268527292221E307,"2023-08-15 14:37:35.412"
10022,3.9478571741612126E307,"2023-08-15 14:37:35.412"

Flink Job Implementation

Create one more Scala script called and paste below content into it:

//> using dep "org.flinkextended::flink-scala-api:1.17.1_1.0.0"
//> using dep "org.apache.flink:flink-clients:1.17.1"
//> using dep "org.apache.flink:flink-csv:1.17.1"
//> using dep "org.apache.flink:flink-connector-files:1.17.1"
//> using dep "org.apache.flink:flink-connector-kafka:1.17.1"

import org.apache.flink.api._
import org.apache.flink.api.serializers._
import org.apache.flink.connector.file.src.FileSource
import org.apache.flink.connector.file.src.reader.TextLineInputFormat
import org.apache.flink.connector.file.src.impl.StreamFormatAdapter
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.connector.base.source.hybrid.HybridSource
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.core.fs.Path

val currentDirectory =".").getCanonicalPath

val fileSource = FileSource

val switchTimestamp = -1L
val brokers = "<put-your-kakfa-host-here>:9092"

val kafkaSource = KafkaSource
  .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))

val hybridSource = HybridSource

val env = StreamExecutionEnvironment.getExecutionEnvironment
  .fromSource(hybridSource, WatermarkStrategy.noWatermarks(), "combined")


Let’s review this code to understand what it does:

  1. First of all we define two Flink sources: one for files and one for the Kafka topic. The Kafka topic offset is eventually set to 0, but it can be based on some user-defined offset in case the production pipeline needs to skip some historical data when starting to consume a Kafka topic.
  2. Then we combine both sources via HybridSource builder. There can be as many sources as needed. The main idea here is that all sources are bounded and the last source in the chain should be unbounded. Such composition allows Flink to switch data consumption once a bounded data source is fully consumed.
  3. Finally, we use the hybrid source to create a DataStream via the “fromSource” method.

There is an important point that the hybrid source API requires all combined sources to be based on the same type T; in the above example it is String type. If you have different record types among the sources these can be tackled by still using String as the input type and adding a “map” operator right after the hybrid source, to convert the string value into some typed value based on Scala case class or Java class.

Change the “brokers” variable value by setting your real Kafka hostname into it. Now let’s run this Flink Job via the following command:

> scala-cli

This results in the following console output:

2> 935,1.3486879258636198E308,"2023-08-15 13:58:38.741"
2> 945,1.2010058239019397E308,"2023-08-15 13:58:38.741"
2> 955,4.153541843468437E307,"2023-08-15 13:58:38.741"
9> 10970,2.2001341438733312E307,"2023-08-15 14:37:35.414"
9> 10980,1.129258179257586E308,"2023-08-15 14:37:35.414"
9> 10990,1.3994484424740486E308,"2023-08-15 14:37:35.414"
9> 11000,1.6970222740482843E308,"2023-08-15 14:37:35.414"

We can see that at the beginning of the data output we have the “id” column (first column from the left) within a range of 1 … 10000, which comes from the CSV files. Closer to the end of the output there is data with the “id” column in the range of 10001 … 20000, which comes from the Kafka topic.


Using HybridSource we can easily bootstrap a Fink Job state from different data sources before switching to the main data source. HybridSource was introduced in Flink v1.14. Before that you needed to implement a source switch somewhere in the user space by writing some tricky SourceFunction, which increased the overall Flink job complexity.

For more information on HybridSource see Flink Improvement Process Page FLIP-150:

Ververica Academy

Alexey Novakov
Article by:

Alexey Novakov


Our Latest Blogs

Streamhouse Unveiled featured image
by Jing Ge 17 November 2023

Streamhouse Unveiled

Apache Flink: History of Reliability Every year, Apache Flink® sets new records in its development journey. Standing as a testament to its growing popularity, Flink now boosts over 1.6k contributors,...
Read More
Highlights from Flink Forward Seattle 2023 featured image
by Karin Landers 16 November 2023

Highlights from Flink Forward Seattle 2023

Now that Flink Forward Seattle 2023 is over, we’re excited to share that the event was a big (Flinking) success! Flink Forward is *the* conference dedicated entirely to Apache Flink®, and includes...
Read More
Join me at Flink Forward! featured image
by Jing Ge 12 October 2023

Join me at Flink Forward!

Real-time data streaming is a HOT topic, which comes as no surprise considering how stream processing helps companies create new business opportunities, strengthens their advantage over competitors,...
Read More