Hey,
Along with the suggested changes to the streaming API structure I think we should also rework the "iteration" api. Currently the iteration api tries to mimic the syntax of the batch API while the runtime behaviour is quite different. What we create instead of iterations is really just cyclic streams (loops in the streaming job), so the API should somehow be intuitive about this behaviour. I suggest to remove the explicit iterate call and instead add a method to the StreamOperators that allows to connect feedback inputs (create loops). It would look like this: A mapper that does nothing but iterates over some filtered input: *Current API :* DataStream source = .. IterativeDataStream it = source.iterate() DataStream mapper = it.map(noOpMapper) DataStream feedback = mapper.filter(...) it.closeWith(feedback) *Suggested API :* DataStream source = .. DataStream mapper = source.map(noOpMapper) DataStream feedback = mapper.filter(...) mapper.addInput(feedback) The suggested approach would let us define inputs to operators after they are created and implicitly union them with the normal input. This is I think a much clearer approach than what we have now. What do you think? Gyula |
I see that the newly proposed API makes some things easier to define.
There is one source of confusion, though, in my opinion: The new API suggests that the data stream actually refers to the operator that created it. The "DataStream mapper = source.map(noOpMapper)" here refers to the map operator, not to the result of the map function. When adding the feedback input, you add the input to the stream before the stream that you call "addInput()" on. Here, the feedback actually gets unioned with the source data stream, not with the result of the mapper. This seems very weird to me. What happens here: DataStream source = env.createStream(myKafkaConnector); DataStream mapper = source.map(noOpMapper) source.addInput(feedback) or here: DataStream source1 = env.createStream(myKafkaConnector); DataStream source2 = env.createStream(myKafkaConnector); DataStream joined = source1.keyBy(...).join(source2.keyBy(...)).onWindow(...); DataStream feedback = joined.map(someMapper); joined.addInput(feedback); On Tue, Jul 7, 2015 at 3:57 PM, Gyula Fóra <[hidden email]> wrote: > Hey, > > Along with the suggested changes to the streaming API structure I think we > should also rework the "iteration" api. Currently the iteration api tries > to mimic the syntax of the batch API while the runtime behaviour is quite > different. > > What we create instead of iterations is really just cyclic streams (loops > in the streaming job), so the API should somehow be intuitive about this > behaviour. > > I suggest to remove the explicit iterate call and instead add a method to > the StreamOperators that allows to connect feedback inputs (create loops). > It would look like this: > > A mapper that does nothing but iterates over some filtered input: > > *Current API :* > DataStream source = .. > IterativeDataStream it = source.iterate() > DataStream mapper = it.map(noOpMapper) > DataStream feedback = mapper.filter(...) > it.closeWith(feedback) > > *Suggested API :* > DataStream source = .. > DataStream mapper = source.map(noOpMapper) > DataStream feedback = mapper.filter(...) > mapper.addInput(feedback) > > The suggested approach would let us define inputs to operators after they > are created and implicitly union them with the normal input. This is I > think a much clearer approach than what we have now. > > What do you think? > > Gyula > |
In reply to this post by Gyula Fóra-2
I think this would be good yes. I was just about to open an Issue for
changing the Streaming Iteration API. :D Then we should also make the implementation very straightforward and simple, right now, the implementation of the iterations is all over the place. On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <[hidden email]> wrote: > Hey, > > Along with the suggested changes to the streaming API structure I think we > should also rework the "iteration" api. Currently the iteration api tries > to mimic the syntax of the batch API while the runtime behaviour is quite > different. > > What we create instead of iterations is really just cyclic streams (loops > in the streaming job), so the API should somehow be intuitive about this > behaviour. > > I suggest to remove the explicit iterate call and instead add a method to > the StreamOperators that allows to connect feedback inputs (create loops). > It would look like this: > > A mapper that does nothing but iterates over some filtered input: > > *Current API :* > DataStream source = .. > IterativeDataStream it = source.iterate() > DataStream mapper = it.map(noOpMapper) > DataStream feedback = mapper.filter(...) > it.closeWith(feedback) > > *Suggested API :* > DataStream source = .. > DataStream mapper = source.map(noOpMapper) > DataStream feedback = mapper.filter(...) > mapper.addInput(feedback) > > The suggested approach would let us define inputs to operators after they > are created and implicitly union them with the normal input. This is I > think a much clearer approach than what we have now. > > What do you think? > > Gyula > |
In reply to this post by Stephan Ewen
+1 for rethinking the iterations in DataStream
However, wouldn't this proposal allow the definition of arbitrary loops (e.g., nested loops) that are not well behaved afaik? On Tue, Jul 7, 2015 at 4:12 PM, Stephan Ewen <[hidden email]> wrote: > I see that the newly proposed API makes some things easier to define. > > There is one source of confusion, though, in my opinion: > > The new API suggests that the data stream actually refers to the operator > that created it. > The "DataStream mapper = source.map(noOpMapper)" here refers to the map > operator, not to the result of the map function. > > When adding the feedback input, you add the input to the stream before the > stream that you call "addInput()" on. Here, the feedback actually gets > unioned with the source data stream, not with the result of the mapper. > This seems very weird to me. > > What happens here: > > DataStream source = env.createStream(myKafkaConnector); > DataStream mapper = source.map(noOpMapper) > source.addInput(feedback) > > or here: > > DataStream source1 = env.createStream(myKafkaConnector); > DataStream source2 = env.createStream(myKafkaConnector); > > DataStream joined = > source1.keyBy(...).join(source2.keyBy(...)).onWindow(...); > DataStream feedback = joined.map(someMapper); > joined.addInput(feedback); > > > > > On Tue, Jul 7, 2015 at 3:57 PM, Gyula Fóra <[hidden email]> wrote: > > > Hey, > > > > Along with the suggested changes to the streaming API structure I think > we > > should also rework the "iteration" api. Currently the iteration api tries > > to mimic the syntax of the batch API while the runtime behaviour is quite > > different. > > > > What we create instead of iterations is really just cyclic streams (loops > > in the streaming job), so the API should somehow be intuitive about this > > behaviour. > > > > I suggest to remove the explicit iterate call and instead add a method to > > the StreamOperators that allows to connect feedback inputs (create > loops). > > It would look like this: > > > > A mapper that does nothing but iterates over some filtered input: > > > > *Current API :* > > DataStream source = .. > > IterativeDataStream it = source.iterate() > > DataStream mapper = it.map(noOpMapper) > > DataStream feedback = mapper.filter(...) > > it.closeWith(feedback) > > > > *Suggested API :* > > DataStream source = .. > > DataStream mapper = source.map(noOpMapper) > > DataStream feedback = mapper.filter(...) > > mapper.addInput(feedback) > > > > The suggested approach would let us define inputs to operators after they > > are created and implicitly union them with the normal input. This is I > > think a much clearer approach than what we have now. > > > > What do you think? > > > > Gyula > > > |
In reply to this post by Aljoscha Krettek-2
Sorry Stephan I meant it slightly differently, I see your point:
DataStream source = ... SingleInputOperator mapper = source.map(...) mapper.addInput() So the add input would be a method of the operator not the stream. Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. 7., K, 16:12): > I think this would be good yes. I was just about to open an Issue for > changing the Streaming Iteration API. :D > > Then we should also make the implementation very straightforward and > simple, right now, the implementation of the iterations is all over the > place. > > On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <[hidden email]> wrote: > > > Hey, > > > > Along with the suggested changes to the streaming API structure I think > we > > should also rework the "iteration" api. Currently the iteration api tries > > to mimic the syntax of the batch API while the runtime behaviour is quite > > different. > > > > What we create instead of iterations is really just cyclic streams (loops > > in the streaming job), so the API should somehow be intuitive about this > > behaviour. > > > > I suggest to remove the explicit iterate call and instead add a method to > > the StreamOperators that allows to connect feedback inputs (create > loops). > > It would look like this: > > > > A mapper that does nothing but iterates over some filtered input: > > > > *Current API :* > > DataStream source = .. > > IterativeDataStream it = source.iterate() > > DataStream mapper = it.map(noOpMapper) > > DataStream feedback = mapper.filter(...) > > it.closeWith(feedback) > > > > *Suggested API :* > > DataStream source = .. > > DataStream mapper = source.map(noOpMapper) > > DataStream feedback = mapper.filter(...) > > mapper.addInput(feedback) > > > > The suggested approach would let us define inputs to operators after they > > are created and implicitly union them with the normal input. This is I > > think a much clearer approach than what we have now. > > > > What do you think? > > > > Gyula > > > |
@Kostas:
This new API is I believe equivalent in expressivity with the current one. We can define nested loops now as well. And I also don't see nested loops much worse generally than simple loops. Gyula Fóra <[hidden email]> ezt írta (időpont: 2015. júl. 7., K, 16:14): > Sorry Stephan I meant it slightly differently, I see your point: > > DataStream source = ... > SingleInputOperator mapper = source.map(...) > mapper.addInput() > > So the add input would be a method of the operator not the stream. > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. 7., > K, 16:12): > >> I think this would be good yes. I was just about to open an Issue for >> changing the Streaming Iteration API. :D >> >> Then we should also make the implementation very straightforward and >> simple, right now, the implementation of the iterations is all over the >> place. >> >> On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <[hidden email]> wrote: >> >> > Hey, >> > >> > Along with the suggested changes to the streaming API structure I think >> we >> > should also rework the "iteration" api. Currently the iteration api >> tries >> > to mimic the syntax of the batch API while the runtime behaviour is >> quite >> > different. >> > >> > What we create instead of iterations is really just cyclic streams >> (loops >> > in the streaming job), so the API should somehow be intuitive about this >> > behaviour. >> > >> > I suggest to remove the explicit iterate call and instead add a method >> to >> > the StreamOperators that allows to connect feedback inputs (create >> loops). >> > It would look like this: >> > >> > A mapper that does nothing but iterates over some filtered input: >> > >> > *Current API :* >> > DataStream source = .. >> > IterativeDataStream it = source.iterate() >> > DataStream mapper = it.map(noOpMapper) >> > DataStream feedback = mapper.filter(...) >> > it.closeWith(feedback) >> > >> > *Suggested API :* >> > DataStream source = .. >> > DataStream mapper = source.map(noOpMapper) >> > DataStream feedback = mapper.filter(...) >> > mapper.addInput(feedback) >> > >> > The suggested approach would let us define inputs to operators after >> they >> > are created and implicitly union them with the normal input. This is I >> > think a much clearer approach than what we have now. >> > >> > What do you think? >> > >> > Gyula >> > >> > |
In reply to this post by Aljoscha Krettek-2
I think it could work if we allowed a DataStream to be unioned after
creation. For example: DataStream source = .. DataStream mapper = source.map(noOpMapper) DataStream feedback = mapper.filter(...) source.union(feedback) This would basically mean that a DataStream is mutable and can be extended after creation with more streams. On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek <[hidden email]> wrote: > I think this would be good yes. I was just about to open an Issue for > changing the Streaming Iteration API. :D > > Then we should also make the implementation very straightforward and > simple, right now, the implementation of the iterations is all over the > place. > > On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <[hidden email]> wrote: > >> Hey, >> >> Along with the suggested changes to the streaming API structure I think we >> should also rework the "iteration" api. Currently the iteration api tries >> to mimic the syntax of the batch API while the runtime behaviour is quite >> different. >> >> What we create instead of iterations is really just cyclic streams (loops >> in the streaming job), so the API should somehow be intuitive about this >> behaviour. >> >> I suggest to remove the explicit iterate call and instead add a method to >> the StreamOperators that allows to connect feedback inputs (create loops). >> It would look like this: >> >> A mapper that does nothing but iterates over some filtered input: >> >> *Current API :* >> DataStream source = .. >> IterativeDataStream it = source.iterate() >> DataStream mapper = it.map(noOpMapper) >> DataStream feedback = mapper.filter(...) >> it.closeWith(feedback) >> >> *Suggested API :* >> DataStream source = .. >> DataStream mapper = source.map(noOpMapper) >> DataStream feedback = mapper.filter(...) >> mapper.addInput(feedback) >> >> The suggested approach would let us define inputs to operators after they >> are created and implicitly union them with the normal input. This is I >> think a much clearer approach than what we have now. >> >> What do you think? >> >> Gyula >> > |
@Aljoscha:
Yes, thats basically my point as well. This is what happens now too but we give this mutable datastream a special name : IterativeDataStream This can be handled in very different ways through the api, the goal would be to make something easy to use. I am fine with what we have now because I know how it works but it might confuse people to call it iterate. Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. 7., K, 16:18): > I think it could work if we allowed a DataStream to be unioned after > creation. For example: > > DataStream source = .. > DataStream mapper = source.map(noOpMapper) > DataStream feedback = mapper.filter(...) > source.union(feedback) > > This would basically mean that a DataStream is mutable and can be extended > after creation with more streams. > > On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek <[hidden email]> wrote: > > > I think this would be good yes. I was just about to open an Issue for > > changing the Streaming Iteration API. :D > > > > Then we should also make the implementation very straightforward and > > simple, right now, the implementation of the iterations is all over the > > place. > > > > On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <[hidden email]> wrote: > > > >> Hey, > >> > >> Along with the suggested changes to the streaming API structure I think > we > >> should also rework the "iteration" api. Currently the iteration api > tries > >> to mimic the syntax of the batch API while the runtime behaviour is > quite > >> different. > >> > >> What we create instead of iterations is really just cyclic streams > (loops > >> in the streaming job), so the API should somehow be intuitive about this > >> behaviour. > >> > >> I suggest to remove the explicit iterate call and instead add a method > to > >> the StreamOperators that allows to connect feedback inputs (create > loops). > >> It would look like this: > >> > >> A mapper that does nothing but iterates over some filtered input: > >> > >> *Current API :* > >> DataStream source = .. > >> IterativeDataStream it = source.iterate() > >> DataStream mapper = it.map(noOpMapper) > >> DataStream feedback = mapper.filter(...) > >> it.closeWith(feedback) > >> > >> *Suggested API :* > >> DataStream source = .. > >> DataStream mapper = source.map(noOpMapper) > >> DataStream feedback = mapper.filter(...) > >> mapper.addInput(feedback) > >> > >> The suggested approach would let us define inputs to operators after > they > >> are created and implicitly union them with the normal input. This is I > >> think a much clearer approach than what we have now. > >> > >> What do you think? > >> > >> Gyula > >> > > > |
In Aljoscha's approach, we would need a special mutable stream. We could do
it like this: DataStream source = ... FeedbackPoint pt = source.createFeedbackPoint(); DataStream mapper = pt .map(noOpMapper) DataStream feedback = mapper.filter(...) pt .addFeedbacl(feedback) It is basically like the current approach, with different names. I actually like the current approach, because it is explicit where streams could be altered in hind-sight (after their definition). On Tue, Jul 7, 2015 at 4:20 PM, Gyula Fóra <[hidden email]> wrote: > @Aljoscha: > Yes, thats basically my point as well. This is what happens now too but we > give this mutable datastream a special name : IterativeDataStream > > This can be handled in very different ways through the api, the goal would > be to make something easy to use. I am fine with what we have now because I > know how it works but it might confuse people to call it iterate. > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. 7., > K, > 16:18): > > > I think it could work if we allowed a DataStream to be unioned after > > creation. For example: > > > > DataStream source = .. > > DataStream mapper = source.map(noOpMapper) > > DataStream feedback = mapper.filter(...) > > source.union(feedback) > > > > This would basically mean that a DataStream is mutable and can be > extended > > after creation with more streams. > > > > On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek <[hidden email]> > wrote: > > > > > I think this would be good yes. I was just about to open an Issue for > > > changing the Streaming Iteration API. :D > > > > > > Then we should also make the implementation very straightforward and > > > simple, right now, the implementation of the iterations is all over the > > > place. > > > > > > On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <[hidden email]> wrote: > > > > > >> Hey, > > >> > > >> Along with the suggested changes to the streaming API structure I > think > > we > > >> should also rework the "iteration" api. Currently the iteration api > > tries > > >> to mimic the syntax of the batch API while the runtime behaviour is > > quite > > >> different. > > >> > > >> What we create instead of iterations is really just cyclic streams > > (loops > > >> in the streaming job), so the API should somehow be intuitive about > this > > >> behaviour. > > >> > > >> I suggest to remove the explicit iterate call and instead add a method > > to > > >> the StreamOperators that allows to connect feedback inputs (create > > loops). > > >> It would look like this: > > >> > > >> A mapper that does nothing but iterates over some filtered input: > > >> > > >> *Current API :* > > >> DataStream source = .. > > >> IterativeDataStream it = source.iterate() > > >> DataStream mapper = it.map(noOpMapper) > > >> DataStream feedback = mapper.filter(...) > > >> it.closeWith(feedback) > > >> > > >> *Suggested API :* > > >> DataStream source = .. > > >> DataStream mapper = source.map(noOpMapper) > > >> DataStream feedback = mapper.filter(...) > > >> mapper.addInput(feedback) > > >> > > >> The suggested approach would let us define inputs to operators after > > they > > >> are created and implicitly union them with the normal input. This is I > > >> think a much clearer approach than what we have now. > > >> > > >> What do you think? > > >> > > >> Gyula > > >> > > > > > > |
Okay, I am fine with this approach as well I see the advantages. Then we
just need to find a suitable name for marking a "FeedbackPoint" :) Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. júl. 7., K, 16:28): > In Aljoscha's approach, we would need a special mutable stream. We could do > it like this: > > DataStream source = ... > > FeedbackPoint pt = source.createFeedbackPoint(); > > DataStream mapper = pt .map(noOpMapper) > DataStream feedback = mapper.filter(...) > pt .addFeedbacl(feedback) > > > It is basically like the current approach, with different names. > > I actually like the current approach, because it is explicit where streams > could be altered in hind-sight (after their definition). > > > On Tue, Jul 7, 2015 at 4:20 PM, Gyula Fóra <[hidden email]> wrote: > > > @Aljoscha: > > Yes, thats basically my point as well. This is what happens now too but > we > > give this mutable datastream a special name : IterativeDataStream > > > > This can be handled in very different ways through the api, the goal > would > > be to make something easy to use. I am fine with what we have now > because I > > know how it works but it might confuse people to call it iterate. > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. 7., > > K, > > 16:18): > > > > > I think it could work if we allowed a DataStream to be unioned after > > > creation. For example: > > > > > > DataStream source = .. > > > DataStream mapper = source.map(noOpMapper) > > > DataStream feedback = mapper.filter(...) > > > source.union(feedback) > > > > > > This would basically mean that a DataStream is mutable and can be > > extended > > > after creation with more streams. > > > > > > On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek <[hidden email]> > > wrote: > > > > > > > I think this would be good yes. I was just about to open an Issue for > > > > changing the Streaming Iteration API. :D > > > > > > > > Then we should also make the implementation very straightforward and > > > > simple, right now, the implementation of the iterations is all over > the > > > > place. > > > > > > > > On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <[hidden email]> wrote: > > > > > > > >> Hey, > > > >> > > > >> Along with the suggested changes to the streaming API structure I > > think > > > we > > > >> should also rework the "iteration" api. Currently the iteration api > > > tries > > > >> to mimic the syntax of the batch API while the runtime behaviour is > > > quite > > > >> different. > > > >> > > > >> What we create instead of iterations is really just cyclic streams > > > (loops > > > >> in the streaming job), so the API should somehow be intuitive about > > this > > > >> behaviour. > > > >> > > > >> I suggest to remove the explicit iterate call and instead add a > method > > > to > > > >> the StreamOperators that allows to connect feedback inputs (create > > > loops). > > > >> It would look like this: > > > >> > > > >> A mapper that does nothing but iterates over some filtered input: > > > >> > > > >> *Current API :* > > > >> DataStream source = .. > > > >> IterativeDataStream it = source.iterate() > > > >> DataStream mapper = it.map(noOpMapper) > > > >> DataStream feedback = mapper.filter(...) > > > >> it.closeWith(feedback) > > > >> > > > >> *Suggested API :* > > > >> DataStream source = .. > > > >> DataStream mapper = source.map(noOpMapper) > > > >> DataStream feedback = mapper.filter(...) > > > >> mapper.addInput(feedback) > > > >> > > > >> The suggested approach would let us define inputs to operators after > > > they > > > >> are created and implicitly union them with the normal input. This > is I > > > >> think a much clearer approach than what we have now. > > > >> > > > >> What do you think? > > > >> > > > >> Gyula > > > >> > > > > > > > > > > |
I see. Perhaps more important IMO is defining the semantics of stream loops
with event time. The reason I asked about nested is that Naiad and other designs used a multidimensional timestamp to capture loops: (outer loop counter, inner loop counter, timestamp). I assume that currently making sense of which iteration an element comes from is left to the user. Should we aim to change that with the API redesign? On Tue, Jul 7, 2015 at 4:30 PM, Gyula Fóra <[hidden email]> wrote: > Okay, I am fine with this approach as well I see the advantages. Then we > just need to find a suitable name for marking a "FeedbackPoint" :) > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. júl. 7., K, > 16:28): > > > In Aljoscha's approach, we would need a special mutable stream. We could > do > > it like this: > > > > DataStream source = ... > > > > FeedbackPoint pt = source.createFeedbackPoint(); > > > > DataStream mapper = pt .map(noOpMapper) > > DataStream feedback = mapper.filter(...) > > pt .addFeedbacl(feedback) > > > > > > It is basically like the current approach, with different names. > > > > I actually like the current approach, because it is explicit where > streams > > could be altered in hind-sight (after their definition). > > > > > > On Tue, Jul 7, 2015 at 4:20 PM, Gyula Fóra <[hidden email]> wrote: > > > > > @Aljoscha: > > > Yes, thats basically my point as well. This is what happens now too but > > we > > > give this mutable datastream a special name : IterativeDataStream > > > > > > This can be handled in very different ways through the api, the goal > > would > > > be to make something easy to use. I am fine with what we have now > > because I > > > know how it works but it might confuse people to call it iterate. > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > 7., > > > K, > > > 16:18): > > > > > > > I think it could work if we allowed a DataStream to be unioned after > > > > creation. For example: > > > > > > > > DataStream source = .. > > > > DataStream mapper = source.map(noOpMapper) > > > > DataStream feedback = mapper.filter(...) > > > > source.union(feedback) > > > > > > > > This would basically mean that a DataStream is mutable and can be > > > extended > > > > after creation with more streams. > > > > > > > > On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek <[hidden email]> > > > wrote: > > > > > > > > > I think this would be good yes. I was just about to open an Issue > for > > > > > changing the Streaming Iteration API. :D > > > > > > > > > > Then we should also make the implementation very straightforward > and > > > > > simple, right now, the implementation of the iterations is all over > > the > > > > > place. > > > > > > > > > > On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <[hidden email]> wrote: > > > > > > > > > >> Hey, > > > > >> > > > > >> Along with the suggested changes to the streaming API structure I > > > think > > > > we > > > > >> should also rework the "iteration" api. Currently the iteration > api > > > > tries > > > > >> to mimic the syntax of the batch API while the runtime behaviour > is > > > > quite > > > > >> different. > > > > >> > > > > >> What we create instead of iterations is really just cyclic streams > > > > (loops > > > > >> in the streaming job), so the API should somehow be intuitive > about > > > this > > > > >> behaviour. > > > > >> > > > > >> I suggest to remove the explicit iterate call and instead add a > > method > > > > to > > > > >> the StreamOperators that allows to connect feedback inputs (create > > > > loops). > > > > >> It would look like this: > > > > >> > > > > >> A mapper that does nothing but iterates over some filtered input: > > > > >> > > > > >> *Current API :* > > > > >> DataStream source = .. > > > > >> IterativeDataStream it = source.iterate() > > > > >> DataStream mapper = it.map(noOpMapper) > > > > >> DataStream feedback = mapper.filter(...) > > > > >> it.closeWith(feedback) > > > > >> > > > > >> *Suggested API :* > > > > >> DataStream source = .. > > > > >> DataStream mapper = source.map(noOpMapper) > > > > >> DataStream feedback = mapper.filter(...) > > > > >> mapper.addInput(feedback) > > > > >> > > > > >> The suggested approach would let us define inputs to operators > after > > > > they > > > > >> are created and implicitly union them with the normal input. This > > is I > > > > >> think a much clearer approach than what we have now. > > > > >> > > > > >> What do you think? > > > > >> > > > > >> Gyula > > > > >> > > > > > > > > > > > > > > > |
Good points. If we want to structured loops on streaming we will need to inject iteration counters. The question is if we really need structured iterations on plain data streams. Window iterations are must-have on the other hand...
Paris > On 07 Jul 2015, at 16:43, Kostas Tzoumas <[hidden email]> wrote: > > I see. Perhaps more important IMO is defining the semantics of stream loops > with event time. > > The reason I asked about nested is that Naiad and other designs used a > multidimensional timestamp to capture loops: (outer loop counter, inner > loop counter, timestamp). I assume that currently making sense of which > iteration an element comes from is left to the user. Should we aim to > change that with the API redesign? > > > On Tue, Jul 7, 2015 at 4:30 PM, Gyula Fóra <[hidden email]> wrote: > >> Okay, I am fine with this approach as well I see the advantages. Then we >> just need to find a suitable name for marking a "FeedbackPoint" :) >> >> Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. júl. 7., K, >> 16:28): >> >>> In Aljoscha's approach, we would need a special mutable stream. We could >> do >>> it like this: >>> >>> DataStream source = ... >>> >>> FeedbackPoint pt = source.createFeedbackPoint(); >>> >>> DataStream mapper = pt .map(noOpMapper) >>> DataStream feedback = mapper.filter(...) >>> pt .addFeedbacl(feedback) >>> >>> >>> It is basically like the current approach, with different names. >>> >>> I actually like the current approach, because it is explicit where >> streams >>> could be altered in hind-sight (after their definition). >>> >>> >>> On Tue, Jul 7, 2015 at 4:20 PM, Gyula Fóra <[hidden email]> wrote: >>> >>>> @Aljoscha: >>>> Yes, thats basically my point as well. This is what happens now too but >>> we >>>> give this mutable datastream a special name : IterativeDataStream >>>> >>>> This can be handled in very different ways through the api, the goal >>> would >>>> be to make something easy to use. I am fine with what we have now >>> because I >>>> know how it works but it might confuse people to call it iterate. >>>> >>>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. >> 7., >>>> K, >>>> 16:18): >>>> >>>>> I think it could work if we allowed a DataStream to be unioned after >>>>> creation. For example: >>>>> >>>>> DataStream source = .. >>>>> DataStream mapper = source.map(noOpMapper) >>>>> DataStream feedback = mapper.filter(...) >>>>> source.union(feedback) >>>>> >>>>> This would basically mean that a DataStream is mutable and can be >>>> extended >>>>> after creation with more streams. >>>>> >>>>> On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek <[hidden email]> >>>> wrote: >>>>> >>>>>> I think this would be good yes. I was just about to open an Issue >> for >>>>>> changing the Streaming Iteration API. :D >>>>>> >>>>>> Then we should also make the implementation very straightforward >> and >>>>>> simple, right now, the implementation of the iterations is all over >>> the >>>>>> place. >>>>>> >>>>>> On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <[hidden email]> wrote: >>>>>> >>>>>>> Hey, >>>>>>> >>>>>>> Along with the suggested changes to the streaming API structure I >>>> think >>>>> we >>>>>>> should also rework the "iteration" api. Currently the iteration >> api >>>>> tries >>>>>>> to mimic the syntax of the batch API while the runtime behaviour >> is >>>>> quite >>>>>>> different. >>>>>>> >>>>>>> What we create instead of iterations is really just cyclic streams >>>>> (loops >>>>>>> in the streaming job), so the API should somehow be intuitive >> about >>>> this >>>>>>> behaviour. >>>>>>> >>>>>>> I suggest to remove the explicit iterate call and instead add a >>> method >>>>> to >>>>>>> the StreamOperators that allows to connect feedback inputs (create >>>>> loops). >>>>>>> It would look like this: >>>>>>> >>>>>>> A mapper that does nothing but iterates over some filtered input: >>>>>>> >>>>>>> *Current API :* >>>>>>> DataStream source = .. >>>>>>> IterativeDataStream it = source.iterate() >>>>>>> DataStream mapper = it.map(noOpMapper) >>>>>>> DataStream feedback = mapper.filter(...) >>>>>>> it.closeWith(feedback) >>>>>>> >>>>>>> *Suggested API :* >>>>>>> DataStream source = .. >>>>>>> DataStream mapper = source.map(noOpMapper) >>>>>>> DataStream feedback = mapper.filter(...) >>>>>>> mapper.addInput(feedback) >>>>>>> >>>>>>> The suggested approach would let us define inputs to operators >> after >>>>> they >>>>>>> are created and implicitly union them with the normal input. This >>> is I >>>>>>> think a much clearer approach than what we have now. >>>>>>> >>>>>>> What do you think? >>>>>>> >>>>>>> Gyula >>>>>>> >>>>>> >>>>> >>>> >>> >> |
Free forum by Nabble | Edit this page |