SQL Query Optimization with Ververica Platform 2.4

May 11, 2021 | by Nico Kruber

In my last blog post, Simplifying Ververica Platform SQL Analytics with UDFs, I showed how easy it is to get started with SQL analytics on Ververica Platform and leverage the power of user-defined functions without having to worry about the details. However, when developing the examples, we skipped an entire category of queries because these could not be displayed in the SQL editor of Ververica Platform 2.3: queries that produce changelog streams, e.g. non-windowed aggregations, top-n queries, and more. I would thus like to take the opportunity of the recent release of Ververica Platform 2.4 to develop a few SQL queries that derive further insights from the Apache Flink community similarly to how a data scientist would develop a real-time streaming analytics application: by iteratively exploring the input data and tuning the query until it fits. I will present changelog streams and what you can do with them along the line of this story.


Preparation

If you want to follow along and eventually run your own analytics, please refer to our previous blog post to get started. In one of the following articles, we will try to simplify the setup and move the data import into the SQL world. Any statements below will assume that you have successfully imported mailing list data into the following 3 Kafka-backed tables as defined in our README file: flink_ml_dev, flink_ml_user, and flink_ml_user_zh.

important-clipart-general-information-1  The tables used throughout this blog post can be imported quickly without setting up authentication (as opposed to the Github-based data streams, which otherwise are throttled too quickly). The import job for com.ververica.platform.FlinkMailingListToKafka will fetch the mailing list archives from the public mirrors and quickly write their content to Kafka.


As before, the code for importing the data, any user-defined functions (UDFs), and more are available on Github as version 2.1. Since we added a few more UDFs to our mix, you need to update the SQL functions' jar artifact (freshly built or fetched from Github’s packages) in Ververica Platform (SQL → Functions → Artifacts → Update), as shown by the screenshot below. After uploading the new version, you will be asked to register any new functions. Select all new functions and register them to use them in the following queries.

Register UDF Artifact-Ververica Platform-Apache Flink

We will show any statements below based on Ververica Platform 2.4.2.

 

Introduction

Last time, we briefly looked at the number of distinct authors on the mailing list with a simple query like this:


SELECT
  TUMBLE_END(`date`, INTERVAL '365' DAY(3)) as windowEnd,
  COUNT(DISTINCT fromEmail) AS numUsers
FROM flink_ml_user
GROUP BY TUMBLE(`date`, INTERVAL '365' DAY(3));

As you see, we have been grouping the result by windows of 365 days. This may be a useful statistic to track over time and look for patterns, especially when altering the query and creating results for different periods of time. You could, for example, change to days of the week or weeks throughout the year with which you should be able to identify typical days or seasons for a vacation that you may also correlate with geographical regions by, for example, comparing the Chinese vs the international user mailing list as the simplest option. However, you may also want to execute this query without grouping by time and just using non-windowed aggregations. In that case, the query will not create an append-only stream but instead a changelog stream that we will explore more below with the help of Ververica Platform 2.4.

 

Non-Windowed Aggregations

Possibly the simplest queries to showcase non-windowed aggregations are counting the number of (distinct) rows like in the following example:


SELECT
  COUNT(DISTINCT fromEmail) AS numUsers
FROM flink_ml_user;


Since in streaming mode, we are processing a potentially infinite stream of events (or “rows” in SQL terminology) and see only one event at a time, there is no such thing as a “complete result”. Therefore, any SQL result table can actually only deliver a (temporary) view of the query’s input data up to the current row(s), followed by further result updates. If this stream of result updates adds new rows and needs to correct or remove previous results, we call this a changelog stream. This is actually nothing special but used in day-to-day business to, e.g., materialize - probably more complex - aggregations into external systems and serve real-time dashboards from the most up-to-date (and continuously updated) results.

Before putting such dashboards into production, however, you need to design your aggregations, and typically you start from something simple, explore the results, and take it from there. In Ververica Platform 2.4, you can run any query and observe its results in the preview pane. Changelog streams like the one from the query above will keep updating the result rows as defined by the query and as long as it is running (you can also pause the execution to look at a specific snapshot in time).


Apache Flink SQL Optimization-Ververica Platform

 

