As folks first begin to work with Flink, watermarks can be a source of confusion. But this topic doesn't have to be complicated. Let's work through a simple example that will show why we need watermarks, and how they work.
4 observations when working with Watermarks in Apache Flink
In the example below, we have a stream of timestamped events that arrive somewhat out of order. The numbers shown are timestamps that indicate when these events actually occurred. The first event to arrive happened at time 4, and it is followed by an event that happened earlier, at time 2, and so on:
Note that this is an example of event-time processing, meaning that the timestamps reflect when the events took place, and not when they were processed. Event-time processing is a powerful abstraction that makes it possible to create streaming applications that behave consistently whether they are processing live data or re-processing historic data.
You can find more information about the different notions of time such as event time, processing time or ingestion time in the Apache Flink documentation.
Now imagine that we are trying to create a stream sorter. This is meant to be an application that processes each event from a stream as it arrives and emits a new stream containing the same events but ordered by their timestamps.
The first element our stream sorter sees is the 4, but we can't just immediately release it as the first element of the sorted stream. It may have arrived out of order, and an earlier event might yet arrive. In fact, we have the benefit of some god-like knowledge of this stream's future, and we can see that our stream sorter should wait at least until the 2 arrives before producing any results.
Some buffering, and some delay is necessary.
If we do this wrong, we could end up waiting forever. First, our application saw an event from time 4, and then an event from time 2. Will an event from a time earlier than 2 ever arrive? Maybe. Maybe not. We could wait forever and never see a 1.
Eventually, we have to be courageous and emit the 2 as the start of the sorted stream.
What we need then is some sort of policy that defines when, for any given timestamped event, to stop waiting for the arrival of earlier events.
This is precisely what watermarks do — they define when to stop waiting for earlier events.
Event-time processing in Flink depends on special timestamped elements, called watermarks, that are inserted into the stream either by the data sources or by a watermark generator. A watermark with a timestamp t can be understood as an assertion that all events with timestamps < t have (with reasonable probability) already arrived.
When should our stream sorter stop waiting, and push out the 2 to start the sorted stream? When a watermark arrives with a timestamp of 2, or greater.
We can imagine different policies for deciding how to generate watermarks.
We know that each event arrives after some delay and that these delays vary, so some events are delayed more than others. One simple approach is to assume that these delays are bounded by some maximum delay. Flink refers to this strategy as bounded-out-of-orderness watermarking. It's easy to imagine more complex approaches to watermarking, but for many applications, a fixed delay works well enough.
If you want to build an application like a stream sorter, Flink's ProcessFunction is the right building block. It provides access to event-time timers (that is, callbacks that fire based on the arrival of watermarks), and has hooks for managing the state needed for buffering events until it's their turn to be sent downstream.
For more information on Apache Flink’s ProcessFunction, you can read the Apache Flink 1.6.0 documentation. If you want to get additional support and advice on Watermarks and other Apache Flink functions and use cases, signup to a training near you or contact us for more information below.