Hello community,
I'm currently struggling with an Apache Beam batch pipeline on top of Flink. The pipeline runs smoothly in smaller environments, but in production it always ends up with `connection timeout` in one of the last shuffle phases. org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f not reachable. at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) ... Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: ##########/10.249.28.39:25709 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at ... Basically the pipeline looks as follows: read "skewed" sources -> reshuffle -> flatMap (performs a heavy computation - few secs per element) -> write to multiple outputs (~4) - cluster size: 100 tms - slots per tm: 4 - data size per single job run ranging from 100G to 1TB - job paralelism: 400 I've tried to increase netty `taskmanager.network.netty.client.connectTimeoutSec` with no luck. Also increasing # of netty threads did not help. JVM performs ok (no ooms, gc pauses, ...). Connect backlog defaults to 128. This is probably caused by netty threads being blocked on the server side. All these threads share the same lock, so increasing number of threads won't help. "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] java.lang.Thread.State: RUNNABLE at java.lang.Number.<init>(Number.java:55) at java.lang.Long.<init>(Long.java:947) at sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) at java.lang.reflect.Field.get(Field.java:393) at org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) at org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) at org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) at org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) at org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) at org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) at org.apache.flink.runtime.io.network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) - locked <0x00000006d822e180> (a java.lang.Object) at org.apache.flink.runtime.io.network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) - locked <0x00000006cad32578> (a java.util.HashMap) at org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) - locked <0x000000079767ff38> (a java.lang.Object) at org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) at org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) This may be related to mmap backed BoundedData implementation, where `nextBuffer` seems to be somehow expensive (reflection, skipping empty buffers?) . Just to note, last phases only shuffle metadata (kilobyte scale), but the job paralelism remains the same due to beam nature (400). Does this sound familiar to anyone? Do you have any suggestions how to solve this? Thanks for help, David |
Hi David,
I'm unfortunately not familiar with these parts of Flink but I'm pulling Piotr in who might be able to tell you more. Cheers, Till On Mon, Jan 27, 2020 at 5:58 PM David Morávek <[hidden email]> wrote: > Hello community, > > I'm currently struggling with an Apache Beam batch pipeline on top of > Flink. The pipeline runs smoothly in smaller environments, but in > production it always ends up with `connection timeout` in one of the last > shuffle phases. > > org.apache.flink.runtime.io > .network.partition.consumer.PartitionConnectionException: > Connection for partition > 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f not > reachable. > at org.apache.flink.runtime.io > .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) > at org.apache.flink.runtime.io > .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) > at org.apache.flink.runtime.io > .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) > at > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) > at > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > ... > Caused by: > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: > Connection timed out: ##########/10.249.28.39:25709 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > at ... > > Basically the pipeline looks as follows: > > read "skewed" sources -> reshuffle -> flatMap (performs a heavy computation > - few secs per element) -> write to multiple outputs (~4) > > - cluster size: 100 tms > - slots per tm: 4 > - data size per single job run ranging from 100G to 1TB > - job paralelism: 400 > > I've tried to increase netty > `taskmanager.network.netty.client.connectTimeoutSec` with no luck. Also > increasing # of netty threads did not help. JVM performs ok (no ooms, gc > pauses, ...). Connect backlog defaults to 128. > > This is probably caused by netty threads being blocked on the server side. > All these threads share the same lock, so increasing number of threads > won't help. > > "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 > tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] > java.lang.Thread.State: RUNNABLE > at java.lang.Number.<init>(Number.java:55) > at java.lang.Long.<init>(Long.java:947) > at > > sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) > at java.lang.reflect.Field.get(Field.java:393) > at > > org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) > at > > org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) > at > > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) > at > > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) > at > > org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) > at > org.apache.flink.runtime.io > .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) > at > org.apache.flink.runtime.io > .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) > at > org.apache.flink.runtime.io > .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) > at > org.apache.flink.runtime.io > .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) > - locked <0x00000006d822e180> (a java.lang.Object) > at > org.apache.flink.runtime.io > .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) > at > org.apache.flink.runtime.io > .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) > - locked <0x00000006cad32578> (a java.util.HashMap) > at > org.apache.flink.runtime.io > .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) > - locked <0x000000079767ff38> (a java.lang.Object) > at > org.apache.flink.runtime.io > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) > at > org.apache.flink.runtime.io > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) > at > > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > > This may be related to mmap backed BoundedData implementation, where > `nextBuffer` seems to be somehow expensive (reflection, skipping empty > buffers?) . Just to note, last phases only shuffle metadata (kilobyte > scale), but the job paralelism remains the same due to beam nature (400). > > Does this sound familiar to anyone? Do you have any suggestions how to > solve this? > > Thanks for help, > David > |
Hi David,
The usual cause for connection time out is long deployment. For example if your Job's jar is large and takes long time to distribute across the cluster. I’m not sure if large state could affect this as well or not. Are you sure that’s not the case? The think you are suggesting, I haven’t heard about previously, but indeed theoretically it could happen. Reading from mmap’ed sub partitions could block the Netty threads if kernel decides to drop mmap’ed page and it has to be read from the disks. Could you check your CPU and disks IO usage? This should be visible by high IOWait CPU usage. Could you for example post couple of sample results of iostat -xm 2 command from some representative Task Manager? If indeed disks are overloaded, changing Flink’s config option taskmanager.network.bounded-blocking-subpartition-type From default to `file` could solve the problem. FYI, this option is renamed in 1.10 to taskmanager.network.blocking-shuffle.type And it’s default value will be `file`. We would appreciate if you could get back to us with the results! Piotrek > On 28 Jan 2020, at 11:03, Till Rohrmann <[hidden email]> wrote: > > Hi David, > > I'm unfortunately not familiar with these parts of Flink but I'm pulling > Piotr in who might be able to tell you more. > > Cheers, > Till > > On Mon, Jan 27, 2020 at 5:58 PM David Morávek <[hidden email]> wrote: > >> Hello community, >> >> I'm currently struggling with an Apache Beam batch pipeline on top of >> Flink. The pipeline runs smoothly in smaller environments, but in >> production it always ends up with `connection timeout` in one of the last >> shuffle phases. >> >> org.apache.flink.runtime.io >> .network.partition.consumer.PartitionConnectionException: >> Connection for partition >> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f not >> reachable. >> at org.apache.flink.runtime.io >> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) >> at org.apache.flink.runtime.io >> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) >> at org.apache.flink.runtime.io >> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) >> at >> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) >> at >> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >> at java.lang.Thread.run(Thread.java:748) >> ... >> Caused by: >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: >> Connection timed out: ##########/10.249.28.39:25709 >> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) >> at >> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) >> at ... >> >> Basically the pipeline looks as follows: >> >> read "skewed" sources -> reshuffle -> flatMap (performs a heavy computation >> - few secs per element) -> write to multiple outputs (~4) >> >> - cluster size: 100 tms >> - slots per tm: 4 >> - data size per single job run ranging from 100G to 1TB >> - job paralelism: 400 >> >> I've tried to increase netty >> `taskmanager.network.netty.client.connectTimeoutSec` with no luck. Also >> increasing # of netty threads did not help. JVM performs ok (no ooms, gc >> pauses, ...). Connect backlog defaults to 128. >> >> This is probably caused by netty threads being blocked on the server side. >> All these threads share the same lock, so increasing number of threads >> won't help. >> >> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 >> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] >> java.lang.Thread.State: RUNNABLE >> at java.lang.Number.<init>(Number.java:55) >> at java.lang.Long.<init>(Long.java:947) >> at >> >> sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) >> at java.lang.reflect.Field.get(Field.java:393) >> at >> >> org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) >> at >> >> org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) >> at >> >> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) >> at >> >> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) >> at >> >> org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) >> at >> org.apache.flink.runtime.io >> .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) >> at >> org.apache.flink.runtime.io >> .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) >> at >> org.apache.flink.runtime.io >> .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) >> at >> org.apache.flink.runtime.io >> .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) >> - locked <0x00000006d822e180> (a java.lang.Object) >> at >> org.apache.flink.runtime.io >> .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) >> at >> org.apache.flink.runtime.io >> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) >> - locked <0x00000006cad32578> (a java.util.HashMap) >> at >> org.apache.flink.runtime.io >> .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) >> - locked <0x000000079767ff38> (a java.lang.Object) >> at >> org.apache.flink.runtime.io >> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) >> at >> org.apache.flink.runtime.io >> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) >> at >> >> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >> at >> >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) >> at >> >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) >> at >> >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) >> >> This may be related to mmap backed BoundedData implementation, where >> `nextBuffer` seems to be somehow expensive (reflection, skipping empty >> buffers?) . Just to note, last phases only shuffle metadata (kilobyte >> scale), but the job paralelism remains the same due to beam nature (400). >> >> Does this sound familiar to anyone? Do you have any suggestions how to >> solve this? >> >> Thanks for help, >> David >> |
Hi Piotr,
thanks for suggestions! In case of large jar, wouldn't this happen in previous stages as well (if so this should not be the case)? Also there shouldn't be any state involved (unless Beam IO's use it internally). I'll get back to you with with the results after checking TM's io stats. D. On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski <[hidden email]> wrote: > Hi David, > > The usual cause for connection time out is long deployment. For example if > your Job's jar is large and takes long time to distribute across the > cluster. I’m not sure if large state could affect this as well or not. Are > you sure that’s not the case? > > The think you are suggesting, I haven’t heard about previously, but indeed > theoretically it could happen. Reading from mmap’ed sub partitions could > block the Netty threads if kernel decides to drop mmap’ed page and it has > to be read from the disks. Could you check your CPU and disks IO usage? > This should be visible by high IOWait CPU usage. Could you for example post > couple of sample results of > > iostat -xm 2 > > command from some representative Task Manager? If indeed disks are > overloaded, changing Flink’s config option > > taskmanager.network.bounded-blocking-subpartition-type > > From default to `file` could solve the problem. FYI, this option is > renamed in 1.10 to > > taskmanager.network.blocking-shuffle.type > > And it’s default value will be `file`. > > We would appreciate if you could get back to us with the results! > > Piotrek > > > On 28 Jan 2020, at 11:03, Till Rohrmann <[hidden email]> wrote: > > > > Hi David, > > > > I'm unfortunately not familiar with these parts of Flink but I'm pulling > > Piotr in who might be able to tell you more. > > > > Cheers, > > Till > > > > On Mon, Jan 27, 2020 at 5:58 PM David Morávek <[hidden email]> wrote: > > > >> Hello community, > >> > >> I'm currently struggling with an Apache Beam batch pipeline on top of > >> Flink. The pipeline runs smoothly in smaller environments, but in > >> production it always ends up with `connection timeout` in one of the > last > >> shuffle phases. > >> > >> org.apache.flink.runtime.io > >> .network.partition.consumer.PartitionConnectionException: > >> Connection for partition > >> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f not > >> reachable. > >> at org.apache.flink.runtime.io > >> > .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) > >> at org.apache.flink.runtime.io > >> > .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) > >> at org.apache.flink.runtime.io > >> > .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) > >> at > >> > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) > >> at > >> > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) > >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > >> at java.lang.Thread.run(Thread.java:748) > >> ... > >> Caused by: > >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: > >> Connection timed out: ##########/10.249.28.39:25709 > >> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > >> at > >> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > >> at ... > >> > >> Basically the pipeline looks as follows: > >> > >> read "skewed" sources -> reshuffle -> flatMap (performs a heavy > computation > >> - few secs per element) -> write to multiple outputs (~4) > >> > >> - cluster size: 100 tms > >> - slots per tm: 4 > >> - data size per single job run ranging from 100G to 1TB > >> - job paralelism: 400 > >> > >> I've tried to increase netty > >> `taskmanager.network.netty.client.connectTimeoutSec` with no luck. Also > >> increasing # of netty threads did not help. JVM performs ok (no ooms, gc > >> pauses, ...). Connect backlog defaults to 128. > >> > >> This is probably caused by netty threads being blocked on the server > side. > >> All these threads share the same lock, so increasing number of threads > >> won't help. > >> > >> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 > >> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] > >> java.lang.Thread.State: RUNNABLE > >> at java.lang.Number.<init>(Number.java:55) > >> at java.lang.Long.<init>(Long.java:947) > >> at > >> > >> > sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) > >> at java.lang.reflect.Field.get(Field.java:393) > >> at > >> > >> > org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) > >> at > >> > >> > org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) > >> at > >> > >> > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) > >> at > >> > >> > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) > >> at > >> > >> > org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) > >> at > >> org.apache.flink.runtime.io > >> > .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) > >> at > >> org.apache.flink.runtime.io > >> > .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) > >> at > >> org.apache.flink.runtime.io > >> > .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) > >> at > >> org.apache.flink.runtime.io > >> > .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) > >> - locked <0x00000006d822e180> (a java.lang.Object) > >> at > >> org.apache.flink.runtime.io > >> > .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) > >> at > >> org.apache.flink.runtime.io > >> > .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) > >> - locked <0x00000006cad32578> (a java.util.HashMap) > >> at > >> org.apache.flink.runtime.io > >> > .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) > >> - locked <0x000000079767ff38> (a java.lang.Object) > >> at > >> org.apache.flink.runtime.io > >> > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) > >> at > >> org.apache.flink.runtime.io > >> > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) > >> at > >> > >> > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > >> at > >> > >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > >> at > >> > >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > >> at > >> > >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > >> > >> This may be related to mmap backed BoundedData implementation, where > >> `nextBuffer` seems to be somehow expensive (reflection, skipping empty > >> buffers?) . Just to note, last phases only shuffle metadata (kilobyte > >> scale), but the job paralelism remains the same due to beam nature > (400). > >> > >> Does this sound familiar to anyone? Do you have any suggestions how to > >> solve this? > >> > >> Thanks for help, > >> David > >> > > |
Hi,
> In case of large jar, wouldn't this happen in previous stages as well (if > so this should not be the case)? I’m not exactly sure how jars are distributed, but if they are being sent/uploaded from one (or some other static/fixed number, like uploading to and reading from a DFS) node to all, this might not scale well. Also your dev deployment might not be stressing network/storage/something as much as production deployment, which can also affect time to deploy the job. What’s yours job size? (How large is the jar uploaded to Flink?) Also there might be other factors in play here, like if you are using Flink job mode (not stand alone), time to start up a Flink node might be too long. Some nodes are already up and running and they are time outing waiting for others to start up. > Also there shouldn't be any state involved > (unless Beam IO's use it internally). My bad. Instead of > - data size per single job run ranging from 100G to 1TB I read state size 100G to 1TB. Piotrek > > On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski <[hidden email]> wrote: > >> Hi David, >> >> The usual cause for connection time out is long deployment. For example if >> your Job's jar is large and takes long time to distribute across the >> cluster. I’m not sure if large state could affect this as well or not. Are >> you sure that’s not the case? >> >> The think you are suggesting, I haven’t heard about previously, but indeed >> theoretically it could happen. Reading from mmap’ed sub partitions could >> block the Netty threads if kernel decides to drop mmap’ed page and it has >> to be read from the disks. Could you check your CPU and disks IO usage? >> This should be visible by high IOWait CPU usage. Could you for example post >> couple of sample results of >> >> iostat -xm 2 >> >> command from some representative Task Manager? If indeed disks are >> overloaded, changing Flink’s config option >> >> taskmanager.network.bounded-blocking-subpartition-type >> >> From default to `file` could solve the problem. FYI, this option is >> renamed in 1.10 to >> >> taskmanager.network.blocking-shuffle.type >> >> And it’s default value will be `file`. >> >> We would appreciate if you could get back to us with the results! >> >> Piotrek >> >>> On 28 Jan 2020, at 11:03, Till Rohrmann <[hidden email]> wrote: >>> >>> Hi David, >>> >>> I'm unfortunately not familiar with these parts of Flink but I'm pulling >>> Piotr in who might be able to tell you more. >>> >>> Cheers, >>> Till >>> >>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <[hidden email]> wrote: >>> >>>> Hello community, >>>> >>>> I'm currently struggling with an Apache Beam batch pipeline on top of >>>> Flink. The pipeline runs smoothly in smaller environments, but in >>>> production it always ends up with `connection timeout` in one of the >> last >>>> shuffle phases. >>>> >>>> org.apache.flink.runtime.io >>>> .network.partition.consumer.PartitionConnectionException: >>>> Connection for partition >>>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f not >>>> reachable. >>>> at org.apache.flink.runtime.io >>>> >> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) >>>> at org.apache.flink.runtime.io >>>> >> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) >>>> at org.apache.flink.runtime.io >>>> >> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) >>>> at >>>> >> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) >>>> at >>>> >> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>> at java.lang.Thread.run(Thread.java:748) >>>> ... >>>> Caused by: >>>> >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: >>>> Connection timed out: ##########/10.249.28.39:25709 >>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) >>>> at >>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) >>>> at ... >>>> >>>> Basically the pipeline looks as follows: >>>> >>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy >> computation >>>> - few secs per element) -> write to multiple outputs (~4) >>>> >>>> - cluster size: 100 tms >>>> - slots per tm: 4 >>>> - data size per single job run ranging from 100G to 1TB >>>> - job paralelism: 400 >>>> >>>> I've tried to increase netty >>>> `taskmanager.network.netty.client.connectTimeoutSec` with no luck. Also >>>> increasing # of netty threads did not help. JVM performs ok (no ooms, gc >>>> pauses, ...). Connect backlog defaults to 128. >>>> >>>> This is probably caused by netty threads being blocked on the server >> side. >>>> All these threads share the same lock, so increasing number of threads >>>> won't help. >>>> >>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 >>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] >>>> java.lang.Thread.State: RUNNABLE >>>> at java.lang.Number.<init>(Number.java:55) >>>> at java.lang.Long.<init>(Long.java:947) >>>> at >>>> >>>> >> sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) >>>> at java.lang.reflect.Field.get(Field.java:393) >>>> at >>>> >>>> >> org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) >>>> at >>>> >>>> >> org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) >>>> at >>>> >>>> >> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) >>>> at >>>> >>>> >> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) >>>> at >>>> >>>> >> org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) >>>> at >>>> org.apache.flink.runtime.io >>>> >> .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) >>>> at >>>> org.apache.flink.runtime.io >>>> >> .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) >>>> at >>>> org.apache.flink.runtime.io >>>> >> .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) >>>> at >>>> org.apache.flink.runtime.io >>>> >> .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) >>>> - locked <0x00000006d822e180> (a java.lang.Object) >>>> at >>>> org.apache.flink.runtime.io >>>> >> .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) >>>> at >>>> org.apache.flink.runtime.io >>>> >> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) >>>> - locked <0x00000006cad32578> (a java.util.HashMap) >>>> at >>>> org.apache.flink.runtime.io >>>> >> .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) >>>> - locked <0x000000079767ff38> (a java.lang.Object) >>>> at >>>> org.apache.flink.runtime.io >>>> >> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) >>>> at >>>> org.apache.flink.runtime.io >>>> >> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) >>>> at >>>> >>>> >> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>> at >>>> >>>> >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) >>>> at >>>> >>>> >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) >>>> at >>>> >>>> >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) >>>> >>>> This may be related to mmap backed BoundedData implementation, where >>>> `nextBuffer` seems to be somehow expensive (reflection, skipping empty >>>> buffers?) . Just to note, last phases only shuffle metadata (kilobyte >>>> scale), but the job paralelism remains the same due to beam nature >> (400). >>>> >>>> Does this sound familiar to anyone? Do you have any suggestions how to >>>> solve this? >>>> >>>> Thanks for help, >>>> David >>>> >> >> |
Hi!
Concerning JAR files: I think this has nothing to do with it, it is a batch shuffle after all. The previous stage must have completed already. A few things that come to my mind: - What Flink version are you using? 1.9? - Are you sure that the source TaskManager is still running? Earlier Flink versions had an issue with releasing TMs too early, sometimes before the result was fetched by a consumer. - The buffer creation on the sender / netty server side is more expensive than necessary, but should be nowhere near as expensive to cause a stall. Can you elaborate on what the shared lock is that all server threads are using? Best, Stephan On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <[hidden email]> wrote: > Hi, > > > In case of large jar, wouldn't this happen in previous stages as well (if > > so this should not be the case)? > > I’m not exactly sure how jars are distributed, but if they are being > sent/uploaded from one (or some other static/fixed number, like uploading > to and reading from a DFS) node to all, this might not scale well. Also > your dev deployment might not be stressing network/storage/something as > much as production deployment, which can also affect time to deploy the job. > > What’s yours job size? (How large is the jar uploaded to Flink?) > > Also there might be other factors in play here, like if you are using > Flink job mode (not stand alone), time to start up a Flink node might be > too long. Some nodes are already up and running and they are time outing > waiting for others to start up. > > > Also there shouldn't be any state involved > > (unless Beam IO's use it internally). > > My bad. Instead of > > > - data size per single job run ranging from 100G to 1TB > > I read state size 100G to 1TB. > > Piotrek > > > > > On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski <[hidden email]> > wrote: > > > >> Hi David, > >> > >> The usual cause for connection time out is long deployment. For example > if > >> your Job's jar is large and takes long time to distribute across the > >> cluster. I’m not sure if large state could affect this as well or not. > Are > >> you sure that’s not the case? > >> > >> The think you are suggesting, I haven’t heard about previously, but > indeed > >> theoretically it could happen. Reading from mmap’ed sub partitions could > >> block the Netty threads if kernel decides to drop mmap’ed page and it > has > >> to be read from the disks. Could you check your CPU and disks IO usage? > >> This should be visible by high IOWait CPU usage. Could you for example > post > >> couple of sample results of > >> > >> iostat -xm 2 > >> > >> command from some representative Task Manager? If indeed disks are > >> overloaded, changing Flink’s config option > >> > >> taskmanager.network.bounded-blocking-subpartition-type > >> > >> From default to `file` could solve the problem. FYI, this option is > >> renamed in 1.10 to > >> > >> taskmanager.network.blocking-shuffle.type > >> > >> And it’s default value will be `file`. > >> > >> We would appreciate if you could get back to us with the results! > >> > >> Piotrek > >> > >>> On 28 Jan 2020, at 11:03, Till Rohrmann <[hidden email]> wrote: > >>> > >>> Hi David, > >>> > >>> I'm unfortunately not familiar with these parts of Flink but I'm > pulling > >>> Piotr in who might be able to tell you more. > >>> > >>> Cheers, > >>> Till > >>> > >>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <[hidden email]> wrote: > >>> > >>>> Hello community, > >>>> > >>>> I'm currently struggling with an Apache Beam batch pipeline on top of > >>>> Flink. The pipeline runs smoothly in smaller environments, but in > >>>> production it always ends up with `connection timeout` in one of the > >> last > >>>> shuffle phases. > >>>> > >>>> org.apache.flink.runtime.io > >>>> .network.partition.consumer.PartitionConnectionException: > >>>> Connection for partition > >>>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f not > >>>> reachable. > >>>> at org.apache.flink.runtime.io > >>>> > >> > .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) > >>>> at org.apache.flink.runtime.io > >>>> > >> > .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) > >>>> at org.apache.flink.runtime.io > >>>> > >> > .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) > >>>> at > >>>> > >> > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) > >>>> at > >>>> > >> > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) > >>>> at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) > >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > >>>> at java.lang.Thread.run(Thread.java:748) > >>>> ... > >>>> Caused by: > >>>> > >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: > >>>> Connection timed out: ##########/10.249.28.39:25709 > >>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > >>>> at > >>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > >>>> at ... > >>>> > >>>> Basically the pipeline looks as follows: > >>>> > >>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy > >> computation > >>>> - few secs per element) -> write to multiple outputs (~4) > >>>> > >>>> - cluster size: 100 tms > >>>> - slots per tm: 4 > >>>> - data size per single job run ranging from 100G to 1TB > >>>> - job paralelism: 400 > >>>> > >>>> I've tried to increase netty > >>>> `taskmanager.network.netty.client.connectTimeoutSec` with no luck. > Also > >>>> increasing # of netty threads did not help. JVM performs ok (no ooms, > gc > >>>> pauses, ...). Connect backlog defaults to 128. > >>>> > >>>> This is probably caused by netty threads being blocked on the server > >> side. > >>>> All these threads share the same lock, so increasing number of threads > >>>> won't help. > >>>> > >>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 > >>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] > >>>> java.lang.Thread.State: RUNNABLE > >>>> at java.lang.Number.<init>(Number.java:55) > >>>> at java.lang.Long.<init>(Long.java:947) > >>>> at > >>>> > >>>> > >> > sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) > >>>> at java.lang.reflect.Field.get(Field.java:393) > >>>> at > >>>> > >>>> > >> > org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) > >>>> at > >>>> > >>>> > >> > org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) > >>>> at > >>>> > >>>> > >> > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) > >>>> at > >>>> > >>>> > >> > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) > >>>> at > >>>> > >>>> > >> > org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) > >>>> - locked <0x00000006d822e180> (a java.lang.Object) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) > >>>> - locked <0x00000006cad32578> (a java.util.HashMap) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) > >>>> - locked <0x000000079767ff38> (a java.lang.Object) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) > >>>> at > >>>> > >>>> > >> > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > >>>> at > >>>> > >>>> > >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > >>>> at > >>>> > >>>> > >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > >>>> at > >>>> > >>>> > >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > >>>> > >>>> This may be related to mmap backed BoundedData implementation, where > >>>> `nextBuffer` seems to be somehow expensive (reflection, skipping empty > >>>> buffers?) . Just to note, last phases only shuffle metadata (kilobyte > >>>> scale), but the job paralelism remains the same due to beam nature > >> (400). > >>>> > >>>> Does this sound familiar to anyone? Do you have any suggestions how to > >>>> solve this? > >>>> > >>>> Thanks for help, > >>>> David > >>>> > >> > >> > > |
Hi Stephan,
I've actually managed to narrow problem down to blocked netty server threads. I'm using 1.9.1 with few custom patches <https://github.com/dmvk/flink/commits/1.9.1-szn>, that are not relevant to this issue. To highlight the problem, I've added these checks to ResultPartitionManager <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d>, which measure how long "netty tasks" take to execute (monitor acquisition + actual execution). We indeed have a pretty busy cluster, with high load and io waits (mostly due to ongoing shuffles and computations). From the measurements I can see numbers like: createSubpartitionView: 129255ms createSubpartitionView: 129333ms createSubpartitionView: 129353ms createSubpartitionView: 129354ms createSubpartitionView: 144419ms createSubpartitionView: 144653ms createSubpartitionView: 144759ms createSubpartitionView: 144905ms releasePartition: 145218ms releasePartition: 145250ms releasePartition: 145256ms releasePartition: 145263ms These vary a lot, depending on what other pipelines are being simultaneously executed. These numbers imply that at least for 145 seconds (which is greater than conn. timeout), taskmanger was not able to accept any connection (because of netty internals <https://github.com/netty/netty/issues/240>). Switching to *file* backed BoundedData implementation didn't help, because there are still heavy IO ops being executed by netty threads when monitor is acquired (eg. deletion of backing file). 1) I've tried to make more fine-grained locking (locking as single partition insead of partitionManager). This helped a little, but pipeline is still unable to finish in some cases. 2) Currently I'm trying to completely remove locking from ResultPartitionManager, as it is seems only relevant for internal datastructure access (can be replaced with java.concurrent). I think this should also have an impact to overall job completition times in some cases. What do you think? On Tue, Jan 28, 2020 at 9:53 PM Stephan Ewen <[hidden email]> wrote: > Hi! > > Concerning JAR files: I think this has nothing to do with it, it is a batch > shuffle after all. The previous stage must have completed already. > > A few things that come to my mind: > - What Flink version are you using? 1.9? > - Are you sure that the source TaskManager is still running? Earlier > Flink versions had an issue with releasing TMs too early, sometimes before > the result was fetched by a consumer. > - The buffer creation on the sender / netty server side is more expensive > than necessary, but should be nowhere near as expensive to cause a stall. > > Can you elaborate on what the shared lock is that all server threads are > using? > > Best, > Stephan > > > On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <[hidden email]> > wrote: > > > Hi, > > > > > In case of large jar, wouldn't this happen in previous stages as well > (if > > > so this should not be the case)? > > > > I’m not exactly sure how jars are distributed, but if they are being > > sent/uploaded from one (or some other static/fixed number, like uploading > > to and reading from a DFS) node to all, this might not scale well. Also > > your dev deployment might not be stressing network/storage/something as > > much as production deployment, which can also affect time to deploy the > job. > > > > What’s yours job size? (How large is the jar uploaded to Flink?) > > > > Also there might be other factors in play here, like if you are using > > Flink job mode (not stand alone), time to start up a Flink node might be > > too long. Some nodes are already up and running and they are time outing > > waiting for others to start up. > > > > > Also there shouldn't be any state involved > > > (unless Beam IO's use it internally). > > > > My bad. Instead of > > > > > - data size per single job run ranging from 100G to 1TB > > > > I read state size 100G to 1TB. > > > > Piotrek > > > > > > > > On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski <[hidden email]> > > wrote: > > > > > >> Hi David, > > >> > > >> The usual cause for connection time out is long deployment. For > example > > if > > >> your Job's jar is large and takes long time to distribute across the > > >> cluster. I’m not sure if large state could affect this as well or not. > > Are > > >> you sure that’s not the case? > > >> > > >> The think you are suggesting, I haven’t heard about previously, but > > indeed > > >> theoretically it could happen. Reading from mmap’ed sub partitions > could > > >> block the Netty threads if kernel decides to drop mmap’ed page and it > > has > > >> to be read from the disks. Could you check your CPU and disks IO > usage? > > >> This should be visible by high IOWait CPU usage. Could you for example > > post > > >> couple of sample results of > > >> > > >> iostat -xm 2 > > >> > > >> command from some representative Task Manager? If indeed disks are > > >> overloaded, changing Flink’s config option > > >> > > >> taskmanager.network.bounded-blocking-subpartition-type > > >> > > >> From default to `file` could solve the problem. FYI, this option is > > >> renamed in 1.10 to > > >> > > >> taskmanager.network.blocking-shuffle.type > > >> > > >> And it’s default value will be `file`. > > >> > > >> We would appreciate if you could get back to us with the results! > > >> > > >> Piotrek > > >> > > >>> On 28 Jan 2020, at 11:03, Till Rohrmann <[hidden email]> > wrote: > > >>> > > >>> Hi David, > > >>> > > >>> I'm unfortunately not familiar with these parts of Flink but I'm > > pulling > > >>> Piotr in who might be able to tell you more. > > >>> > > >>> Cheers, > > >>> Till > > >>> > > >>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <[hidden email]> > wrote: > > >>> > > >>>> Hello community, > > >>>> > > >>>> I'm currently struggling with an Apache Beam batch pipeline on top > of > > >>>> Flink. The pipeline runs smoothly in smaller environments, but in > > >>>> production it always ends up with `connection timeout` in one of the > > >> last > > >>>> shuffle phases. > > >>>> > > >>>> org.apache.flink.runtime.io > > >>>> .network.partition.consumer.PartitionConnectionException: > > >>>> Connection for partition > > >>>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f > not > > >>>> reachable. > > >>>> at org.apache.flink.runtime.io > > >>>> > > >> > > > .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) > > >>>> at org.apache.flink.runtime.io > > >>>> > > >> > > > .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) > > >>>> at org.apache.flink.runtime.io > > >>>> > > >> > > > .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) > > >>>> at > > >>>> > > >> > > > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) > > >>>> at > > >>>> > > >> > > > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) > > >>>> at > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) > > >>>> at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > > >>>> at java.lang.Thread.run(Thread.java:748) > > >>>> ... > > >>>> Caused by: > > >>>> > > >> > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: > > >>>> Connection timed out: ##########/10.249.28.39:25709 > > >>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > > >>>> at > > >>>> > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > > >>>> at ... > > >>>> > > >>>> Basically the pipeline looks as follows: > > >>>> > > >>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy > > >> computation > > >>>> - few secs per element) -> write to multiple outputs (~4) > > >>>> > > >>>> - cluster size: 100 tms > > >>>> - slots per tm: 4 > > >>>> - data size per single job run ranging from 100G to 1TB > > >>>> - job paralelism: 400 > > >>>> > > >>>> I've tried to increase netty > > >>>> `taskmanager.network.netty.client.connectTimeoutSec` with no luck. > > Also > > >>>> increasing # of netty threads did not help. JVM performs ok (no > ooms, > > gc > > >>>> pauses, ...). Connect backlog defaults to 128. > > >>>> > > >>>> This is probably caused by netty threads being blocked on the server > > >> side. > > >>>> All these threads share the same lock, so increasing number of > threads > > >>>> won't help. > > >>>> > > >>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 > > >>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] > > >>>> java.lang.Thread.State: RUNNABLE > > >>>> at java.lang.Number.<init>(Number.java:55) > > >>>> at java.lang.Long.<init>(Long.java:947) > > >>>> at > > >>>> > > >>>> > > >> > > > sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) > > >>>> at java.lang.reflect.Field.get(Field.java:393) > > >>>> at > > >>>> > > >>>> > > >> > > > org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) > > >>>> at > > >>>> > > >>>> > > >> > > > org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) > > >>>> at > > >>>> > > >>>> > > >> > > > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) > > >>>> at > > >>>> > > >>>> > > >> > > > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) > > >>>> at > > >>>> > > >>>> > > >> > > > org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) > > >>>> at > > >>>> org.apache.flink.runtime.io > > >>>> > > >> > > > .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) > > >>>> at > > >>>> org.apache.flink.runtime.io > > >>>> > > >> > > > .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) > > >>>> at > > >>>> org.apache.flink.runtime.io > > >>>> > > >> > > > .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) > > >>>> at > > >>>> org.apache.flink.runtime.io > > >>>> > > >> > > > .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) > > >>>> - locked <0x00000006d822e180> (a java.lang.Object) > > >>>> at > > >>>> org.apache.flink.runtime.io > > >>>> > > >> > > > .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) > > >>>> at > > >>>> org.apache.flink.runtime.io > > >>>> > > >> > > > .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) > > >>>> - locked <0x00000006cad32578> (a java.util.HashMap) > > >>>> at > > >>>> org.apache.flink.runtime.io > > >>>> > > >> > > > .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) > > >>>> - locked <0x000000079767ff38> (a java.lang.Object) > > >>>> at > > >>>> org.apache.flink.runtime.io > > >>>> > > >> > > > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) > > >>>> at > > >>>> org.apache.flink.runtime.io > > >>>> > > >> > > > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) > > >>>> at > > >>>> > > >>>> > > >> > > > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > > >>>> at > > >>>> > > >>>> > > >> > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > > >>>> at > > >>>> > > >>>> > > >> > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > > >>>> at > > >>>> > > >>>> > > >> > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > > >>>> > > >>>> This may be related to mmap backed BoundedData implementation, where > > >>>> `nextBuffer` seems to be somehow expensive (reflection, skipping > empty > > >>>> buffers?) . Just to note, last phases only shuffle metadata > (kilobyte > > >>>> scale), but the job paralelism remains the same due to beam nature > > >> (400). > > >>>> > > >>>> Does this sound familiar to anyone? Do you have any suggestions how > to > > >>>> solve this? > > >>>> > > >>>> Thanks for help, > > >>>> David > > >>>> > > >> > > >> > > > > > |
/CC Piotr and Zhijiang
Sounds reasonable at first glance. Would like to hear Piotr's and Zhijiang's take, though, they know that code better than me. On Wed, Jan 29, 2020 at 1:58 PM David Morávek <[hidden email]> wrote: > Hi Stephan, > > I've actually managed to narrow problem down to blocked netty server > threads. I'm using 1.9.1 with few custom patches > <https://github.com/dmvk/flink/commits/1.9.1-szn>, that are not relevant > to > this issue. > > To highlight the problem, I've added these checks to ResultPartitionManager > <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d>, which > measure how long "netty tasks" take to execute (monitor acquisition + > actual execution). > > We indeed have a pretty busy cluster, with high load and io waits (mostly > due to ongoing shuffles and computations). From the measurements I can see > numbers like: > > createSubpartitionView: 129255ms > createSubpartitionView: 129333ms > createSubpartitionView: 129353ms > createSubpartitionView: 129354ms > createSubpartitionView: 144419ms > createSubpartitionView: 144653ms > createSubpartitionView: 144759ms > createSubpartitionView: 144905ms > releasePartition: 145218ms > releasePartition: 145250ms > releasePartition: 145256ms > releasePartition: 145263ms > > These vary a lot, depending on what other pipelines are being > simultaneously executed. These numbers imply that at least for 145 seconds > (which is greater than conn. timeout), taskmanger was not able to accept > any connection (because of netty internals > <https://github.com/netty/netty/issues/240>). > > Switching to *file* backed BoundedData implementation didn't help, because > there are still heavy IO ops being executed by netty threads when monitor > is acquired (eg. deletion of backing file). > > 1) I've tried to make more fine-grained locking (locking as single > partition insead of partitionManager). This helped a little, but pipeline > is still unable to finish in some cases. > > 2) Currently I'm trying to completely remove locking from > ResultPartitionManager, as it is seems only relevant for internal > datastructure access (can be replaced with java.concurrent). I think this > should also have an impact to overall job completition times in some cases. > > What do you think? > > On Tue, Jan 28, 2020 at 9:53 PM Stephan Ewen <[hidden email]> wrote: > > > Hi! > > > > Concerning JAR files: I think this has nothing to do with it, it is a > batch > > shuffle after all. The previous stage must have completed already. > > > > A few things that come to my mind: > > - What Flink version are you using? 1.9? > > - Are you sure that the source TaskManager is still running? Earlier > > Flink versions had an issue with releasing TMs too early, sometimes > before > > the result was fetched by a consumer. > > - The buffer creation on the sender / netty server side is more > expensive > > than necessary, but should be nowhere near as expensive to cause a stall. > > > > Can you elaborate on what the shared lock is that all server threads are > > using? > > > > Best, > > Stephan > > > > > > On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <[hidden email]> > > wrote: > > > > > Hi, > > > > > > > In case of large jar, wouldn't this happen in previous stages as well > > (if > > > > so this should not be the case)? > > > > > > I’m not exactly sure how jars are distributed, but if they are being > > > sent/uploaded from one (or some other static/fixed number, like > uploading > > > to and reading from a DFS) node to all, this might not scale well. Also > > > your dev deployment might not be stressing network/storage/something as > > > much as production deployment, which can also affect time to deploy the > > job. > > > > > > What’s yours job size? (How large is the jar uploaded to Flink?) > > > > > > Also there might be other factors in play here, like if you are using > > > Flink job mode (not stand alone), time to start up a Flink node might > be > > > too long. Some nodes are already up and running and they are time > outing > > > waiting for others to start up. > > > > > > > Also there shouldn't be any state involved > > > > (unless Beam IO's use it internally). > > > > > > My bad. Instead of > > > > > > > - data size per single job run ranging from 100G to 1TB > > > > > > I read state size 100G to 1TB. > > > > > > Piotrek > > > > > > > > > > > On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski <[hidden email] > > > > > wrote: > > > > > > > >> Hi David, > > > >> > > > >> The usual cause for connection time out is long deployment. For > > example > > > if > > > >> your Job's jar is large and takes long time to distribute across the > > > >> cluster. I’m not sure if large state could affect this as well or > not. > > > Are > > > >> you sure that’s not the case? > > > >> > > > >> The think you are suggesting, I haven’t heard about previously, but > > > indeed > > > >> theoretically it could happen. Reading from mmap’ed sub partitions > > could > > > >> block the Netty threads if kernel decides to drop mmap’ed page and > it > > > has > > > >> to be read from the disks. Could you check your CPU and disks IO > > usage? > > > >> This should be visible by high IOWait CPU usage. Could you for > example > > > post > > > >> couple of sample results of > > > >> > > > >> iostat -xm 2 > > > >> > > > >> command from some representative Task Manager? If indeed disks are > > > >> overloaded, changing Flink’s config option > > > >> > > > >> taskmanager.network.bounded-blocking-subpartition-type > > > >> > > > >> From default to `file` could solve the problem. FYI, this option is > > > >> renamed in 1.10 to > > > >> > > > >> taskmanager.network.blocking-shuffle.type > > > >> > > > >> And it’s default value will be `file`. > > > >> > > > >> We would appreciate if you could get back to us with the results! > > > >> > > > >> Piotrek > > > >> > > > >>> On 28 Jan 2020, at 11:03, Till Rohrmann <[hidden email]> > > wrote: > > > >>> > > > >>> Hi David, > > > >>> > > > >>> I'm unfortunately not familiar with these parts of Flink but I'm > > > pulling > > > >>> Piotr in who might be able to tell you more. > > > >>> > > > >>> Cheers, > > > >>> Till > > > >>> > > > >>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <[hidden email]> > > wrote: > > > >>> > > > >>>> Hello community, > > > >>>> > > > >>>> I'm currently struggling with an Apache Beam batch pipeline on top > > of > > > >>>> Flink. The pipeline runs smoothly in smaller environments, but in > > > >>>> production it always ends up with `connection timeout` in one of > the > > > >> last > > > >>>> shuffle phases. > > > >>>> > > > >>>> org.apache.flink.runtime.io > > > >>>> .network.partition.consumer.PartitionConnectionException: > > > >>>> Connection for partition > > > >>>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f > > not > > > >>>> reachable. > > > >>>> at org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) > > > >>>> at org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) > > > >>>> at org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) > > > >>>> at > > > >>>> > > > >> > > > > > > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) > > > >>>> at > > > >>>> > > > >> > > > > > > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) > > > >>>> at > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) > > > >>>> at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > > > >>>> at java.lang.Thread.run(Thread.java:748) > > > >>>> ... > > > >>>> Caused by: > > > >>>> > > > >> > > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: > > > >>>> Connection timed out: ##########/10.249.28.39:25709 > > > >>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > > > >>>> at > > > >>>> > > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > > > >>>> at ... > > > >>>> > > > >>>> Basically the pipeline looks as follows: > > > >>>> > > > >>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy > > > >> computation > > > >>>> - few secs per element) -> write to multiple outputs (~4) > > > >>>> > > > >>>> - cluster size: 100 tms > > > >>>> - slots per tm: 4 > > > >>>> - data size per single job run ranging from 100G to 1TB > > > >>>> - job paralelism: 400 > > > >>>> > > > >>>> I've tried to increase netty > > > >>>> `taskmanager.network.netty.client.connectTimeoutSec` with no luck. > > > Also > > > >>>> increasing # of netty threads did not help. JVM performs ok (no > > ooms, > > > gc > > > >>>> pauses, ...). Connect backlog defaults to 128. > > > >>>> > > > >>>> This is probably caused by netty threads being blocked on the > server > > > >> side. > > > >>>> All these threads share the same lock, so increasing number of > > threads > > > >>>> won't help. > > > >>>> > > > >>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 > > > >>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] > > > >>>> java.lang.Thread.State: RUNNABLE > > > >>>> at java.lang.Number.<init>(Number.java:55) > > > >>>> at java.lang.Long.<init>(Long.java:947) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) > > > >>>> at java.lang.reflect.Field.get(Field.java:393) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) > > > >>>> - locked <0x00000006d822e180> (a java.lang.Object) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) > > > >>>> - locked <0x00000006cad32578> (a java.util.HashMap) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) > > > >>>> - locked <0x000000079767ff38> (a java.lang.Object) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > > > >>>> > > > >>>> This may be related to mmap backed BoundedData implementation, > where > > > >>>> `nextBuffer` seems to be somehow expensive (reflection, skipping > > empty > > > >>>> buffers?) . Just to note, last phases only shuffle metadata > > (kilobyte > > > >>>> scale), but the job paralelism remains the same due to beam nature > > > >> (400). > > > >>>> > > > >>>> Does this sound familiar to anyone? Do you have any suggestions > how > > to > > > >>>> solve this? > > > >>>> > > > >>>> Thanks for help, > > > >>>> David > > > >>>> > > > >> > > > >> > > > > > > > > > |
Hi David,
> with high load and io waits How high values are talking about? Could you attach a CPU profiler and post the results somehow? Which threads are busy on what call trees? Regarding the idea of removing of the locks in the `ResultPartitionManager`. I guess it could help a bit, but I wouldn’t expect it to solve the problem fully. Doing a blocking IO in the Netty threads is already asking for troubles, as even without locks it can block and crash many things (like createSubpartitionView). But one improvement that we might take a look, is that `BoundedBlockingSubpartitionReader` constructor is doing a blocking calls on pre-fetching the buffer. If not for that, code inside `ResultPartitionManager` would probably be non blocking, or at least much less blocking. Piotrek > On 29 Jan 2020, at 14:05, Stephan Ewen <[hidden email]> wrote: > > /CC Piotr and Zhijiang > > Sounds reasonable at first glance. Would like to hear Piotr's and Zhijiang's take, though, they know that code better than me. > > On Wed, Jan 29, 2020 at 1:58 PM David Morávek <[hidden email] <mailto:[hidden email]>> wrote: > Hi Stephan, > > I've actually managed to narrow problem down to blocked netty server > threads. I'm using 1.9.1 with few custom patches > <https://github.com/dmvk/flink/commits/1.9.1-szn <https://github.com/dmvk/flink/commits/1.9.1-szn>>, that are not relevant to > this issue. > > To highlight the problem, I've added these checks to ResultPartitionManager > <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d>>, which > measure how long "netty tasks" take to execute (monitor acquisition + > actual execution). > > We indeed have a pretty busy cluster, with high load and io waits (mostly > due to ongoing shuffles and computations). From the measurements I can see > numbers like: > > createSubpartitionView: 129255ms > createSubpartitionView: 129333ms > createSubpartitionView: 129353ms > createSubpartitionView: 129354ms > createSubpartitionView: 144419ms > createSubpartitionView: 144653ms > createSubpartitionView: 144759ms > createSubpartitionView: 144905ms > releasePartition: 145218ms > releasePartition: 145250ms > releasePartition: 145256ms > releasePartition: 145263ms > > These vary a lot, depending on what other pipelines are being > simultaneously executed. These numbers imply that at least for 145 seconds > (which is greater than conn. timeout), taskmanger was not able to accept > any connection (because of netty internals > <https://github.com/netty/netty/issues/240 <https://github.com/netty/netty/issues/240>>). > > Switching to *file* backed BoundedData implementation didn't help, because > there are still heavy IO ops being executed by netty threads when monitor > is acquired (eg. deletion of backing file). > > 1) I've tried to make more fine-grained locking (locking as single > partition insead of partitionManager). This helped a little, but pipeline > is still unable to finish in some cases. > > 2) Currently I'm trying to completely remove locking from > ResultPartitionManager, as it is seems only relevant for internal > datastructure access (can be replaced with java.concurrent). I think this > should also have an impact to overall job completition times in some cases. > > What do you think? > > On Tue, Jan 28, 2020 at 9:53 PM Stephan Ewen <[hidden email] <mailto:[hidden email]>> wrote: > > > Hi! > > > > Concerning JAR files: I think this has nothing to do with it, it is a batch > > shuffle after all. The previous stage must have completed already. > > > > A few things that come to my mind: > > - What Flink version are you using? 1.9? > > - Are you sure that the source TaskManager is still running? Earlier > > Flink versions had an issue with releasing TMs too early, sometimes before > > the result was fetched by a consumer. > > - The buffer creation on the sender / netty server side is more expensive > > than necessary, but should be nowhere near as expensive to cause a stall. > > > > Can you elaborate on what the shared lock is that all server threads are > > using? > > > > Best, > > Stephan > > > > > > On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <[hidden email] <mailto:[hidden email]>> > > wrote: > > > > > Hi, > > > > > > > In case of large jar, wouldn't this happen in previous stages as well > > (if > > > > so this should not be the case)? > > > > > > I’m not exactly sure how jars are distributed, but if they are being > > > sent/uploaded from one (or some other static/fixed number, like uploading > > > to and reading from a DFS) node to all, this might not scale well. Also > > > your dev deployment might not be stressing network/storage/something as > > > much as production deployment, which can also affect time to deploy the > > job. > > > > > > What’s yours job size? (How large is the jar uploaded to Flink?) > > > > > > Also there might be other factors in play here, like if you are using > > > Flink job mode (not stand alone), time to start up a Flink node might be > > > too long. Some nodes are already up and running and they are time outing > > > waiting for others to start up. > > > > > > > Also there shouldn't be any state involved > > > > (unless Beam IO's use it internally). > > > > > > My bad. Instead of > > > > > > > - data size per single job run ranging from 100G to 1TB > > > > > > I read state size 100G to 1TB. > > > > > > Piotrek > > > > > > > > > > > On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski <[hidden email] <mailto:[hidden email]>> > > > wrote: > > > > > > > >> Hi David, > > > >> > > > >> The usual cause for connection time out is long deployment. For > > example > > > if > > > >> your Job's jar is large and takes long time to distribute across the > > > >> cluster. I’m not sure if large state could affect this as well or not. > > > Are > > > >> you sure that’s not the case? > > > >> > > > >> The think you are suggesting, I haven’t heard about previously, but > > > indeed > > > >> theoretically it could happen. Reading from mmap’ed sub partitions > > could > > > >> block the Netty threads if kernel decides to drop mmap’ed page and it > > > has > > > >> to be read from the disks. Could you check your CPU and disks IO > > usage? > > > >> This should be visible by high IOWait CPU usage. Could you for example > > > post > > > >> couple of sample results of > > > >> > > > >> iostat -xm 2 > > > >> > > > >> command from some representative Task Manager? If indeed disks are > > > >> overloaded, changing Flink’s config option > > > >> > > > >> taskmanager.network.bounded-blocking-subpartition-type > > > >> > > > >> From default to `file` could solve the problem. FYI, this option is > > > >> renamed in 1.10 to > > > >> > > > >> taskmanager.network.blocking-shuffle.type > > > >> > > > >> And it’s default value will be `file`. > > > >> > > > >> We would appreciate if you could get back to us with the results! > > > >> > > > >> Piotrek > > > >> > > > >>> On 28 Jan 2020, at 11:03, Till Rohrmann <[hidden email] <mailto:[hidden email]>> > > wrote: > > > >>> > > > >>> Hi David, > > > >>> > > > >>> I'm unfortunately not familiar with these parts of Flink but I'm > > > pulling > > > >>> Piotr in who might be able to tell you more. > > > >>> > > > >>> Cheers, > > > >>> Till > > > >>> > > > >>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <[hidden email] <mailto:[hidden email]>> > > wrote: > > > >>> > > > >>>> Hello community, > > > >>>> > > > >>>> I'm currently struggling with an Apache Beam batch pipeline on top > > of > > > >>>> Flink. The pipeline runs smoothly in smaller environments, but in > > > >>>> production it always ends up with `connection timeout` in one of the > > > >> last > > > >>>> shuffle phases. > > > >>>> > > > >>>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> > > > >>>> .network.partition.consumer.PartitionConnectionException: > > > >>>> Connection for partition > > > >>>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f > > not > > > >>>> reachable. > > > >>>> at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> > > > >>>> > > > >> > > > > > .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) > > > >>>> at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> > > > >>>> > > > >> > > > > > .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) > > > >>>> at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> > > > >>>> > > > >> > > > > > .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) > > > >>>> at > > > >>>> > > > >> > > > > > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) > > > >>>> at > > > >>>> > > > >> > > > > > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) > > > >>>> at > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) > > > >>>> at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > > > >>>> at java.lang.Thread.run(Thread.java:748) > > > >>>> ... > > > >>>> Caused by: > > > >>>> > > > >> > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: > > > >>>> Connection timed out: ##########/10.249.28.39:25709 <http://10.249.28.39:25709/> > > > >>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > > > >>>> at > > > >>>> > > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > > > >>>> at ... > > > >>>> > > > >>>> Basically the pipeline looks as follows: > > > >>>> > > > >>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy > > > >> computation > > > >>>> - few secs per element) -> write to multiple outputs (~4) > > > >>>> > > > >>>> - cluster size: 100 tms > > > >>>> - slots per tm: 4 > > > >>>> - data size per single job run ranging from 100G to 1TB > > > >>>> - job paralelism: 400 > > > >>>> > > > >>>> I've tried to increase netty > > > >>>> `taskmanager.network.netty.client.connectTimeoutSec` with no luck. > > > Also > > > >>>> increasing # of netty threads did not help. JVM performs ok (no > > ooms, > > > gc > > > >>>> pauses, ...). Connect backlog defaults to 128. > > > >>>> > > > >>>> This is probably caused by netty threads being blocked on the server > > > >> side. > > > >>>> All these threads share the same lock, so increasing number of > > threads > > > >>>> won't help. > > > >>>> > > > >>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 > > > >>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] > > > >>>> java.lang.Thread.State: RUNNABLE > > > >>>> at java.lang.Number.<init>(Number.java:55) > > > >>>> at java.lang.Long.<init>(Long.java:947) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) > > > >>>> at java.lang.reflect.Field.get(Field.java:393) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) > > > >>>> at > > > >>>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> > > > >>>> > > > >> > > > > > .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) > > > >>>> at > > > >>>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> > > > >>>> > > > >> > > > > > .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) > > > >>>> at > > > >>>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> > > > >>>> > > > >> > > > > > .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) > > > >>>> at > > > >>>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> > > > >>>> > > > >> > > > > > .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) > > > >>>> - locked <0x00000006d822e180> (a java.lang.Object) > > > >>>> at > > > >>>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> > > > >>>> > > > >> > > > > > .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) > > > >>>> at > > > >>>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> > > > >>>> > > > >> > > > > > .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) > > > >>>> - locked <0x00000006cad32578> (a java.util.HashMap) > > > >>>> at > > > >>>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> > > > >>>> > > > >> > > > > > .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) > > > >>>> - locked <0x000000079767ff38> (a java.lang.Object) > > > >>>> at > > > >>>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> > > > >>>> > > > >> > > > > > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) > > > >>>> at > > > >>>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> > > > >>>> > > > >> > > > > > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > > > >>>> > > > >>>> This may be related to mmap backed BoundedData implementation, where > > > >>>> `nextBuffer` seems to be somehow expensive (reflection, skipping > > empty > > > >>>> buffers?) . Just to note, last phases only shuffle metadata > > (kilobyte > > > >>>> scale), but the job paralelism remains the same due to beam nature > > > >> (400). > > > >>>> > > > >>>> Does this sound familiar to anyone? Do you have any suggestions how > > to > > > >>>> solve this? > > > >>>> > > > >>>> Thanks for help, > > > >>>> David > > > >>>> > > > >> > > > >> > > > > > > > > |
Hi Piotr, removal of buffer prefetch in BoundedBlockingSubpartitionReader did not help, I've already tried that (there are still other problematic code paths, eg. releasePartition). I think it's perfectly ok to perform IO ops in netty threads, we just have to make sure, we can leverage multiple threads at once. Synchronization in ResultPartitionManager effectively decreases parallelism to one, and "netty tasks / unprocessed messages" keep piling up. Removing synchronization did solve the problem for me, because it allows flink to leverage the whole netty event loop pool and it's ok to have a single thread blocked for a little while (we still can accept connections with other threads). Let me think about how to get a relevant cpu graph from the TM, it's kind of hard to target a "defective node". Anyway attached are some graphs from such a busy node in time of failure. Is there any special reason for the synchronization I don't see? I have a feeling it's only for sychronizing `registredPartitions` map access and that it's perfectly ok not to synchronize `createSubpartitionView` and `releasePartition` calls. Thanks, D. On Wed, Jan 29, 2020 at 4:45 PM Piotr Nowojski <[hidden email]> wrote: Hi David, |
Just to clarify, these are bare metal nodes (128G ram, 16 cpus +
hyperthreading, 4xHDDS, 10g network), which run yarn, hdfs and hbase. D. On Wed, Jan 29, 2020 at 5:03 PM David Morávek <[hidden email]> wrote: > Hi Piotr, > > removal of buffer prefetch in BoundedBlockingSubpartitionReader did not > help, I've already tried that (there are still other problematic code > paths, eg. releasePartition). I think it's perfectly ok to perform IO ops > in netty threads, we just have to make sure, we can leverage multiple > threads at once. Synchronization in ResultPartitionManager effectively > decreases parallelism to one, and "netty tasks / unprocessed messages" keep > piling up. > > Removing synchronization *did solve* the problem for me, because it > allows flink to leverage the whole netty event loop pool and it's ok to > have a single thread blocked for a little while (we still can accept > connections with other threads). > > Let me think about how to get a relevant cpu graph from the TM, it's kind > of hard to target a "defective node". Anyway attached are some graphs from > such a busy node in time of failure. > > Is there any special reason for the synchronization I don't see? I have a > feeling it's only for sychronizing `registredPartitions` map access and > that it's perfectly ok not to synchronize `createSubpartitionView` and > `releasePartition` calls. > > Thanks, > D. > > On Wed, Jan 29, 2020 at 4:45 PM Piotr Nowojski <[hidden email]> > wrote: > >> Hi David, >> >> > with high load and io waits >> >> How high values are talking about? >> >> Could you attach a CPU profiler and post the results somehow? Which >> threads are busy on what call trees? >> >> >> >> >> Regarding the idea of removing of the locks in the >> `ResultPartitionManager`. I guess it could help a bit, but I wouldn’t >> expect it to solve the problem fully. >> >> Doing a blocking IO in the Netty threads is already asking for troubles, >> as even without locks it can block and crash many things (like >> createSubpartitionView). But one improvement that we might take a look, is >> that `BoundedBlockingSubpartitionReader` constructor is doing a blocking >> calls on pre-fetching the buffer. If not for that, code inside >> `ResultPartitionManager` would probably be non blocking, or at least much >> less blocking. >> >> Piotrek >> >> > On 29 Jan 2020, at 14:05, Stephan Ewen <[hidden email]> wrote: >> > >> > /CC Piotr and Zhijiang >> > >> > Sounds reasonable at first glance. Would like to hear Piotr's and >> Zhijiang's take, though, they know that code better than me. >> > >> > On Wed, Jan 29, 2020 at 1:58 PM David Morávek <[hidden email] >> <mailto:[hidden email]>> wrote: >> > Hi Stephan, >> > >> > I've actually managed to narrow problem down to blocked netty server >> > threads. I'm using 1.9.1 with few custom patches >> > <https://github.com/dmvk/flink/commits/1.9.1-szn < >> https://github.com/dmvk/flink/commits/1.9.1-szn>>, that are not relevant >> to >> > this issue. >> > >> > To highlight the problem, I've added these checks to >> ResultPartitionManager >> > <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d < >> https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d>>, which >> > measure how long "netty tasks" take to execute (monitor acquisition + >> > actual execution). >> > >> > We indeed have a pretty busy cluster, with high load and io waits >> (mostly >> > due to ongoing shuffles and computations). From the measurements I can >> see >> > numbers like: >> > >> > createSubpartitionView: 129255ms >> > createSubpartitionView: 129333ms >> > createSubpartitionView: 129353ms >> > createSubpartitionView: 129354ms >> > createSubpartitionView: 144419ms >> > createSubpartitionView: 144653ms >> > createSubpartitionView: 144759ms >> > createSubpartitionView: 144905ms >> > releasePartition: 145218ms >> > releasePartition: 145250ms >> > releasePartition: 145256ms >> > releasePartition: 145263ms >> > >> > These vary a lot, depending on what other pipelines are being >> > simultaneously executed. These numbers imply that at least for 145 >> seconds >> > (which is greater than conn. timeout), taskmanger was not able to accept >> > any connection (because of netty internals >> > <https://github.com/netty/netty/issues/240 < >> https://github.com/netty/netty/issues/240>>). >> > >> > Switching to *file* backed BoundedData implementation didn't help, >> because >> > there are still heavy IO ops being executed by netty threads when >> monitor >> > is acquired (eg. deletion of backing file). >> > >> > 1) I've tried to make more fine-grained locking (locking as single >> > partition insead of partitionManager). This helped a little, but >> pipeline >> > is still unable to finish in some cases. >> > >> > 2) Currently I'm trying to completely remove locking from >> > ResultPartitionManager, as it is seems only relevant for internal >> > datastructure access (can be replaced with java.concurrent). I think >> this >> > should also have an impact to overall job completition times in some >> cases. >> > >> > What do you think? >> > >> > On Tue, Jan 28, 2020 at 9:53 PM Stephan Ewen <[hidden email] <mailto: >> [hidden email]>> wrote: >> > >> > > Hi! >> > > >> > > Concerning JAR files: I think this has nothing to do with it, it is a >> batch >> > > shuffle after all. The previous stage must have completed already. >> > > >> > > A few things that come to my mind: >> > > - What Flink version are you using? 1.9? >> > > - Are you sure that the source TaskManager is still running? Earlier >> > > Flink versions had an issue with releasing TMs too early, sometimes >> before >> > > the result was fetched by a consumer. >> > > - The buffer creation on the sender / netty server side is more >> expensive >> > > than necessary, but should be nowhere near as expensive to cause a >> stall. >> > > >> > > Can you elaborate on what the shared lock is that all server threads >> are >> > > using? >> > > >> > > Best, >> > > Stephan >> > > >> > > >> > > On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <[hidden email] >> <mailto:[hidden email]>> >> > > wrote: >> > > >> > > > Hi, >> > > > >> > > > > In case of large jar, wouldn't this happen in previous stages as >> well >> > > (if >> > > > > so this should not be the case)? >> > > > >> > > > I’m not exactly sure how jars are distributed, but if they are being >> > > > sent/uploaded from one (or some other static/fixed number, like >> uploading >> > > > to and reading from a DFS) node to all, this might not scale well. >> Also >> > > > your dev deployment might not be stressing >> network/storage/something as >> > > > much as production deployment, which can also affect time to deploy >> the >> > > job. >> > > > >> > > > What’s yours job size? (How large is the jar uploaded to Flink?) >> > > > >> > > > Also there might be other factors in play here, like if you are >> using >> > > > Flink job mode (not stand alone), time to start up a Flink node >> might be >> > > > too long. Some nodes are already up and running and they are time >> outing >> > > > waiting for others to start up. >> > > > >> > > > > Also there shouldn't be any state involved >> > > > > (unless Beam IO's use it internally). >> > > > >> > > > My bad. Instead of >> > > > >> > > > > - data size per single job run ranging from 100G to 1TB >> > > > >> > > > I read state size 100G to 1TB. >> > > > >> > > > Piotrek >> > > > >> > > > > >> > > > > On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski < >> [hidden email] <mailto:[hidden email]>> >> > > > wrote: >> > > > > >> > > > >> Hi David, >> > > > >> >> > > > >> The usual cause for connection time out is long deployment. For >> > > example >> > > > if >> > > > >> your Job's jar is large and takes long time to distribute across >> the >> > > > >> cluster. I’m not sure if large state could affect this as well >> or not. >> > > > Are >> > > > >> you sure that’s not the case? >> > > > >> >> > > > >> The think you are suggesting, I haven’t heard about previously, >> but >> > > > indeed >> > > > >> theoretically it could happen. Reading from mmap’ed sub >> partitions >> > > could >> > > > >> block the Netty threads if kernel decides to drop mmap’ed page >> and it >> > > > has >> > > > >> to be read from the disks. Could you check your CPU and disks IO >> > > usage? >> > > > >> This should be visible by high IOWait CPU usage. Could you for >> example >> > > > post >> > > > >> couple of sample results of >> > > > >> >> > > > >> iostat -xm 2 >> > > > >> >> > > > >> command from some representative Task Manager? If indeed disks >> are >> > > > >> overloaded, changing Flink’s config option >> > > > >> >> > > > >> taskmanager.network.bounded-blocking-subpartition-type >> > > > >> >> > > > >> From default to `file` could solve the problem. FYI, this option >> is >> > > > >> renamed in 1.10 to >> > > > >> >> > > > >> taskmanager.network.blocking-shuffle.type >> > > > >> >> > > > >> And it’s default value will be `file`. >> > > > >> >> > > > >> We would appreciate if you could get back to us with the results! >> > > > >> >> > > > >> Piotrek >> > > > >> >> > > > >>> On 28 Jan 2020, at 11:03, Till Rohrmann <[hidden email] >> <mailto:[hidden email]>> >> > > wrote: >> > > > >>> >> > > > >>> Hi David, >> > > > >>> >> > > > >>> I'm unfortunately not familiar with these parts of Flink but I'm >> > > > pulling >> > > > >>> Piotr in who might be able to tell you more. >> > > > >>> >> > > > >>> Cheers, >> > > > >>> Till >> > > > >>> >> > > > >>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <[hidden email] >> <mailto:[hidden email]>> >> > > wrote: >> > > > >>> >> > > > >>>> Hello community, >> > > > >>>> >> > > > >>>> I'm currently struggling with an Apache Beam batch pipeline on >> top >> > > of >> > > > >>>> Flink. The pipeline runs smoothly in smaller environments, but >> in >> > > > >>>> production it always ends up with `connection timeout` in one >> of the >> > > > >> last >> > > > >>>> shuffle phases. >> > > > >>>> >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> .network.partition.consumer.PartitionConnectionException: >> > > > >>>> Connection for partition >> > > > >>>> >> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f >> > > not >> > > > >>>> reachable. >> > > > >>>> at org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) >> > > > >>>> at org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) >> > > > >>>> at org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) >> > > > >>>> at >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) >> > > > >>>> at >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) >> > > > >>>> at >> > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) >> > > > >>>> at >> > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >> > > > >>>> at java.lang.Thread.run(Thread.java:748) >> > > > >>>> ... >> > > > >>>> Caused by: >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: >> > > > >>>> Connection timed out: ##########/10.249.28.39:25709 < >> http://10.249.28.39:25709/> >> > > > >>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native >> Method) >> > > > >>>> at >> > > > >>>> >> > > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) >> > > > >>>> at ... >> > > > >>>> >> > > > >>>> Basically the pipeline looks as follows: >> > > > >>>> >> > > > >>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy >> > > > >> computation >> > > > >>>> - few secs per element) -> write to multiple outputs (~4) >> > > > >>>> >> > > > >>>> - cluster size: 100 tms >> > > > >>>> - slots per tm: 4 >> > > > >>>> - data size per single job run ranging from 100G to 1TB >> > > > >>>> - job paralelism: 400 >> > > > >>>> >> > > > >>>> I've tried to increase netty >> > > > >>>> `taskmanager.network.netty.client.connectTimeoutSec` with no >> luck. >> > > > Also >> > > > >>>> increasing # of netty threads did not help. JVM performs ok (no >> > > ooms, >> > > > gc >> > > > >>>> pauses, ...). Connect backlog defaults to 128. >> > > > >>>> >> > > > >>>> This is probably caused by netty threads being blocked on the >> server >> > > > >> side. >> > > > >>>> All these threads share the same lock, so increasing number of >> > > threads >> > > > >>>> won't help. >> > > > >>>> >> > > > >>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 >> > > > >>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] >> > > > >>>> java.lang.Thread.State: RUNNABLE >> > > > >>>> at java.lang.Number.<init>(Number.java:55) >> > > > >>>> at java.lang.Long.<init>(Long.java:947) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) >> > > > >>>> at java.lang.reflect.Field.get(Field.java:393) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) >> > > > >>>> - locked <0x00000006d822e180> (a java.lang.Object) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) >> > > > >>>> - locked <0x00000006cad32578> (a java.util.HashMap) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) >> > > > >>>> - locked <0x000000079767ff38> (a java.lang.Object) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) >> > > > >>>> >> > > > >>>> This may be related to mmap backed BoundedData implementation, >> where >> > > > >>>> `nextBuffer` seems to be somehow expensive (reflection, >> skipping >> > > empty >> > > > >>>> buffers?) . Just to note, last phases only shuffle metadata >> > > (kilobyte >> > > > >>>> scale), but the job paralelism remains the same due to beam >> nature >> > > > >> (400). >> > > > >>>> >> > > > >>>> Does this sound familiar to anyone? Do you have any >> suggestions how >> > > to >> > > > >>>> solve this? >> > > > >>>> >> > > > >>>> Thanks for help, >> > > > >>>> David >> > > > >>>> >> > > > >> >> > > > >> >> > > > >> > > > >> > > >> >> |
Hi,
>> I think it's perfectly ok to perform IO ops in netty threads, (…) >> Removing synchronization *did solve* the problem for me, because it >> allows flink to leverage the whole netty event loop pool and it's ok to >> have a single thread blocked for a little while (we still can accept >> connections with other threads). It’s discouraged pattern, as Netty have a thread pool for processing multiple channels, but a single channel is always handled by the same pre-defined thread (to the best of my knowledge). In Flink we are lucky that Netty threads are not doing anything critical besides registering partitions (heartbeats are handled independently) that could fail the job if blocked. And I guess you are right, if some threads are blocked on the IO, new (sub)partition registration should be handled by the non blocked threads, if not for the global lock. It sounds very hacky though. Also that's ignoring performance implication - one thread blocked on the disks IO, wastes CPU/network potential of ~1/16 channels (due to this fix pre determined assignment between channels <-> threads). In some scenarios that might be acceptable, with uniform tasks without data skew. But if there are simultaneously running multiple tasks with different work load patterns and/or a data skew, this can cause visible performance issues. Having said that, any rework to fully address this issue and make the IO non blocking, could be very substantial, so I would be more than happy to just kick the can down the road for now ;) >> removal of buffer prefetch in BoundedBlockingSubpartitionReader did not >> help, I've already tried that (there are still other problematic code >> paths, eg. releasePartition). Are there other problematic parts besides releasePartition that you have already analysed? Maybe it would be better to just try moving out those calls out of the `ResultPartitionManager` somehow call stack? >> Let me think about how to get a relevant cpu graph from the TM, it's kind >> of hard to target a "defective node". Thanks, I know it’s non trivial, but I would guess you do not have to target a “defective node”. If defective node is blocking for ~2 minutes during the failure, I’m pretty sure other nodes are being blocked constantly for seconds at a time, and profiler results from such nodes would allow us to confirm the issue and better understand what’s exactly happening. >> Anyway attached are some graphs from such a busy node in time of failure. I didn’t get/see any graphs? >> Is there any special reason for the synchronization I don't see? I have a >> feeling it's only for sychronizing `registredPartitions` map access and >> that it's perfectly ok not to synchronize `createSubpartitionView` and >> `releasePartition` calls. I’m not sure. Definitely removing this lock increases concurrency and so the potential for race conditions, especially on the releasing resources paths. After briefly looking at the code, I didn’t find any obvious issue, but there are some callback/notifications happening, and generally speaking resource releasing paths are pretty hard to reason about. Zhijiang might spot something, as he had a good eye for catching such problems in the past. Besides that, you could just run couple (10? 20?) travis runs. All of the ITCases from various modules (connectors, flink-tests, …) are pretty good in catching race conditions in the network stack. Piotrek > On 29 Jan 2020, at 17:05, David Morávek <[hidden email]> wrote: > > Just to clarify, these are bare metal nodes (128G ram, 16 cpus + > hyperthreading, 4xHDDS, 10g network), which run yarn, hdfs and hbase. > > D. > > On Wed, Jan 29, 2020 at 5:03 PM David Morávek <[hidden email]> > wrote: > >> Hi Piotr, >> >> removal of buffer prefetch in BoundedBlockingSubpartitionReader did not >> help, I've already tried that (there are still other problematic code >> paths, eg. releasePartition). I think it's perfectly ok to perform IO ops >> in netty threads, we just have to make sure, we can leverage multiple >> threads at once. Synchronization in ResultPartitionManager effectively >> decreases parallelism to one, and "netty tasks / unprocessed messages" keep >> piling up. >> >> Removing synchronization *did solve* the problem for me, because it >> allows flink to leverage the whole netty event loop pool and it's ok to >> have a single thread blocked for a little while (we still can accept >> connections with other threads). >> >> Let me think about how to get a relevant cpu graph from the TM, it's kind >> of hard to target a "defective node". Anyway attached are some graphs from >> such a busy node in time of failure. >> >> Is there any special reason for the synchronization I don't see? I have a >> feeling it's only for sychronizing `registredPartitions` map access and >> that it's perfectly ok not to synchronize `createSubpartitionView` and >> `releasePartition` calls. >> >> Thanks, >> D. >> >> On Wed, Jan 29, 2020 at 4:45 PM Piotr Nowojski <[hidden email]> >> wrote: >> >>> Hi David, >>> >>>> with high load and io waits >>> >>> How high values are talking about? >>> >>> Could you attach a CPU profiler and post the results somehow? Which >>> threads are busy on what call trees? >>> >>> >>> >>> >>> Regarding the idea of removing of the locks in the >>> `ResultPartitionManager`. I guess it could help a bit, but I wouldn’t >>> expect it to solve the problem fully. >>> >>> Doing a blocking IO in the Netty threads is already asking for troubles, >>> as even without locks it can block and crash many things (like >>> createSubpartitionView). But one improvement that we might take a look, is >>> that `BoundedBlockingSubpartitionReader` constructor is doing a blocking >>> calls on pre-fetching the buffer. If not for that, code inside >>> `ResultPartitionManager` would probably be non blocking, or at least much >>> less blocking. >>> >>> Piotrek >>> >>>> On 29 Jan 2020, at 14:05, Stephan Ewen <[hidden email]> wrote: >>>> >>>> /CC Piotr and Zhijiang >>>> >>>> Sounds reasonable at first glance. Would like to hear Piotr's and >>> Zhijiang's take, though, they know that code better than me. >>>> >>>> On Wed, Jan 29, 2020 at 1:58 PM David Morávek <[hidden email] >>> <mailto:[hidden email]>> wrote: >>>> Hi Stephan, >>>> >>>> I've actually managed to narrow problem down to blocked netty server >>>> threads. I'm using 1.9.1 with few custom patches >>>> <https://github.com/dmvk/flink/commits/1.9.1-szn < >>> https://github.com/dmvk/flink/commits/1.9.1-szn>>, that are not relevant >>> to >>>> this issue. >>>> >>>> To highlight the problem, I've added these checks to >>> ResultPartitionManager >>>> <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d < >>> https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d>>, which >>>> measure how long "netty tasks" take to execute (monitor acquisition + >>>> actual execution). >>>> >>>> We indeed have a pretty busy cluster, with high load and io waits >>> (mostly >>>> due to ongoing shuffles and computations). From the measurements I can >>> see >>>> numbers like: >>>> >>>> createSubpartitionView: 129255ms >>>> createSubpartitionView: 129333ms >>>> createSubpartitionView: 129353ms >>>> createSubpartitionView: 129354ms >>>> createSubpartitionView: 144419ms >>>> createSubpartitionView: 144653ms >>>> createSubpartitionView: 144759ms >>>> createSubpartitionView: 144905ms >>>> releasePartition: 145218ms >>>> releasePartition: 145250ms >>>> releasePartition: 145256ms >>>> releasePartition: 145263ms >>>> >>>> These vary a lot, depending on what other pipelines are being >>>> simultaneously executed. These numbers imply that at least for 145 >>> seconds >>>> (which is greater than conn. timeout), taskmanger was not able to accept >>>> any connection (because of netty internals >>>> <https://github.com/netty/netty/issues/240 < >>> https://github.com/netty/netty/issues/240>>). >>>> >>>> Switching to *file* backed BoundedData implementation didn't help, >>> because >>>> there are still heavy IO ops being executed by netty threads when >>> monitor >>>> is acquired (eg. deletion of backing file). >>>> >>>> 1) I've tried to make more fine-grained locking (locking as single >>>> partition insead of partitionManager). This helped a little, but >>> pipeline >>>> is still unable to finish in some cases. >>>> >>>> 2) Currently I'm trying to completely remove locking from >>>> ResultPartitionManager, as it is seems only relevant for internal >>>> datastructure access (can be replaced with java.concurrent). I think >>> this >>>> should also have an impact to overall job completition times in some >>> cases. >>>> >>>> What do you think? >>>> >>>> On Tue, Jan 28, 2020 at 9:53 PM Stephan Ewen <[hidden email] <mailto: >>> [hidden email]>> wrote: >>>> >>>>> Hi! >>>>> >>>>> Concerning JAR files: I think this has nothing to do with it, it is a >>> batch >>>>> shuffle after all. The previous stage must have completed already. >>>>> >>>>> A few things that come to my mind: >>>>> - What Flink version are you using? 1.9? >>>>> - Are you sure that the source TaskManager is still running? Earlier >>>>> Flink versions had an issue with releasing TMs too early, sometimes >>> before >>>>> the result was fetched by a consumer. >>>>> - The buffer creation on the sender / netty server side is more >>> expensive >>>>> than necessary, but should be nowhere near as expensive to cause a >>> stall. >>>>> >>>>> Can you elaborate on what the shared lock is that all server threads >>> are >>>>> using? >>>>> >>>>> Best, >>>>> Stephan >>>>> >>>>> >>>>> On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <[hidden email] >>> <mailto:[hidden email]>> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>>> In case of large jar, wouldn't this happen in previous stages as >>> well >>>>> (if >>>>>>> so this should not be the case)? >>>>>> >>>>>> I’m not exactly sure how jars are distributed, but if they are being >>>>>> sent/uploaded from one (or some other static/fixed number, like >>> uploading >>>>>> to and reading from a DFS) node to all, this might not scale well. >>> Also >>>>>> your dev deployment might not be stressing >>> network/storage/something as >>>>>> much as production deployment, which can also affect time to deploy >>> the >>>>> job. >>>>>> >>>>>> What’s yours job size? (How large is the jar uploaded to Flink?) >>>>>> >>>>>> Also there might be other factors in play here, like if you are >>> using >>>>>> Flink job mode (not stand alone), time to start up a Flink node >>> might be >>>>>> too long. Some nodes are already up and running and they are time >>> outing >>>>>> waiting for others to start up. >>>>>> >>>>>>> Also there shouldn't be any state involved >>>>>>> (unless Beam IO's use it internally). >>>>>> >>>>>> My bad. Instead of >>>>>> >>>>>>> - data size per single job run ranging from 100G to 1TB >>>>>> >>>>>> I read state size 100G to 1TB. >>>>>> >>>>>> Piotrek >>>>>> >>>>>>> >>>>>>> On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski < >>> [hidden email] <mailto:[hidden email]>> >>>>>> wrote: >>>>>>> >>>>>>>> Hi David, >>>>>>>> >>>>>>>> The usual cause for connection time out is long deployment. For >>>>> example >>>>>> if >>>>>>>> your Job's jar is large and takes long time to distribute across >>> the >>>>>>>> cluster. I’m not sure if large state could affect this as well >>> or not. >>>>>> Are >>>>>>>> you sure that’s not the case? >>>>>>>> >>>>>>>> The think you are suggesting, I haven’t heard about previously, >>> but >>>>>> indeed >>>>>>>> theoretically it could happen. Reading from mmap’ed sub >>> partitions >>>>> could >>>>>>>> block the Netty threads if kernel decides to drop mmap’ed page >>> and it >>>>>> has >>>>>>>> to be read from the disks. Could you check your CPU and disks IO >>>>> usage? >>>>>>>> This should be visible by high IOWait CPU usage. Could you for >>> example >>>>>> post >>>>>>>> couple of sample results of >>>>>>>> >>>>>>>> iostat -xm 2 >>>>>>>> >>>>>>>> command from some representative Task Manager? If indeed disks >>> are >>>>>>>> overloaded, changing Flink’s config option >>>>>>>> >>>>>>>> taskmanager.network.bounded-blocking-subpartition-type >>>>>>>> >>>>>>>> From default to `file` could solve the problem. FYI, this option >>> is >>>>>>>> renamed in 1.10 to >>>>>>>> >>>>>>>> taskmanager.network.blocking-shuffle.type >>>>>>>> >>>>>>>> And it’s default value will be `file`. >>>>>>>> >>>>>>>> We would appreciate if you could get back to us with the results! >>>>>>>> >>>>>>>> Piotrek >>>>>>>> >>>>>>>>> On 28 Jan 2020, at 11:03, Till Rohrmann <[hidden email] >>> <mailto:[hidden email]>> >>>>> wrote: >>>>>>>>> >>>>>>>>> Hi David, >>>>>>>>> >>>>>>>>> I'm unfortunately not familiar with these parts of Flink but I'm >>>>>> pulling >>>>>>>>> Piotr in who might be able to tell you more. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Till >>>>>>>>> >>>>>>>>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <[hidden email] >>> <mailto:[hidden email]>> >>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello community, >>>>>>>>>> >>>>>>>>>> I'm currently struggling with an Apache Beam batch pipeline on >>> top >>>>> of >>>>>>>>>> Flink. The pipeline runs smoothly in smaller environments, but >>> in >>>>>>>>>> production it always ends up with `connection timeout` in one >>> of the >>>>>>>> last >>>>>>>>>> shuffle phases. >>>>>>>>>> >>>>>>>>>> org.apache.flink.runtime.io < >>> http://org.apache.flink.runtime.io/> >>>>>>>>>> .network.partition.consumer.PartitionConnectionException: >>>>>>>>>> Connection for partition >>>>>>>>>> >>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f >>>>> not >>>>>>>>>> reachable. >>>>>>>>>> at org.apache.flink.runtime.io < >>> http://org.apache.flink.runtime.io/> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) >>>>>>>>>> at org.apache.flink.runtime.io < >>> http://org.apache.flink.runtime.io/> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) >>>>>>>>>> at org.apache.flink.runtime.io < >>> http://org.apache.flink.runtime.io/> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) >>>>>>>>>> at >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) >>>>>>>>>> at >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) >>>>>>>>>> at >>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) >>>>>>>>>> at >>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>> ... >>>>>>>>>> Caused by: >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: >>>>>>>>>> Connection timed out: ##########/10.249.28.39:25709 < >>> http://10.249.28.39:25709/> >>>>>>>>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native >>> Method) >>>>>>>>>> at >>>>>>>>>> >>>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) >>>>>>>>>> at ... >>>>>>>>>> >>>>>>>>>> Basically the pipeline looks as follows: >>>>>>>>>> >>>>>>>>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy >>>>>>>> computation >>>>>>>>>> - few secs per element) -> write to multiple outputs (~4) >>>>>>>>>> >>>>>>>>>> - cluster size: 100 tms >>>>>>>>>> - slots per tm: 4 >>>>>>>>>> - data size per single job run ranging from 100G to 1TB >>>>>>>>>> - job paralelism: 400 >>>>>>>>>> >>>>>>>>>> I've tried to increase netty >>>>>>>>>> `taskmanager.network.netty.client.connectTimeoutSec` with no >>> luck. >>>>>> Also >>>>>>>>>> increasing # of netty threads did not help. JVM performs ok (no >>>>> ooms, >>>>>> gc >>>>>>>>>> pauses, ...). Connect backlog defaults to 128. >>>>>>>>>> >>>>>>>>>> This is probably caused by netty threads being blocked on the >>> server >>>>>>>> side. >>>>>>>>>> All these threads share the same lock, so increasing number of >>>>> threads >>>>>>>>>> won't help. >>>>>>>>>> >>>>>>>>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 >>>>>>>>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] >>>>>>>>>> java.lang.Thread.State: RUNNABLE >>>>>>>>>> at java.lang.Number.<init>(Number.java:55) >>>>>>>>>> at java.lang.Long.<init>(Long.java:947) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) >>>>>>>>>> at java.lang.reflect.Field.get(Field.java:393) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.io < >>> http://org.apache.flink.runtime.io/> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.io < >>> http://org.apache.flink.runtime.io/> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.io < >>> http://org.apache.flink.runtime.io/> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.io < >>> http://org.apache.flink.runtime.io/> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) >>>>>>>>>> - locked <0x00000006d822e180> (a java.lang.Object) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.io < >>> http://org.apache.flink.runtime.io/> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.io < >>> http://org.apache.flink.runtime.io/> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) >>>>>>>>>> - locked <0x00000006cad32578> (a java.util.HashMap) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.io < >>> http://org.apache.flink.runtime.io/> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) >>>>>>>>>> - locked <0x000000079767ff38> (a java.lang.Object) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.io < >>> http://org.apache.flink.runtime.io/> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.io < >>> http://org.apache.flink.runtime.io/> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) >>>>>>>>>> >>>>>>>>>> This may be related to mmap backed BoundedData implementation, >>> where >>>>>>>>>> `nextBuffer` seems to be somehow expensive (reflection, >>> skipping >>>>> empty >>>>>>>>>> buffers?) . Just to note, last phases only shuffle metadata >>>>> (kilobyte >>>>>>>>>> scale), but the job paralelism remains the same due to beam >>> nature >>>>>>>> (400). >>>>>>>>>> >>>>>>>>>> Does this sound familiar to anyone? Do you have any >>> suggestions how >>>>> to >>>>>>>>>> solve this? >>>>>>>>>> >>>>>>>>>> Thanks for help, >>>>>>>>>> David >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>> >>> >>> |
One more thing. Could you create a JIRA ticket for this issue? We could also move the discussion there.
Piotrek > On 30 Jan 2020, at 12:14, Piotr Nowojski <[hidden email]> wrote: > > Hi, > >>> I think it's perfectly ok to perform IO ops in netty threads, > (…) >>> Removing synchronization *did solve* the problem for me, because it >>> allows flink to leverage the whole netty event loop pool and it's ok to >>> have a single thread blocked for a little while (we still can accept >>> connections with other threads). > > > It’s discouraged pattern, as Netty have a thread pool for processing multiple channels, but a single channel is always handled by the same pre-defined thread (to the best of my knowledge). In Flink we are lucky that Netty threads are not doing anything critical besides registering partitions (heartbeats are handled independently) that could fail the job if blocked. And I guess you are right, if some threads are blocked on the IO, new (sub)partition registration should be handled by the non blocked threads, if not for the global lock. > > It sounds very hacky though. Also that's ignoring performance implication - one thread blocked on the disks IO, wastes CPU/network potential of ~1/16 channels (due to this fix pre determined assignment between channels <-> threads). In some scenarios that might be acceptable, with uniform tasks without data skew. But if there are simultaneously running multiple tasks with different work load patterns and/or a data skew, this can cause visible performance issues. > > Having said that, any rework to fully address this issue and make the IO non blocking, could be very substantial, so I would be more than happy to just kick the can down the road for now ;) > >>> removal of buffer prefetch in BoundedBlockingSubpartitionReader did not >>> help, I've already tried that (there are still other problematic code >>> paths, eg. releasePartition). > > > Are there other problematic parts besides releasePartition that you have already analysed? Maybe it would be better to just try moving out those calls out of the `ResultPartitionManager` somehow call stack? > >>> Let me think about how to get a relevant cpu graph from the TM, it's kind >>> of hard to target a "defective node". > > Thanks, I know it’s non trivial, but I would guess you do not have to target a “defective node”. If defective node is blocking for ~2 minutes during the failure, I’m pretty sure other nodes are being blocked constantly for seconds at a time, and profiler results from such nodes would allow us to confirm the issue and better understand what’s exactly happening. > >>> Anyway attached are some graphs from such a busy node in time of failure. > > I didn’t get/see any graphs? > >>> Is there any special reason for the synchronization I don't see? I have a >>> feeling it's only for sychronizing `registredPartitions` map access and >>> that it's perfectly ok not to synchronize `createSubpartitionView` and >>> `releasePartition` calls. > > I’m not sure. Definitely removing this lock increases concurrency and so the potential for race conditions, especially on the releasing resources paths. After briefly looking at the code, I didn’t find any obvious issue, but there are some callback/notifications happening, and generally speaking resource releasing paths are pretty hard to reason about. > > Zhijiang might spot something, as he had a good eye for catching such problems in the past. > > Besides that, you could just run couple (10? 20?) travis runs. All of the ITCases from various modules (connectors, flink-tests, …) are pretty good in catching race conditions in the network stack. > > Piotrek > >> On 29 Jan 2020, at 17:05, David Morávek <[hidden email]> wrote: >> >> Just to clarify, these are bare metal nodes (128G ram, 16 cpus + >> hyperthreading, 4xHDDS, 10g network), which run yarn, hdfs and hbase. >> >> D. >> >> On Wed, Jan 29, 2020 at 5:03 PM David Morávek <[hidden email]> >> wrote: >> >>> Hi Piotr, >>> >>> removal of buffer prefetch in BoundedBlockingSubpartitionReader did not >>> help, I've already tried that (there are still other problematic code >>> paths, eg. releasePartition). I think it's perfectly ok to perform IO ops >>> in netty threads, we just have to make sure, we can leverage multiple >>> threads at once. Synchronization in ResultPartitionManager effectively >>> decreases parallelism to one, and "netty tasks / unprocessed messages" keep >>> piling up. >>> >>> Removing synchronization *did solve* the problem for me, because it >>> allows flink to leverage the whole netty event loop pool and it's ok to >>> have a single thread blocked for a little while (we still can accept >>> connections with other threads). >>> >>> Let me think about how to get a relevant cpu graph from the TM, it's kind >>> of hard to target a "defective node". Anyway attached are some graphs from >>> such a busy node in time of failure. >>> >>> Is there any special reason for the synchronization I don't see? I have a >>> feeling it's only for sychronizing `registredPartitions` map access and >>> that it's perfectly ok not to synchronize `createSubpartitionView` and >>> `releasePartition` calls. >>> >>> Thanks, >>> D. >>> >>> On Wed, Jan 29, 2020 at 4:45 PM Piotr Nowojski <[hidden email]> >>> wrote: >>> >>>> Hi David, >>>> >>>>> with high load and io waits >>>> >>>> How high values are talking about? >>>> >>>> Could you attach a CPU profiler and post the results somehow? Which >>>> threads are busy on what call trees? >>>> >>>> >>>> >>>> >>>> Regarding the idea of removing of the locks in the >>>> `ResultPartitionManager`. I guess it could help a bit, but I wouldn’t >>>> expect it to solve the problem fully. >>>> >>>> Doing a blocking IO in the Netty threads is already asking for troubles, >>>> as even without locks it can block and crash many things (like >>>> createSubpartitionView). But one improvement that we might take a look, is >>>> that `BoundedBlockingSubpartitionReader` constructor is doing a blocking >>>> calls on pre-fetching the buffer. If not for that, code inside >>>> `ResultPartitionManager` would probably be non blocking, or at least much >>>> less blocking. >>>> >>>> Piotrek >>>> >>>>> On 29 Jan 2020, at 14:05, Stephan Ewen <[hidden email]> wrote: >>>>> >>>>> /CC Piotr and Zhijiang >>>>> >>>>> Sounds reasonable at first glance. Would like to hear Piotr's and >>>> Zhijiang's take, though, they know that code better than me. >>>>> >>>>> On Wed, Jan 29, 2020 at 1:58 PM David Morávek <[hidden email] >>>> <mailto:[hidden email]>> wrote: >>>>> Hi Stephan, >>>>> >>>>> I've actually managed to narrow problem down to blocked netty server >>>>> threads. I'm using 1.9.1 with few custom patches >>>>> <https://github.com/dmvk/flink/commits/1.9.1-szn < >>>> https://github.com/dmvk/flink/commits/1.9.1-szn>>, that are not relevant >>>> to >>>>> this issue. >>>>> >>>>> To highlight the problem, I've added these checks to >>>> ResultPartitionManager >>>>> <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d < >>>> https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d>>, which >>>>> measure how long "netty tasks" take to execute (monitor acquisition + >>>>> actual execution). >>>>> >>>>> We indeed have a pretty busy cluster, with high load and io waits >>>> (mostly >>>>> due to ongoing shuffles and computations). From the measurements I can >>>> see >>>>> numbers like: >>>>> >>>>> createSubpartitionView: 129255ms >>>>> createSubpartitionView: 129333ms >>>>> createSubpartitionView: 129353ms >>>>> createSubpartitionView: 129354ms >>>>> createSubpartitionView: 144419ms >>>>> createSubpartitionView: 144653ms >>>>> createSubpartitionView: 144759ms >>>>> createSubpartitionView: 144905ms >>>>> releasePartition: 145218ms >>>>> releasePartition: 145250ms >>>>> releasePartition: 145256ms >>>>> releasePartition: 145263ms >>>>> >>>>> These vary a lot, depending on what other pipelines are being >>>>> simultaneously executed. These numbers imply that at least for 145 >>>> seconds >>>>> (which is greater than conn. timeout), taskmanger was not able to accept >>>>> any connection (because of netty internals >>>>> <https://github.com/netty/netty/issues/240 < >>>> https://github.com/netty/netty/issues/240>>). >>>>> >>>>> Switching to *file* backed BoundedData implementation didn't help, >>>> because >>>>> there are still heavy IO ops being executed by netty threads when >>>> monitor >>>>> is acquired (eg. deletion of backing file). >>>>> >>>>> 1) I've tried to make more fine-grained locking (locking as single >>>>> partition insead of partitionManager). This helped a little, but >>>> pipeline >>>>> is still unable to finish in some cases. >>>>> >>>>> 2) Currently I'm trying to completely remove locking from >>>>> ResultPartitionManager, as it is seems only relevant for internal >>>>> datastructure access (can be replaced with java.concurrent). I think >>>> this >>>>> should also have an impact to overall job completition times in some >>>> cases. >>>>> >>>>> What do you think? >>>>> >>>>> On Tue, Jan 28, 2020 at 9:53 PM Stephan Ewen <[hidden email] <mailto: >>>> [hidden email]>> wrote: >>>>> >>>>>> Hi! >>>>>> >>>>>> Concerning JAR files: I think this has nothing to do with it, it is a >>>> batch >>>>>> shuffle after all. The previous stage must have completed already. >>>>>> >>>>>> A few things that come to my mind: >>>>>> - What Flink version are you using? 1.9? >>>>>> - Are you sure that the source TaskManager is still running? Earlier >>>>>> Flink versions had an issue with releasing TMs too early, sometimes >>>> before >>>>>> the result was fetched by a consumer. >>>>>> - The buffer creation on the sender / netty server side is more >>>> expensive >>>>>> than necessary, but should be nowhere near as expensive to cause a >>>> stall. >>>>>> >>>>>> Can you elaborate on what the shared lock is that all server threads >>>> are >>>>>> using? >>>>>> >>>>>> Best, >>>>>> Stephan >>>>>> >>>>>> >>>>>> On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <[hidden email] >>>> <mailto:[hidden email]>> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>>> In case of large jar, wouldn't this happen in previous stages as >>>> well >>>>>> (if >>>>>>>> so this should not be the case)? >>>>>>> >>>>>>> I’m not exactly sure how jars are distributed, but if they are being >>>>>>> sent/uploaded from one (or some other static/fixed number, like >>>> uploading >>>>>>> to and reading from a DFS) node to all, this might not scale well. >>>> Also >>>>>>> your dev deployment might not be stressing >>>> network/storage/something as >>>>>>> much as production deployment, which can also affect time to deploy >>>> the >>>>>> job. >>>>>>> >>>>>>> What’s yours job size? (How large is the jar uploaded to Flink?) >>>>>>> >>>>>>> Also there might be other factors in play here, like if you are >>>> using >>>>>>> Flink job mode (not stand alone), time to start up a Flink node >>>> might be >>>>>>> too long. Some nodes are already up and running and they are time >>>> outing >>>>>>> waiting for others to start up. >>>>>>> >>>>>>>> Also there shouldn't be any state involved >>>>>>>> (unless Beam IO's use it internally). >>>>>>> >>>>>>> My bad. Instead of >>>>>>> >>>>>>>> - data size per single job run ranging from 100G to 1TB >>>>>>> >>>>>>> I read state size 100G to 1TB. >>>>>>> >>>>>>> Piotrek >>>>>>> >>>>>>>> >>>>>>>> On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski < >>>> [hidden email] <mailto:[hidden email]>> >>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi David, >>>>>>>>> >>>>>>>>> The usual cause for connection time out is long deployment. For >>>>>> example >>>>>>> if >>>>>>>>> your Job's jar is large and takes long time to distribute across >>>> the >>>>>>>>> cluster. I’m not sure if large state could affect this as well >>>> or not. >>>>>>> Are >>>>>>>>> you sure that’s not the case? >>>>>>>>> >>>>>>>>> The think you are suggesting, I haven’t heard about previously, >>>> but >>>>>>> indeed >>>>>>>>> theoretically it could happen. Reading from mmap’ed sub >>>> partitions >>>>>> could >>>>>>>>> block the Netty threads if kernel decides to drop mmap’ed page >>>> and it >>>>>>> has >>>>>>>>> to be read from the disks. Could you check your CPU and disks IO >>>>>> usage? >>>>>>>>> This should be visible by high IOWait CPU usage. Could you for >>>> example >>>>>>> post >>>>>>>>> couple of sample results of >>>>>>>>> >>>>>>>>> iostat -xm 2 >>>>>>>>> >>>>>>>>> command from some representative Task Manager? If indeed disks >>>> are >>>>>>>>> overloaded, changing Flink’s config option >>>>>>>>> >>>>>>>>> taskmanager.network.bounded-blocking-subpartition-type >>>>>>>>> >>>>>>>>> From default to `file` could solve the problem. FYI, this option >>>> is >>>>>>>>> renamed in 1.10 to >>>>>>>>> >>>>>>>>> taskmanager.network.blocking-shuffle.type >>>>>>>>> >>>>>>>>> And it’s default value will be `file`. >>>>>>>>> >>>>>>>>> We would appreciate if you could get back to us with the results! >>>>>>>>> >>>>>>>>> Piotrek >>>>>>>>> >>>>>>>>>> On 28 Jan 2020, at 11:03, Till Rohrmann <[hidden email] >>>> <mailto:[hidden email]>> >>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi David, >>>>>>>>>> >>>>>>>>>> I'm unfortunately not familiar with these parts of Flink but I'm >>>>>>> pulling >>>>>>>>>> Piotr in who might be able to tell you more. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Till >>>>>>>>>> >>>>>>>>>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <[hidden email] >>>> <mailto:[hidden email]>> >>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hello community, >>>>>>>>>>> >>>>>>>>>>> I'm currently struggling with an Apache Beam batch pipeline on >>>> top >>>>>> of >>>>>>>>>>> Flink. The pipeline runs smoothly in smaller environments, but >>>> in >>>>>>>>>>> production it always ends up with `connection timeout` in one >>>> of the >>>>>>>>> last >>>>>>>>>>> shuffle phases. >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> .network.partition.consumer.PartitionConnectionException: >>>>>>>>>>> Connection for partition >>>>>>>>>>> >>>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f >>>>>> not >>>>>>>>>>> reachable. >>>>>>>>>>> at org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) >>>>>>>>>>> at org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) >>>>>>>>>>> at org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) >>>>>>>>>>> at >>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) >>>>>>>>>>> at >>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>>> ... >>>>>>>>>>> Caused by: >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: >>>>>>>>>>> Connection timed out: ##########/10.249.28.39:25709 < >>>> http://10.249.28.39:25709/> >>>>>>>>>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native >>>> Method) >>>>>>>>>>> at >>>>>>>>>>> >>>>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) >>>>>>>>>>> at ... >>>>>>>>>>> >>>>>>>>>>> Basically the pipeline looks as follows: >>>>>>>>>>> >>>>>>>>>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy >>>>>>>>> computation >>>>>>>>>>> - few secs per element) -> write to multiple outputs (~4) >>>>>>>>>>> >>>>>>>>>>> - cluster size: 100 tms >>>>>>>>>>> - slots per tm: 4 >>>>>>>>>>> - data size per single job run ranging from 100G to 1TB >>>>>>>>>>> - job paralelism: 400 >>>>>>>>>>> >>>>>>>>>>> I've tried to increase netty >>>>>>>>>>> `taskmanager.network.netty.client.connectTimeoutSec` with no >>>> luck. >>>>>>> Also >>>>>>>>>>> increasing # of netty threads did not help. JVM performs ok (no >>>>>> ooms, >>>>>>> gc >>>>>>>>>>> pauses, ...). Connect backlog defaults to 128. >>>>>>>>>>> >>>>>>>>>>> This is probably caused by netty threads being blocked on the >>>> server >>>>>>>>> side. >>>>>>>>>>> All these threads share the same lock, so increasing number of >>>>>> threads >>>>>>>>>>> won't help. >>>>>>>>>>> >>>>>>>>>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 >>>>>>>>>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] >>>>>>>>>>> java.lang.Thread.State: RUNNABLE >>>>>>>>>>> at java.lang.Number.<init>(Number.java:55) >>>>>>>>>>> at java.lang.Long.<init>(Long.java:947) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) >>>>>>>>>>> at java.lang.reflect.Field.get(Field.java:393) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) >>>>>>>>>>> - locked <0x00000006d822e180> (a java.lang.Object) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) >>>>>>>>>>> - locked <0x00000006cad32578> (a java.util.HashMap) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) >>>>>>>>>>> - locked <0x000000079767ff38> (a java.lang.Object) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) >>>>>>>>>>> >>>>>>>>>>> This may be related to mmap backed BoundedData implementation, >>>> where >>>>>>>>>>> `nextBuffer` seems to be somehow expensive (reflection, >>>> skipping >>>>>> empty >>>>>>>>>>> buffers?) . Just to note, last phases only shuffle metadata >>>>>> (kilobyte >>>>>>>>>>> scale), but the job paralelism remains the same due to beam >>>> nature >>>>>>>>> (400). >>>>>>>>>>> >>>>>>>>>>> Does this sound familiar to anyone? Do you have any >>>> suggestions how >>>>>> to >>>>>>>>>>> solve this? >>>>>>>>>>> >>>>>>>>>>> Thanks for help, >>>>>>>>>>> David >>>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>> >>>> > |
Sorry for touching this issue late, just come back from Chinese Spring Festival.
Actually we have not encountered this problem in production before. The problem of connection timeout was mainly caused by netty server starting delay on upstream side (big jar loading might cause as Piotr mentioned) or TaskManager exits early (as Stephan mentioned). As we know, one connection channel would be bound to only one netty thread by design, and one netty thread might be responsible for multiple channels. In this case, if the respective netty thread is blocking in heavy IO operator, then it would not respond to the connection request in time to cause timeout. Even though we remove the global lock from ResultPartitionManager, I guess that it can not fully solve this issue and actually the connection process does not touch the global lock. The global lock in ResultPartitionManager is mainly working on registering/releasing partitions, and for maintaining the global states of `isShutDown`,`registeredPartitions`. It is feasible to remove the global lock in technology/theory which might get a bit benefit to not delay create other subpartition views if one view is blocking into IO operation in some scenarios. But from another aspect, it is also meaningful to try best not block netty thread long time, that could solve the connection timeout completely. In our previous assumption/suggestion it is better to make netty thread involve in light-weight operations if possible. Let's forward the further solutions on the jira page as Piotr suggested. :) Best, Zhijiang ------------------------------------------------------------------ From:Piotr Nowojski <[hidden email]> Send Time:2020 Jan. 30 (Thu.) 19:29 To:dev <[hidden email]>; zhijiang <[hidden email]> Subject:Re: connection timeout during shuffle initialization One more thing. Could you create a JIRA ticket for this issue? We could also move the discussion there. Piotrek > On 30 Jan 2020, at 12:14, Piotr Nowojski <[hidden email]> wrote: > > Hi, > >>> I think it's perfectly ok to perform IO ops in netty threads, > (…) >>> Removing synchronization *did solve* the problem for me, because it >>> allows flink to leverage the whole netty event loop pool and it's ok to >>> have a single thread blocked for a little while (we still can accept >>> connections with other threads). > > > It’s discouraged pattern, as Netty have a thread pool for processing multiple channels, but a single channel is always handled by the same pre-defined thread (to the best of my knowledge). In Flink we are lucky that Netty threads are not doing anything critical besides registering partitions (heartbeats are handled independently) that could fail the job if blocked. And I guess you are right, if some threads are blocked on the IO, new (sub)partition registration should be handled by the non blocked threads, if not for the global lock. > > It sounds very hacky though. Also that's ignoring performance implication - one thread blocked on the disks IO, wastes CPU/network potential of ~1/16 channels (due to this fix pre determined assignment between channels <-> threads). In some scenarios that might be acceptable, with uniform tasks without data skew. But if there are simultaneously running multiple tasks with different work load patterns and/or a data skew, this can cause visible performance issues. > > Having said that, any rework to fully address this issue and make the IO non blocking, could be very substantial, so I would be more than happy to just kick the can down the road for now ;) > >>> removal of buffer prefetch in BoundedBlockingSubpartitionReader did not >>> help, I've already tried that (there are still other problematic code >>> paths, eg. releasePartition). > > > Are there other problematic parts besides releasePartition that you have already analysed? Maybe it would be better to just try moving out those calls out of the `ResultPartitionManager` somehow call stack? > >>> Let me think about how to get a relevant cpu graph from the TM, it's kind >>> of hard to target a "defective node". > > Thanks, I know it’s non trivial, but I would guess you do not have to target a “defective node”. If defective node is blocking for ~2 minutes during the failure, I’m pretty sure other nodes are being blocked constantly for seconds at a time, and profiler results from such nodes would allow us to confirm the issue and better understand what’s exactly happening. > >>> Anyway attached are some graphs from such a busy node in time of failure. > > I didn’t get/see any graphs? > >>> Is there any special reason for the synchronization I don't see? I have a >>> feeling it's only for sychronizing `registredPartitions` map access and >>> that it's perfectly ok not to synchronize `createSubpartitionView` and >>> `releasePartition` calls. > > I’m not sure. Definitely removing this lock increases concurrency and so the potential for race conditions, especially on the releasing resources paths. After briefly looking at the code, I didn’t find any obvious issue, but there are some callback/notifications happening, and generally speaking resource releasing paths are pretty hard to reason about. > > Zhijiang might spot something, as he had a good eye for catching such problems in the past. > > Besides that, you could just run couple (10? 20?) travis runs. All of the ITCases from various modules (connectors, flink-tests, …) are pretty good in catching race conditions in the network stack. > > Piotrek > >> On 29 Jan 2020, at 17:05, David Morávek <[hidden email]> wrote: >> >> Just to clarify, these are bare metal nodes (128G ram, 16 cpus + >> hyperthreading, 4xHDDS, 10g network), which run yarn, hdfs and hbase. >> >> D. >> >> On Wed, Jan 29, 2020 at 5:03 PM David Morávek <[hidden email]> >> wrote: >> >>> Hi Piotr, >>> >>> removal of buffer prefetch in BoundedBlockingSubpartitionReader did not >>> help, I've already tried that (there are still other problematic code >>> paths, eg. releasePartition). I think it's perfectly ok to perform IO ops >>> in netty threads, we just have to make sure, we can leverage multiple >>> threads at once. Synchronization in ResultPartitionManager effectively >>> decreases parallelism to one, and "netty tasks / unprocessed messages" keep >>> piling up. >>> >>> Removing synchronization *did solve* the problem for me, because it >>> allows flink to leverage the whole netty event loop pool and it's ok to >>> have a single thread blocked for a little while (we still can accept >>> connections with other threads). >>> >>> Let me think about how to get a relevant cpu graph from the TM, it's kind >>> of hard to target a "defective node". Anyway attached are some graphs from >>> such a busy node in time of failure. >>> >>> Is there any special reason for the synchronization I don't see? I have a >>> feeling it's only for sychronizing `registredPartitions` map access and >>> that it's perfectly ok not to synchronize `createSubpartitionView` and >>> `releasePartition` calls. >>> >>> Thanks, >>> D. >>> >>> On Wed, Jan 29, 2020 at 4:45 PM Piotr Nowojski <[hidden email]> >>> wrote: >>> >>>> Hi David, >>>> >>>>> with high load and io waits >>>> >>>> How high values are talking about? >>>> >>>> Could you attach a CPU profiler and post the results somehow? Which >>>> threads are busy on what call trees? >>>> >>>> >>>> >>>> >>>> Regarding the idea of removing of the locks in the >>>> `ResultPartitionManager`. I guess it could help a bit, but I wouldn’t >>>> expect it to solve the problem fully. >>>> >>>> Doing a blocking IO in the Netty threads is already asking for troubles, >>>> as even without locks it can block and crash many things (like >>>> createSubpartitionView). But one improvement that we might take a look, is >>>> that `BoundedBlockingSubpartitionReader` constructor is doing a blocking >>>> calls on pre-fetching the buffer. If not for that, code inside >>>> `ResultPartitionManager` would probably be non blocking, or at least much >>>> less blocking. >>>> >>>> Piotrek >>>> >>>>> On 29 Jan 2020, at 14:05, Stephan Ewen <[hidden email]> wrote: >>>>> >>>>> /CC Piotr and Zhijiang >>>>> >>>>> Sounds reasonable at first glance. Would like to hear Piotr's and >>>> Zhijiang's take, though, they know that code better than me. >>>>> >>>>> On Wed, Jan 29, 2020 at 1:58 PM David Morávek <[hidden email] >>>> <mailto:[hidden email]>> wrote: >>>>> Hi Stephan, >>>>> >>>>> I've actually managed to narrow problem down to blocked netty server >>>>> threads. I'm using 1.9.1 with few custom patches >>>>> <https://github.com/dmvk/flink/commits/1.9.1-szn < >>>> https://github.com/dmvk/flink/commits/1.9.1-szn>>, that are not relevant >>>> to >>>>> this issue. >>>>> >>>>> To highlight the problem, I've added these checks to >>>> ResultPartitionManager >>>>> <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d < >>>> https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d>>, which >>>>> measure how long "netty tasks" take to execute (monitor acquisition + >>>>> actual execution). >>>>> >>>>> We indeed have a pretty busy cluster, with high load and io waits >>>> (mostly >>>>> due to ongoing shuffles and computations). From the measurements I can >>>> see >>>>> numbers like: >>>>> >>>>> createSubpartitionView: 129255ms >>>>> createSubpartitionView: 129333ms >>>>> createSubpartitionView: 129353ms >>>>> createSubpartitionView: 129354ms >>>>> createSubpartitionView: 144419ms >>>>> createSubpartitionView: 144653ms >>>>> createSubpartitionView: 144759ms >>>>> createSubpartitionView: 144905ms >>>>> releasePartition: 145218ms >>>>> releasePartition: 145250ms >>>>> releasePartition: 145256ms >>>>> releasePartition: 145263ms >>>>> >>>>> These vary a lot, depending on what other pipelines are being >>>>> simultaneously executed. These numbers imply that at least for 145 >>>> seconds >>>>> (which is greater than conn. timeout), taskmanger was not able to accept >>>>> any connection (because of netty internals >>>>> <https://github.com/netty/netty/issues/240 < >>>> https://github.com/netty/netty/issues/240>>). >>>>> >>>>> Switching to *file* backed BoundedData implementation didn't help, >>>> because >>>>> there are still heavy IO ops being executed by netty threads when >>>> monitor >>>>> is acquired (eg. deletion of backing file). >>>>> >>>>> 1) I've tried to make more fine-grained locking (locking as single >>>>> partition insead of partitionManager). This helped a little, but >>>> pipeline >>>>> is still unable to finish in some cases. >>>>> >>>>> 2) Currently I'm trying to completely remove locking from >>>>> ResultPartitionManager, as it is seems only relevant for internal >>>>> datastructure access (can be replaced with java.concurrent). I think >>>> this >>>>> should also have an impact to overall job completition times in some >>>> cases. >>>>> >>>>> What do you think? >>>>> >>>>> On Tue, Jan 28, 2020 at 9:53 PM Stephan Ewen <[hidden email] <mailto: >>>> [hidden email]>> wrote: >>>>> >>>>>> Hi! >>>>>> >>>>>> Concerning JAR files: I think this has nothing to do with it, it is a >>>> batch >>>>>> shuffle after all. The previous stage must have completed already. >>>>>> >>>>>> A few things that come to my mind: >>>>>> - What Flink version are you using? 1.9? >>>>>> - Are you sure that the source TaskManager is still running? Earlier >>>>>> Flink versions had an issue with releasing TMs too early, sometimes >>>> before >>>>>> the result was fetched by a consumer. >>>>>> - The buffer creation on the sender / netty server side is more >>>> expensive >>>>>> than necessary, but should be nowhere near as expensive to cause a >>>> stall. >>>>>> >>>>>> Can you elaborate on what the shared lock is that all server threads >>>> are >>>>>> using? >>>>>> >>>>>> Best, >>>>>> Stephan >>>>>> >>>>>> >>>>>> On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <[hidden email] >>>> <mailto:[hidden email]>> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>>> In case of large jar, wouldn't this happen in previous stages as >>>> well >>>>>> (if >>>>>>>> so this should not be the case)? >>>>>>> >>>>>>> I’m not exactly sure how jars are distributed, but if they are being >>>>>>> sent/uploaded from one (or some other static/fixed number, like >>>> uploading >>>>>>> to and reading from a DFS) node to all, this might not scale well. >>>> Also >>>>>>> your dev deployment might not be stressing >>>> network/storage/something as >>>>>>> much as production deployment, which can also affect time to deploy >>>> the >>>>>> job. >>>>>>> >>>>>>> What’s yours job size? (How large is the jar uploaded to Flink?) >>>>>>> >>>>>>> Also there might be other factors in play here, like if you are >>>> using >>>>>>> Flink job mode (not stand alone), time to start up a Flink node >>>> might be >>>>>>> too long. Some nodes are already up and running and they are time >>>> outing >>>>>>> waiting for others to start up. >>>>>>> >>>>>>>> Also there shouldn't be any state involved >>>>>>>> (unless Beam IO's use it internally). >>>>>>> >>>>>>> My bad. Instead of >>>>>>> >>>>>>>> - data size per single job run ranging from 100G to 1TB >>>>>>> >>>>>>> I read state size 100G to 1TB. >>>>>>> >>>>>>> Piotrek >>>>>>> >>>>>>>> >>>>>>>> On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski < >>>> [hidden email] <mailto:[hidden email]>> >>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi David, >>>>>>>>> >>>>>>>>> The usual cause for connection time out is long deployment. For >>>>>> example >>>>>>> if >>>>>>>>> your Job's jar is large and takes long time to distribute across >>>> the >>>>>>>>> cluster. I’m not sure if large state could affect this as well >>>> or not. >>>>>>> Are >>>>>>>>> you sure that’s not the case? >>>>>>>>> >>>>>>>>> The think you are suggesting, I haven’t heard about previously, >>>> but >>>>>>> indeed >>>>>>>>> theoretically it could happen. Reading from mmap’ed sub >>>> partitions >>>>>> could >>>>>>>>> block the Netty threads if kernel decides to drop mmap’ed page >>>> and it >>>>>>> has >>>>>>>>> to be read from the disks. Could you check your CPU and disks IO >>>>>> usage? >>>>>>>>> This should be visible by high IOWait CPU usage. Could you for >>>> example >>>>>>> post >>>>>>>>> couple of sample results of >>>>>>>>> >>>>>>>>> iostat -xm 2 >>>>>>>>> >>>>>>>>> command from some representative Task Manager? If indeed disks >>>> are >>>>>>>>> overloaded, changing Flink’s config option >>>>>>>>> >>>>>>>>> taskmanager.network.bounded-blocking-subpartition-type >>>>>>>>> >>>>>>>>> From default to `file` could solve the problem. FYI, this option >>>> is >>>>>>>>> renamed in 1.10 to >>>>>>>>> >>>>>>>>> taskmanager.network.blocking-shuffle.type >>>>>>>>> >>>>>>>>> And it’s default value will be `file`. >>>>>>>>> >>>>>>>>> We would appreciate if you could get back to us with the results! >>>>>>>>> >>>>>>>>> Piotrek >>>>>>>>> >>>>>>>>>> On 28 Jan 2020, at 11:03, Till Rohrmann <[hidden email] >>>> <mailto:[hidden email]>> >>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi David, >>>>>>>>>> >>>>>>>>>> I'm unfortunately not familiar with these parts of Flink but I'm >>>>>>> pulling >>>>>>>>>> Piotr in who might be able to tell you more. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Till >>>>>>>>>> >>>>>>>>>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <[hidden email] >>>> <mailto:[hidden email]>> >>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hello community, >>>>>>>>>>> >>>>>>>>>>> I'm currently struggling with an Apache Beam batch pipeline on >>>> top >>>>>> of >>>>>>>>>>> Flink. The pipeline runs smoothly in smaller environments, but >>>> in >>>>>>>>>>> production it always ends up with `connection timeout` in one >>>> of the >>>>>>>>> last >>>>>>>>>>> shuffle phases. >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> .network.partition.consumer.PartitionConnectionException: >>>>>>>>>>> Connection for partition >>>>>>>>>>> >>>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f >>>>>> not >>>>>>>>>>> reachable. >>>>>>>>>>> at org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) >>>>>>>>>>> at org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) >>>>>>>>>>> at org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) >>>>>>>>>>> at >>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) >>>>>>>>>>> at >>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>>> ... >>>>>>>>>>> Caused by: >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: >>>>>>>>>>> Connection timed out: ##########/10.249.28.39:25709 < >>>> http://10.249.28.39:25709/> >>>>>>>>>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native >>>> Method) >>>>>>>>>>> at >>>>>>>>>>> >>>>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) >>>>>>>>>>> at ... >>>>>>>>>>> >>>>>>>>>>> Basically the pipeline looks as follows: >>>>>>>>>>> >>>>>>>>>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy >>>>>>>>> computation >>>>>>>>>>> - few secs per element) -> write to multiple outputs (~4) >>>>>>>>>>> >>>>>>>>>>> - cluster size: 100 tms >>>>>>>>>>> - slots per tm: 4 >>>>>>>>>>> - data size per single job run ranging from 100G to 1TB >>>>>>>>>>> - job paralelism: 400 >>>>>>>>>>> >>>>>>>>>>> I've tried to increase netty >>>>>>>>>>> `taskmanager.network.netty.client.connectTimeoutSec` with no >>>> luck. >>>>>>> Also >>>>>>>>>>> increasing # of netty threads did not help. JVM performs ok (no >>>>>> ooms, >>>>>>> gc >>>>>>>>>>> pauses, ...). Connect backlog defaults to 128. >>>>>>>>>>> >>>>>>>>>>> This is probably caused by netty threads being blocked on the >>>> server >>>>>>>>> side. >>>>>>>>>>> All these threads share the same lock, so increasing number of >>>>>> threads >>>>>>>>>>> won't help. >>>>>>>>>>> >>>>>>>>>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 >>>>>>>>>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] >>>>>>>>>>> java.lang.Thread.State: RUNNABLE >>>>>>>>>>> at java.lang.Number.<init>(Number.java:55) >>>>>>>>>>> at java.lang.Long.<init>(Long.java:947) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) >>>>>>>>>>> at java.lang.reflect.Field.get(Field.java:393) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) >>>>>>>>>>> - locked <0x00000006d822e180> (a java.lang.Object) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) >>>>>>>>>>> - locked <0x00000006cad32578> (a java.util.HashMap) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) >>>>>>>>>>> - locked <0x000000079767ff38> (a java.lang.Object) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) >>>>>>>>>>> >>>>>>>>>>> This may be related to mmap backed BoundedData implementation, >>>> where >>>>>>>>>>> `nextBuffer` seems to be somehow expensive (reflection, >>>> skipping >>>>>> empty >>>>>>>>>>> buffers?) . Just to note, last phases only shuffle metadata >>>>>> (kilobyte >>>>>>>>>>> scale), but the job paralelism remains the same due to beam >>>> nature >>>>>>>>> (400). >>>>>>>>>>> >>>>>>>>>>> Does this sound familiar to anyone? Do you have any >>>> suggestions how >>>>>> to >>>>>>>>>>> solve this? >>>>>>>>>>> >>>>>>>>>>> Thanks for help, >>>>>>>>>>> David >>>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>> >>>> > |
Hi,
Coming back to this idea: >> Removing synchronization *did solve* the problem for me, because it >> allows flink to leverage the whole netty event loop pool and it's ok to >> have a single thread blocked for a little while (we still can accept >> connections with other threads). Assignment between channels and threads is not done on a basis “assign to some idle channel”, but based on some trivial hash function (`PowerOfTwoEventExecutorChooser` or `GenericEventExecutorChooser`). Without the global ResultPartitionManager lock: # assuming 16 threads 1. If a thread A blocks for 2+ minutes 2. New incoming connection has at best 1/16 chances to be blocked (instead of 100% as it is now) So removing the lock at best only decreases the magnitude of the problem by factor of number of Netty threads. At worst, it might not help much, as if one thread is blocked on IO usually it doesn’t mean that other thread can perform IO without blocking itself and being enqueued one after another by the Kernel. Or even completely overloading the IO system. This also ignores the issue that if a thread is currently blocked on blocking IO outside of the lock, it will still prevent 1/16th of incoming connections to be accepted, both now and after theoretical removal of the ResultPartitionManager's lock. In short, I’m +1 for trying to remove/limit a scope of this lock, but I wouldn’t expect wonders :( Piotrek > On 3 Feb 2020, at 08:39, Zhijiang <[hidden email]> wrote: > > Sorry for touching this issue late, just come back from Chinese Spring Festival. > > Actually we have not encountered this problem in production before. The problem of connection timeout was mainly caused by netty server starting delay on upstream side (big jar loading might cause as Piotr mentioned) or TaskManager exits early (as Stephan mentioned). > > As we know, one connection channel would be bound to only one netty thread by design, and one netty thread might be responsible for multiple channels. In this case, if the respective netty thread is blocking in heavy IO operator, then it would not respond to the connection request in time to cause timeout. Even though we remove the global lock from ResultPartitionManager, I guess that it can not fully solve this issue and actually the connection process does not touch the global lock. > > The global lock in ResultPartitionManager is mainly working on registering/releasing partitions, and for maintaining the global states of `isShutDown`,`registeredPartitions`. It is feasible to remove the global lock in technology/theory which might get a bit benefit to not delay create other subpartition views if one view is blocking into IO operation in some scenarios. But from another aspect, it is also meaningful to try best not block netty thread long time, that could solve the connection timeout completely. In our previous assumption/suggestion it is better to make netty thread involve in light-weight operations if possible. > > Let's forward the further solutions on the jira page as Piotr suggested. :) > > Best, > Zhijiang > > ------------------------------------------------------------------ > From:Piotr Nowojski <[hidden email]> > Send Time:2020 Jan. 30 (Thu.) 19:29 > To:dev <[hidden email]>; zhijiang <[hidden email]> > Subject:Re: connection timeout during shuffle initialization > > One more thing. Could you create a JIRA ticket for this issue? We could also move the discussion there. > > Piotrek > > > On 30 Jan 2020, at 12:14, Piotr Nowojski <[hidden email]> wrote: > > > > Hi, > > > >>> I think it's perfectly ok to perform IO ops in netty threads, > > (…) > >>> Removing synchronization *did solve* the problem for me, because it > >>> allows flink to leverage the whole netty event loop pool and it's ok to > >>> have a single thread blocked for a little while (we still can accept > >>> connections with other threads). > > > > > > It’s discouraged pattern, as Netty have a thread pool for processing multiple channels, but a single channel is always handled by the same pre-defined thread (to the best of my knowledge). In Flink we are lucky that Netty threads are not doing anything critical besides registering partitions (heartbeats are handled independently) that could fail the job if blocked. And I guess you are right, if some threads are blocked on the IO, new (sub)partition registration should be handled by the non blocked threads, if not for the global lock. > > > > It sounds very hacky though. Also that's ignoring performance implication - one thread blocked on the disks IO, wastes CPU/network potential of ~1/16 channels (due to this fix pre determined assignment between channels <-> threads). In some scenarios that might be acceptable, with uniform tasks without data skew. But if there are simultaneously running multiple tasks with different work load patterns and/or a data skew, this can cause visible performance issues. > > > > Having said that, any rework to fully address this issue and make the IO non blocking, could be very substantial, so I would be more than happy to just kick the can down the road for now ;) > > > >>> removal of buffer prefetch in BoundedBlockingSubpartitionReader did not > >>> help, I've already tried that (there are still other problematic code > >>> paths, eg. releasePartition). > > > > > > Are there other problematic parts besides releasePartition that you have already analysed? Maybe it would be better to just try moving out those calls out of the `ResultPartitionManager` somehow call stack? > > > >>> Let me think about how to get a relevant cpu graph from the TM, it's kind > >>> of hard to target a "defective node". > > > > Thanks, I know it’s non trivial, but I would guess you do not have to target a “defective node”. If defective node is blocking for ~2 minutes during the failure, I’m pretty sure other nodes are being blocked constantly for seconds at a time, and profiler results from such nodes would allow us to confirm the issue and better understand what’s exactly happening. > > > >>> Anyway attached are some graphs from such a busy node in time of failure. > > > > I didn’t get/see any graphs? > > > >>> Is there any special reason for the synchronization I don't see? I have a > >>> feeling it's only for sychronizing `registredPartitions` map access and > >>> that it's perfectly ok not to synchronize `createSubpartitionView` and > >>> `releasePartition` calls. > > > > I’m not sure. Definitely removing this lock increases concurrency and so the potential for race conditions, especially on the releasing resources paths. After briefly looking at the code, I didn’t find any obvious issue, but there are some callback/notifications happening, and generally speaking resource releasing paths are pretty hard to reason about. > > > > Zhijiang might spot something, as he had a good eye for catching such problems in the past. > > > > Besides that, you could just run couple (10? 20?) travis runs. All of the ITCases from various modules (connectors, flink-tests, …) are pretty good in catching race conditions in the network stack. > > > > Piotrek > > > >> On 29 Jan 2020, at 17:05, David Morávek <[hidden email]> wrote: > >> > >> Just to clarify, these are bare metal nodes (128G ram, 16 cpus + > >> hyperthreading, 4xHDDS, 10g network), which run yarn, hdfs and hbase. > >> > >> D. > >> > >> On Wed, Jan 29, 2020 at 5:03 PM David Morávek <[hidden email]> > >> wrote: > >> > >>> Hi Piotr, > >>> > >>> removal of buffer prefetch in BoundedBlockingSubpartitionReader did not > >>> help, I've already tried that (there are still other problematic code > >>> paths, eg. releasePartition). I think it's perfectly ok to perform IO ops > >>> in netty threads, we just have to make sure, we can leverage multiple > >>> threads at once. Synchronization in ResultPartitionManager effectively > >>> decreases parallelism to one, and "netty tasks / unprocessed messages" keep > >>> piling up. > >>> > >>> Removing synchronization *did solve* the problem for me, because it > >>> allows flink to leverage the whole netty event loop pool and it's ok to > >>> have a single thread blocked for a little while (we still can accept > >>> connections with other threads). > >>> > >>> Let me think about how to get a relevant cpu graph from the TM, it's kind > >>> of hard to target a "defective node". Anyway attached are some graphs from > >>> such a busy node in time of failure. > >>> > >>> Is there any special reason for the synchronization I don't see? I have a > >>> feeling it's only for sychronizing `registredPartitions` map access and > >>> that it's perfectly ok not to synchronize `createSubpartitionView` and > >>> `releasePartition` calls. > >>> > >>> Thanks, > >>> D. > >>> > >>> On Wed, Jan 29, 2020 at 4:45 PM Piotr Nowojski <[hidden email]> > >>> wrote: > >>> > >>>> Hi David, > >>>> > >>>>> with high load and io waits > >>>> > >>>> How high values are talking about? > >>>> > >>>> Could you attach a CPU profiler and post the results somehow? Which > >>>> threads are busy on what call trees? > >>>> > >>>> > >>>> > >>>> > >>>> Regarding the idea of removing of the locks in the > >>>> `ResultPartitionManager`. I guess it could help a bit, but I wouldn’t > >>>> expect it to solve the problem fully. > >>>> > >>>> Doing a blocking IO in the Netty threads is already asking for troubles, > >>>> as even without locks it can block and crash many things (like > >>>> createSubpartitionView). But one improvement that we might take a look, is > >>>> that `BoundedBlockingSubpartitionReader` constructor is doing a blocking > >>>> calls on pre-fetching the buffer. If not for that, code inside > >>>> `ResultPartitionManager` would probably be non blocking, or at least much > >>>> less blocking. > >>>> > >>>> Piotrek > >>>> > >>>>> On 29 Jan 2020, at 14:05, Stephan Ewen <[hidden email]> wrote: > >>>>> > >>>>> /CC Piotr and Zhijiang > >>>>> > >>>>> Sounds reasonable at first glance. Would like to hear Piotr's and > >>>> Zhijiang's take, though, they know that code better than me. > >>>>> > >>>>> On Wed, Jan 29, 2020 at 1:58 PM David Morávek <[hidden email] > >>>> <mailto:[hidden email]>> wrote: > >>>>> Hi Stephan, > >>>>> > >>>>> I've actually managed to narrow problem down to blocked netty server > >>>>> threads. I'm using 1.9.1 with few custom patches > >>>>> <https://github.com/dmvk/flink/commits/1.9.1-szn <https://github.com/dmvk/flink/commits/1.9.1-szn> < > >>>> https://github.com/dmvk/flink/commits/1.9.1-szn>> <https://github.com/dmvk/flink/commits/1.9.1-szn%3E%3E>, that are not relevant > >>>> to > >>>>> this issue. > >>>>> > >>>>> To highlight the problem, I've added these checks to > >>>> ResultPartitionManager > >>>>> <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d> < > >>>> https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d>> <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d%3E%3E>, which > >>>>> measure how long "netty tasks" take to execute (monitor acquisition + > >>>>> actual execution). > >>>>> > >>>>> We indeed have a pretty busy cluster, with high load and io waits > >>>> (mostly > >>>>> due to ongoing shuffles and computations). From the measurements I can > >>>> see > >>>>> numbers like: > >>>>> > >>>>> createSubpartitionView: 129255ms > >>>>> createSubpartitionView: 129333ms > >>>>> createSubpartitionView: 129353ms > >>>>> createSubpartitionView: 129354ms > >>>>> createSubpartitionView: 144419ms > >>>>> createSubpartitionView: 144653ms > >>>>> createSubpartitionView: 144759ms > >>>>> createSubpartitionView: 144905ms > >>>>> releasePartition: 145218ms > >>>>> releasePartition: 145250ms > >>>>> releasePartition: 145256ms > >>>>> releasePartition: 145263ms > >>>>> > >>>>> These vary a lot, depending on what other pipelines are being > >>>>> simultaneously executed. These numbers imply that at least for 145 > >>>> seconds > >>>>> (which is greater than conn. timeout), taskmanger was not able to accept > >>>>> any connection (because of netty internals > >>>>> <https://github.com/netty/netty/issues/240 <https://github.com/netty/netty/issues/240> < > >>>> https://github.com/netty/netty/issues/240>> <https://github.com/netty/netty/issues/240%3E%3E>). > >>>>> > >>>>> Switching to *file* backed BoundedData implementation didn't help, > >>>> because > >>>>> there are still heavy IO ops being executed by netty threads when > >>>> monitor > >>>>> is acquired (eg. deletion of backing file). > >>>>> > >>>>> 1) I've tried to make more fine-grained locking (locking as single > >>>>> partition insead of partitionManager). This helped a little, but > >>>> pipeline > >>>>> is still unable to finish in some cases. > >>>>> > >>>>> 2) Currently I'm trying to completely remove locking from > >>>>> ResultPartitionManager, as it is seems only relevant for internal > >>>>> datastructure access (can be replaced with java.concurrent). I think > >>>> this > >>>>> should also have an impact to overall job completition times in some > >>>> cases. > >>>>> > >>>>> What do you think? > >>>>> > >>>>> On Tue, Jan 28, 2020 at 9:53 PM Stephan Ewen <[hidden email] <mailto: > >>>> [hidden email]>> wrote: > >>>>> > >>>>>> Hi! > >>>>>> > >>>>>> Concerning JAR files: I think this has nothing to do with it, it is a > >>>> batch > >>>>>> shuffle after all. The previous stage must have completed already. > >>>>>> > >>>>>> A few things that come to my mind: > >>>>>> - What Flink version are you using? 1.9? > >>>>>> - Are you sure that the source TaskManager is still running? Earlier > >>>>>> Flink versions had an issue with releasing TMs too early, sometimes > >>>> before > >>>>>> the result was fetched by a consumer. > >>>>>> - The buffer creation on the sender / netty server side is more > >>>> expensive > >>>>>> than necessary, but should be nowhere near as expensive to cause a > >>>> stall. > >>>>>> > >>>>>> Can you elaborate on what the shared lock is that all server threads > >>>> are > >>>>>> using? > >>>>>> > >>>>>> Best, > >>>>>> Stephan > >>>>>> > >>>>>> > >>>>>> On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <[hidden email] > >>>> <mailto:[hidden email]>> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>>> In case of large jar, wouldn't this happen in previous stages as > >>>> well > >>>>>> (if > >>>>>>>> so this should not be the case)? > >>>>>>> > >>>>>>> I’m not exactly sure how jars are distributed, but if they are being > >>>>>>> sent/uploaded from one (or some other static/fixed number, like > >>>> uploading > >>>>>>> to and reading from a DFS) node to all, this might not scale well. > >>>> Also > >>>>>>> your dev deployment might not be stressing > >>>> network/storage/something as > >>>>>>> much as production deployment, which can also affect time to deploy > >>>> the > >>>>>> job. > >>>>>>> > >>>>>>> What’s yours job size? (How large is the jar uploaded to Flink?) > >>>>>>> > >>>>>>> Also there might be other factors in play here, like if you are > >>>> using > >>>>>>> Flink job mode (not stand alone), time to start up a Flink node > >>>> might be > >>>>>>> too long. Some nodes are already up and running and they are time > >>>> outing > >>>>>>> waiting for others to start up. > >>>>>>> > >>>>>>>> Also there shouldn't be any state involved > >>>>>>>> (unless Beam IO's use it internally). > >>>>>>> > >>>>>>> My bad. Instead of > >>>>>>> > >>>>>>>> - data size per single job run ranging from 100G to 1TB > >>>>>>> > >>>>>>> I read state size 100G to 1TB. > >>>>>>> > >>>>>>> Piotrek > >>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski < > >>>> [hidden email] <mailto:[hidden email]>> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi David, > >>>>>>>>> > >>>>>>>>> The usual cause for connection time out is long deployment. For > >>>>>> example > >>>>>>> if > >>>>>>>>> your Job's jar is large and takes long time to distribute across > >>>> the > >>>>>>>>> cluster. I’m not sure if large state could affect this as well > >>>> or not. > >>>>>>> Are > >>>>>>>>> you sure that’s not the case? > >>>>>>>>> > >>>>>>>>> The think you are suggesting, I haven’t heard about previously, > >>>> but > >>>>>>> indeed > >>>>>>>>> theoretically it could happen. Reading from mmap’ed sub > >>>> partitions > >>>>>> could > >>>>>>>>> block the Netty threads if kernel decides to drop mmap’ed page > >>>> and it > >>>>>>> has > >>>>>>>>> to be read from the disks. Could you check your CPU and disks IO > >>>>>> usage? > >>>>>>>>> This should be visible by high IOWait CPU usage. Could you for > >>>> example > >>>>>>> post > >>>>>>>>> couple of sample results of > >>>>>>>>> > >>>>>>>>> iostat -xm 2 > >>>>>>>>> > >>>>>>>>> command from some representative Task Manager? If indeed disks > >>>> are > >>>>>>>>> overloaded, changing Flink’s config option > >>>>>>>>> > >>>>>>>>> taskmanager.network.bounded-blocking-subpartition-type > >>>>>>>>> > >>>>>>>>> From default to `file` could solve the problem. FYI, this option > >>>> is > >>>>>>>>> renamed in 1.10 to > >>>>>>>>> > >>>>>>>>> taskmanager.network.blocking-shuffle.type > >>>>>>>>> > >>>>>>>>> And it’s default value will be `file`. > >>>>>>>>> > >>>>>>>>> We would appreciate if you could get back to us with the results! > >>>>>>>>> > >>>>>>>>> Piotrek > >>>>>>>>> > >>>>>>>>>> On 28 Jan 2020, at 11:03, Till Rohrmann <[hidden email] > >>>> <mailto:[hidden email]>> > >>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Hi David, > >>>>>>>>>> > >>>>>>>>>> I'm unfortunately not familiar with these parts of Flink but I'm > >>>>>>> pulling > >>>>>>>>>> Piotr in who might be able to tell you more. > >>>>>>>>>> > >>>>>>>>>> Cheers, > >>>>>>>>>> Till > >>>>>>>>>> > >>>>>>>>>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <[hidden email] > >>>> <mailto:[hidden email]>> > >>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hello community, > >>>>>>>>>>> > >>>>>>>>>>> I'm currently struggling with an Apache Beam batch pipeline on > >>>> top > >>>>>> of > >>>>>>>>>>> Flink. The pipeline runs smoothly in smaller environments, but > >>>> in > >>>>>>>>>>> production it always ends up with `connection timeout` in one > >>>> of the > >>>>>>>>> last > >>>>>>>>>>> shuffle phases. > >>>>>>>>>>> > >>>>>>>>>>> org.apache.flink.runtime.io < > >>>> http://org.apache.flink.runtime.io/> <http://org.apache.flink.runtime.io></a>> > >>>>>>>>>>> .network.partition.consumer.PartitionConnectionException: > >>>>>>>>>>> Connection for partition > >>>>>>>>>>> > >>>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f > >>>>>> not > >>>>>>>>>>> reachable. > >>>>>>>>>>> at org.apache.flink.runtime.io < > >>>> http://org.apache.flink.runtime.io/> <http://org.apache.flink.runtime.io></a>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) > >>>>>>>>>>> at org.apache.flink.runtime.io < > >>>> http://org.apache.flink.runtime.io/> <http://org.apache.flink.runtime.io></a>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) > >>>>>>>>>>> at org.apache.flink.runtime.io < > >>>> http://org.apache.flink.runtime.io/> <http://org.apache.flink.runtime.io></a>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) > >>>>>>>>>>> at > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) > >>>>>>>>>>> at > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) > >>>>>>>>>>> at > >>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) > >>>>>>>>>>> at > >>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > >>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) > >>>>>>>>>>> ... > >>>>>>>>>>> Caused by: > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: > >>>>>>>>>>> Connection timed out: ##########/10.249.28.39:25709 < > >>>> http://10.249.28.39:25709/> <http://10.249.28.39:25709></a>> > >>>>>>>>>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native > >>>> Method) > >>>>>>>>>>> at > >>>>>>>>>>> > >>>>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > >>>>>>>>>>> at ... > >>>>>>>>>>> > >>>>>>>>>>> Basically the pipeline looks as follows: > >>>>>>>>>>> > >>>>>>>>>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy > >>>>>>>>> computation > >>>>>>>>>>> - few secs per element) -> write to multiple outputs (~4) > >>>>>>>>>>> > >>>>>>>>>>> - cluster size: 100 tms > >>>>>>>>>>> - slots per tm: 4 > >>>>>>>>>>> - data size per single job run ranging from 100G to 1TB > >>>>>>>>>>> - job paralelism: 400 > >>>>>>>>>>> > >>>>>>>>>>> I've tried to increase netty > >>>>>>>>>>> `taskmanager.network.netty.client.connectTimeoutSec` with no > >>>> luck. > >>>>>>> Also > >>>>>>>>>>> increasing # of netty threads did not help. JVM performs ok (no > >>>>>> ooms, > >>>>>>> gc > >>>>>>>>>>> pauses, ...). Connect backlog defaults to 128. > >>>>>>>>>>> > >>>>>>>>>>> This is probably caused by netty threads being blocked on the > >>>> server > >>>>>>>>> side. > >>>>>>>>>>> All these threads share the same lock, so increasing number of > >>>>>> threads > >>>>>>>>>>> won't help. > >>>>>>>>>>> > >>>>>>>>>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 > >>>>>>>>>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] > >>>>>>>>>>> java.lang.Thread.State: RUNNABLE > >>>>>>>>>>> at java.lang.Number.<init>(Number.java:55) > >>>>>>>>>>> at java.lang.Long.<init>(Long.java:947) > >>>>>>>>>>> at > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) > >>>>>>>>>>> at java.lang.reflect.Field.get(Field.java:393) > >>>>>>>>>>> at > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) > >>>>>>>>>>> at > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) > >>>>>>>>>>> at > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) > >>>>>>>>>>> at > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) > >>>>>>>>>>> at > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) > >>>>>>>>>>> at > >>>>>>>>>>> org.apache.flink.runtime.io < > >>>> http://org.apache.flink.runtime.io/> <http://org.apache.flink.runtime.io></a>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) > >>>>>>>>>>> at > >>>>>>>>>>> org.apache.flink.runtime.io < > >>>> http://org.apache.flink.runtime.io/> <http://org.apache.flink.runtime.io></a>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) > >>>>>>>>>>> at > >>>>>>>>>>> org.apache.flink.runtime.io < > >>>> http://org.apache.flink.runtime.io/> <http://org.apache.flink.runtime.io></a>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) > >>>>>>>>>>> at > >>>>>>>>>>> org.apache.flink.runtime.io < > >>>> http://org.apache.flink.runtime.io/> <http://org.apache.flink.runtime.io></a>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) > >>>>>>>>>>> - locked <0x00000006d822e180> (a java.lang.Object) > >>>>>>>>>>> at > >>>>>>>>>>> org.apache.flink.runtime.io < > >>>> http://org.apache.flink.runtime.io/> <http://org.apache.flink.runtime.io></a>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) > >>>>>>>>>>> at > >>>>>>>>>>> org.apache.flink.runtime.io < > >>>> http://org.apache.flink.runtime.io/> <http://org.apache.flink.runtime.io></a>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) > >>>>>>>>>>> - locked <0x00000006cad32578> (a java.util.HashMap) > >>>>>>>>>>> at > >>>>>>>>>>> org.apache.flink.runtime.io < > >>>> http://org.apache.flink.runtime.io/> <http://org.apache.flink.runtime.io></a>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) > >>>>>>>>>>> - locked <0x000000079767ff38> (a java.lang.Object) > >>>>>>>>>>> at > >>>>>>>>>>> org.apache.flink.runtime.io < > >>>> http://org.apache.flink.runtime.io/> <http://org.apache.flink.runtime.io></a>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) > >>>>>>>>>>> at > >>>>>>>>>>> org.apache.flink.runtime.io < > >>>> http://org.apache.flink.runtime.io/> <http://org.apache.flink.runtime.io></a>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) > >>>>>>>>>>> at > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > >>>>>>>>>>> at > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > >>>>>>>>>>> at > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > >>>>>>>>>>> at > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > >>>>>>>>>>> > >>>>>>>>>>> This may be related to mmap backed BoundedData implementation, > >>>> where > >>>>>>>>>>> `nextBuffer` seems to be somehow expensive (reflection, > >>>> skipping > >>>>>> empty > >>>>>>>>>>> buffers?) . Just to note, last phases only shuffle metadata > >>>>>> (kilobyte > >>>>>>>>>>> scale), but the job paralelism remains the same due to beam > >>>> nature > >>>>>>>>> (400). > >>>>>>>>>>> > >>>>>>>>>>> Does this sound familiar to anyone? Do you have any > >>>> suggestions how > >>>>>> to > >>>>>>>>>>> solve this? > >>>>>>>>>>> > >>>>>>>>>>> Thanks for help, > >>>>>>>>>>> David > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>> > >>>> > > > |
Free forum by Nabble | Edit this page |