Hi everyone,
I had some jobs running over the night and in two of them after about half an hour the following exception occurred. Do you know why this happens? Thanks, Sebastian tenem16.hpi.uni-potsdam.de Error: java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:56)) -> Filter (Filter at de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:68)) -> FlatMap (FlatMap at de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(TraversalStrategy.scala:46)) -> Map (Map at de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Buffer re-ordering: expected buffer with sequence number 17841, but received 17842. at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Buffer re-ordering: expected buffer with sequence number 17841, but received 17842. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607) at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Buffer re-ordering: expected buffer with sequence number 17841, but received 17842. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784) Caused by: org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException: Buffer re-ordering: expected buffer with sequence number 17841, but received 17842. at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.onBuffer(RemoteInputChannel.java:253) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeBufferOrEvent(PartitionRequestClientHandler.java:279) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:214) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:158) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) --- Sebastian Kruse Doktorand am Fachbereich Information Systems Group Hasso-Plattner-Institut an der Universität Potsdam Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240 Amtsgericht Potsdam, HRB 12184 Geschäftsführung: Prof. Dr. Christoph Meinel |
This is a critical bug.
- which version are you using? If snapshot, which commit? - what is your setup? Number of machines, datset etc? - is it reproducible? On Wednesday, June 3, 2015, Kruse, Sebastian <[hidden email]> wrote: > Hi everyone, > > I had some jobs running over the night and in two of them after about half > an hour the following exception occurred. Do you know why this happens? > > Thanks, > Sebastian > > tenem16.hpi.uni-potsdam.de > Error: java.lang.Exception: The data preparation for task 'CHAIN > GroupReduce (GroupReduce at > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:56)) > -> Filter (Filter at > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:68)) > -> FlatMap (FlatMap at > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(TraversalStrategy.scala:46)) > -> Map (Map at > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > , caused an error: Error obtaining the sorted input: Thread 'SortMerger > Reading Thread' terminated due to an exception: Buffer re-ordering: > expected buffer with sequence number 17841, but received 17842. > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger Reading Thread' terminated due to an exception: Buffer > re-ordering: expected buffer with sequence number 17841, but received 17842. > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607) > at > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145) > at > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466) > ... 3 more > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > terminated due to an exception: Buffer re-ordering: expected buffer with > sequence number 17841, but received 17842. > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784) > Caused by: > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException: > Buffer re-ordering: expected buffer with sequence number 17841, but > received 17842. > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.onBuffer(RemoteInputChannel.java:253) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeBufferOrEvent(PartitionRequestClientHandler.java:279) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:214) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:158) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at java.lang.Thread.run(Thread.java:745) > > > --- > Sebastian Kruse > Doktorand am Fachbereich Information Systems Group > Hasso-Plattner-Institut an der Universität Potsdam > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam > Tel +49 331 5509 240 > Amtsgericht Potsdam, HRB 12184 > Geschäftsführung: Prof. Dr. Christoph Meinel > > > |
I am currently using 0.9-SNAPSHOT. All the non-jar files are from an older build, but I recently manually updated the flink-dist.jar with commit d163a817fa2e330e86384d0bbcd104f051a6fb48.
Our setup consists of 10 workers and a master, all interconnected via a switch (100 Mbit, I think). The data set is an NTriple file of about 8 GB, however, intermediate datasets might be much larger. However, for smaller datasets, I could not observe this problem, yet. Also, during the failure there are a lot of concurrent shuffles ongoing [1,2]. Additionally, it might be interesting that in the affected jobs either this exception occurs or another one that looks like a network disruption. [3] So, it might well be, that our setup suffers from occasional network errors, especially during high network load - but that’s just a plain guess. Regarding the reproducibility, I can only say right now, that this error occurred twice. I will re-run the jobs and see, if the errors can be observed again and let you know. Cheers, Sebastian [1] https://owncloud.hpi.de/public.php?service=files&t=89dcba7ce63ed053331f5099cad64704 [2] https://owncloud.hpi.de/public.php?service=files&t=1b16d66d6b9e41c68bf598309d57b7b1 [3] Stack trace: tenem18.hpi.uni-potsdam.de Error: java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:56)) -> Filter (Filter at de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:68)) -> FlatMap (FlatMap at de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(TraversalStrategy.scala:46)) -> Map (Map at de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607) at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784) Caused by: java.io.EOFException at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:290) at org.apache.flink.types.StringValue.readString(StringValue.java:741) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:68) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:102) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:29) at org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:123) at org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:33) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:102) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:95) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:29) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:133) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:64) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1020) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781) -----Original Message----- From: Ufuk Celebi [mailto:[hidden email]] Sent: Mittwoch, 3. Juni 2015 10:33 To: [hidden email] Subject: Re: Buffer re-ordering problem This is a critical bug. - which version are you using? If snapshot, which commit? - what is your setup? Number of machines, datset etc? - is it reproducible? On Wednesday, June 3, 2015, Kruse, Sebastian <[hidden email]> wrote: > Hi everyone, > > I had some jobs running over the night and in two of them after about > half an hour the following exception occurred. Do you know why this happens? > > Thanks, > Sebastian > > tenem16.hpi.uni-potsdam.de > Error: java.lang.Exception: The data preparation for task 'CHAIN > GroupReduce (GroupReduce at > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > an(AllAtOnceTraversalStrategy.scala:56)) > -> Filter (Filter at > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > an(AllAtOnceTraversalStrategy.scala:68)) > -> FlatMap (FlatMap at > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Traver > salStrategy.scala:46)) > -> Map (Map at > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > , caused an error: Error obtaining the sorted input: Thread > 'SortMerger Reading Thread' terminated due to an exception: Buffer re-ordering: > expected buffer with sequence number 17841, but received 17842. > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > .java:471) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactT > ask.java:362) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger Reading Thread' terminated due to an exception: > Buffer > re-ordering: expected buffer with sequence number 17841, but received 17842. > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterat > or(UnilateralSortMerger.java:607) > at > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPac > tTask.java:1145) > at > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupRedu > ceDriver.java:94) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > .java:466) > ... 3 more > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > terminated due to an exception: Buffer re-ordering: expected buffer > with sequence number 17841, but received 17842. > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas > e.run(UnilateralSortMerger.java:784) > Caused by: > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException: > Buffer re-ordering: expected buffer with sequence number 17841, but > received 17842. > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChan > nel.onBuffer(RemoteInputChannel.java:253) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle > r.decodeBufferOrEvent(PartitionRequestClientHandler.java:279) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle > r.decodeMsg(PartitionRequestClientHandler.java:214) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle > r.channelRead(PartitionRequestClientHandler.java:158) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr > actChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac > tChannelHandlerContext.java:324) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMe > ssageDecoder.java:103) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr > actChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac > tChannelHandlerContext.java:324) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageD > ecoder.java:242) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr > actChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac > tChannelHandlerContext.java:324) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannel > Pipeline.java:847) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abstrac > tNioByteChannel.java:131) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java > :511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEven > tLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.jav > a:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadE > ventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) > > > --- > Sebastian Kruse > Doktorand am Fachbereich Information Systems Group > Hasso-Plattner-Institut an der Universität Potsdam > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240 > Amtsgericht Potsdam, HRB 12184 > Geschäftsführung: Prof. Dr. Christoph Meinel > > > |
Hey Sebastian,
would you mind sharing the code and dataset with me (privately would work fine). I want to try to reproduce this as well. – Ufuk On Wed, Jun 3, 2015 at 11:07 AM, Kruse, Sebastian <[hidden email]> wrote: > I am currently using 0.9-SNAPSHOT. All the non-jar files are from an older > build, but I recently manually updated the flink-dist.jar with commit > d163a817fa2e330e86384d0bbcd104f051a6fb48. > > Our setup consists of 10 workers and a master, all interconnected via a > switch (100 Mbit, I think). The data set is an NTriple file of about 8 GB, > however, intermediate datasets might be much larger. However, for smaller > datasets, I could not observe this problem, yet. > > Also, during the failure there are a lot of concurrent shuffles ongoing > [1,2]. Additionally, it might be interesting that in the affected jobs > either this exception occurs or another one that looks like a network > disruption. [3] So, it might well be, that our setup suffers from > occasional network errors, especially during high network load - but that’s > just a plain guess. > > Regarding the reproducibility, I can only say right now, that this error > occurred twice. I will re-run the jobs and see, if the errors can be > observed again and let you know. > > Cheers, > Sebastian > > [1] > https://owncloud.hpi.de/public.php?service=files&t=89dcba7ce63ed053331f5099cad64704 > [2] > https://owncloud.hpi.de/public.php?service=files&t=1b16d66d6b9e41c68bf598309d57b7b1 > [3] Stack trace: tenem18.hpi.uni-potsdam.de > Error: java.lang.Exception: The data preparation for task 'CHAIN > GroupReduce (GroupReduce at > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:56)) > -> Filter (Filter at > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:68)) > -> FlatMap (FlatMap at > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(TraversalStrategy.scala:46)) > -> Map (Map at > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > , caused an error: Error obtaining the sorted input: Thread 'SortMerger > Reading Thread' terminated due to an exception: null > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger Reading Thread' terminated due to an exception: null > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607) > at > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145) > at > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466) > ... 3 more > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > terminated due to an exception: null > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784) > Caused by: java.io.EOFException > at > org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:290) > at org.apache.flink.types.StringValue.readString(StringValue.java:741) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:68) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:102) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:29) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:123) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:33) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:102) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:95) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:29) > at > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:133) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:64) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1020) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781) > > > -----Original Message----- > From: Ufuk Celebi [mailto:[hidden email]] > Sent: Mittwoch, 3. Juni 2015 10:33 > To: [hidden email] > Subject: Re: Buffer re-ordering problem > > This is a critical bug. > > - which version are you using? If snapshot, which commit? > - what is your setup? Number of machines, datset etc? > - is it reproducible? > > On Wednesday, June 3, 2015, Kruse, Sebastian <[hidden email]> > wrote: > > > Hi everyone, > > > > I had some jobs running over the night and in two of them after about > > half an hour the following exception occurred. Do you know why this > happens? > > > > Thanks, > > Sebastian > > > > tenem16.hpi.uni-potsdam.de > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > GroupReduce (GroupReduce at > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > > an(AllAtOnceTraversalStrategy.scala:56)) > > -> Filter (Filter at > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > > an(AllAtOnceTraversalStrategy.scala:68)) > > -> FlatMap (FlatMap at > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Traver > > salStrategy.scala:46)) > > -> Map (Map at > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > , caused an error: Error obtaining the sorted input: Thread > > 'SortMerger Reading Thread' terminated due to an exception: Buffer > re-ordering: > > expected buffer with sequence number 17841, but received 17842. > > at > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > > .java:471) > > at > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactT > > ask.java:362) at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > Buffer > > re-ordering: expected buffer with sequence number 17841, but received > 17842. > > at > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterat > > or(UnilateralSortMerger.java:607) > > at > > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPac > > tTask.java:1145) > > at > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupRedu > > ceDriver.java:94) > > at > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > > .java:466) > > ... 3 more > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > terminated due to an exception: Buffer re-ordering: expected buffer > > with sequence number 17841, but received 17842. > > at > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas > > e.run(UnilateralSortMerger.java:784) > > Caused by: > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException: > > Buffer re-ordering: expected buffer with sequence number 17841, but > > received 17842. > > at > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChan > > nel.onBuffer(RemoteInputChannel.java:253) > > at > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle > > r.decodeBufferOrEvent(PartitionRequestClientHandler.java:279) > > at > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle > > r.decodeMsg(PartitionRequestClientHandler.java:214) > > at > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle > > r.channelRead(PartitionRequestClientHandler.java:158) > > at > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr > > actChannelHandlerContext.java:339) > > at > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac > > tChannelHandlerContext.java:324) > > at > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMe > > ssageDecoder.java:103) > > at > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr > > actChannelHandlerContext.java:339) > > at > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac > > tChannelHandlerContext.java:324) > > at > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageD > > ecoder.java:242) > > at > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr > > actChannelHandlerContext.java:339) > > at > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac > > tChannelHandlerContext.java:324) > > at > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannel > > Pipeline.java:847) > > at > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abstrac > > tNioByteChannel.java:131) > > at > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java > > :511) > > at > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEven > > tLoop.java:468) > > at > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.jav > > a:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > > at > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadE > > ventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) > > > > > > --- > > Sebastian Kruse > > Doktorand am Fachbereich Information Systems Group > > Hasso-Plattner-Institut an der Universität Potsdam > > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240 > > Amtsgericht Potsdam, HRB 12184 > > Geschäftsführung: Prof. Dr. Christoph Meinel > > > > > > > |
Hi Sebastian!
The first error that you report looks could also be a lost buffer, rather than a reordering. The second error that you post seems a serialization issue. Both can be consequences of a network stream corruption. Would be good to figure out why the netty event loop does not encode/decode this properly in your case. BTW: I would assume we can virtually rule out TCP/kernel bugs as sources for the corruption, unless you have this setup here: http://arstechnica.com/information-technology/2015/05/the-discovery-of-apache-zookeepers-poison-packet/ Stephan On Wed, Jun 3, 2015 at 7:51 PM, Ufuk Celebi <[hidden email]> wrote: > Hey Sebastian, > > would you mind sharing the code and dataset with me (privately would work > fine). I want to try to reproduce this as well. > > – Ufuk > > On Wed, Jun 3, 2015 at 11:07 AM, Kruse, Sebastian <[hidden email]> > wrote: > > > I am currently using 0.9-SNAPSHOT. All the non-jar files are from an > older > > build, but I recently manually updated the flink-dist.jar with commit > > d163a817fa2e330e86384d0bbcd104f051a6fb48. > > > > Our setup consists of 10 workers and a master, all interconnected via a > > switch (100 Mbit, I think). The data set is an NTriple file of about 8 > GB, > > however, intermediate datasets might be much larger. However, for smaller > > datasets, I could not observe this problem, yet. > > > > Also, during the failure there are a lot of concurrent shuffles ongoing > > [1,2]. Additionally, it might be interesting that in the affected jobs > > either this exception occurs or another one that looks like a network > > disruption. [3] So, it might well be, that our setup suffers from > > occasional network errors, especially during high network load - but > that’s > > just a plain guess. > > > > Regarding the reproducibility, I can only say right now, that this error > > occurred twice. I will re-run the jobs and see, if the errors can be > > observed again and let you know. > > > > Cheers, > > Sebastian > > > > [1] > > > https://owncloud.hpi.de/public.php?service=files&t=89dcba7ce63ed053331f5099cad64704 > > [2] > > > https://owncloud.hpi.de/public.php?service=files&t=1b16d66d6b9e41c68bf598309d57b7b1 > > [3] Stack trace: tenem18.hpi.uni-potsdam.de > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > GroupReduce (GroupReduce at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:56)) > > -> Filter (Filter at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:68)) > > -> FlatMap (FlatMap at > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(TraversalStrategy.scala:46)) > > -> Map (Map at > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > , caused an error: Error obtaining the sorted input: Thread 'SortMerger > > Reading Thread' terminated due to an exception: null > > at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) > > at > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > > Thread 'SortMerger Reading Thread' terminated due to an exception: null > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607) > > at > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145) > > at > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) > > at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466) > > ... 3 more > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > terminated due to an exception: null > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784) > > Caused by: java.io.EOFException > > at > > > org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:290) > > at org.apache.flink.types.StringValue.readString(StringValue.java:741) > > at > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:68) > > at > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:102) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:29) > > at > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:123) > > at > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:33) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:102) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:95) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:29) > > at > > > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > > at > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:133) > > at > > > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:64) > > at > > > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > > at > > > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1020) > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781) > > > > > > -----Original Message----- > > From: Ufuk Celebi [mailto:[hidden email]] > > Sent: Mittwoch, 3. Juni 2015 10:33 > > To: [hidden email] > > Subject: Re: Buffer re-ordering problem > > > > This is a critical bug. > > > > - which version are you using? If snapshot, which commit? > > - what is your setup? Number of machines, datset etc? > > - is it reproducible? > > > > On Wednesday, June 3, 2015, Kruse, Sebastian <[hidden email]> > > wrote: > > > > > Hi everyone, > > > > > > I had some jobs running over the night and in two of them after about > > > half an hour the following exception occurred. Do you know why this > > happens? > > > > > > Thanks, > > > Sebastian > > > > > > tenem16.hpi.uni-potsdam.de > > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > > GroupReduce (GroupReduce at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > > > an(AllAtOnceTraversalStrategy.scala:56)) > > > -> Filter (Filter at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > > > an(AllAtOnceTraversalStrategy.scala:68)) > > > -> FlatMap (FlatMap at > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Traver > > > salStrategy.scala:46)) > > > -> Map (Map at > > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > > , caused an error: Error obtaining the sorted input: Thread > > > 'SortMerger Reading Thread' terminated due to an exception: Buffer > > re-ordering: > > > expected buffer with sequence number 17841, but received 17842. > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > > > .java:471) > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactT > > > ask.java:362) at > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > at java.lang.Thread.run(Thread.java:745) > > > Caused by: java.lang.RuntimeException: Error obtaining the sorted > input: > > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > > Buffer > > > re-ordering: expected buffer with sequence number 17841, but received > > 17842. > > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterat > > > or(UnilateralSortMerger.java:607) > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPac > > > tTask.java:1145) > > > at > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupRedu > > > ceDriver.java:94) > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > > > .java:466) > > > ... 3 more > > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > > terminated due to an exception: Buffer re-ordering: expected buffer > > > with sequence number 17841, but received 17842. > > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas > > > e.run(UnilateralSortMerger.java:784) > > > Caused by: > > > > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException: > > > Buffer re-ordering: expected buffer with sequence number 17841, but > > > received 17842. > > > at > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChan > > > nel.onBuffer(RemoteInputChannel.java:253) > > > at > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle > > > r.decodeBufferOrEvent(PartitionRequestClientHandler.java:279) > > > at > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle > > > r.decodeMsg(PartitionRequestClientHandler.java:214) > > > at > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle > > > r.channelRead(PartitionRequestClientHandler.java:158) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr > > > actChannelHandlerContext.java:339) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac > > > tChannelHandlerContext.java:324) > > > at > > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMe > > > ssageDecoder.java:103) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr > > > actChannelHandlerContext.java:339) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac > > > tChannelHandlerContext.java:324) > > > at > > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageD > > > ecoder.java:242) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr > > > actChannelHandlerContext.java:339) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac > > > tChannelHandlerContext.java:324) > > > at > > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannel > > > Pipeline.java:847) > > > at > > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abstrac > > > tNioByteChannel.java:131) > > > at > > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java > > > :511) > > > at > > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEven > > > tLoop.java:468) > > > at > > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.jav > > > a:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > > > at > > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadE > > > ventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) > > > > > > > > > --- > > > Sebastian Kruse > > > Doktorand am Fachbereich Information Systems Group > > > Hasso-Plattner-Institut an der Universität Potsdam > > > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240 > > > Amtsgericht Potsdam, HRB 12184 > > > Geschäftsführung: Prof. Dr. Christoph Meinel > > > > > > > > > > > > |
Thanks for your feedback. I am neither running IPSec nor the aesni-intel module.
So far, I could not reproduce the reordering issue. I also have detected that my code might have created String objects with invalid UTF16 content in exactly those jobs that suffered from the reordering. I wanted to use binary data as a key, so I put it into a char array and wrapped it in a String, hoping that would do the trick. Maybe, this has caused some error in the String (de-)serialization with further consequences, e.g, skipping of a buffer. Ever since I have encoded the binary data in Base64, the reordering error did not pop up again. I will do some more runs to verify this. However, the other problem with the serialization exceptions should not be affected of this. To see if network stream corruption is an issue, I also perform some runs now on a larger single machine with a single task manager, so that no network communication is involved. Serialization should take place anyway, right? Is there a way, to run a second task manager on the same machine within the same OS? Cheers, Sebastian -----Original Message----- From: [hidden email] [mailto:[hidden email]] On Behalf Of Stephan Ewen Sent: Mittwoch, 3. Juni 2015 20:14 To: [hidden email] Subject: Re: Buffer re-ordering problem Hi Sebastian! The first error that you report looks could also be a lost buffer, rather than a reordering. The second error that you post seems a serialization issue. Both can be consequences of a network stream corruption. Would be good to figure out why the netty event loop does not encode/decode this properly in your case. BTW: I would assume we can virtually rule out TCP/kernel bugs as sources for the corruption, unless you have this setup here: http://arstechnica.com/information-technology/2015/05/the-discovery-of-apache-zookeepers-poison-packet/ Stephan On Wed, Jun 3, 2015 at 7:51 PM, Ufuk Celebi <[hidden email]> wrote: > Hey Sebastian, > > would you mind sharing the code and dataset with me (privately would > work fine). I want to try to reproduce this as well. > > – Ufuk > > On Wed, Jun 3, 2015 at 11:07 AM, Kruse, Sebastian > <[hidden email]> > wrote: > > > I am currently using 0.9-SNAPSHOT. All the non-jar files are from an > older > > build, but I recently manually updated the flink-dist.jar with > > commit d163a817fa2e330e86384d0bbcd104f051a6fb48. > > > > Our setup consists of 10 workers and a master, all interconnected > > via a switch (100 Mbit, I think). The data set is an NTriple file of > > about 8 > GB, > > however, intermediate datasets might be much larger. However, for > > smaller datasets, I could not observe this problem, yet. > > > > Also, during the failure there are a lot of concurrent shuffles > > ongoing [1,2]. Additionally, it might be interesting that in the > > affected jobs either this exception occurs or another one that looks > > like a network disruption. [3] So, it might well be, that our setup > > suffers from occasional network errors, especially during high > > network load - but > that’s > > just a plain guess. > > > > Regarding the reproducibility, I can only say right now, that this > > error occurred twice. I will re-run the jobs and see, if the errors > > can be observed again and let you know. > > > > Cheers, > > Sebastian > > > > [1] > > > https://owncloud.hpi.de/public.php?service=files&t=89dcba7ce63ed053331 > f5099cad64704 > > [2] > > > https://owncloud.hpi.de/public.php?service=files&t=1b16d66d6b9e41c68bf > 598309d57b7b1 > > [3] Stack trace: tenem18.hpi.uni-potsdam.de > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > GroupReduce (GroupReduce at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > an(AllAtOnceTraversalStrategy.scala:56)) > > -> Filter (Filter at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > an(AllAtOnceTraversalStrategy.scala:68)) > > -> FlatMap (FlatMap at > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Traver > salStrategy.scala:46)) > > -> Map (Map at > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > , caused an error: Error obtaining the sorted input: Thread > > 'SortMerger Reading Thread' terminated due to an exception: null at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > .java:471) > > at > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactT > ask.java:362) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > null at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterat > or(UnilateralSortMerger.java:607) > > at > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPac > tTask.java:1145) > > at > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupRedu > ceDriver.java:94) > > at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > .java:466) > > ... 3 more > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > terminated due to an exception: null at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas > e.run(UnilateralSortMerger.java:784) > > Caused by: java.io.EOFException > > at > > > org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(D > ataInputDeserializer.java:290) > > at > > org.apache.flink.types.StringValue.readString(StringValue.java:741) > > at > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ > e(StringSerializer.java:68) > > at > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ > e(StringSerializer.java:28) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > aseClassSerializer.scala:102) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > aseClassSerializer.scala:29) > > at > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.dese > rialize(GenericArraySerializer.java:123) > > at > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.dese > rialize(GenericArraySerializer.java:33) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > aseClassSerializer.scala:102) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > aseClassSerializer.scala:95) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > aseClassSerializer.scala:29) > > at > > > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read( > ReusingDeserializationDelegate.java:57) > > at > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptive > SpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecor > dDeserializer.java:133) > > at > > > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.ge > tNextRecord(AbstractRecordReader.java:64) > > at > > > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.nex > t(MutableRecordReader.java:34) > > at > > > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIter > ator.java:59) > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingTh > read.go(UnilateralSortMerger.java:1020) > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas > e.run(UnilateralSortMerger.java:781) > > > > > > -----Original Message----- > > From: Ufuk Celebi [mailto:[hidden email]] > > Sent: Mittwoch, 3. Juni 2015 10:33 > > To: [hidden email] > > Subject: Re: Buffer re-ordering problem > > > > This is a critical bug. > > > > - which version are you using? If snapshot, which commit? > > - what is your setup? Number of machines, datset etc? > > - is it reproducible? > > > > On Wednesday, June 3, 2015, Kruse, Sebastian > > <[hidden email]> > > wrote: > > > > > Hi everyone, > > > > > > I had some jobs running over the night and in two of them after > > > about half an hour the following exception occurred. Do you know > > > why this > > happens? > > > > > > Thanks, > > > Sebastian > > > > > > tenem16.hpi.uni-potsdam.de > > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > > GroupReduce (GroupReduce at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFli > > > nkPl > > > an(AllAtOnceTraversalStrategy.scala:56)) > > > -> Filter (Filter at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFli > > > nkPl > > > an(AllAtOnceTraversalStrategy.scala:68)) > > > -> FlatMap (FlatMap at > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Tr > > > aver > > > salStrategy.scala:46)) > > > -> Map (Map at > > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > > , caused an error: Error obtaining the sorted input: Thread > > > 'SortMerger Reading Thread' terminated due to an exception: Buffer > > re-ordering: > > > expected buffer with sequence number 17841, but received 17842. > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPact > > > Task > > > .java:471) > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularP > > > actT > > > ask.java:362) at > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > at java.lang.Thread.run(Thread.java:745) > > > Caused by: java.lang.RuntimeException: Error obtaining the sorted > input: > > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > > Buffer > > > re-ordering: expected buffer with sequence number 17841, but > > > received > > 17842. > > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIt > > > erat > > > or(UnilateralSortMerger.java:607) > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(Regula > > > rPac > > > tTask.java:1145) > > > at > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(Group > > > Redu > > > ceDriver.java:94) > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPact > > > Task > > > .java:466) > > > ... 3 more > > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > > terminated due to an exception: Buffer re-ordering: expected > > > buffer with sequence number 17841, but received 17842. > > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$Threa > > > dBas > > > e.run(UnilateralSortMerger.java:784) > > > Caused by: > > > > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException: > > > Buffer re-ordering: expected buffer with sequence number 17841, > > > but received 17842. > > > at > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInput > > > Chan > > > nel.onBuffer(RemoteInputChannel.java:253) > > > at > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHa > > > ndle > > > r.decodeBufferOrEvent(PartitionRequestClientHandler.java:279) > > > at > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHa > > > ndle > > > r.decodeMsg(PartitionRequestClientHandler.java:214) > > > at > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHa > > > ndle > > > r.channelRead(PartitionRequestClientHandler.java:158) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A > > > bstr > > > actChannelHandlerContext.java:339) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abs > > > trac > > > tChannelHandlerContext.java:324) > > > at > > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(Message > > > ToMe > > > ssageDecoder.java:103) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A > > > bstr > > > actChannelHandlerContext.java:339) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abs > > > trac > > > tChannelHandlerContext.java:324) > > > at > > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMess > > > ageD > > > ecoder.java:242) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A > > > bstr > > > actChannelHandlerContext.java:339) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abs > > > trac > > > tChannelHandlerContext.java:324) > > > at > > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultCha > > > nnel > > > Pipeline.java:847) > > > at > > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abs > > > trac > > > tNioByteChannel.java:131) > > > at > > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop. > > > java > > > :511) > > > at > > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(Nio > > > Even > > > tLoop.java:468) > > > at > > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop > > > .jav > > > a:382) at > > > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > > > at > > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThr > > > eadE > > > ventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) > > > > > > > > > --- > > > Sebastian Kruse > > > Doktorand am Fachbereich Information Systems Group > > > Hasso-Plattner-Institut an der Universität Potsdam > > > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240 > > > Amtsgericht Potsdam, HRB 12184 > > > Geschäftsführung: Prof. Dr. Christoph Meinel > > > > > > > > > > > > |
Thanks for helping us debug this.
You can start many taskmanagers in one JVM, by using the LocalMiniCluster. Have a look at this (manually triggered) test, which runs 100 TaskManagers in one JVM: https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java The data type serialization should have no impact on the network stream, as that is buffer-oriented and has no understanding or notion of the types inside. That transition comes later in the readers/writers. It is never impossible for a bug to be in there such that the serialization affects the buffer transport. Thanks for the pointer, we will definitely look into that. On Thu, Jun 4, 2015 at 1:07 PM, Kruse, Sebastian <[hidden email]> wrote: > Thanks for your feedback. I am neither running IPSec nor the aesni-intel > module. > > So far, I could not reproduce the reordering issue. I also have detected > that my code might have created String objects with invalid UTF16 content > in exactly those jobs that suffered from the reordering. I wanted to use > binary data as a key, so I put it into a char array and wrapped it in a > String, hoping that would do the trick. Maybe, this has caused some error > in the String (de-)serialization with further consequences, e.g, skipping > of a buffer. Ever since I have encoded the binary data in Base64, the > reordering error did not pop up again. I will do some more runs to verify > this. > > However, the other problem with the serialization exceptions should not be > affected of this. To see if network stream corruption is an issue, I also > perform some runs now on a larger single machine with a single task > manager, so that no network communication is involved. Serialization should > take place anyway, right? Is there a way, to run a second task manager on > the same machine within the same OS? > > Cheers, > Sebastian > > -----Original Message----- > From: [hidden email] [mailto:[hidden email]] On Behalf Of > Stephan Ewen > Sent: Mittwoch, 3. Juni 2015 20:14 > To: [hidden email] > Subject: Re: Buffer re-ordering problem > > Hi Sebastian! > > The first error that you report looks could also be a lost buffer, rather > than a reordering. > The second error that you post seems a serialization issue. > Both can be consequences of a network stream corruption. > > Would be good to figure out why the netty event loop does not > encode/decode this properly in your case. > > BTW: I would assume we can virtually rule out TCP/kernel bugs as sources > for the corruption, unless you have this setup here: > > http://arstechnica.com/information-technology/2015/05/the-discovery-of-apache-zookeepers-poison-packet/ > > Stephan > > > On Wed, Jun 3, 2015 at 7:51 PM, Ufuk Celebi <[hidden email]> wrote: > > > Hey Sebastian, > > > > would you mind sharing the code and dataset with me (privately would > > work fine). I want to try to reproduce this as well. > > > > – Ufuk > > > > On Wed, Jun 3, 2015 at 11:07 AM, Kruse, Sebastian > > <[hidden email]> > > wrote: > > > > > I am currently using 0.9-SNAPSHOT. All the non-jar files are from an > > older > > > build, but I recently manually updated the flink-dist.jar with > > > commit d163a817fa2e330e86384d0bbcd104f051a6fb48. > > > > > > Our setup consists of 10 workers and a master, all interconnected > > > via a switch (100 Mbit, I think). The data set is an NTriple file of > > > about 8 > > GB, > > > however, intermediate datasets might be much larger. However, for > > > smaller datasets, I could not observe this problem, yet. > > > > > > Also, during the failure there are a lot of concurrent shuffles > > > ongoing [1,2]. Additionally, it might be interesting that in the > > > affected jobs either this exception occurs or another one that looks > > > like a network disruption. [3] So, it might well be, that our setup > > > suffers from occasional network errors, especially during high > > > network load - but > > that’s > > > just a plain guess. > > > > > > Regarding the reproducibility, I can only say right now, that this > > > error occurred twice. I will re-run the jobs and see, if the errors > > > can be observed again and let you know. > > > > > > Cheers, > > > Sebastian > > > > > > [1] > > > > > https://owncloud.hpi.de/public.php?service=files&t=89dcba7ce63ed053331 > > f5099cad64704 > > > [2] > > > > > https://owncloud.hpi.de/public.php?service=files&t=1b16d66d6b9e41c68bf > > 598309d57b7b1 > > > [3] Stack trace: tenem18.hpi.uni-potsdam.de > > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > > GroupReduce (GroupReduce at > > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > > an(AllAtOnceTraversalStrategy.scala:56)) > > > -> Filter (Filter at > > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > > an(AllAtOnceTraversalStrategy.scala:68)) > > > -> FlatMap (FlatMap at > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Traver > > salStrategy.scala:46)) > > > -> Map (Map at > > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > > , caused an error: Error obtaining the sorted input: Thread > > > 'SortMerger Reading Thread' terminated due to an exception: null at > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > > .java:471) > > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactT > > ask.java:362) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > at java.lang.Thread.run(Thread.java:745) > > > Caused by: java.lang.RuntimeException: Error obtaining the sorted > input: > > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > > null at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterat > > or(UnilateralSortMerger.java:607) > > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPac > > tTask.java:1145) > > > at > > > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupRedu > > ceDriver.java:94) > > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > > .java:466) > > > ... 3 more > > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > > terminated due to an exception: null at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas > > e.run(UnilateralSortMerger.java:784) > > > Caused by: java.io.EOFException > > > at > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(D > > ataInputDeserializer.java:290) > > > at > > > org.apache.flink.types.StringValue.readString(StringValue.java:741) > > > at > > > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ > > e(StringSerializer.java:68) > > > at > > > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ > > e(StringSerializer.java:28) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > > aseClassSerializer.scala:102) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > > aseClassSerializer.scala:29) > > > at > > > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.dese > > rialize(GenericArraySerializer.java:123) > > > at > > > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.dese > > rialize(GenericArraySerializer.java:33) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > > aseClassSerializer.scala:102) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > > aseClassSerializer.scala:95) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > > aseClassSerializer.scala:29) > > > at > > > > > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read( > > ReusingDeserializationDelegate.java:57) > > > at > > > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptive > > SpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecor > > dDeserializer.java:133) > > > at > > > > > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.ge > > tNextRecord(AbstractRecordReader.java:64) > > > at > > > > > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.nex > > t(MutableRecordReader.java:34) > > > at > > > > > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIter > > ator.java:59) > > > at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingTh > > read.go(UnilateralSortMerger.java:1020) > > > at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas > > e.run(UnilateralSortMerger.java:781) > > > > > > > > > -----Original Message----- > > > From: Ufuk Celebi [mailto:[hidden email]] > > > Sent: Mittwoch, 3. Juni 2015 10:33 > > > To: [hidden email] > > > Subject: Re: Buffer re-ordering problem > > > > > > This is a critical bug. > > > > > > - which version are you using? If snapshot, which commit? > > > - what is your setup? Number of machines, datset etc? > > > - is it reproducible? > > > > > > On Wednesday, June 3, 2015, Kruse, Sebastian > > > <[hidden email]> > > > wrote: > > > > > > > Hi everyone, > > > > > > > > I had some jobs running over the night and in two of them after > > > > about half an hour the following exception occurred. Do you know > > > > why this > > > happens? > > > > > > > > Thanks, > > > > Sebastian > > > > > > > > tenem16.hpi.uni-potsdam.de > > > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > > > GroupReduce (GroupReduce at > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFli > > > > nkPl > > > > an(AllAtOnceTraversalStrategy.scala:56)) > > > > -> Filter (Filter at > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFli > > > > nkPl > > > > an(AllAtOnceTraversalStrategy.scala:68)) > > > > -> FlatMap (FlatMap at > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Tr > > > > aver > > > > salStrategy.scala:46)) > > > > -> Map (Map at > > > > > > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > > > , caused an error: Error obtaining the sorted input: Thread > > > > 'SortMerger Reading Thread' terminated due to an exception: Buffer > > > re-ordering: > > > > expected buffer with sequence number 17841, but received 17842. > > > > at > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPact > > > > Task > > > > .java:471) > > > > at > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularP > > > > actT > > > > ask.java:362) at > > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > > at java.lang.Thread.run(Thread.java:745) > > > > Caused by: java.lang.RuntimeException: Error obtaining the sorted > > input: > > > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > > > Buffer > > > > re-ordering: expected buffer with sequence number 17841, but > > > > received > > > 17842. > > > > at > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIt > > > > erat > > > > or(UnilateralSortMerger.java:607) > > > > at > > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(Regula > > > > rPac > > > > tTask.java:1145) > > > > at > > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(Group > > > > Redu > > > > ceDriver.java:94) > > > > at > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPact > > > > Task > > > > .java:466) > > > > ... 3 more > > > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > > > terminated due to an exception: Buffer re-ordering: expected > > > > buffer with sequence number 17841, but received 17842. > > > > at > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$Threa > > > > dBas > > > > e.run(UnilateralSortMerger.java:784) > > > > Caused by: > > > > > > > > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException: > > > > Buffer re-ordering: expected buffer with sequence number 17841, > > > > but received 17842. > > > > at > > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInput > > > > Chan > > > > nel.onBuffer(RemoteInputChannel.java:253) > > > > at > > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHa > > > > ndle > > > > r.decodeBufferOrEvent(PartitionRequestClientHandler.java:279) > > > > at > > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHa > > > > ndle > > > > r.decodeMsg(PartitionRequestClientHandler.java:214) > > > > at > > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHa > > > > ndle > > > > r.channelRead(PartitionRequestClientHandler.java:158) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A > > > > bstr > > > > actChannelHandlerContext.java:339) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abs > > > > trac > > > > tChannelHandlerContext.java:324) > > > > at > > > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(Message > > > > ToMe > > > > ssageDecoder.java:103) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A > > > > bstr > > > > actChannelHandlerContext.java:339) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abs > > > > trac > > > > tChannelHandlerContext.java:324) > > > > at > > > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMess > > > > ageD > > > > ecoder.java:242) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A > > > > bstr > > > > actChannelHandlerContext.java:339) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abs > > > > trac > > > > tChannelHandlerContext.java:324) > > > > at > > > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultCha > > > > nnel > > > > Pipeline.java:847) > > > > at > > > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abs > > > > trac > > > > tNioByteChannel.java:131) > > > > at > > > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop. > > > > java > > > > :511) > > > > at > > > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(Nio > > > > Even > > > > tLoop.java:468) > > > > at > > > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop > > > > .jav > > > > a:382) at > > > > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > > > > at > > > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThr > > > > eadE > > > > ventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) > > > > > > > > > > > > --- > > > > Sebastian Kruse > > > > Doktorand am Fachbereich Information Systems Group > > > > Hasso-Plattner-Institut an der Universität Potsdam > > > > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240 > > > > Amtsgericht Potsdam, HRB 12184 > > > > Geschäftsführung: Prof. Dr. Christoph Meinel > > > > > > > > > > > > > > > > > > |
Hi everyone,
I just wanted to let you know, that after quite a few more runs on different machines the buffer reordering problem did not happen to appear again. I don't know what caused the problem, maybe it really was due to the potentially illegal UTF-16 code within strings. If the error should ever happen again, I will let you know but otherwise it might just have been some strange edge-case side effect. :) Cheers, Sebastian -----Original Message----- From: [hidden email] [mailto:[hidden email]] On Behalf Of Stephan Ewen Sent: Donnerstag, 4. Juni 2015 13:13 To: [hidden email] Subject: Re: Buffer re-ordering problem Thanks for helping us debug this. You can start many taskmanagers in one JVM, by using the LocalMiniCluster. Have a look at this (manually triggered) test, which runs 100 TaskManagers in one JVM: https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java The data type serialization should have no impact on the network stream, as that is buffer-oriented and has no understanding or notion of the types inside. That transition comes later in the readers/writers. It is never impossible for a bug to be in there such that the serialization affects the buffer transport. Thanks for the pointer, we will definitely look into that. On Thu, Jun 4, 2015 at 1:07 PM, Kruse, Sebastian <[hidden email]> wrote: > Thanks for your feedback. I am neither running IPSec nor the > aesni-intel module. > > So far, I could not reproduce the reordering issue. I also have > detected that my code might have created String objects with invalid > UTF16 content in exactly those jobs that suffered from the reordering. > I wanted to use binary data as a key, so I put it into a char array > and wrapped it in a String, hoping that would do the trick. Maybe, > this has caused some error in the String (de-)serialization with > further consequences, e.g, skipping of a buffer. Ever since I have > encoded the binary data in Base64, the reordering error did not pop up > again. I will do some more runs to verify this. > > However, the other problem with the serialization exceptions should > not be affected of this. To see if network stream corruption is an > issue, I also perform some runs now on a larger single machine with a > single task manager, so that no network communication is involved. > Serialization should take place anyway, right? Is there a way, to run > a second task manager on the same machine within the same OS? > > Cheers, > Sebastian > > -----Original Message----- > From: [hidden email] [mailto:[hidden email]] On Behalf > Of Stephan Ewen > Sent: Mittwoch, 3. Juni 2015 20:14 > To: [hidden email] > Subject: Re: Buffer re-ordering problem > > Hi Sebastian! > > The first error that you report looks could also be a lost buffer, > rather than a reordering. > The second error that you post seems a serialization issue. > Both can be consequences of a network stream corruption. > > Would be good to figure out why the netty event loop does not > encode/decode this properly in your case. > > BTW: I would assume we can virtually rule out TCP/kernel bugs as > sources for the corruption, unless you have this setup here: > > http://arstechnica.com/information-technology/2015/05/the-discovery-of > -apache-zookeepers-poison-packet/ > > Stephan > > > On Wed, Jun 3, 2015 at 7:51 PM, Ufuk Celebi <[hidden email]> wrote: > > > Hey Sebastian, > > > > would you mind sharing the code and dataset with me (privately would > > work fine). I want to try to reproduce this as well. > > > > – Ufuk > > > > On Wed, Jun 3, 2015 at 11:07 AM, Kruse, Sebastian > > <[hidden email]> > > wrote: > > > > > I am currently using 0.9-SNAPSHOT. All the non-jar files are from > > > an > > older > > > build, but I recently manually updated the flink-dist.jar with > > > commit d163a817fa2e330e86384d0bbcd104f051a6fb48. > > > > > > Our setup consists of 10 workers and a master, all interconnected > > > via a switch (100 Mbit, I think). The data set is an NTriple file > > > of about 8 > > GB, > > > however, intermediate datasets might be much larger. However, for > > > smaller datasets, I could not observe this problem, yet. > > > > > > Also, during the failure there are a lot of concurrent shuffles > > > ongoing [1,2]. Additionally, it might be interesting that in the > > > affected jobs either this exception occurs or another one that > > > looks like a network disruption. [3] So, it might well be, that > > > our setup suffers from occasional network errors, especially > > > during high network load - but > > that’s > > > just a plain guess. > > > > > > Regarding the reproducibility, I can only say right now, that this > > > error occurred twice. I will re-run the jobs and see, if the > > > errors can be observed again and let you know. > > > > > > Cheers, > > > Sebastian > > > > > > [1] > > > > > https://owncloud.hpi.de/public.php?service=files&t=89dcba7ce63ed0533 > > 31 > > f5099cad64704 > > > [2] > > > > > https://owncloud.hpi.de/public.php?service=files&t=1b16d66d6b9e41c68 > > bf > > 598309d57b7b1 > > > [3] Stack trace: tenem18.hpi.uni-potsdam.de > > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > > GroupReduce (GroupReduce at > > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlink > > Pl > > an(AllAtOnceTraversalStrategy.scala:56)) > > > -> Filter (Filter at > > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlink > > Pl > > an(AllAtOnceTraversalStrategy.scala:68)) > > > -> FlatMap (FlatMap at > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Trav > > er > > salStrategy.scala:46)) > > > -> Map (Map at > > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > > , caused an error: Error obtaining the sorted input: Thread > > > 'SortMerger Reading Thread' terminated due to an exception: null > > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTa > > sk > > .java:471) > > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPac > > tT > > ask.java:362) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > at java.lang.Thread.run(Thread.java:745) > > > Caused by: java.lang.RuntimeException: Error obtaining the sorted > input: > > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > > null at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIter > > at > > or(UnilateralSortMerger.java:607) > > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularP > > ac > > tTask.java:1145) > > > at > > > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupRe > > du > > ceDriver.java:94) > > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTa > > sk > > .java:466) > > > ... 3 more > > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > > terminated due to an exception: null at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadB > > as > > e.run(UnilateralSortMerger.java:784) > > > Caused by: java.io.EOFException > > > at > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte > > (D > > ataInputDeserializer.java:290) > > > at > > > org.apache.flink.types.StringValue.readString(StringValue.java:741 > > > ) > > > at > > > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserial > > iz > > e(StringSerializer.java:68) > > > at > > > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserial > > iz > > e(StringSerializer.java:28) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize > > (C > > aseClassSerializer.scala:102) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize > > (C > > aseClassSerializer.scala:29) > > > at > > > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.de > > se > > rialize(GenericArraySerializer.java:123) > > > at > > > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.de > > se > > rialize(GenericArraySerializer.java:33) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize > > (C > > aseClassSerializer.scala:102) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize > > (C > > aseClassSerializer.scala:95) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize > > (C > > aseClassSerializer.scala:29) > > > at > > > > > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.rea > > d( > > ReusingDeserializationDelegate.java:57) > > > at > > > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti > > ve > > SpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRec > > or > > dDeserializer.java:133) > > > at > > > > > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader. > > ge > > tNextRecord(AbstractRecordReader.java:64) > > > at > > > > > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.n > > ex > > t(MutableRecordReader.java:34) > > > at > > > > > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIt > > er > > ator.java:59) > > > at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$Reading > > Th > > read.go(UnilateralSortMerger.java:1020) > > > at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadB > > as > > e.run(UnilateralSortMerger.java:781) > > > > > > > > > -----Original Message----- > > > From: Ufuk Celebi [mailto:[hidden email]] > > > Sent: Mittwoch, 3. Juni 2015 10:33 > > > To: [hidden email] > > > Subject: Re: Buffer re-ordering problem > > > > > > This is a critical bug. > > > > > > - which version are you using? If snapshot, which commit? > > > - what is your setup? Number of machines, datset etc? > > > - is it reproducible? > > > > > > On Wednesday, June 3, 2015, Kruse, Sebastian > > > <[hidden email]> > > > wrote: > > > > > > > Hi everyone, > > > > > > > > I had some jobs running over the night and in two of them after > > > > about half an hour the following exception occurred. Do you know > > > > why this > > > happens? > > > > > > > > Thanks, > > > > Sebastian > > > > > > > > tenem16.hpi.uni-potsdam.de > > > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > > > GroupReduce (GroupReduce at > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceF > > > > li > > > > nkPl > > > > an(AllAtOnceTraversalStrategy.scala:56)) > > > > -> Filter (Filter at > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceF > > > > li > > > > nkPl > > > > an(AllAtOnceTraversalStrategy.scala:68)) > > > > -> FlatMap (FlatMap at > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply( > > > > Tr > > > > aver > > > > salStrategy.scala:46)) > > > > -> Map (Map at > > > > > > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > > > , caused an error: Error obtaining the sorted input: Thread > > > > 'SortMerger Reading Thread' terminated due to an exception: > > > > Buffer > > > re-ordering: > > > > expected buffer with sequence number 17841, but received 17842. > > > > at > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPa > > > > ct > > > > Task > > > > .java:471) > > > > at > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(Regula > > > > rP > > > > actT > > > > ask.java:362) at > > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > > at java.lang.Thread.run(Thread.java:745) > > > > Caused by: java.lang.RuntimeException: Error obtaining the > > > > sorted > > input: > > > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > > > Buffer > > > > re-ordering: expected buffer with sequence number 17841, but > > > > received > > > 17842. > > > > at > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.get > > > > It > > > > erat > > > > or(UnilateralSortMerger.java:607) at > > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(Regu > > > > la > > > > rPac > > > > tTask.java:1145) > > > > at > > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(Gro > > > > up > > > > Redu > > > > ceDriver.java:94) > > > > at > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPa > > > > ct > > > > Task > > > > .java:466) > > > > ... 3 more > > > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > > > terminated due to an exception: Buffer re-ordering: expected > > > > buffer with sequence number 17841, but received 17842. > > > > at > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$Thr > > > > ea > > > > dBas > > > > e.run(UnilateralSortMerger.java:784) > > > > Caused by: > > > > > > > > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException: > > > > Buffer re-ordering: expected buffer with sequence number 17841, > > > > but received 17842. > > > > at > > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInp > > > > ut > > > > Chan > > > > nel.onBuffer(RemoteInputChannel.java:253) > > > > at > > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClient > > > > Ha > > > > ndle > > > > r.decodeBufferOrEvent(PartitionRequestClientHandler.java:279) > > > > at > > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClient > > > > Ha > > > > ndle > > > > r.decodeMsg(PartitionRequestClientHandler.java:214) > > > > at > > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClient > > > > Ha > > > > ndle > > > > r.channelRead(PartitionRequestClientHandler.java:158) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead > > > > (A > > > > bstr > > > > actChannelHandlerContext.java:339) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(A > > > > bs > > > > trac > > > > tChannelHandlerContext.java:324) at > > > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(Messa > > > > ge > > > > ToMe > > > > ssageDecoder.java:103) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead > > > > (A > > > > bstr > > > > actChannelHandlerContext.java:339) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(A > > > > bs > > > > trac > > > > tChannelHandlerContext.java:324) at > > > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMe > > > > ss > > > > ageD > > > > ecoder.java:242) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead > > > > (A > > > > bstr > > > > actChannelHandlerContext.java:339) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(A > > > > bs > > > > trac > > > > tChannelHandlerContext.java:324) at > > > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultC > > > > ha > > > > nnel > > > > Pipeline.java:847) > > > > at > > > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(A > > > > bs > > > > trac > > > > tNioByteChannel.java:131) > > > > at > > > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop. > > > > java > > > > :511) > > > > at > > > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(N > > > > io > > > > Even > > > > tLoop.java:468) > > > > at > > > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLo > > > > op > > > > .jav > > > > a:382) at > > > > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > > > > at > > > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleT > > > > hr > > > > eadE > > > > ventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) > > > > > > > > > > > > --- > > > > Sebastian Kruse > > > > Doktorand am Fachbereich Information Systems Group > > > > Hasso-Plattner-Institut an der Universität Potsdam > > > > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240 > > > > Amtsgericht Potsdam, HRB 12184 > > > > Geschäftsführung: Prof. Dr. Christoph Meinel > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |