Ververica Platform 2.3: Getting Started with Flink SQL on Ververica Platform

November 09, 2020 | by Konstantin Knauf

Flink SQL is one of the three main programming interfaces of Apache Flink.  It is declarative and, because the community kept the syntax inline with the SQL standard, anyone who knows traditional SQL will feel familiar within minutes. Flink SQL has managed to unify batch and stream processing. This means that you can run the same query on a bounded table (a data set) as a batch job or an unbounded table (a stream of data) as a long-running stream processing application and — given the same input — the resulting table will be identical.

Two weeks ago we released Ververica Platform 2.3 which brought development, operations, and management capabilities for Flink SQL to Ververica Platform. In this blog post, I would like to walk you through a simple analytics use case of Flink SQL on Ververica Platform. We will use Flink SQL to read temperature measurements from an Apache Kafka topic, continuously derive statistics, and write these statistics back to a different Kafka topic. 

Reading From Kafka

Flink SQL operates on tables that can be read from and written to. In contrast to a traditional database or data warehouse, Apache Flink does not store the tables itself. Tables are always backed by an external system. So, to read temperature measurements from Kafka we need to define a table over our input topic.

As illustrated in Figure 1, we choose a template for a new table via + in the Schema Explorer and choose “Kafka” and “Confluent Avro” format, because that is where our measurement data resides.

Selecting a CREATE TABLE template for “kafka” with “confluent-avro” formatFigure 1: Selecting a CREATE TABLE template for “kafka” with “confluent-avro” format

Then we need to fill in the template. Besides the name temperature_measurements, we first supply the schema of the table. The schema supplied here will be the one used to deserialize the Avro records read from Kafka. As such, it needs to be compatible with the schema that these records have been serialized with, which is fetched from the schema registry when reading the table. Second, you use the table options to configure the external system that this table is stored in. Since we are using Kafka with Confluent’s Avro this entails configuring Kafka bootstrap servers, the Kafka topic and the connection to the schema registry. 

Finally, we provide a watermark for the measurement_time column. Defining a watermark for a column informs Apache Flink about the expected delay and out-of-orderness in which rows are added to this table. Concretely, WATERMARK FOR measurement_time AS measument_time - INTERVAL ‘15’ SECONDS tells the framework that measurements might come up to 15 seconds out-of-order with respect to the measurement_time. Without a WATERMARK definition, a TIMESTAMP column cannot be used for time-based operations like event time windowing or sorting an unbounded table by time.

Executing the CREATE TABLE statement for temperature_measurementsFigure 2: Executing the CREATE TABLE statement for temperature_measurements


After executing the CREATE TABLE statement the definition of the temperatue_measurements table is durably stored in the vvp catalog and will be available to other users of that platform namespace going forward as well.

Now, we can run queries on that table. To start with, let us simply take a look at rows of that Kafka-backed table by running SELECT * FROM temperature_measurements. Thereby, we are also making sure that the table has been properly configured before building more complicated queries.

Executing a SELECT query over a Kafka-backed table form the editorFigure 3: Executing a SELECT query over a Kafka-backed table form the editor

Being able to query any Flink table from the browser like this is already pretty cool. Particularly because this provides a common interface to query data distributed over various systems like Kafka, ElasticSearch, Hive, MySQL, and more.


Developing our Query

Next, we would like to continuously derive statistics about the measurement records as they come in. Specifically, we would like to derive the average temperature per minute based on the measurement_time.

The temperature column is currently a STRING column and includes the unit °C. In order to take the average, we need to strip the unit and cast the type to a FLOAT. We do this by using two different built-in functions REGEXP_ETRACT and CAST.

Transforming the temperature column from STRING to FLOATFigure 4: Transforming the temperature column from STRING to FLOAT