note-1-purple Tip: If you enrich your statement with a maxDate field as shown, you will also know where you (or your browser) are in our input data. This can sometimes be helpful if you see unexpected results.  


Behind the scenes, the platform is actually collecting the changes as they appear: the first row inserts “1” (and the date) into the result (+I). As soon as the second distinct user or a new
maxDate is seen, the previous row becomes outdated and needs to be retracted (-U), immediately followed by the row's actual update (+U). These updates continue for as long as new data is changing the result and can be observed in detail with the Changelog tab:

Apache Flink SQL Optimization-Ververica Platform -updates

important-clipart-general-information-1 If you look through the Flink Documentation on Dynamic Tables, you will actually also find a more efficient changelog stream that only contains upsert and delete messages. This optimized form is used for (output) tables that define a (possibly composite) unique key. For connectors such as upsert-kafka, Flink’s SQL engine will then update the target system’s row identified by that key. The SQL editor’s preview cannot (yet) use these because it lacks a unique key constraint. For this reason, you may actually also experience the result rows flickering, e.g. when the preview processed the result’s “-U” row but the according “+U” hasn’t been received yet.


We will iterate these kinds of queries a bit and derive a few more statistics on mailing list use.

 

Number of Emails per Mailing List User

Instead of counting the total number of emails, how about finding out how many emails each user sent and when they sent the last one? We can do this with a simple extension to our first query:


SELECT
  fromEmail AS `user`,
  COUNT(*) AS numEmails,
  MAX(`date`) AS maxDate
FROM flink_ml_user
GROUP BY fromEmail;

While this query is running, you will see new result rows popping up in the preview and existing rows changing. Just as above, you can also inspect the individual changes. These updates are flowing in at a speed where you may even still be able to see individual changes. If you want to take a look behind the scenes and observe the changes live, click the “View in Flink UI” button in the editor which will direct you to the Flink job that is running in the SQL preview’s session cluster.

View Flink web UI - Ververica Platform


Now, if you click through the tasks from the query you posted, you will eventually find out that the source task is backpressured by the sink task, which does the aggregation and holds the results for Ververica Platform. We made this sink to backpressure the Flink job if the UI is not fast enough in fetching the results - something that feels natural in the streaming world.


Flink SQL job example - Ververica Platform


attention Because we designed the SQL preview to not overwhelm the browser (or the user) with thousands of changes per second, it is not a high-throughput sink; and thus not meant for performance benchmarks. Expect wait times to show the final results of queries that produce a lot of changes before getting there!


We could now wait until the UI has processed all results and present them on screen, but that wouldn’t be too useful after all since it is too much data to look at anyway. Actually, we do not need the full set of results for this query since we typically use the preview for rapid prototyping of SQL queries to see whether it is working as expected. Similarly, we will expand our query a bit further in the next sections of this post.


important-clipart-general-information-1 Once you have your query in shape to produce the expected results, you can promote it to a full streaming SQL deployment that continuously produces results whenever new data arrives. Typically, these results are written into external systems via one of the provided connectors, e.g. to Elasticsearch for serving real-time dashboards:

INSERT INTO output_table SELECT …

When you run such a query, Ververica Platform will ask you to create a deployment that you can then manage independently and outside of the SQL editor. You can configure this deployment to your needs, and you can change the parallelism, state backends, checkpointing, and more. In addition to cluster settings like these, there are also a couple of SQL Tuning options you can find in our documentation.

 

Email Aliases and (potential) Company Association for Users on the Mailing List

Let’s try to be a little fancier and find out which email addresses people use on the mailing lists and whether we can attribute these to companies. Especially the company attribution is not an easy task in general. For this section, we will use the information we have from previous emails sent from that user and look at the used email domains. This is not perfect, but good enough for a start.

Let’s first look at the email domains that are present on the list:


SELECT
  DISTINCT SUBSTRING(fromEmail FROM POSITION('@' IN fromEmail))
    AS domain
FROM flink_ml_user;


That results in over 700 rows (all inserts, no retractions), and if you scroll over the entries, you will see a couple of email providers, which we should filter out from company associations. Let’s make this list a bit more digestible by looking at how many unique users each of these domains have:


SELECT
  SUBSTRING(fromEmail FROM POSITION('@' IN fromEmail)) AS domain,
  COUNT(*) AS uniqueUsers
FROM (
  SELECT DISTINCT fromEmail
  FROM flink_ml_user
)
GROUP BY SUBSTRING(fromEmail FROM POSITION('@' IN fromEmail))
HAVING COUNT(*) > 5;

By showing only domains with more than 5 unique users, we receive a couple of dozen email domains to check manually for being an email provider.


important-clipart-general-information-1 Notice that this query creates a changelog stream again because the number of users keeps changing. We do, however, not need to worry about this fact anymore since Ververica Platform 2.4.

 

The company association should now work the following way:

  • a user is identified by the name in the FROM field, which is usually composed of NAME <EMAIL> (for the scope of this article, we will ignore the fact that names are actually not a unique identifier for a person)

  • for each user, we collect all email aliases seen so far

  • the most recent non-email-provider domain will be this user’s associated company


This felt like it can be more easily done in a user-defined function which we implemented as GetEmailAliasesAndCompany (code). We will discuss creating various types of user-defined functions and handling data types in a separate blog post. For this article, it suffices to know that the logic above is implemented in a user-defined aggregate function that receives an email address and the date it was used, and creates a ROW type result with three fields: aliases contains collected email aliases, company the most recent company association, and companySince the date of the most recent switch to that company (the latter two in the order of processing the rows).

With this function, we define a new temporary view, i.e. flink_ml_user_with_name_and_aliases, that we can leverage for further analysis:


CREATE TEMPORARY VIEW flink_ml_user_with_name_and_aliases AS
SELECT *,
  REPLACE(fromRaw, ' <' || fromEmail || '>', '') AS fromName,
  GetEmailAliasesAndCompany(fromEmail, `date`) OVER (
    PARTITION BY REPLACE(fromRaw, ' <' || fromEmail || '>', '')
    ORDER BY `date`
  ) AS fromAliasesAndCompany
FROM flink_ml_user;

important-clipart-general-information-1 Don’t worry about using REPLACE(fromRaw, ' <' || fromEmail || '>', '') twice in the query above. As long as the called function is deterministic, any number of calls with the same parameters will be optimized into a single actual function call when a query is run. This holds for deterministic built-in functions just as well as any deterministic user-defined functions. This view is essentially the same as the following:


CREATE TEMPORARY VIEW flink_ml_user_with_name_and_aliases AS
SELECT *,
  GetEmailAliasesAndCompany(fromEmail, `date`) OVER (
    PARTITION BY fromName
    ORDER BY `date`
  ) AS fromAliasesAndCompany
FROM (
  SELECT *,
    REPLACE(fromRaw, ' <' || fromEmail || '>', '') AS fromName
  FROM flink_ml_user
);


We can now go ahead and explore this view a bit more. A simple SELECT * FROM flink_ml_user_with_name_and_aliases, for example, presents each email with the information we collected. Before we are confident that we actually implemented it correctly, let’s look at how individual users' results change over time. For that, we need something more practical than this statement: an aggregation for each user, showing the collected aliases and the company association we derived:


SELECT
  fromName,
  LISTAGG(DISTINCT fromEmail) AS fromAliases,
  LEAD(fromAliasesAndCompany.company, 0) AS fromCompany
FROM flink_ml_user_with_name_and_aliases
GROUP BY fromName
HAVING
  LEAD(fromAliasesAndCompany.company, 0) IS NOT NULL AND
  COUNT(DISTINCT fromEmail) >= 2;

Apache Flink SQL example - Ververica Platform - SQL Optimization


The result is looking fine, and email domains like apache.org are not used for company names but are collected as an alias. You can further explore the flink_ml_user_with_name_and_aliases view by, for example, filtering it on a specific fromName to follow individual people.

note-1-purple Note: We actually didn’t use fromAliasesAndCompany.aliases in the query above but did our own aggregation with LISTAGG. The reason is two-fold: (1) this field is not being grouped on (which would not make much sense here) and thus cannot be used directly, and (2) we cannot simply use LEAD(fromAliasesAndCompany.aliases, 0) as we did for the company field because LEAD is not defined for array types (yet). If you actually want to verify that the aliases accumulation from GetEmailAliasesAndCompany is also correct, you can either look at the non-aggregated results or use the LargestStringArrayAggFunction user-defined aggregate function (code) that we implemented for the ARRAY<STRING> type. 


