Real-Time Performance Monitoring with Flink SQL: AdTech Use Case

November 27, 2020 | by Alexander Fedulov

Background

Advertising Technologies (Ad Tech) is a collective name that describes systems and tools for managing and analyzing programmatic advertising campaigns. The goal of digital advertising is to reach the largest number of relevant audience members possible. Therefore, ad tech is intrinsically related to processing large volumes of data.

In this blog post, we'll look into how to correlate two streams of events — ad servings (so called impressions) and clicks and calculate an important ad tech metric — a click-through rate (CTR). Our calculations will be performed based on the in-flight data using Apache Flink’s horizontally scalable execution engine. We will focus on getting the results without writing any code in Java or Scala, but rather by completely relying on SQL.

In a typical scenario, a placement of an ad is performed via a mechanism called Real-Time Bidding.  In essence, Real-Time Bidding is an auction where a multitude of participants compete for displaying a banner or a video (collectively called a creative) to a specific end user. During this process, demand-side platforms (DSPs) get offerings to show advertisements to users, identified by their device IDs and reply with their bets.

Use-Case-FlinkSQLon VVP-picture 2e

Figure 1: Real-Time Bidding


Tracking which impressions were shown and which of them were clicked on is one of the key tasks in digital advertising technologies.

Impressions and click events
Figure 2: Impression and click events

 

Although the process of placing advertisements is largely automated, there is usually still a significant degree of manual control employed by advertisement campaign managers and business analysts. Frequently, the definition of a campaign and selectors for the audience, such as demographics, country of origin as well as campaign’s performance criteria are defined manually. Closely monitoring the performance of a campaign and adjusting certain parameters might be necessary, especially during an early after-launch phase - the time when assumptions are being validated.

Why Stream Processing?

The task of getting insights into large volumes of data was traditionally addressed by utilizing batch processing. This approach comes into contradiction with the highly dynamic nature of the digital advertising business. It is critical to get insights in real time - waiting for an hour or more for a periodic batch job to finish processing raw data and meanwhile depleting the budget due to wrong initial parameters of a campaign is highly undesirable. Moreover, for any metric that relies on correlating two subsequent events, batch processing will not deliver correct results for occurrences that lie on the opposite sides of the batch “cut-off” and, hence, get processed by two different batch jobs. 


Why Flink SQL?

The task of monitoring a campaign is typically performed by a data- or a business analyst. Due to the dynamic nature of the business, potential ad-hoc integrations with new data feeds, addition of new dimensions to the existing data streams and other similar adjustments can be expected. In this scenario, it is desirable to remove the dependency of data analysts on data engineers in performing their day-to-day tasks. In order to achieve that, a flexible toolset with a low barrier of adoption is required. SQL is the lingua franca of data analysis and its knowledge is widespread. Running SQL statements in Flink allows you to utilize the power of Flink’s horizontally-scalable stream processing engine without the requirement of being a Java or a Scala developer. It makes it possible to easily tap into large volumes of raw in-flight data and facilitate creation of interactive custom dashboards in a self-service manner.

Hands-on

In our example we are going to work with two streams of data. First, those streams are registered as tables by defining their schema and table options.

The first stream is the stream of impressions. Each of these events indicates a win in the Real-Time Bidding auction and successful demonstration of a creative to the user. It contains such details as creative's dimensions, a country code,  and an id of the advertising campaign.


CREATE TEMPORARY TABLE `impressions` (
 bid_id VARCHAR NOT NULL,
 `timestamp` VARCHAR,
 serve_time AS 
    TO_TIMESTAMP(`timestamp`, 'EEE MMM dd HH:mm:ss zzz yyyy'),
 campaign_id INT,
 creative_dimensions VARCHAR,
 country_code VARCHAR(2),
 WATERMARK FOR serve_time AS serve_time - INTERVAL '5' SECOND
)
WITH (
 'connector' = 'kafka',
 'format' = 'json',
 'properties.bootstrap.servers' = 'kafka.svc:9092',
 'properties.group.id' = 'impressions',
 'scan.startup.mode' = 'latest-offset',
 'topic' = 'impressions-ingest'
);

