[jira] [Created] (FLINK-19741) InternalTimeServiceManager fails to restore if there are other users of raw keyed state streams

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-19741) InternalTimeServiceManager fails to restore if there are other users of raw keyed state streams

Shang Yuanchun (Jira)
Tzu-Li (Gordon) Tai created FLINK-19741:
-------------------------------------------

             Summary: InternalTimeServiceManager fails to restore if there are other users of raw keyed state streams
                 Key: FLINK-19741
                 URL: https://issues.apache.org/jira/browse/FLINK-19741
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
    Affects Versions: 1.11.2, 1.10.2, 1.9.3
            Reporter: Tzu-Li (Gordon) Tai


Currently, when restoring a {{InternalTimeServiceManager}}, we always attempt to read from the provided raw keyed state streams (using {{InternalTimerServiceSerializationProxy}}):
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L117

This is incorrect, since we don't write with the {{InternalTimerServiceSerializationProxy}} if the timers do not require legacy synchronous snapshots:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L192
(we currently only require that when users use RocksDB backend + heap timers).

Therefore, the {{InternalTimeServiceManager}} can fail to be created on restore due to corrupt reads in the case where:
* a checkpoint was taken where {{useLegacySynchronousSnapshots}} is false (hence nothing was written, and the time service manager does not use the raw keyed stream)
* the raw keyed stream is used elsewhere (e.g. in the Flink application's user code)
* on restore from the checkpoint, {{InternalTimeServiceManagerImpl.create()}} attempts to read from the raw keyed stream with the {{InternalTimerServiceSerializationProxy}}.

The fix would be to also respect the {{useLegacySynchronousSnapshots}} flag in:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L231



--
This message was sent by Atlassian Jira
(v8.3.4#803005)