[jira] [Created] (FLINK-22946) Network buffer deadlock introduced by unaligned checkpoint

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-22946) Network buffer deadlock introduced by unaligned checkpoint

Shang Yuanchun (Jira)
Guokuai Huang created FLINK-22946:
-------------------------------------

             Summary: Network buffer deadlock introduced by unaligned checkpoint
                 Key: FLINK-22946
                 URL: https://issues.apache.org/jira/browse/FLINK-22946
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
    Affects Versions: 1.13.1, 1.13.0
            Reporter: Guokuai Huang
         Attachments: Screen Shot 2021-06-09 at 6.39.47 PM.png, Screen Shot 2021-06-09 at 7.02.04 PM.png

We recently encountered deadlock when using unaligned checkpoint. Below are two thread stacks that cause deadlock:
```
"Channel state writer Join(xxxxxx) (34/256)#1":"Channel state writer Join(xxxxxx) (34/256)#1": at org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296) - waiting to lock <0x00000007296dfa90> (a org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) at org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399) at org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200) - locked <0x00000007296bc450> (a org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) at org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:173) at org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:131) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$0(ChannelStateWriteRequest.java:63) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$785/722492780.accept(Unknown Source) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$2(ChannelStateWriteRequest.java:93) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$786/1360749026.accept(Unknown Source) at org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:212) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:82) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:59) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl$$Lambda$253/502209879.run(Unknown Source) at java.lang.Thread.run(Thread.java:745)"Join(xxxxxx) (34/256)#1": at org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296) - waiting to lock <0x00000007296bc450> (a org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) at org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399) at org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200) - locked <0x00000007296dfa90> (a org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$226/1801850008.runDefaultAction(Unknown Source) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$577/1653738667.run(Unknown Source) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:745)
```

The root cause of this problem is that unaligned checkpoint makes it possible that *under the same input gate,* *multiple input channels may recycle network buffer at the same time.*

Previously, network buffer recycling would only occur serially between input channels under the same input gate, because each sub-task is process Input data serially, and an input gate belongs to only one sub-task. When unaligned checkpoint is enabled, each input channel will take a snapshot of the input channel when it receives the checkpoint barrier, and the network buffer may be recycled in the process.

Unfortunately, *the current network buffer recovery mechanism does not take into account the situation where multiple input channels perform network buffer recovery at the same time.* The following code block is from that causes deadlock when multiple input channels under same input gate perform network buffer recycling at the same time.

!Screen Shot 2021-06-09 at 7.02.04 PM.png!

The solution to this problem is very straightforward. Here are two possible solutions:
*1. Case by case solution.* Note that input channel A (locked A) gave the released network buffer to input channel B (waiting to lock B), and input channel B (locked B) gave the released network buffer to input channel A (waiting to lock A) ), so when an input channel releases the network buffer, first check whether it is also waiting for the network buffer, and if it is, directly allocate it to itself, which can avoid the situation that different input channels exchange network buffers.
2. *A straightforward solution.* Considering that the input channel occupies the lock during recycle to remove the network buffer from the bufferQueue, the subsequent operations do not need to hold this lock. Therefore, we only need to place Buffer::recycleBuffer outside the bufferQueue lock to avoid deadlock.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)