When loading/restoring a Flink state snapshot (checkpoint/savepoint), you may face the following exception:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for StreamMap_0a448493b4782967b150582570326227_(1/1) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:335)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:148)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:319)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible.
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:141)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 15 more
The exact exception stack trace may be different depending on the configured state backend and Flink version.
Flink 1.8 - 1.19
As stated in the stack trace above, the reason for this error is that the key serializer is not compatible.
In the specific example illustrated above, the incompatibility happens at the keyBy()
used for the operator StreamMap_0a448493b4782967b150582570326227
. Since StreamMap
is Flink's operator class wrapping a MapFunction
, you should check the keyBy()
call before the map()
. Beside StreamMap
, you may also see other operator classes like StreamFilter
, StreamFlatMap
, ProcessOperator
, etc. which wrap functions of filter()
, flatMap()
, and process()
calls. Make sure that the key used in your job is compatible with the one stored in your checkpoint/savepoint, by checking its serializer.
One situation which often causes confusion is that you may call keyBy()
in different ways using, for example:
keyBy("id")
, orkeyBy(0)
Because the compiler does not know the exact type of the field in this case, the key type used here is actually Tuple
. When you try to load/restore the state snapshot with
keyBy(e -> e.f0)
, or keyBy(e -> e.getId())
, orKeySelector
, e.g.,.keyBy(new KeySelector<MyClass, Integer>() {
@Override
public String getKey(MyClass value) throws Exception {
return value.getId();
}
})
you will get a StateMigrationException: The new key serializer must be compatible
. Similarly, if you use a lambda or a KeySelector
to store the snapshot but load/restore it with a field name/position, you will get the same exception, as the key serializer is not compatible (Tuple
vs. the concrete type).
A similar situation may also occur with Flink's State Processor API. For example, assuming the field 0 or the field 'id' is an Integer
, and if keyBy(0)
or keyBy("id")
was used to create the savepoint, then when reading from the savepoint, your KeyedStateReaderFunction
must use Tuple1<Integer>
as the key type:
class MyStateReader extends KeyedStateReaderFunction<Tuple1<Integer>, ...>
{...}
Note: If you bootstrap a savepoint with the State Processor API and call keyBy()
with a field name/position, e.g.:
OperatorTransformation
.bootstrapWith(wordMap)
.keyBy(0) //or .keyBy("id")
.transform(new MyBootstrapper());
then your KeyedStateBootstrapFunction
(MyBootstrapper
in this case) must be:
class MyBootstrapper extends KeyedStateBootstrapFunction<Tuple,...>
{...}
Because of this confusion, keyBy()
with field names or positions has been deprecated since Flink 1.11. You should use Java lambdas or a KeySelector
.
Flink 1.12+ contains an improvement which prints the exact new and previous type serializers in the stack trace to help you finding the root cause of this exception. For example:
org.apache.flink.util.StateMigrationException: The new key serializer
(org.apache.flink.api.common.typeutils.base.IntSerializer@355e1d62)
must be compatible with the previous key serializer
(org.apache.flink.api.java.typeutils.runtime.TupleSerializer@daebabfd).
The key type used when reading a state snapshot is not compatible with the one used to store the snapshot. A common mistake is using keyBy()
with a field name (e.g., keyBy("id")
) or a field position (e.g., keyBy(0)
) when storing a state snapshot while using the type of that field directly without a Tuple wrapper when reading from the snapshot, or vice versa.
FLINK-17074: Deprecate DataStream.keyBy() that use tuple/expression keys
FLINK-19972: Provide more details when type serializers are not compatible