As the original creators of Apache Flink, we are often asked for best practices around monitoring Flink applications and people want to know which metrics they should monitor for their applications at scale. In this two-piece blog post series based on a previous monitoring webinar, we would like to share our experiences around monitoring, focus on metrics to look at, and explain how to interpret them.
To get everyone on the same page, we will start detailing the concepts of monitoring and metrics with Apache Flink and answer the question of how to monitor Apache Flink. Then, we will turn our attention to metrics for continuous monitoring and explain their characteristics and how they help see what is going on. A follow-up blog post will touch on metrics that help during troubleshooting and present the dashboard we have been using here and for our Apache Flink training playground.
According to Technopedia , application monitoring...
...is a process that ensures that a software application processes and performs in an expected manner and scope. This technique routinely identifies, measures and evaluates the performance of an application and provides the means to isolate and rectify any abnormalities or shortcomings.
This is a rather generic area, so what makes monitoring Apache Flink special?
First of all, Flink is a distributed stream processing framework! So jobs may not only run at high parallelism on multiple machines with a vast set of different environments across several layers of abstractions (virtualization, Kubernetes,...), but also in different deployment modes (application, per-job, session mode). Larger jobs typically introduce several challenges for monitoring and interpreting what is going on underneath.
In addition to that, Flink jobs run arbitrary user code with custom business logic and vast differences in application behaviour that may affect the underlying resources and expose different bottlenecks.
The toolset for monitoring large-scale applications is quite broad:
Logging is very useful for debugging specific failures (so don't blindly disable or reduce it in order to decrease the size of the log). However, it is not really suitable for large-scale monitoring since you would typically have to log too much data to monitor your application continuously and it may be difficult to set up alerts based on that. A common metrics system typically has better tools in this regard.
System-level and cluster metrics help you understand your application at a coarse-grained level, e.g., how the underlying Flink, Kubernetes, ... cluster behaves, but don't give you much insight into the application layer. This is why we recommend enriching your application with custom application metrics that reflect the critical parts and states of your application. Based on these, you understand what your application is doing in a certain situation, e.g., when troubleshooting a problem.
Last but not least, profilers may give you very detailed insights into your application to identify bottlenecks, thread-locks and congestion, troubleshoot memory leaks and GC issues, and much more. While they are very useful for troubleshooting and performance tuning, they are not really suitable for large-scale monitoring because they accumulate quite a lot of data which is difficult to use for alerting before problems occur.
Flink's metrics system
Let's look at how Flink makes metrics available, which metric types are supported, and how these can be used.
First of all, Flink metrics are objects that pair an identifier with a measurement. There are 4 different types of metrics:
counters count things, e.g., numRecordsIn
meters both count and measure rates, e.g., numRecordsInPerSecond
histograms measure statistical distributions, e.g., latency distribution, and can easily provide percentiles
a gauge returns a value, e.g., uptime
Each metric is scoped to a specific context within the Flink runtime where the scope becomes a part of the metrics identifier as <system scope> [+ <user scope>] + <metric name>. There are system scopes for metrics at the JobManager (JM), JM+job, TaskManager (TM), TM+job, task, and operator. You can also fine-tune scope formats to adapt metrics identifiers to your need.
It is fairly easy to expose your own custom metric in Flink, and the Flink docs already provide a couple of details on this topic, so we will not go into details here.
Metrics can be exported by a MetricReporter, the Flink REST API, or Flink’s web UI. While it’s clear that the REST API is available for automation, you may be tempted to use Flink’s web UI for monitoring your application (and cluster). Flink, however, is not a fully-fledged metrics system, nor does it try to provide a suitable dashboard for monitoring - it is ok to use the UI for occasional checks as part of your debugging experience; it lacks the appropriate functionality of a proper metrics system and dashboard. Therefore, any serious application monitoring goes through metrics reporters, which write metric values into an external system. Flink bundles a vast set of such reporters, e.g., Prometheus, Datadoc, Ganglia, InfluxDB, etc., and it is also fairly easy to write your own should you need something else.
Metrics for continuous monitoring
Flink includes a lot of metrics out-of-the-box. Here, we will look at some of the most useful examples you can use to continuously monitor your (large-scale) applications, set up alerts on, or use for troubleshooting. We will provide screenshots of our metrics dashboard and will share the full dashboard in a follow-up blog post that wraps up this series.
The obvious things to keep an eye on for your Flink application are whether it is still running (uptime) or how often it was not running / restarting (numRestarts). You should set appropriate alerts for each of these.
There are a lot of different reasons why a job may restart. In a well-behaved system, though, most of these originate from transient failures and can be ignored. However, since these can happen at any time, you want to make sure that the recovery is fast, and this is what you can track with the restartingTime metric.
If the restartingTime is high and you have a latency-critical job, you may want to tune it in this regard.
Whenever a failure (transient or not) occurs and triggers a restart, fault-tolerant Flink applications will restart the job from the latest checkpoint or savepoint. For these cases, you may want to make sure that you don't have to go back too far and reprocess a lot of data because that would effectively add to your job’s downtime. You can track various (general) checkpointing metrics for this, e.g., numberOfCompletedCheckpoints, numberOfFailedCheckpoints, numberOfInProgressCheckpoints. You may also want to track checkpoint failures in advance for jobs that restart on a pre-configured number of failed checkpoint attempts. Alerts can be set on some threshold of the number of failed checkpoints or on the last successful checkpoint being created too long ago.*
* Unfortunately, there is no direct metric for "time of last completed checkpoint". You can either extract this from the last increase of numberOfCompletedCheckpoints, retrieve it from Flink's REST API, or calculate it from your checkpoint frequency and set an alert on a number of consecutive checkpoint failures.
While we are looking at checkpointing, let’s explore some more details like duration, size, and alignment time: lastCheckpointDuration should be below the checkpoint timeout or a checkpoint will fail. You could set up alerts on this in case the duration keeps increasing but it may be safer, with respect to false alarms, to alert on actual checkpoint failures instead (see above). lastCheckpointSize can be used to estimate the current state size. For incremental checkpoints, however, this wasn’t too meaningful in the past and Flink 1.15 adds the new lastCheckpointFullSize metric which provides the full checkpoint size (including files shared with previous checkpoints) rather than the number of bytes in an incremental checkpoint. checkpointAlignmentTime is also more a metric for troubleshooting checkpoint failures.
The most important part to look at if your job runs event-time logic is: "Are we making progress?".
For applications using event-time processing, it is important that the watermarks continue to advance. It is good to monitor the watermark at time-sensitive operators, such as process functions and windows, by watching the difference between the current timestamp and the watermark. If this event-time skew or event-time lag is unusually high, then this indicates that either (1) you are processing older events, perhaps during recovery from an outage, or (2) something upstream has not sent a watermark for a long time, e.g., a source has become idle. In the latter case, you need to find a workaround or fix your code.
To get the event-time lag from built-in metrics, we can plot the timestamp of gathering the value of the currentOutputWatermark metric versus the actual value and set an appropriate alert.
We can do so at different levels, e.g.,
to gather information about how event-time is making progress throughout the job graph, we could group by the different operators of the job; or
if we want to inspect event-time skew among subtasks, then we plot each subtask of a particular operator as shown.
A downside of the built-in metrics approach, however, is that processing time timestamps, at least in Prometheus, only have second accuracy. For very low-latency use cases, this may not be enough...
In general, the event-time lag definition from above is working quite well, but it also includes the time that events spend while waiting in external systems (outside Flink, e.g., in your Kafka topic, or even before that one while writing to it). If you want a more fine-grained resolution, to solely look at the time spent in Flink, or monitor a different flavor of a processing delay, e.g., by excluding lag from Async I/O, late events handling, etc., or want to include delays from transactional sinks while operating in end-to-end exactly-once mode, you will need your own custom metric to reflect your definition.
In our training exercises, for example, we create our own custom eventTimeLag metric, which we update whenever a window fires. Since we are using a histogram, we can also easily plot percentiles which may be useful for SLA management.
Alternatively, you can also look at Flink’s built-in latency markers, but they are more of a debugging tool and a bit special in their definition.
Last but not least, I would like to highlight a few connector-specific metrics to indicate whether we are keeping up with the external systems. While reading from Kafka or Kinesis, for example, records-lag-max and millisBehindLatest, respectively, indicate how far a consumer (group) is behind the head of the message queue. Flink forwards these connector metrics into Flink's metric system for convenience.
- records-lag-max shows the maximum lag in terms of the number of records for any partition. An increasing value over time is your best indication that the consumer group is not keeping up with the producers.
- millisBehindLatest shows the number of milliseconds a consumer is behind the head of the stream. For any consumer and Kinesis shard, this indicates how far it is behind the current time.
Any performance monitoring should come with throughput measurements!
Flink offers metrics such as numRecords(In|Out)PerSecond or numRecords(In|Out) for each subtask*. Although these are available for all tasks in your job, due to backpressure propagating upstream in Flink, it is usually enough to monitor the throughput on the output of the sources and configure alerting on that one. Additional details per task and/or subtask may help you during troubleshooting and performance tuning.
JVM metrics (1): Continuous monitoring
Going one level deeper, there are also a couple of system-level metrics to look at for continuous monitoring, starting at the JVM and going into further metrics such as those from the underlying deployment system, e.g., Kubernetes.
On Kubernetes, container_memory_working_set_bytes is an accurate snapshot of how much memory your pod's containers are using in total, including memory that the JVM can reach and those areas it cannot, e.g., native memory from RocksDB. A reasonable alert here would compare this with the value of container_spec_memory_limit_bytes and notify you early before a machine is being restarted or will help you troubleshoot why a restart occurred. Similarly, any of the Status.JVM.Memory.* metrics will help you keep an eye on the JVM memory and its components.
See Metrics for a full list of system metrics provided by Flink. Further metrics may be collected outside of Flink for underlying system components.
In this first part of a two-piece blog post series on monitoring large-scale Apache Flink applications, we have presented the concepts around Flink’s metrics system and introduced various useful metrics for continuous monitoring. These metrics can be set up with proper alerts to inform you about imminent failures and allow you to monitor cluster and application health and checkpointing progress. We presented different ways to track latency and observe your application’s throughput for performance monitoring. These metrics also serve as a good starting point for troubleshooting failures or performance degradations, which we will extend in the next part of this series, where we will also go into some details of our dashboard and share that with you to start your own. Stay tuned!