RocksDB's LOG file comes in handy when troubleshooting Flink with RocksDB. How can I configure RocksDB logging?
Note: This section applies to Flink 1.10 or later
By default, Flink uses the log level HEADER_LEVEL
for RocksDB. This essentially disables RocksDB logging and only prints RocksDB configuration to its log file. The main reason for this behavior is that this log file is not controllable in size prior to Flink 1.14. You can change these defaults but beware of the consequences.
How to configure RocksDB logging depends on the version of Flink you are using. Flink 1.13 or later supports changing RocksDB log level via configuration. Flink 1.14 additionally supports specifying the logging directory so you can, for example, put it onto a (separate) volume that is retained after container shutdown and can be used for debugging purposes. Thanks to the new RocksDB version, you can also configure log rotation in Flink 1.14 or later. For example:
state.backend.rocksdb.log.level: INFO_LEVEL
state.backend.rocksdb.log.max-file-size: 10MB
state.backend.rocksdb.log.file-num: 10
# This is set to Flink's log directory in Flink 1.15+ by default
state.backend.rocksdb.log.dir: /rocksdb/logs
state.backend.rocksdb.log.level: INFO_LEVEL
For older Flink versions and more low-level control, you can define your own custom Options Factory. See details below.
If you are using older versions of Flink (<1.13), you want to configure logging directory in Flink 1.13, or you want to dump RocksDB statistics in RocksDB's LOG
file, you can create a custom Options Factory by extending Flink's DefaultConfigurableOptionsFactory
. This mechanism gives you options to configure RocksDB logging while still allowing your jobs to continue using any other RocksDB tuning options the way you used them before.
Important: DefaultConfigurableOptionsFactory
was not really meant for being extended and may change among releases. If you plan to take this into production, you should write your own complete ConfigurableRocksDBOptionsFactory
and set all the options you need in there.
First, if you have not done so yet, add a dependency to Flink's RocksDB state backend. For example, add this to your Maven project's pom.xml
:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Then create your own Options Factory:
package com.ververica.troubleshooting;
import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;
public class MyCustomRocksDBOptionsFactory extends DefaultConfigurableOptionsFactory {
private static final long serialVersionUID = 1L;
private String dbLogDir = "";
@Override
public DBOptions createDBOptions(DBOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
currentOptions = super.createDBOptions(currentOptions, handlesToClose);
currentOptions.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
currentOptions.setStatsDumpPeriodSec(60);
currentOptions.setDbLogDir(dbLogDir);
return currentOptions;
}
@Override
public String toString() {
return this.getClass().toString() + "{" + super.toString() + '}';
}
/**
* Set directory where RocksDB writes its info LOG file (empty = data dir,
* otherwise the data directory's absolute path will be used as the log
* file prefix).
*/
public void setDbLogDir(String dbLogDir) {
this.dbLogDir = dbLogDir;
}
}
Three points in createDBOptions
are important here:
setInfoLogLevel(InfoLogLevel.INFO_LEVEL)
sets the logging level to INFO from which you would get a decent amount of logging data (increase if needed)setStatsDumpPeriodSec(60)
dumps various RocksDB statistics every this many seconds: this includes compaction statistics.setDbLogDir(dbLogDir)
specifies the path where to put the LOG
file: depending on what you are trying to troubleshoot, you can just use a local directory or you may need to put this onto a distributed file system (or persistent volume) to survive node/pod/job restartsWith the custom Options Factory, you can configure Flink either programmatically or through its flink-conf.yaml
.
Note: The state backend interface is changed since Flink 1.13; we provide both versions below.
Flink 1.13 or later
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend()
env.setStateBackend(stateBackend);
env.getCheckpointConfig().setCheckpointStorage("file:///path/to/checkpoints");
Flink 1.12 or earlier
RocksDBStateBackend stateBackend =
new RocksDBStateBackend("file:///path/to/checkpoints");
Then
MyCustomRocksDBOptionsFactory options = new MyCustomRocksDBOptionsFactory();
options.setDbLogDir("/path/to/rocksdb/logging/");
stateBackend.setRocksDBOptions(options);
If you want to configure your options factory completely via flink-conf.yaml
, you can extend the code above to update its settings from the configuration. The code below exemplifies how to do this for the log directory.
Note: The interface slightly changed since Flink 1.11; we provide both versions below.
Flink 1.11 or later
public static final ConfigOption<String> LOG_DIR =
key("state.backend.rocksdb.log.dir")
.stringType()
.noDefaultValue()
.withDescription("Location of RocksDB's info LOG file " +
"(empty = data dir, otherwise the data directory's " +
"absolute path will be used as the log file prefix)");
@Override
public DefaultConfigurableOptionsFactory configure(ReadableConfig configuration) {
DefaultConfigurableOptionsFactory optionsFactory =
super.configure(configuration);
this.dbLogDir = configuration.getOptional(LOG_DIR).orElse(this.dbLogDir);
return optionsFactory;
}
FFlink 1.10
public static final ConfigOption<String> LOG_DIR =
key("state.backend.rocksdb.log.dir")
.stringType()
.noDefaultValue()
.withDescription("Location of RocksDB's info LOG file " +
"(empty = data dir, otherwise the data directory's " +
"absolute path will be used as the log file prefix)");
@Override
public DefaultConfigurableOptionsFactory configure(Configuration configuration) {
DefaultConfigurableOptionsFactory optionsFactory =
super.configure(configuration);
this.dbLogDir = configuration.getOptional(LOG_DIR).orElse(this.dbLogDir);
return optionsFactory;
}
With the code additions from above, you can simply adapt your flink-conf.yaml
and configure it like this:
state.backend.rocksdb.log.dir: /path/to/rocksdb/logging/
state.backend.rocksdb.options-factory: com.ververica.troubleshooting.MyCustomRocksDBOptionsFactory
Note: Configuring the options factory via flink-conf.yaml
will apply the options factory to all jobs started in the Flink cluster. Make sure that this class is available cluster-wide or in all jobs started on this cluster! For Ververica Platform deployments, this will not be a problem since each deployment spawns its own Flink cluster.