In this example, events are consumed from Kafka in JSON format. See this blog post for various other connectors and data formats supported by Ververica Platform.

WATERMARK FOR serve_time AS serve_time - INTERVAL '5' SECOND means that we can tolerate out-of-order delivery of events in the timeframe of 5 seconds and still produce correct results.

Note: TEMPORARY modifier means that this schema definition will not be persisted in the catalog. It is a convenient way to start working with new schemas without having to use ALTER statements for changes.


One of the most important outcomes that we can track after displaying an ad is a click on the creative.


CREATE TABLE TEMPORARY `clicks` (
 correlation_id VARCHAR NOT NULL,
 `timestamp` VARCHAR,
 click_time AS 
    TO_TIMESTAMP(`timestamp`, 'EEE MMM dd HH:mm:ss zzz yyyy'),
 tracker VARCHAR,
 WATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND 
)
WITH (
 'connector' = 'kafka',
 'format' = 'json',
 'properties.bootstrap.servers' = 'kafka.svc:9092',
 'properties.group.id' = 'clicks',
 'scan.startup.mode' = 'latest-offset',
 'topic' = 'clicks-ingest'
);

The correlation_id of the clicks stream corresponds to the bid_id field of the impressions stream (Figure 2). This will be the basis for joining respective data streams and calculating the Click-Through Rate (CTR). But before we look into the CTR calculation, let’s first start with something simpler to check that all of the moving parts are in the right place.

ERD Diagram-AdTech-use case-1cFigure 3: Impressions and Clicks relation


The following query calculates the  number of impressions within a tumbling window of 60 seconds broken down by campaign_id and creative_dimensions:


SELECT 
 campaign_id, 
 creative_dimensions, 
 TUMBLE_ROWTIME(event_time, INTERVAL '60' SECOND) 
   AS window_end, COUNT(*) AS c
FROM impressions
GROUP BY 
  TUMBLE(event_time, INTERVAL '60' SECOND), 
  campaign_id, 
  creative_dimensions
ORDER BY window_end, c DESC;

Executing this query in Ververica Platform will display the respective breakdown in its live results preview:

Impressions by campaign & dimensions (aggregation over one minute)
Figure 4: Impressions by campaign & dimensions (aggregation over one minute)

 


Calculating Click-Through Rate (CTR)

CTR is defined as the relation between the number of clicks to the overall number of served impressions. From this definition it becomes clear that we need to join two streams together:


CREATE TEMPORARY VIEW impressions_with_clicks_raw AS
SELECT 
  i.bid_id, 
  i.campaign_id,
  i.country_code, 
  i.creative_dimensions, 
  i.`event_time` AS serve_time, 
  c.tracker,
  c.`timestamp` AS click_time,
  CASE
     WHEN c.`timestamp` IS NULL THEN FALSE
     WHEN c.`timestamp` IS NOT NULL THEN TRUE
  END AS clicked
FROM  impressions i
LEFT OUTER JOIN clicks c
  ON i.bid_id = c.correlation_id AND
  c.event_time BETWEEN  i.event_time AND 
  i.event_time + INTERVAL '2' MINUTE ;

This query will produce one row for each impression and match it with a click (if any) that was observed within two minutes after serving the ad.

Note: creation of a VIEW is a logical operation that makes combining multiple queries easier. It does not result in the actual execution of the job until it is referenced elsewhere. When used as part of another query, Flink’s SQL runtime will generate the execution plan and perform optimizations as if it was one joint nested query. Temporary views are used in this blog post for simplifying the description, they are optional and the same results can be produced without them.


A view can be queried just like a table:

Joining impressions and corresponding clicks (raw)-1
Figure 5: Joining impressions and corresponding clicks (raw)

Note: When using streaming SQL, it is important to always have some time-restricting component of the query, otherwise the internal state of the underlying Flink application will grow infinitely. In this particular case, the BETWEEN clause allows Flink to purge events from state when 120 (matching interval) + 5 (table’s watermark) seconds of event time have passed
BETWEEN i.event_time AND i.event_time + INTERVAL '2' MINUTE


The next step is to aggregate raw data and count the number of impressions with and without clicks broken down by the relevant dimensions (campaign_id, country_code in this example):


