For extending the feature set of the streaming project today we were
experimenting with the addition of an IterativeDataStream class, resulting in the following dummy application: https://github.com/stratosphere/stratosphere-streaming/blob/iterate/stratosphere-streaming-core/src/test/java/eu/stratosphere/streaming/api/IterateTest.java#L80-89 This approach constructs a cyclic JobGraph and exception is thrown for it at the JobManager: https://github.com/apache/incubator-flink/blob/master/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java#L419-424 However when removing this check the JobGraph was created and working as expected. Is the check really needed or should we take another approach when constructing the iterative jobs? Thanks, Marton |
Cyclic graphs are a problem for both the scheduler and the
currently-under-works recovery logic. General cyclic graphs are a bit of a design problem. I think what we need in the future is something like a special feedback edge that closes the cycle and that is synchronized (in batches / generations) to make it transparent to the recovery and scheduling logic. Stephan |
Hey,
So we started digging through the IterativeDataSet and it seems like you are using the BlockingBackChannel as this special feedback edge. Does it work as a synchronization barrier would in BSP? If so it is probably not suitable for processing continuous dataflows is it? Gyula On Tue, Jul 8, 2014 at 4:33 PM, Stephan Ewen <[hidden email]> wrote: > Cyclic graphs are a problem for both the scheduler and the > currently-under-works recovery logic. > > General cyclic graphs are a bit of a design problem. > > I think what we need in the future is something like a special feedback > edge that closes the cycle and that is synchronized (in batches / > generations) to make it transparent to the recovery and scheduling logic. > > Stephan > |
Yes. The thing is that this is not even visible to the scheduler. It is
something we build up in the local runtime and it is in-transparent to the network stack and scheduler. You can do something similar as a temporary workaround. |
In reply to this post by Gyula Fóra
Hey Gyula,
the BlockingBackChannel is only used for data feedback. The synchronization happens with a separate barrier called SuperstepBarrier. (For delta iterations there is also a further SolutionSetUpdateBarrier). But this are more "runtime hacks" (the DAG still looks like a DAG to the execution engine) and you would have to think about how to model this stuff for streams. Best, Ufuk On 09 Jul 2014, at 10:14, Gyula Fóra <[hidden email]> wrote: > Hey, > > So we started digging through the IterativeDataSet and it seems like you > are using the BlockingBackChannel as this special feedback edge. Does it > work as a synchronization barrier would in BSP? If so it is probably not > suitable for processing continuous dataflows is it? > > Gyula > > > > > On Tue, Jul 8, 2014 at 4:33 PM, Stephan Ewen <[hidden email]> wrote: > >> Cyclic graphs are a problem for both the scheduler and the >> currently-under-works recovery logic. >> >> General cyclic graphs are a bit of a design problem. >> >> I think what we need in the future is something like a special feedback >> edge that closes the cycle and that is synchronized (in batches / >> generations) to make it transparent to the recovery and scheduling logic. >> >> Stephan >> |
Hey,
I implemented a simple version of what you do for IterativeDatasets for streaming with using BlockingQueues to pass tuples between inmemory tasks. It seems to be working locally to contruct cycles in the graph. Is there a reason why we should use BlockingBackChannnel instead of a single BlockingQueue when we dont need to serialize the tuples anyway (we are staying in-memory)? Regards, Gyula On Wed, Jul 9, 2014 at 10:26 AM, Ufuk Celebi <[hidden email]> wrote: > Hey Gyula, > > the BlockingBackChannel is only used for data feedback. The > synchronization happens with a separate barrier called SuperstepBarrier. > (For delta iterations there is also a further SolutionSetUpdateBarrier). > But this are more "runtime hacks" (the DAG still looks like a DAG to the > execution engine) and you would have to think about how to model this stuff > for streams. > > Best, > > Ufuk > > On 09 Jul 2014, at 10:14, Gyula Fóra <[hidden email]> wrote: > > > Hey, > > > > So we started digging through the IterativeDataSet and it seems like you > > are using the BlockingBackChannel as this special feedback edge. Does it > > work as a synchronization barrier would in BSP? If so it is probably not > > suitable for processing continuous dataflows is it? > > > > Gyula > > > > > > > > > > On Tue, Jul 8, 2014 at 4:33 PM, Stephan Ewen <[hidden email]> wrote: > > > >> Cyclic graphs are a problem for both the scheduler and the > >> currently-under-works recovery logic. > >> > >> General cyclic graphs are a bit of a design problem. > >> > >> I think what we need in the future is something like a special feedback > >> edge that closes the cycle and that is synchronized (in batches / > >> generations) to make it transparent to the recovery and scheduling > logic. > >> > >> Stephan > >> > > |
After a recent skype meeting with Stefan we've came up with the following
API based on the ideas Gyula described in the last message. https://github.com/stratosphere/stratosphere-streaming/blob/iterate/stratosphere-streaming-core/src/test/java/eu/stratosphere/streaming/api/IterateTest.java#L80-85 IterativeDataStream<Tuple1<Integer>> source = env.fromElements(1).flatMap(new Forward()).iterate(); DataStream<Tuple1<Integer>> increment = source.flatMap(new Increment()); source.closeWith(increment).print; This way the JobGraph stays acyclic, but iteration is enabled as a feature. Cheers, Marton On Wed, Jul 9, 2014 at 4:59 PM, Gyula Fóra <[hidden email]> wrote: > > Hey, > I implemented a simple version of what you do for IterativeDatasets for > streaming with using BlockingQueues to pass tuples between inmemory tasks. > It seems to be working locally to contruct cycles in the graph. > Is there a reason why we should use BlockingBackChannnel instead of a > single BlockingQueue when we dont need to serialize the tuples anyway (we > are staying in-memory)? > > Regards, > Gyula > > > On Wed, Jul 9, 2014 at 10:26 AM, Ufuk Celebi <[hidden email]> > > > Hey Gyula, > > > > the BlockingBackChannel is only used for data feedback. The > > synchronization happens with a separate barrier called SuperstepBarrier. > > (For delta iterations there is also a further SolutionSetUpdateBarrier). > > But this are more "runtime hacks" (the DAG still looks like a DAG to the > > execution engine) and you would have to think about how to model this stuff > > for streams. > > > > Best, > > > > Ufuk > > > > On 09 Jul 2014, at 10:14, Gyula Fóra <[hidden email]> wrote: > > > > > Hey, > > > > > > So we started digging through the IterativeDataSet and it seems like > > > are using the BlockingBackChannel as this special feedback edge. Does it > > > work as a synchronization barrier would in BSP? If so it is probably not > > > suitable for processing continuous dataflows is it? > > > > > > Gyula > > > > > > > > > > > > > > > On Tue, Jul 8, 2014 at 4:33 PM, Stephan Ewen <[hidden email]> wrote: > > > > > >> Cyclic graphs are a problem for both the scheduler and the > > >> currently-under-works recovery logic. > > >> > > >> General cyclic graphs are a bit of a design problem. > > >> > > >> I think what we need in the future is something like a special > > >> edge that closes the cycle and that is synchronized (in batches / > > >> generations) to make it transparent to the recovery and scheduling > > logic. > > >> > > >> Stephan > > >> > > > > |
By the way: We are thinking about adding/changing the way that the
iterations are specified. Rather than having a set.iterate(...) -> closeWith(...) sequence, we could go for a proper step function, in a similar way as the Scala API does it. That is somewhat more elegant. |
Free forum by Nabble | Edit this page |