Skip to content

Ververica Cloud, a fully-managed cloud service for stream processing!

Learn more

Flink SQL: Joins Series 3 (Lateral Joins, LAG aggregate function)


Flink SQL has emerged as the de facto standard for low-code data analytics. It has managed to unify batch and stream processing while simultaneously staying true to the SQL standard. In addition, it provides a rich set of advanced features for real-time use cases. In a nutshell, Flink SQL provides the best of both worlds: it gives you the ability to process streaming data using SQL, but it also supports batch processing.

Ververica Platform makes Flink SQL even more accessible and efficiently scalable across teams. The platform comes with additional tools for developing SQL scripts, managing user-defined functions (UDFs), catalogs and connectors, as well as operating the resulting long-running queries.

We have seen that there are many use cases for Flink SQL, and we are excited to see what you will build with it. In this three-part series of blog posts, we will show you different types of joins in Flink SQL and how you can use them to process data in a variety of ways.

What is a Lateral Join?

Lateral joins are a type of SQL join that allow you to specify a subquery in the FROM clause. This subquery is then executed for each row in the outer query. Lateral joins can be used to improve the performance of SQL queries by reducing the number of table scans.

In other words, you can think of lateral join as a foreach loop in SQL that iterates through a collection, applies some transformation on each iteration, and produces an output. Lateral join is very useful in processing data that is stored in a hierarchical or nested format.

How to perform a lateral table join

This example will show how you can correlate events using a LATERAL join.

Given a table with people's addresses, you need to find the two most populous cities for each state and continuously update those rankings as people move. The input table of People contains a uid for each person and their address and when they moved there.

The first step is to calculate each city's population using a continuous aggregation. While this is simple enough, the real power of Flink SQL comes when people move. By using deduplication Flink will automatically issue a retraction for a person's old city when they move. So if John moves from New York to Los Angeles, the population for New York will automatically go down by 1. This gives us the power of Change-Data-Capture without having to invest in the actual infrastructure of setting it up!

With this dynamic population table at hand, you are ready to solve the original problem using a LATERAL table join. Unlike a normal join, lateral joins allow the subquery to correlate with columns from other arguments in the FROM clause. And unlike a regular subquery, as a join, the lateral can return multiple rows. You can now have a subquery correlated with every individual state, and for every state it ranks by population and returns the top 2 cities.

    id           INT,
    city         STRING,
    state        STRING,
    arrival_time TIMESTAMP(3),
    WATERMARK FOR arrival_time AS arrival_time - INTERVAL '1' MINUTE
) WITH (
    'connector' = 'faker',
    ''    = '#{number.numberBetween ''1'',''100''}',
    ''  = '#{regexify ''(Newmouth|Newburgh|Portport|Southfort|Springfield){1}''}',
    'fields.state.expression' = '#{regexify ''(New York|Illinois|California|Washington){1}''}',
    'fields.arrival_time.expression' = '#{date.past ''15'',''SECONDS''}',
    'rows-per-second'          = '10'

    COUNT(*) as population
        ROW_NUMBER() OVER (PARTITION BY id ORDER BY arrival_time DESC) AS rownum
    FROM People
WHERE rownum = 1
GROUP BY city, state;

    (SELECT DISTINCT state FROM CurrentPopulation) States,
        SELECT city, population
        FROM CurrentPopulation
        WHERE state = States.state
        ORDER BY population DESC
        LIMIT 2

What is a LAG() function?

LAG(column_name, offset) is a function that is used to access data from a previous row in the same table.

This function is useful for comparisons where you want to compare values in the current row with values in a previous row. For example, you might want to find out how much a given stock price has increased or decreased over time.

LAG() is easy to use. Simply specify the column you want to access data from and the number of rows back you want to go. For example, LAG(sales, 1) would return the sales data from the previous row.

How to retrieve a previous row value with the LAG() function

This example will demonstrate how to retrieve the previous value and compute trends for a specific data partition.

The source table (fake_stocks) is backed by the faker connector, which continuously generates fake stock quotations in memory based on Java Faker expressions.

In this example, you are going to create a table which contains stock ticker updates for which we want to determine if the new stock price has gone up or down compared to its previous value.

First, create the table, then use a select statement including the LAG function to retrieve the previous stock value. Finally, use the case statement in the final select to compare the current stock price against the previous value to determine the trend.

CREATE TABLE fake_stocks (
    stock_name STRING,
    stock_value double,
    log_time AS PROCTIME()
) WITH (
  'connector' = 'faker',
  'fields.stock_name.expression' = '#{regexify ''(Deja\ Brew|Jurassic\ Pork|Lawn\ \&\ Order|Pita\ Pan|Bread\ Pitt|Indiana\ Jeans|Thai\ Tanic){1}''}',
  'fields.stock_value.expression' =  '#{number.randomDouble ''2'',''10'',''20''}',
  'fields.log_time.expression' =  '#{date.past ''15'',''5'',''SECONDS''}',
  'rows-per-second' = '10'

WITH current_and_previous as (
        lag(stock_value, 1) over (partition by stock_name order by log_time) previous_value
    from fake_stocks
select *,
        when stock_value > previous_value then '▲'
        when stock_value < previous_value then '▼'
        else '='
    end as trend
from current_and_previous;

SQL Query result


In this article, you learned about a lateral table join and a LAG() function. You've also seen how to use Flink SQL to write queries for both of these types of scenarios.

We encourage you to run these examples on Ververica Platform. You can follow these simple steps to install the platform.

To learn more about Flink SQL, check out the following resources:

Interested in trying out ververica platform Ververica Platform? Contact us for more information.

Flink Forward 2024 conference + bootcamp ticket

Article by:



Our Latest Blogs

Flink Forward Berlin 2024: Registration, Training & Sponsorships featured image
by Karin Landers 12 July 2024

Flink Forward Berlin 2024: Registration, Training & Sponsorships

You're invited to Flink Forward Berlin 2024, your conference for all things streaming data and Apache Flink®️, organized by Ververica. Check out information on registration, training courses, and...
Read More
Ververica Platform 2.13.1 is Released featured image
by Ververica 21 June 2024

Ververica Platform 2.13.1 is Released

The Ververica Platform 2.13.1 release is here! We're excited to share the latest changes with you.
Read More
4 Hot Tips for Crafting a Great Flink Forward Presentation Submission: Insights from the Program Committee featured image
by Kaye Lincoln 28 May 2024

4 Hot Tips for Crafting a Great Flink Forward Presentation Submission: Insights from the Program Committee

The deadline for the Call for Presentations at Flink Forward Berlin 2024 is fast approaching, offering potential speakers a unique opportunity to contribute to the vibrant Flink community. Whether...
Read More