Roman Khachatryan created FLINK-20097:
-----------------------------------------
Summary: Race conditions in InputChannel.ChannelStatePersister
Key: FLINK-20097
URL:
https://issues.apache.org/jira/browse/FLINK-20097 Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing, Runtime / Network
Affects Versions: 1.12.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
Fix For: 1.12.0
In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() always update pendingCheckpointBarrierId, potentially overwriting newer id (or BARRIER_RECEIVED value) with an old one.
For stopPersisting(), consider a case:
# Two consecutive UC barriers arrive at the same channel (1st being stale at some point)
# In RemoteInputChannel.onBuffer, netty thread updates pendingCheckpointBarrierId to BARRIER_RECEIVED
# Task thread processes the 1st barrier and triggers a checkpoint
Task thread processes the 2nd barrier and aborts 1st checkpoint, calling stopPersisting() from UC controller and setting pendingCheckpointBarrierId to CHECKPOINT_COMPLETED
# Task thread starts 2nd checkpoint and calls startPersisting() setting pendingCheckpointBarrierId to 2
# now new buffers have a chance to be included in the 2nd checkpoint (though they belong to the next one)
For pendingCheckpointBarrierId(), consider an input gate with two channels A and B and two barriers 1 and 2:
# Channel A receives both barriers, channel B receives nothing yet
# Task thread processes both barriers on A, eventually triggering 2nd checkpoint
# Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2)
# Channel B receives the 1st barrier and becomes BARRIER_RECEIVED
# No buffers in B between barriers 1 and 2 will be included in the checkpoint
# Channel B receives the 2nd barrier which will eventually conclude the checkpoint
I see a solution in doing an action only if passed checkpointId >= pendingCheckpointId. For that, a separate field will be needed to hold the status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it shouldn't be a problem.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)