Recently the Apache Flink community announced the release of Flink 1.16, which continues to push the vision of stream and batch unification in Flink SQL to a new level. At this point, Flink SQL is one of the most sophisticated and powerful tools available for data analytics. It is ANSI SQL compliant, stream and batch unified, and supports hundreds of thousands of mission-critical stream and batch applications in renowned companies across different industries globally. Alibaba, for example, has over 30,000 Flink jobs running on 2 million+ CPU cores across the organization.
Flink SQL has come a long way to where it is today via tremendous efforts and collaborations across the entire Flink community over the years. Thus, it would be valuable to have a retrospective of the journey of Flink SQL. This post will try to summarize the important milestones of Flink SQL in the past years, show the critical issues and challenges that have arisen, understand where it is today, and demonstrate the path Flink SQL has been through and where it might head in the future, based on personal observation and opinions.
Disclaimer: this blog post is not meant to be an official chronology of Flink SQL nor is it an official roadmap of Flink SQL.
The diagram above shows selected milestones of Apache Flink and Flink Table Store. Among many memorable things in the history of Flink SQL, the merge of Blink (the Alibaba internal fork of Apache Flink which was later contributed back to Apache Flink in 2019), is probably the most important one that has profoundly impacted the development of Flink SQL. But let’s first go back a bit further and take a look at Flink SQL in the early days prior to the merge of Blink.
Flink SQL in its early days
The idea to have another more expressive API in Flink in addition to the DataStream API and the DataSet API (now sunsetted) dates back to mid 2014. The idea finally landed about one year later in Flink 0.9 in which the Table API was first introduced. The Table API is a declarative API built on top of the DataStream API and the DataSet API for streaming and batch processing and allows users to write streaming programs in a SQL-like manner. Over the next several releases, more functionalities were added to build this declarative and relational API for stream processing.
Out of numerous features built and decisions made as a part of this effort, I’d like to highlight a few that have not only become the architectural cornerstones of Flink SQL today but have also inspired how streaming SQL should be done.
Introducing SQL as a DSL on top of the Table API
It was not long after the Table API was introduced that the community realized building an actual SQL API on top of it looks like a reasonable step from a technical perspective. While it looks quite natural today to use SQL for stream processing, it was not as obvious how far this idea can go at that time. Although the idea of having a fluent SQL-like API has been put in practice by a few projects by then, having a serious full function SQL for stream processing still sounded ambitious and faced a lot of challenges. In fact, the Flink Table API itself was considered as a proof-of-concept for quite a while and only supported stateless operations. Nevertheless, in Flink release 1.1, preliminary SQL support on top of the Table API was announced and the ball started rolling.
Introducing Apache Calcite for Query Optimization
In Flink release 1.1, along with the release of the Flink SQL, Apache Calcite was introduced to perform relational plan optimization for Flink SQL queries. Thanks to the excellent pluggability of Calcite, Flink was able to integrate it into Flink SQL with a reasonable amount of effort, which significantly accelerated the development of Flink SQL.
Retraction capabilities
This is probably one of the most important differentiators of Flink SQL compared with other solutions. Two fundamental characteristics of streaming data are unboundedness and out-of-order-ness. These two characteristics demand stream computing to be able to modify (or “retract”) the results it has already produced upon the arrival of a new record in order to make sure the results are correct. Even though Flink uses watermarks to help reduce the chance of such modifications, it is still a must for stream processing. This demand is unique to stream processing since, in batch processing, the emitted results are always final.
Typically, such data retractions are computing logic specific and need to be done by the application authors. However, while it is easy to provide APIs in programming interfaces like the DataStream API and let users do retraction by themselves, when it comes to SQL, the computing engine itself has to take care of all the retractions for the users, because SQL does not have “retraction” in its standard semantic. This is non-trivial work considering most of the stateful operations will need to have retraction implemented, including ranking, joining, aggregation, etc. It took Apache Flink a couple of years before retraction was production ready. The integration of Blink further extended retraction to support CDC formats. As a result, Flink can execute the vast majority of SQL queries and produce correct results (e.g., arbitrary combination of aggregations and joins). To some extent, retraction is one of the features that distinguishes Flink SQL from others in this realm.
The Blink project and its contributions back to Apache Flink
Despite all the cool stuff available in its early days, Flink SQL has always been an exploratory or experimental effort. It was not even shipped as a part of the Flink distribution. When Blink came into the picture, things changed.
Blink was created as an internal fork of Flink at Alibaba in 2015. The project grew rapidly inside the company and quickly became the backbone streaming engine of the entire company, processing billions of records per second, generating some mission critical financial dashboard and reports.
In late 2018, Alibaba made the decision to open source Blink and contribute it back to Apache Flink. The plan of merging Blink back to Apache Flink was announced in the beginning of 2019 and the work lasted for about one and a half years. The critical features in Blink were put into mailing list discussions and merged one by one back into Apache Flink. As a rough estimation, about 1 million lines of code changes were committed and the majority of them are SQL related.
The release of Flink 1.10 in early 2020 marks the completion of the Blink merge.
The commit history during the Blink merge
The Blink project has a profound impact on Flink SQL in many ways. It not only boosted the development of Flink SQL, made it 10x faster than it used to be, but also helped shape the Flink SQL to become the way it is today.
First of all, Blink was designed and developed based on SQL-first principles. A lot of its features address the immediate requirements of streaming SQL and make it production-ready. These include the entire execution planner, operator performance and stability improvements, various retraction support, solutions to data skew, complete support of ANSI SQL standard semantics on top of streaming data, etc.
Secondly, Blink pushed the exploration of stream and batch unification to a new level. By the time Blink was open sourced, it already had a full TPC-DS coverage. Furthermore, all the streaming and batch SQL execution are performed in streaming operators instead of split into DataSet and DataStream operators. This architectural change has also led to what Flink SQL is today, as well as the deprecation of the DataSet API.
Architectural Evolution of Table API / Flink SQL
Another contribution of Blink that should not be underestimated is in the change in mindset. Being the first battle-tested large-scale streaming SQL practice in the industry, Blink proved that streaming SQL is not just an experiment for simple processing use cases, but that it can also perform sophisticated computations.
Flink SQL Today
Flink SQL has continued to develop since the merge of Blink. Many useful features are now available in Flink SQL which helps it to cover a wide range of use cases. To name some:
- Analytics. Analytics is still the most prominent use case for Flink SQL. The project has built a complete infrastructure to run both streaming and batch queries with the same query statement. In addition, Flink SQL has been improving compatibility with Apache Hive. According to the recent release of Flink 1.16, ~94% Hive SQL statements can also run smoothly on Flink. In addition, the newly added SQL Gateway now also supports HiveServer2 protocols. That means the Hive ecosystem tools (such as Hive Beeline, Apache Zeppelin, and Apache Superset) can connect to Flink SQL Gateway and submit Hive SQL seamlessly.
- Data Integration. With the full Change Log Format support, as well as ecosystem projects such as CDC Connectors for Apache Flink (a popular project with 3200+ stars now), Flink is rising as one of the most popular engines used for data integration. Users are leveraging its rich catalog and connector ecosystem to move around and transform data stored in dozens of different types of external storage systems.
- Pattern Matching. Flink SQL supports the MATCH_RECOGNIZE statement, which allows users to perform pattern matching on streaming data. This is especially useful for cases like risk control.
- Machine Learning. Nowadays, machine learning is becoming more and more real-time. Flink SQL has been used by many users to perform real-time feature generation and sample assembly when prompt result is critical to the machine learning system, for instance, real-time recommendation systems.
It would be difficult to give an exhaustive summary of the current status of Flink SQL with just this post, but I think the following topics would be worth diving into: Flink SQL adoption, stream & batch unification, and the remaining challenges.
Flink SQL Adoption
One interesting thing I noticed is that while Flink has established itself as one of the top streaming engines in the industry and widely used globally, the Flink SQL adoption varies a lot in different regions. In China, Flink SQL is quite popular. It is very common to see Flink SQL account for 80% or higher of the streaming jobs in a company. On the other hand, in the rest of the regions such as the U.S. and Europe, DataStream API is still dominant for streaming jobs.
Regardless of the reason behind this difference, from the past experience with Flink SQL, using SQL for streaming application has demonstrated a few important benefits:
- Easy to use. SQL is the de facto language for analytics. It has clear semantics and is expressive. It does not require much programming skills. This significantly helped democratize stream processing because those who know how to query relational databases can easily analyze data streams without much additional effort.
- More efficient in development. Even for those who have the expertise of writing Java programs, switching to Flink SQL still helps improve productivity in most cases. This is because when writing Flink SQL, one can focus on the business logic instead of worrying about a lot of implementation details, especially when all the logic can be fulfilled by built-in functions and operations.
- Maintenance and migration friendly. SQL is a standard declarative DSL. This allows the computing engine to continuously evolve transparently to the end users. For example, when the computing engine changes to provide better query plans, faster operators, more performant query executions, the existing Flink SQL jobs can immediately benefit from such improvement by restarting the job with a new Flink version, without making any changes.
Stream and Batch Unification
When we talk about stream and batch unification, there are usually three related things that need to be unified: API, computing engine, and storage. Unifying stream and batch processing helps to significantly reduce the complexity of the data systems, improve data quality, and contribute to lower total costs. Flink SQL addresses these problems by providing a unified SQL layer on top of Apache Flink.
The basic idea is that batch is just a special case for streaming. This is not a new idea and is also quite an intuitive one - the results of batch processing are equivalent to the results of stream processing on the bounded streams containing the same records in those batches of data.
While conceptually speaking this is correct, it is not as simple as it looks like in practice. The equation may no longer stand if we take things like resource management, scheduling, and execution efficiency into consideration. For example, a batch processing job can run on any amount of physical resources because the whole job is divided into stages and tasks can run on the same physical resource one after another in a time-shared manner. In contrast, a streaming job typically needs to bring up all the tasks at the same time to run properly. There is a long list of differences between the best way to process stream and batch data. The operator behavior might be different depending on the boundedness of the data, the failure recovery strategy and shuffle strategy may be different, some operators may be prohibitively expensive for unbounded data, and so on.
Flink SQL as well as the Flink runtime has come a long way to address most of the above issues so it can run both streaming and batch jobs in the appropriate way.
The remaining challenges
Even though Flink SQL has become one of the most sophisticated tools for data analytics, there are still a few remaining challenges yet to be tackled.
- The accuracy of stream SQL. Despite the long term efforts such as retraction support, there are still a few corner cases in which achieving strict accuracy is challenging. For example, when a dynamically changing dimensional table is involved, in case of failure recovery, inaccurate results may be emitted and not retractable. Another example is chained streaming SQL jobs: if the intermediate storage is unable to generate full change logs, the accuracy of downstream SQL jobs may not be guaranteed.
- The cost of stream-to-stream joins. At this moment, joining streams together is still quite expensive, especially when multiple streams are joined together. Addressing this case would be valuable since joining streams together is a very common usage pattern.
- The complexity of streaming SQL semantics. One thing that we commonly heard from Flink SQL users is that it is sometimes difficult to understand the output of a Flink SQL job. Although streaming SQL semantics is complex by nature, Flink SQL could probably improve in making the semantics clearer and more intuitive.
What’s next in Flink SQL
Putting together all the work that the Flink community has done in Flink SQL, I think we can anticipate the following for Flink SQL.
Further enhancements to stream and batch unification
On the computing engine side, people have recognized that stream and batch unification is technically sound, valuable, and also achievable. Flink SQL is currently at the forefront of this endeavor and will continue to push the boundary of stream and batch unification, including addressing the remaining challenges in streaming SQL and further improving its batch capability.
From Flink SQL to Streaming Data Warehouse
The primary use case for Flink SQL is undoubtedly analytics on both streaming and batch data. However, even though Flink SQL has significantly lowered the bar of analyzing streaming data, it is still not so easy for people to work with streaming data compared with what they can do with batch data.
Although Data Warehouse is a popular and well-established architecture designed for analytics purposes, it could become a little clumsy today if one wants to handle streaming data. A typical setup can be illustrated below.
A typical Data Warehouse setup
In the diagram above, the original data comes from two sources: the application logs (e.g. user behavior tracking logs) and the transactional databases. The data is streamed into the system from these two sources. There are several streaming jobs chained with a message queue like Apache Kafka sitting in between as the storage for streaming data. The pipeline composed of Flink and Kafka in this case can deliver low latency streaming analytics capability.
In addition to the streaming pipeline, a common use case is to archive the data in Kafka to an offline system which provides better efficiency for better computing. In this case, a common solution is to send a copy of the data from Kafka to Hive and have a separate batch computing pipeline, just like a typical data warehouse.
Users may also want to query the data in Kafka in an ad-hoc manner for purposes like data exploration or debugging. Unfortunately Kafka is not queryable by itself, so users would usually send another copy of data from Kafka to a system that one can directly query, such as ClickHouse or MySQL.
While this architecture seems intuitive and handles both streaming and batch data, it has a few caveats.
- There are multiple storage systems likely with different data guarantees and the data consistency and data quality could be challenging.
- There are multiple computing engines with different APIs, dialects, and semantics. This may result in higher cost in code development and maintenance, as well as more discrepancies in the logic.
- Due to the complexity of the architecture, the operational cost could also be high, and it might be hard to ensure system stability.
At the beginning of this year, a new subproject of Apache Flink called Flink Table Store was launched. The goal of this project is to provide a stream and batch unified storage. It aims to serve all the analytical use cases well, including stream computing, batch computing, and ad-hoc queries. In order to achieve this, Flink Table Store needs to accept and generate full change logs from end to end, which is critical for stream processing to produce reliably correct results.
Combining Flink SQL and Flink Table Store, we can come up with an elegant architecture to unify stream and batch analytics.
We call this architecture Streaming Data Warehouse. The idea behind this is treating every dataset as a Dynamic Table backed by Flink Table Store. Each dynamic table can be read in three different ways:
- a message queue which emits full change logs
- a scannable with proper indexes for range scan, based on file systems or objects stores, for example.
- a K-V store for lookup based on keys
The interaction between Flink SQL and the dynamic tables are through different SQL statements:
- DDL which helps define the dynamic tables and how Flink SQL should perform IO on it.
- DML which manipulates the dynamic tables, such as altering schema, updating partial data, etc.
- DQL which performs the queries on the dynamic tables.
Streaming Data Warehouse helps address the shortcomings for the existing architecture. It is an architecture with complete stream and batch unification, equipped with a unified API, a unified computing engine, and a unified storage.
Although born from Flink SQL, Flink Table Store can also be used in other computing engines, such as Apache Spark or Trino, for read/write.
Summary
Flink SQL has been through a long journey. This blog post only captures a glimpse of the work that has been done, the brilliant ideas thought of, the lessons learned, and milestones achieved.
Stay tuned for more deep dives into Flink SQL. We plan to explore the experiences and best practices from Flink veterans who not only contributed to Flink SQL features, but are also dealing with production Flink jobs from a wide range of companies on a daily basis.
Acknowledgements
This blog post is impossible without the outstanding engineers who have made Flink SQL the way it is today. I’d like to thank the Flink Community and the Alibaba Blink team who have been relentlessly pushing Flink SQL forward.