Hey,
Until now, after every emit to the outputs we flushed them using the .flush() method of the recordwriter. Now we removed this flush() call and we have two interesting observations: First of all we dont send enough records the source finishes but the output buffer never gets flushed. Secondly if we generate a simple datastream from lets say the first 1500 <https://github.com/stratosphere/stratosphere-streaming/blob/output-flush/stratosphere-streaming-core/src/test/java/eu/stratosphere/streaming/api/PrintTest.java#L48> numbers we get an exception in the InputGates (after lets say a hundred records): java.lang.IllegalStateException: Channel received an event before completing the current partial record. java.lang.IllegalStateException: Channel received an event before completing the current partial record. at eu.stratosphere.runtime.io.channels.InputChannel.readRecord(InputChannel.java:177) at eu.stratosphere.runtime.io.gates.InputGate.readRecord(InputGate.java:173) at eu.stratosphere.streaming.api.streamcomponent.StreamRecordReader.hasNext(StreamRecordReader.java:96) at eu.stratosphere.streaming.api.streamcomponent.AbstractStreamComponent.invokeRecords(AbstractStreamComponent.java:255) at eu.stratosphere.streaming.api.streamcomponent.StreamSink.invoke(StreamSink.java:74) at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:260) at java.lang.Thread.run(Unknown Source) This works perfectly if we flush the outputs after the emits. Any ideas what might cause this problem? Regards, Gyula |
Interesting. The only event I can picture is this SenderHintEvent. Maybe
there is a corner case. Can you open a JIRA, I'll take a look. On Thu, Jul 10, 2014 at 2:59 PM, Gyula Fóra <[hidden email]> wrote: > Hey, > > Until now, after every emit to the outputs we flushed them using the > .flush() method of the recordwriter. Now we removed this flush() call and > we have two interesting observations: > > First of all we dont send enough records the source finishes but the output > buffer never gets flushed. > > Secondly if we generate a simple datastream from lets say the first 1500 > < > https://github.com/stratosphere/stratosphere-streaming/blob/output-flush/stratosphere-streaming-core/src/test/java/eu/stratosphere/streaming/api/PrintTest.java#L48 > > > numbers we get an exception in the InputGates (after lets say a hundred > records): java.lang.IllegalStateException: Channel received an event before > completing the current partial record. > > java.lang.IllegalStateException: Channel received an event before > completing the current partial record. > at > > eu.stratosphere.runtime.io.channels.InputChannel.readRecord(InputChannel.java:177) > at > eu.stratosphere.runtime.io.gates.InputGate.readRecord(InputGate.java:173) > at > > eu.stratosphere.streaming.api.streamcomponent.StreamRecordReader.hasNext(StreamRecordReader.java:96) > at > > eu.stratosphere.streaming.api.streamcomponent.AbstractStreamComponent.invokeRecords(AbstractStreamComponent.java:255) > at > > eu.stratosphere.streaming.api.streamcomponent.StreamSink.invoke(StreamSink.java:74) > at > > eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:260) > at java.lang.Thread.run(Unknown Source) > > > > This works perfectly if we flush the outputs after the emits. > > Any ideas what might cause this problem? > > Regards, > Gyula > |
Free forum by Nabble | Edit this page |