CREATE TEMPORARY VIEW impressions_with_clicks_5m AS
SELECT 
   TUMBLE_ROWTIME(serve_time, INTERVAL '5' MINUTE) AS window_end,
   campaign_id,
   country_code,
   clicked,
   COUNT(*) AS cnt
FROM impressions_with_clicks_raw
GROUP BY 
  TUMBLE(serve_time, INTERVAL '5' MINUTE),
  campaign_id,
  country_code,
  clicked;

We use TUMBLE window to group and count elements within five-minute intervals. This results in a reduced and predictable number of updates compared to the raw impressions and clicks data:

#campaign_ids * #country_codes * 2 (clicked flag)  every 5 minutes


SELECT * FROM impressions_with_clicks_5m;

Joining impressions and corresponding clicks (aggregation over five minutes)
Figure 6: Joining impressions and corresponding clicks (aggregation over five minutes)

 

As the last step, we need to perform a self-join to calculate the final click-through rates per campaign per country:


CREATE TEMPORARY VIEW ctr_campaigns AS
SELECT
   ic1.country_code,
   ic1.campaign_id,
   ic1.cnt AS `clicks_count`,
   ic2.cnt AS `no_clicks_count`,
   CAST(((100.0*ic1.cnt/(ic1.cnt + ic2.cnt)))     
   AS DECIMAL(8,4) ) AS ctr
FROM impressions_with_clicks_60s AS ic1
   JOIN impressions_with_clicks_60s AS ic2
ON ic1.window_end = ic2.window_end AND
   ic1.country_code = ic2.country_code AND
   ic1.campaign_id = ic2.campaign_id AND 
   ic1.clicked = TRUE AND 
   ic2.clicked = FALSE;

 

ic1.clicked = TRUE AND ic2.clicked = FALSE removes duplicates caused by the self-join.


SELECT * FROM ctr_campaigns;

Derived Click-Through Rate per campaign and country-1
Figure 7: Derived Click-Through Rate per campaign and country

 

At this point we are ready to build our dashboard. According to the definition, impressions_with_clicks_5m produces updates every 5 minutes. These updates trigger the corresponding emission of the results by the ctr_campaigns query with the same cadence. Skipping the window_end field allows to interpret them as in-place, most recent results of the CTR calculation. In order to make those updates available to the external systems, such as BI tools, we can write them into a table that is backed by a database.


CREATE TEMPORARY TABLE `ctr_dashboard` (
 `country_code` VARCHAR(2),
 `campaign_id` INT,
 `clicks_count` BIGINT NOT NULL,
 `no_clicks_count` BIGINT NOT NULL,
 `ctr` DECIMAL(8, 4) NOT NULL,
  PRIMARY KEY (`country_code`, `campaign_id`) NOT ENFORCED
)
WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:postgresql://postgres.databases.svc:5432/adtech',
 'table-name' = 'ctr_dashboard_campaigns',
 'username' = 'flink',
 'password' = '12345'
);


Note: A corresponding table has to be pre-created in the database itself. Flink’s DDL statements only store the table metadata in the catalog, but do not produce side-effects in external systems (e.g. table/topic/index creation).

 


INSERT INTO ctr_dashboard
SELECT
 ic1.country_code,
 ic1.campaign_id,
 ic1.cnt AS `clicks_count`,
 ic2.cnt AS `no_clicks_count`,
 CAST(((100.0*ic1.cnt/(ic1.cnt + ic2.cnt))) AS decimal(8,4) ) 
   AS ctr
FROM impressions_with_clicks_5m as ic1
  JOIN impressions_with_clicks_5m as ic2
ON ic1.window_end = ic2.window_end
  AND  ic1.country_code = ic2.country_code
  AND ic1.campaign_id = ic2.campaign_id AND ic1.clicked = TRUE 
  AND ic2.clicked = FALSE;

 

When executing an INSERT INTO statement in Ververica Platform you will be presented with the following dialog which leads to a creation of a long-running Flink SQL Deployment:

SQL Deployment dialog-bg
Figure 8: SQL Deployment dialog

A Deployment is Ververica Platform’s abstraction of a long-running service or application.

A running Ververica Platform Deployment
Figure 9: A running Ververica Platform Deployment