Finally, we can look at how much activity on Flink’s user mailing list comes from people from specific companies. Here separate for each time window of 30 days:


SELECT
  COALESCE(fromAliasesAndCompany.company, 'unknown') AS fromCompany,
  TUMBLE_END(`date`, INTERVAL '30' DAY) as windowEnd,
  COUNT(*) as numContributions
FROM flink_ml_user_with_name_and_aliases
GROUP BY
  COALESCE(fromAliasesAndCompany.company, 'unknown'),
  TUMBLE(`date`, INTERVAL '30' DAY)
HAVING COUNT(*) > 10;

Apache Flink SQL Query - Running - ververica Platform

 

Top-N Aggregations

Another type of query that is often used for analytics and creates changelog streams is a Top-N aggregation that returns the N largest (or smallest) values as determined by a set of columns. The Flink SQL optimizer recognizes Top-N queries if they follow a specific pattern described in detail in our Top-N documentation.


important-clipart-general-information-1 Even if you want something like a Top-N for each window of 30 days, it will produce update and delete messages because Flink 1.12 does not yet integrate Top-N aggregations with windows (Flink 1.13 supports this via table-valued functions). If you want (or need) windowed Top-N results that create append-only streams in Flink 1.12, you can create your own user-defined table aggregate function and group by the desired window. We will take a look at how to do this and write all sorts of user-defined functions in general in a future blog post of this series.


We will use Top-N aggregations to bring some order to the results from the queries shown above and make them easily interpretable. With this, we can widen our statistic’s scope as well.

The queries above already created the statistics for the number of emails per mailing list user on user@flink.apache.org. Let’s combine these with the emails on the other Apache Flink community lists that we imported, i.e. user-zh and dev, and find out who was most active per year. We can build the query in two steps for better maintainability:

  1. create a (temporary) view of the combined email lists and aggregate the number of emails per user:

    
    CREATE TEMPORARY VIEW flink_ml_user_count_per_user_1y AS
    SELECT
      fromEmail AS `user`,
      COUNT(*) AS numEmails,
      TUMBLE_END(`date`, INTERVAL '365' DAY(3)) AS windowEnd
    FROM (
      SELECT fromEmail, `date` FROM flink_ml_user
      UNION ALL
      SELECT fromEmail, `date` FROM flink_ml_user_zh
      UNION ALL
      SELECT fromEmail, `date` FROM flink_ml_dev
      WHERE NOT IsJiraTicket(fromRaw)
    )
    GROUP BY fromEmail, TUMBLE(`date`, INTERVAL '365' DAY(3));
    

     

  2. apply the Top-N pattern, e.g. for the 10 most active users:
    
    SELECT * FROM (
      SELECT
        *,
        ROW_NUMBER() OVER (
          PARTITION BY windowEnd
          ORDER BY numEmails DESC, `user`) AS rownum
      FROM flink_ml_user_count_per_user_1y
    ) WHERE rownum <= 10;
    


Alternatively, you can inline
the view definition into the Top-N query, e.g. with a sub-query. However, the proposed separation supports you better in looking at the individual tables, e.g. for introspection or debugging. It also allows you to re-use the view many times. e.g. if you, later on, write a full SQL deployment that emits output from different aggregations to multiple Kafka tables via statement sets.

When executing these statements, results will build up, and you can see the changes being applied as they come. Eventually, you will see a final result like this:

Apache Flink SQL Query - Running - Ververica Platform (2)

The query above assumes that users are uniquely identified by their email addresses. Similarly, we could use the name as the unique attribute (just like in the alias collection and company association further up). Because we separated the original query from the Top-N, we can switch this definition by simply changing the view to account for the new meaning. Here, we can also add a list of all collected email addresses for double-checking:


CREATE TEMPORARY VIEW flink_ml_user_count_per_user_1y AS
SELECT
  REPLACE(fromRaw, ' <' || fromEmail || '>', '') AS `user`,
  LISTAGG(DISTINCT fromEmail) AS aliases,
  COUNT(*) AS numEmails,
  TUMBLE_END(`date`, INTERVAL '365' DAY(3)) AS windowEnd
FROM (
  SELECT fromRaw, fromEmail, `date` FROM flink_ml_user
  UNION ALL
  SELECT fromRaw, fromEmail, `date` FROM flink_ml_user_zh
  UNION ALL
  SELECT fromRaw, fromEmail, `date` FROM flink_ml_dev
  WHERE NOT IsJiraTicket(fromRaw)
)
GROUP BY
  REPLACE(fromRaw, ' <' || fromEmail || '>', ''),
  TUMBLE(`date`, INTERVAL '365' DAY(3));

With the examples above, we have all the building blocks to see how active companies - or people we associate with them - were across all mailing lists. Let me list the steps here nonetheless so you can easily adapt them and derive mailing-list dependent results:


CREATE TEMPORARY VIEW flink_ml_users AS
SELECT *,
  REPLACE(fromRaw, ' <' || fromEmail || '>', '') AS fromName,
  GetEmailAliasesAndCompany(fromEmail, `date`) OVER (
    PARTITION BY REPLACE(fromRaw, ' <' || fromEmail || '>', '')
    ORDER BY `date`
  ) AS fromAliasesAndCompany
FROM (
  SELECT fromRaw, fromEmail, `date` FROM flink_ml_user
  UNION ALL
  SELECT fromRaw, fromEmail, `date` FROM flink_ml_user_zh
  UNION ALL
  SELECT fromRaw, fromEmail, `date` FROM flink_ml_dev
  WHERE NOT IsJiraTicket(fromRaw)
);

CREATE TEMPORARY VIEW flink_ml_users_1y AS
SELECT
  COALESCE(fromAliasesAndCompany.company, 'unknown') AS company,
  TUMBLE_END(`date`, INTERVAL '365' DAY(3)) as windowEnd,
  COUNT(*) as numEmails,
  COUNT(DISTINCT fromEmail) as numUsers
FROM flink_ml_users
GROUP BY
  COALESCE(fromAliasesAndCompany.company, 'unknown'),
  TUMBLE(`date`, INTERVAL '365' DAY(3));

SELECT * FROM (
  SELECT
    *,
    ROW_NUMBER() OVER (
      PARTITION BY windowEnd
      ORDER BY numEmails DESC, `company`) AS rownum
  FROM flink_ml_users_1y
) WHERE rownum <= 10;

Apache Flink SQL Query - Running - Ververica Platform (3)

Conclusion

In this article, we have looked at changelog streams in general and have worked with them by using non-windowed aggregations and Top-N queries. We described how to leverage the SQL preview of Ververica Platform 2.4 to iteratively build these (streaming) queries and eventually derive community statistics like the most active mailing list users and companies. Once you are satisfied with the queries and the results that you see in the preview, you can easily put them into production and create your own stream processing application with the power and simplicity of Flink SQL and Ververica Platform.

Topics: Ververica Platform, Flink SQL

Nico Kruber
Article by:

Nico Kruber

Related articles

Comments

Sign up for Monthly Blog Notifications

Please send me updates about products and services of Ververica via my e-mail address. Ververica will process my personal data in accordance with the Ververica Privacy Policy.

Our Latest Blogs

by Nico Kruber May 11, 2021

SQL Query Optimization with Ververica Platform 2.4

In my last blog post, Simplifying Ververica Platform SQL Analytics with UDFs, I showed how easy it is to get started with SQL analytics on Ververica Platform and leverage the power of user-defined...

Read More
by Jun Qin March 29, 2021

The Impact of Disks on RocksDB State Backend in Flink: A Case Study

As covered in a recent blog post, RocksDB is a state backend in Flink that allows a job to have state larger than the amount of available memory as the state backend can spill state to local disk....

Read More
by Konstantin Knauf March 10, 2021

Announcing Ververica Platform 2.4

Newest release adds full support for Flink SQL and Flink 1.12, and improves resource utilization via new shared session clusters.

Read More