Moving streaming to 0.6

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Moving streaming to 0.6

Hermann Gábor
Hey,

We are trying to move the streaming code from the 0.5 release to 0.6, and
we've run into a problem. We extended AbstractInputTask, AbstractTask and
AbstractOutputTask classes to implement our components (StreamSource,
StreamTask, StreamSink), and it seems like they are replaced by
DataSourceTask, RegularPactTask and DataSinkTask respectively, so we
replaced them in our code too. The problem is that the RecordWriter's
numChannels is not set because the RecordWriter's initializeSerializers()
method does not get called. Should we call this manually somewhere? Also, I
don't know whether we should use DataSourceTask and the others or extend
AbstractInvokable and implement our own classes similar to
AbstractInputTask, AbstractTask and AbstractOutputTask. Can you please help
us with this? Thanks!

Regards,
Gábor
Reply | Threaded
Open this post in threaded view
|

Re: Moving streaming to 0.6

Ufuk Celebi
On 01 Jul 2014, at 16:49, Hermann Gábor <[hidden email]> wrote:

> Hey,
>
> We are trying to move the streaming code from the 0.5 release to 0.6, and
> we've run into a problem. We extended AbstractInputTask, AbstractTask and
> AbstractOutputTask classes to implement our components (StreamSource,
> StreamTask, StreamSink), and it seems like they are replaced by
> DataSourceTask, RegularPactTask and DataSinkTask respectively, so we
> replaced them in our code too.

The execution engine's task hierarchy has been simplified with 8c1d82a8ec674de6525319501c6be2674e3143f1 [1]. It does not differentiate between input and output tasks any more. I think the respective input and output task logic (e.g for input splits) has moved to the classes you mentioned, but I'm not sure whether you really should subclass these if you only need a small subset of their functionality.

At least for the previous AbstractTask you could just extend AbstractInvokable, because it was just subclassing AbstractInvokable before.

I'd say wait for Stephan's take on this. ;-)

[1] https://github.com/apache/incubator-flink/commit/8c1d82a8ec674de6525319501c6be2674e3143f1


> The problem is that the RecordWriter's
> numChannels is not set because the RecordWriter's initializeSerializers()
> method does not get called. Should we call this manually somewhere?

Yes. This is needed to work around a flaw in the way that the runtime is instantiated, which is going to be refactored soon.

The RecordWriter creates the OutputGate, but the RuntimeEnvironment initializes the channels of the output gates at a later point. That's why it is not known at construction time how many channels are attached to a Gate and consequently how many serializers are needed for the RecordWriter. The initializeSerializers method is a work around for that.

This could also be refactored stand-alone, but will be subsumed by the upcoming runtime changes for the intermediate data set partitions (FLINK-986). If it is important to you, I could also do a quick fix for it now.

[2] https://github.com/apache/incubator-flink/commit/8c1d82a8ec674de6525319501c6be2674e3143f1

Reply | Threaded
Open this post in threaded view
|

Re: Moving streaming to 0.6

Gyula Fóra
Hey,

Thank you!
We could fix it by calling the  initializeSerializers() in the invoke
method.

Regards,
Gyula






On Wed, Jul 2, 2014 at 12:20 AM, Ufuk Celebi <[hidden email]> wrote:

> On 01 Jul 2014, at 16:49, Hermann Gábor <[hidden email]> wrote:
>
> > Hey,
> >
> > We are trying to move the streaming code from the 0.5 release to 0.6, and
> > we've run into a problem. We extended AbstractInputTask, AbstractTask and
> > AbstractOutputTask classes to implement our components (StreamSource,
> > StreamTask, StreamSink), and it seems like they are replaced by
> > DataSourceTask, RegularPactTask and DataSinkTask respectively, so we
> > replaced them in our code too.
>
> The execution engine's task hierarchy has been simplified with
> 8c1d82a8ec674de6525319501c6be2674e3143f1 [1]. It does not differentiate
> between input and output tasks any more. I think the respective input and
> output task logic (e.g for input splits) has moved to the classes you
> mentioned, but I'm not sure whether you really should subclass these if you
> only need a small subset of their functionality.
>
> At least for the previous AbstractTask you could just extend
> AbstractInvokable, because it was just subclassing AbstractInvokable before.
>
> I'd say wait for Stephan's take on this. ;-)
>
> [1]
> https://github.com/apache/incubator-flink/commit/8c1d82a8ec674de6525319501c6be2674e3143f1
>
>
> > The problem is that the RecordWriter's
> > numChannels is not set because the RecordWriter's initializeSerializers()
> > method does not get called. Should we call this manually somewhere?
>
> Yes. This is needed to work around a flaw in the way that the runtime is
> instantiated, which is going to be refactored soon.
>
> The RecordWriter creates the OutputGate, but the RuntimeEnvironment
> initializes the channels of the output gates at a later point. That's why
> it is not known at construction time how many channels are attached to a
> Gate and consequently how many serializers are needed for the RecordWriter.
> The initializeSerializers method is a work around for that.
>
> This could also be refactored stand-alone, but will be subsumed by the
> upcoming runtime changes for the intermediate data set partitions
> (FLINK-986). If it is important to you, I could also do a quick fix for it
> now.
>
> [2]
> https://github.com/apache/incubator-flink/commit/8c1d82a8ec674de6525319501c6be2674e3143f1
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Moving streaming to 0.6

Stephan Ewen
Hi!

Thanks, Ufuk, for answering this. It's correct, you can now use
"AbstractInvokable" everywhere, no need to distinguish the task classes
between inputs, intermediates, and outputs.

Let me add the following: I am reworking the abstraction for Job Graphs and
scheduling right now. It will make all vertices the same (no distinction
between input / output / intermediate), which is also important for the new
incremental roll-out feature that will come.

Stephan