After letting the Deployment run for some time we can observe the results in the ctr_dashboard_campaigns table.

CTR results in PostgreSQLFigure 10: CTR results in PostgreSQL


Definition of the PRIMARY KEY on the country_code and campaign_id fields allows to show the most recent CTR values for combinations of these dimensions, updated roughly every 5 minutes. By that point, all incoming raw impressions and clicks events have been evaluated, efficiently converted by Flink into the relevant metric and can now easily be presented in a BI tool of your choice.

CTR results presented in Apache Superset
Figure 11: CTR results presented in Apache Superset


Performance considerations

A demand-side platform participating in the Real-Time Bidding auction can receive traffic in the order of 100 000s of requests per second. Depending on the campaign size and the bidding strategy it can result in a very significant number of impressions per second. Flink allows to handle this large volume of data in-flight, without having to “bombard” the SQL database which analysts use for creating dashboards with raw events. At the same time, they can use the same language and mental approach as if they had access to the raw data stored in the database.

An important aspect of Flink’s approach is the clear separation it makes between processing, table persistence (via connectors e.g. Kafka or MySQL), and state fault tolerance (snapshots to blob storage). Flink SQL’s fault tolerance is based on the same lightweight checkpointing mechanism used for non-SQL Flink applications. In contrast to KSQL, for example, Apache Flink does not need to rely on additional topics or partitions on your Apache Kafka cluster to ensure fault-tolerance of the running queries.  The load on the systems that hold your in-flight business data is thereby kept to a minimum. This approach is arguably more suitable for self-service systems, where creation of large volumes of actively-replicated state upon submission of an arbitrary user query might present an operational challenge.

Real-time analytics pipeline with (b) and without (a) Flink SQL
Figure 12: Real-time analytics pipeline with (b) and without (a) Flink SQL

 


Why Ververica Platform?

The latest release of Ververica Platform brough native support for Flink SQL. It includes, among other things, zero-configuration data catalog, convenient web-based editor for creating SQL scripts, UDF support and interactive results preview.

On top of that, Ververica Platform supports multi-tenancy, which is a key requirement for facilitating self-service workflows as described above. In combination with Kubernetes namespaces and resource quotas you can easily control how much computational capacity each team or even individuals within your organization can acquire for running their SQL queries.

Ververica Platform UI

Figure 13: Ververica Platform UI

Conclusion

While this blog post focused on Flink SQL applied to an Ad Tech use case, the general themes are applicable to a wide range of scenarios with any combination of the following requirements:

  • Getting insights into data in real-time
  • Lowering the barrier for accessing real-time data and performing analytics on it in your organization 
  • Reducing the load on traditional databases


Note: If you want to set up a similar scenario with test data, you can use a custom Kafka connector called voluble with the properties which you can find here. This method provides means to ensure that IDs of two streams match - something that is required for the JOIN use case and is currently not supported by the Flink’s DataGenerator source.

 

Topics: Ververica Platform, Flink SQL

Alexander Fedulov
Article by:

Alexander Fedulov

Find me on:

Related articles

Comments

Sign up for Monthly Blog Notifications

Please send me updates about products and services of Ververica via my e-mail address. Ververica will process my personal data in accordance with the Ververica Privacy Policy.

Our Latest Blogs

by Feng Wang December 21, 2020

Apache Flink's stream-batch unification powers Alibaba's 11.11 in 2020

Apache Flink, Ververica Platform and Alibaba Cloud's RealTime Compute technology stack processes record breaking real time data during this year's Double 11!

Wondering how Apache Flink, Ververica...

Read More
by Shashank Agarwal December 15, 2020

Flink-powered model serving & real-time feature generation at Razorpay

During Flink Forward Global 2020 the team from Razorpay showcased how Apache Flink is being utilized in it’s ‘Mitra’ Data Platform as a way to overcome challenges around feature generation and...

Read More
by Alexander Fedulov November 27, 2020

Real-Time Performance Monitoring with Flink SQL: AdTech Use Case

Background

Advertising Technologies (Ad Tech) is a collective name that describes systems and tools for managing and analyzing programmatic advertising campaigns. The goal of digital advertising...

Read More