To compute the average temperature over one minute windows, we use a so called GROUP BY window aggregation with a tumbling window. Tumbling windows have a fixed-length and are non-overlapping. We use TUMBLE to group the rows/records, TUMBLE_ROWTIME to extract the timestamp of the window for the final results and AVG to compute the actual average. So, the final statement will look like this:

 TUMBLE_ROWTIME(measurement_time, INTERVAL '1' MINUTE) AS window_time,
 AVG(CAST(REGEXP_EXTRACT(temperature, '[0-9]+', 0) AS FLOAT)) AS avg_temperature
FROM temperature_measurements
 TUMBLE(measurement_time, INTERVAL '1' MINUTE)

When you run this statement (Figure 5) you’ll see one result row about every minute, because the result rows are only written out once the window end time has passed and the window is closed.

Running our final query from the editorFigure 5: Running our final query from the editor


Writing Back to Kafka

Since we have now established that our query is doing what it should be, we need to write the results into another table. Only by doing so, the results can be consumed by downstream applications. You can also think about it as creating a materialized view based on the temperature_measurement table that is continuously maintained by Apache Flink. We will call the result table average_temperatures and again store it in a Kafka topic with Confluent’s Avro format. Figure 7 shows the corresponding CREATE TABLE statement. Fortunately, the schema and the table name have automatically been templated by the platform based on the INSERT INTO statement below it, so that we only need to provide the missing table options like the Kafka topic.

Executing (“Run Selection”) the CREATE TABLE statement for the result table average_temperaturesFigure 6: Executing (“Run Selection”) the CREATE TABLE statement for the result table average_temperatures

Now, we can “Run” the final INSERT INTO statement. As outlined above an INSERT INTO statement based on an unbounded table is a continuous query, similar to the definition of a materialized view. Therefore, the platform now asks us whether we would like to create a Deployment for this query. A Deployment is Ververica Platform’s abstraction for a long running service or application. 

Validating the final query- a tumbling window aggregation on temperature_measurementsFigure 7: Validating the final query: a tumbling window aggregation on temperature_measurements

We confirm and finalize the Deployment creation form (Figure 8) by giving the Deployment a name and hitting CREATE SQL Deployment.

Submitting the query as a long-running DeploymentFigure 8: Submitting the query as a long-running Deployment

Afterward, we can return to the editor and — once the Deployment is running — query the average_temperatures table directly. Because we only need to query the existing table instead of executing the underlying query, data is now available within seconds (Figure 10).

Querying the result table average_temepraturesFigure 10: Querying the result table average_temepratures

And that’s all for now. This is all you need to build and deploy a highly scalable, flexible data pipeline with Flink SQL and Ververica Platform, but of course, there is still a lot to cover in the upcoming weeks:
user-defined functions, different connectors & catalogs, pattern matching, query management,... If you are interested in learning more, I recommend diving in hands-on with our self-contained getting started guide

     Flink SQL in Ververica Platform, Flink SQL, Ververica Platform, Apache Flink, Getting Started     Flink SQL, SQL, Streaming SQL, KSQL, Apache Flink, Ververica Platform, Flink SQL on Ververica Platform      Ververica Platform, stream processing platform, data platform, Flink, Apache Flink, documentation


Topics: Ververica Platform, Flink SQL

Konstantin Knauf
Article by:

Konstantin Knauf

Find me on:

Related articles


Sign up for Monthly Blog Notifications

Please send me updates about products and services of Ververica via my e-mail address. Ververica will process my personal data in accordance with the Ververica Privacy Policy.

Our Latest Blogs

by Fabian Paul February 11, 2021

Autoscaling Apache Flink with Ververica Platform Autopilot

With the release of Ververica Platform 2.2 in August 2020, we introduced Autopilot, a feature designed to automate the operationalization of Flink applications in production. With its current...

Read More

Leadership changes to accelerate our next development stages

Today we are announcing some important leadership changes to Ververica in order to accelerate our next stages of development.

Read More
by Nick Hwang & Manuela Wei January 26, 2021

How Intuit Built a Self-serve Stream Processing Platform with Flink

At Flink Forward Global 2020, we presented how we on Intuit’s Data Platform team developed an internal, self-serve stream processing platform with Apache Flink as the primary stream processing...

Read More