Iterative stream processing - cyclic JobGraph?

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Iterative stream processing - cyclic JobGraph?

Márton Balassi
For extending the feature set of the streaming project today we were
experimenting with the addition of an IterativeDataStream class, resulting
in the following dummy application:

https://github.com/stratosphere/stratosphere-streaming/blob/iterate/stratosphere-streaming-core/src/test/java/eu/stratosphere/streaming/api/IterateTest.java#L80-89

This approach constructs a cyclic JobGraph and exception is thrown for it
at the JobManager:

https://github.com/apache/incubator-flink/blob/master/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java#L419-424

However when removing this check the JobGraph was created and working as
expected.

Is the check really needed or should we take another approach when
constructing the iterative jobs?

Thanks,

Marton
Reply | Threaded
Open this post in threaded view
|

Re: Iterative stream processing - cyclic JobGraph?

Stephan Ewen
Cyclic graphs are a problem for both the scheduler and the
currently-under-works recovery logic.

General cyclic graphs are a bit of a design problem.

I think what we need in the future is something like a special feedback
edge that closes the cycle and that is synchronized (in batches /
generations) to make it transparent to the recovery and scheduling logic.

Stephan
Reply | Threaded
Open this post in threaded view
|

Re: Iterative stream processing - cyclic JobGraph?

Gyula Fóra
Hey,

So we started digging through the IterativeDataSet and it seems like you
are using the BlockingBackChannel as this special feedback edge. Does it
work as a synchronization barrier would in BSP? If so it is probably not
suitable for processing continuous dataflows is it?

Gyula




On Tue, Jul 8, 2014 at 4:33 PM, Stephan Ewen <[hidden email]> wrote:

> Cyclic graphs are a problem for both the scheduler and the
> currently-under-works recovery logic.
>
> General cyclic graphs are a bit of a design problem.
>
> I think what we need in the future is something like a special feedback
> edge that closes the cycle and that is synchronized (in batches /
> generations) to make it transparent to the recovery and scheduling logic.
>
> Stephan
>
Reply | Threaded
Open this post in threaded view
|

Re: Iterative stream processing - cyclic JobGraph?

Stephan Ewen
Yes. The thing is that this is not even visible to the scheduler. It is
something we build up in the local runtime and it is in-transparent to the
network stack and scheduler.

You can do something similar as a temporary workaround.
Reply | Threaded
Open this post in threaded view
|

Re: Iterative stream processing - cyclic JobGraph?

Ufuk Celebi
In reply to this post by Gyula Fóra
Hey Gyula,

the BlockingBackChannel is only used for data feedback. The synchronization happens with a separate barrier called SuperstepBarrier. (For delta iterations there is also a further SolutionSetUpdateBarrier). But this are more "runtime hacks" (the DAG still looks like a DAG to the execution engine) and you would have to think about how to model this stuff for streams.

Best,

Ufuk

On 09 Jul 2014, at 10:14, Gyula Fóra <[hidden email]> wrote:

> Hey,
>
> So we started digging through the IterativeDataSet and it seems like you
> are using the BlockingBackChannel as this special feedback edge. Does it
> work as a synchronization barrier would in BSP? If so it is probably not
> suitable for processing continuous dataflows is it?
>
> Gyula
>
>
>
>
> On Tue, Jul 8, 2014 at 4:33 PM, Stephan Ewen <[hidden email]> wrote:
>
>> Cyclic graphs are a problem for both the scheduler and the
>> currently-under-works recovery logic.
>>
>> General cyclic graphs are a bit of a design problem.
>>
>> I think what we need in the future is something like a special feedback
>> edge that closes the cycle and that is synchronized (in batches /
>> generations) to make it transparent to the recovery and scheduling logic.
>>
>> Stephan
>>

Reply | Threaded
Open this post in threaded view
|

Re: Iterative stream processing - cyclic JobGraph?

Gyula Fóra
Hey,
I implemented a simple version of what you do for IterativeDatasets for
streaming with using BlockingQueues to pass tuples between inmemory tasks.
It seems to be working locally to contruct cycles in the graph.
Is there a reason why we should use BlockingBackChannnel instead of a
single BlockingQueue when we dont need to serialize the tuples anyway (we
are staying in-memory)?

Regards,
Gyula


On Wed, Jul 9, 2014 at 10:26 AM, Ufuk Celebi <[hidden email]> wrote:

> Hey Gyula,
>
> the BlockingBackChannel is only used for data feedback. The
> synchronization happens with a separate barrier called SuperstepBarrier.
> (For delta iterations there is also a further SolutionSetUpdateBarrier).
> But this are more "runtime hacks" (the DAG still looks like a DAG to the
> execution engine) and you would have to think about how to model this stuff
> for streams.
>
> Best,
>
> Ufuk
>
> On 09 Jul 2014, at 10:14, Gyula Fóra <[hidden email]> wrote:
>
> > Hey,
> >
> > So we started digging through the IterativeDataSet and it seems like you
> > are using the BlockingBackChannel as this special feedback edge. Does it
> > work as a synchronization barrier would in BSP? If so it is probably not
> > suitable for processing continuous dataflows is it?
> >
> > Gyula
> >
> >
> >
> >
> > On Tue, Jul 8, 2014 at 4:33 PM, Stephan Ewen <[hidden email]> wrote:
> >
> >> Cyclic graphs are a problem for both the scheduler and the
> >> currently-under-works recovery logic.
> >>
> >> General cyclic graphs are a bit of a design problem.
> >>
> >> I think what we need in the future is something like a special feedback
> >> edge that closes the cycle and that is synchronized (in batches /
> >> generations) to make it transparent to the recovery and scheduling
> logic.
> >>
> >> Stephan
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Iterative stream processing - cyclic JobGraph?

Márton Balassi
After a recent skype meeting with Stefan we've came up with the following
API based on the ideas Gyula described in the last message.
https://github.com/stratosphere/stratosphere-streaming/blob/iterate/stratosphere-streaming-core/src/test/java/eu/stratosphere/streaming/api/IterateTest.java#L80-85

IterativeDataStream<Tuple1<Integer>> source =
env.fromElements(1).flatMap(new Forward()).iterate();

DataStream<Tuple1<Integer>> increment = source.flatMap(new Increment());

source.closeWith(increment).print;

This way the JobGraph stays acyclic, but iteration is enabled as a feature.

Cheers,

Marton

On Wed, Jul 9, 2014 at 4:59 PM, Gyula Fóra <[hidden email]> wrote:

>
> Hey,
> I implemented a simple version of what you do for IterativeDatasets for
> streaming with using BlockingQueues to pass tuples between inmemory tasks.
> It seems to be working locally to contruct cycles in the graph.
> Is there a reason why we should use BlockingBackChannnel instead of a
> single BlockingQueue when we dont need to serialize the tuples anyway (we
> are staying in-memory)?
>
> Regards,
> Gyula
>
>
> On Wed, Jul 9, 2014 at 10:26 AM, Ufuk Celebi <[hidden email]>
wrote:
>
> > Hey Gyula,
> >
> > the BlockingBackChannel is only used for data feedback. The
> > synchronization happens with a separate barrier called SuperstepBarrier.
> > (For delta iterations there is also a further SolutionSetUpdateBarrier).
> > But this are more "runtime hacks" (the DAG still looks like a DAG to the
> > execution engine) and you would have to think about how to model this
stuff

> > for streams.
> >
> > Best,
> >
> > Ufuk
> >
> > On 09 Jul 2014, at 10:14, Gyula Fóra <[hidden email]> wrote:
> >
> > > Hey,
> > >
> > > So we started digging through the IterativeDataSet and it seems like
you
> > > are using the BlockingBackChannel as this special feedback edge. Does
it
> > > work as a synchronization barrier would in BSP? If so it is probably
not

> > > suitable for processing continuous dataflows is it?
> > >
> > > Gyula
> > >
> > >
> > >
> > >
> > > On Tue, Jul 8, 2014 at 4:33 PM, Stephan Ewen <[hidden email]> wrote:
> > >
> > >> Cyclic graphs are a problem for both the scheduler and the
> > >> currently-under-works recovery logic.
> > >>
> > >> General cyclic graphs are a bit of a design problem.
> > >>
> > >> I think what we need in the future is something like a special
feedback
> > >> edge that closes the cycle and that is synchronized (in batches /
> > >> generations) to make it transparent to the recovery and scheduling
> > logic.
> > >>
> > >> Stephan
> > >>
> >
> >
Reply | Threaded
Open this post in threaded view
|

Re: Iterative stream processing - cyclic JobGraph?

Stephan Ewen
By the way: We are thinking about adding/changing the way that the
iterations are specified. Rather than having a set.iterate(...) ->
closeWith(...) sequence, we could go for a proper step function, in a
similar way as the Scala API does it. That is somewhat more elegant.