Skip to content

Flink SQL Recipe:
Window Top-N and Continuous Top-N


by

Flink SQL has emerged as the standard for low-code streaming analytics and managed to unify batch and stream processing while simultaneously staying true to the SQL standard. In addition, it provides a rich set of advanced features for real-time data analysis. In a nutshell, Flink SQL is the best of both worlds: it gives you the ability to process streaming data using SQL, but it also supports batch processing.

Ververica Platform makes Flink SQL even more accessible and efficiently scalable across teams. The platform comes with additional tools for developing SQL scripts, managing user-defined functions (UDFs), catalogs and connectors, as well as operating resulting long-running queries.

We have seen many use cases for Flink SQL, and we are excited to show you what you can build with it. In this series of blog posts, we will explore how to use Flink SQL to process data in a variety of ways. This post, in particular, will focus on two queries: Window Top-N and Continuous Top-N.

Tip: Visit our Case Study to explore how others are using Apache Flink.

What are Window Top-N and Continuous Top-N queries?

Window Top-N and Continuous Top-N are two similar but slightly different ways of processing data. In both cases, we want to find the top N items in a stream of data but there are some key differences:

  • In Window Top-N, we process data in fixed-size windows. For example, we might want to find the top 10 items every minute.
  • In Continuous Top-N, we process data continuously. We don't use windows, but instead process data as it arrives.

Continuous Top-N is more difficult to implement than Window Top-N, but it has some advantages. For example, it can give us results more quickly, since we don't have to wait for a window to close before we can see the results.

Common use cases for Window Top-N and Continuous Top-N queries

Window Top-N and Continuous Top-N queries are both useful for a variety of tasks. For example, they can be used for:

  • Fraud detection: In a stream of financial transactions, we might want to find the top 10 transactions by amount every minute. What can it help us with? - identify suspicious activity.
  • Recommendations: In a stream of user interactions, we might want to find the top 10 items that are being viewed or purchased. What can it help us with? - make recommendations to users.
  • Anomaly detection: In a stream of sensor readings, we might want to find the top 10 sensors with the highest readings. What can it help us with? - identify sensors that are malfunctionin
  • Monitoring: In a stream of log messages, we might want to find the top 10 log messages by volume. What can it help us with? - identify system issues.

How to use Flink SQL to write Window Top-N queries

Let's start by looking at how to use Flink SQL to write Window Top-N queries. We'll show you how to calculate the Top 3 suppliers who have the highest sales for every tumbling 5 minutes window.


CREATE TABLE orders (
  bidtime TIMESTAMP(3),
  price DOUBLE,
  item STRING,
  supplier STRING,
  WATERMARK FOR bidtime AS bidtime - INTERVAL '5' SECONDS
) WITH (
 'connector' = 'faker',
 'fields.bidtime.expression' = '#{date.past ''30'',''SECONDS''}',
 'fields.price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
 'fields.item.expression' = '#{Commerce.productName}',
 'fields.supplier.expression' = '#{regexify ''(Alice|Bob|Carol|Alex|Joe|James|Jane|Jack)''}',
 'rows-per-second' = '100'
);

SELECT *
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
    FROM (
      SELECT window_start, window_end, supplier, SUM(price) as price, COUNT(*) as cnt
      FROM TABLE(
        TUMBLE(TABLE orders, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY window_start, window_end, supplier
    )
  ) WHERE rownum <= 3;

The source table (orders) is backed by the faker connector, which continuously generates rows in memory based on Java Faker expressions.

 

Note : This example leverages the Window Top-N feature to display the top 3 suppliers with the highest sales every 5 minutes.

schema explorer

How to use Flink SQL to write Continuous Top-N queries

Writing Continuous Top-N queries is more difficult than writing Window Top-N queries. The reason for this is that, in Continuous Top-N, we process data as it arrives instead of using windows.

.This example will take us into the realm of magic as stream processing is often considered to be by the uninitiated. However, it is really just a set of instructions to be executed on a stream of data. We will show how to continuously calculate the "Top-N" rows based on a given attribute, using an OVER window and the ROW_NUMBER() function.

The source table (spells_cast) is backed by the faker connector, which continuously generates rows in memory based on Java Faker expressions.

The Ministry of Magic tracks every spell a wizard casts throughout Great Britain and wants to know every wizard's Top 2 all-time favorite spells.

Flink SQL can be used to calculate continuous aggregations, so if we know each spell a wizard has cast, we can maintain a continuous total of how many times they have cast that spell.


SELECT wizard, spell, COUNT(*) AS times_cast
FROM spells_cast
GROUP BY wizard, spell;

This result can be used in an OVER window to calculate a Top-N. The rows are partitioned using the wizard column, and are then ordered based on the count of spell casts (times_cast DESC). The built-in function ROW_NUMBER() assigns a unique, sequential number to each row, starting from one, according to the rows' ordering within the partition. Finally, the results are filtered for only those rows with a row_num <= 2 to find each wizard's top 2 favorite spells.

Where Flink is most potent in this query is its ability to issue retractions. As wizards cast more spells, their top 2 will change. When this occurs, Flink will issue a retraction, modifying its output, so the result is always correct and up to date.


CREATE TABLE spells_cast (
  wizard STRING,
  spell STRING
) WITH (
 'connector' = 'faker',
 'fields.wizard.expression' = '#{harry_potter.characters}',
 'fields.spell.expression' = '#{harry_potter.spells}'
);

SELECT wizard, spell, times_cast
FROM (
  SELECT *,
  ROW_NUMBER() OVER (PARTITION BY wizard ORDER BY times_cast DESC) AS row_num
  FROM (SELECT wizard, spell, COUNT(*) AS times_cast FROM spells_cast GROUP BY wizard, spell)
)
WHERE row_num <= 2; 

schema explorer

Conclusion

In this article, you've learned about Window Top-N and Continuous Top-N. You've also seen how to use Flink SQL to write queries for both of these types of problems.

We strongly encourage you to run these examples in Ververica Platform, just follow these simple steps to install the platform.

If you're interested in learning more about Flink SQL, we recommend the following resources:

Don’t have an existing Ververica Platform environment? You can still follow along by downloading our free community edition of Ververica Platform HERE.

Flink forward Seattle 2023

Article by:

Ververica

Comments

Our Latest Blogs

Flink SQL: Detecting patterns with MATCH_RECOGNIZE featured image
by Ververica January 31, 2023

Flink SQL: Detecting patterns with MATCH_RECOGNIZE

Flink SQL has emerged as the de facto standard for low-code data analytics. It has managed to unify batch and stream processing while simultaneously staying true to the SQL standard. In addition, it...
Read More
Flink SQL: Queries, Windows, and Time - Part 2 featured image
by Ververica January 24, 2023

Flink SQL: Queries, Windows, and Time - Part 2

In the previous article, we covered some aspects of time windows and time attributes that you should consider when planning your data collection strategy. This article will provide a more in-depth...
Read More
Flink SQL: Queries, Windows, and Time - Part 1 featured image
by Ververica January 17, 2023

Flink SQL: Queries, Windows, and Time - Part 1

Time is a critical element in stream processing since data is processed as it arrives and must be processed quickly to avoid delays. The ubiquity of time in stream processing means that data...
Read More