Exception in Inputgate

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Exception in Inputgate

Gyula Fóra
Hey,

Until now, after every emit to the outputs we flushed them using the
.flush() method of the recordwriter. Now we removed this flush() call and
we have two interesting observations:

First of all we dont send enough records the source finishes but the output
buffer never gets flushed.

Secondly if we generate a simple datastream from lets say the first 1500
<https://github.com/stratosphere/stratosphere-streaming/blob/output-flush/stratosphere-streaming-core/src/test/java/eu/stratosphere/streaming/api/PrintTest.java#L48>
numbers we get an exception in the InputGates (after lets say a hundred
records): java.lang.IllegalStateException: Channel received an event before
completing the current partial record.

java.lang.IllegalStateException: Channel received an event before
completing the current partial record.
at
eu.stratosphere.runtime.io.channels.InputChannel.readRecord(InputChannel.java:177)
at eu.stratosphere.runtime.io.gates.InputGate.readRecord(InputGate.java:173)
at
eu.stratosphere.streaming.api.streamcomponent.StreamRecordReader.hasNext(StreamRecordReader.java:96)
at
eu.stratosphere.streaming.api.streamcomponent.AbstractStreamComponent.invokeRecords(AbstractStreamComponent.java:255)
at
eu.stratosphere.streaming.api.streamcomponent.StreamSink.invoke(StreamSink.java:74)
at
eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:260)
at java.lang.Thread.run(Unknown Source)



This works perfectly if we flush the outputs after the emits.

Any ideas what might cause this problem?

Regards,
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Exception in Inputgate

Stephan Ewen
Interesting. The only event I can picture is this SenderHintEvent. Maybe
there is a corner case.

Can you open a JIRA, I'll take a look.


On Thu, Jul 10, 2014 at 2:59 PM, Gyula Fóra <[hidden email]> wrote:

> Hey,
>
> Until now, after every emit to the outputs we flushed them using the
> .flush() method of the recordwriter. Now we removed this flush() call and
> we have two interesting observations:
>
> First of all we dont send enough records the source finishes but the output
> buffer never gets flushed.
>
> Secondly if we generate a simple datastream from lets say the first 1500
> <
> https://github.com/stratosphere/stratosphere-streaming/blob/output-flush/stratosphere-streaming-core/src/test/java/eu/stratosphere/streaming/api/PrintTest.java#L48
> >
> numbers we get an exception in the InputGates (after lets say a hundred
> records): java.lang.IllegalStateException: Channel received an event before
> completing the current partial record.
>
> java.lang.IllegalStateException: Channel received an event before
> completing the current partial record.
> at
>
> eu.stratosphere.runtime.io.channels.InputChannel.readRecord(InputChannel.java:177)
> at
> eu.stratosphere.runtime.io.gates.InputGate.readRecord(InputGate.java:173)
> at
>
> eu.stratosphere.streaming.api.streamcomponent.StreamRecordReader.hasNext(StreamRecordReader.java:96)
> at
>
> eu.stratosphere.streaming.api.streamcomponent.AbstractStreamComponent.invokeRecords(AbstractStreamComponent.java:255)
> at
>
> eu.stratosphere.streaming.api.streamcomponent.StreamSink.invoke(StreamSink.java:74)
> at
>
> eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:260)
> at java.lang.Thread.run(Unknown Source)
>
>
>
> This works perfectly if we flush the outputs after the emits.
>
> Any ideas what might cause this problem?
>
> Regards,
> Gyula
>