Hi,
I'm currently working on making the StreamGraph generation more centralized (i.e. not spread across the different API classes). The question is now why we need to switch to preserve partitioning? Could we not make "preserve" partitioning the default and if users want to have shuffle partitioning or anything they have to specify it manually when adding the feedback edge? This would make for a very simple scheme where the iteration sources are always connected to the heads using "forward" and the tails are connected to the iteration sinks using whatever partitioner was set by the user. This would make it more transparent than the current default of the "shuffle" betweens tails and iteration sinks. Cheers, Aljoscha P.S. I now we had quite some discussion about introducing "preserve partitioning" but now, when I think of it it should be the default... :D |
Hey,
I am not sure what is the intuitive behaviour here. As you are not applying a transformation on the feedback stream but pass it to a closeWith method, I thought it was somehow nature that it gets the partitioning of the iteration input, but maybe its not intuitive. If others also think that preserving feedback partitioning should be the default I am not against it :) Btw, this still won't make it very simple. We still need as many source/sink pairs as we have different parallelism among the head operators. Otherwise the forwarding logic wont work. Cheers, Gyula Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. 31., P, 11:52): > Hi, > I'm currently working on making the StreamGraph generation more centralized > (i.e. not spread across the different API classes). The question is now why > we need to switch to preserve partitioning? Could we not make "preserve" > partitioning the default and if users want to have shuffle partitioning or > anything they have to specify it manually when adding the feedback edge? > > This would make for a very simple scheme where the iteration sources are > always connected to the heads using "forward" and the tails are connected > to the iteration sinks using whatever partitioner was set by the user. This > would make it more transparent than the current default of the "shuffle" > betweens tails and iteration sinks. > > Cheers, > Aljoscha > > P.S. I now we had quite some discussion about introducing "preserve > partitioning" but now, when I think of it it should be the default... :D > |
I thought about having some tighter restrictions here. My idea was to
enforce that the feedback edges must have the same parallelism as the original input stream, otherwise shipping strategies such as "keyBy", "shuffle", "rebalance" don't seem to make sense because they would differ from the distribution of the original elements (at least IMHO). Maybe I'm wrong there, though. To me it seems intuitive that I get the feedback at the head they way I specify it at the tail. But maybe that's also just me... :D On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <[hidden email]> wrote: > Hey, > > I am not sure what is the intuitive behaviour here. As you are not applying > a transformation on the feedback stream but pass it to a closeWith method, > I thought it was somehow nature that it gets the partitioning of the > iteration input, but maybe its not intuitive. > > If others also think that preserving feedback partitioning should be the > default I am not against it :) > > Btw, this still won't make it very simple. We still need as many > source/sink pairs as we have different parallelism among the head > operators. Otherwise the forwarding logic wont work. > > Cheers, > Gyula > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. 31., > P, 11:52): > > > Hi, > > I'm currently working on making the StreamGraph generation more > centralized > > (i.e. not spread across the different API classes). The question is now > why > > we need to switch to preserve partitioning? Could we not make "preserve" > > partitioning the default and if users want to have shuffle partitioning > or > > anything they have to specify it manually when adding the feedback edge? > > > > This would make for a very simple scheme where the iteration sources are > > always connected to the heads using "forward" and the tails are connected > > to the iteration sinks using whatever partitioner was set by the user. > This > > would make it more transparent than the current default of the "shuffle" > > betweens tails and iteration sinks. > > > > Cheers, > > Aljoscha > > > > P.S. I now we had quite some discussion about introducing "preserve > > partitioning" but now, when I think of it it should be the default... :D > > > |
I mean that the head operators have different parallelism:
IterativeDataStream ids = ... ids.map().setParallelism(2) ids.map().setParallelism(4) //... ids.closeWith(feedback) Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. 31., P, 14:23): > I thought about having some tighter restrictions here. My idea was to > enforce that the feedback edges must have the same parallelism as the > original input stream, otherwise shipping strategies such as "keyBy", > "shuffle", "rebalance" don't seem to make sense because they would differ > from the distribution of the original elements (at least IMHO). Maybe I'm > wrong there, though. > > To me it seems intuitive that I get the feedback at the head they way I > specify it at the tail. But maybe that's also just me... :D > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <[hidden email]> wrote: > > > Hey, > > > > I am not sure what is the intuitive behaviour here. As you are not > applying > > a transformation on the feedback stream but pass it to a closeWith > method, > > I thought it was somehow nature that it gets the partitioning of the > > iteration input, but maybe its not intuitive. > > > > If others also think that preserving feedback partitioning should be the > > default I am not against it :) > > > > Btw, this still won't make it very simple. We still need as many > > source/sink pairs as we have different parallelism among the head > > operators. Otherwise the forwarding logic wont work. > > > > Cheers, > > Gyula > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > 31., > > P, 11:52): > > > > > Hi, > > > I'm currently working on making the StreamGraph generation more > > centralized > > > (i.e. not spread across the different API classes). The question is now > > why > > > we need to switch to preserve partitioning? Could we not make > "preserve" > > > partitioning the default and if users want to have shuffle partitioning > > or > > > anything they have to specify it manually when adding the feedback > edge? > > > > > > This would make for a very simple scheme where the iteration sources > are > > > always connected to the heads using "forward" and the tails are > connected > > > to the iteration sinks using whatever partitioner was set by the user. > > This > > > would make it more transparent than the current default of the > "shuffle" > > > betweens tails and iteration sinks. > > > > > > Cheers, > > > Aljoscha > > > > > > P.S. I now we had quite some discussion about introducing "preserve > > > partitioning" but now, when I think of it it should be the default... > :D > > > > > > |
Yes, this would still work. For example, I have this crazy graph:
http://postimg.org/image/xtv8ay8hv/full/ That results from this program: https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 It works, and the implementation is very simple, actually. On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <[hidden email]> wrote: > I mean that the head operators have different parallelism: > > IterativeDataStream ids = ... > > ids.map().setParallelism(2) > ids.map().setParallelism(4) > > //... > > ids.closeWith(feedback) > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. 31., > P, 14:23): > > > I thought about having some tighter restrictions here. My idea was to > > enforce that the feedback edges must have the same parallelism as the > > original input stream, otherwise shipping strategies such as "keyBy", > > "shuffle", "rebalance" don't seem to make sense because they would differ > > from the distribution of the original elements (at least IMHO). Maybe I'm > > wrong there, though. > > > > To me it seems intuitive that I get the feedback at the head they way I > > specify it at the tail. But maybe that's also just me... :D > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <[hidden email]> wrote: > > > > > Hey, > > > > > > I am not sure what is the intuitive behaviour here. As you are not > > applying > > > a transformation on the feedback stream but pass it to a closeWith > > method, > > > I thought it was somehow nature that it gets the partitioning of the > > > iteration input, but maybe its not intuitive. > > > > > > If others also think that preserving feedback partitioning should be > the > > > default I am not against it :) > > > > > > Btw, this still won't make it very simple. We still need as many > > > source/sink pairs as we have different parallelism among the head > > > operators. Otherwise the forwarding logic wont work. > > > > > > Cheers, > > > Gyula > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > > 31., > > > P, 11:52): > > > > > > > Hi, > > > > I'm currently working on making the StreamGraph generation more > > > centralized > > > > (i.e. not spread across the different API classes). The question is > now > > > why > > > > we need to switch to preserve partitioning? Could we not make > > "preserve" > > > > partitioning the default and if users want to have shuffle > partitioning > > > or > > > > anything they have to specify it manually when adding the feedback > > edge? > > > > > > > > This would make for a very simple scheme where the iteration sources > > are > > > > always connected to the heads using "forward" and the tails are > > connected > > > > to the iteration sinks using whatever partitioner was set by the > user. > > > This > > > > would make it more transparent than the current default of the > > "shuffle" > > > > betweens tails and iteration sinks. > > > > > > > > Cheers, > > > > Aljoscha > > > > > > > > P.S. I now we had quite some discussion about introducing "preserve > > > > partitioning" but now, when I think of it it should be the default... > > :D > > > > > > > > > > |
I still don't get how it could possibly work, let me tell you how I see and
correct me in my logic :) You have this program: ids.map1().setParallelism(2) ids.map2().setParallelism(4) //... ids.closeWith(feedback.groupBy(0)) You are suggesting that we only have one iteration source/sink pair with parallelism of either 2 or 4. I will assume that the parallelism is 2 for the sake of the argument. The iteration source is connected to map1 and map2 with Forward partitioning and the sink is connected with groupBy(0). Each sink instance will receive all tuples of a given key which also means that each iteration source instance (2) will too. Now here comes the problem: the source will forward the tuples to map 1 and since we have forward connection we maintiain the groupby semantics (this is perfect.) the sources will also forward to map 2 which has higher parallelism so the tuple sending turns into round robin, which screws up the groupby. What did I miss? Gyula Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. 31., P, 14:59): > Yes, this would still work. For example, I have this crazy graph: > http://postimg.org/image/xtv8ay8hv/full/ That results from this program: > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > It works, and the implementation is very simple, actually. > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <[hidden email]> wrote: > > > I mean that the head operators have different parallelism: > > > > IterativeDataStream ids = ... > > > > ids.map().setParallelism(2) > > ids.map().setParallelism(4) > > > > //... > > > > ids.closeWith(feedback) > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > 31., > > P, 14:23): > > > > > I thought about having some tighter restrictions here. My idea was to > > > enforce that the feedback edges must have the same parallelism as the > > > original input stream, otherwise shipping strategies such as "keyBy", > > > "shuffle", "rebalance" don't seem to make sense because they would > differ > > > from the distribution of the original elements (at least IMHO). Maybe > I'm > > > wrong there, though. > > > > > > To me it seems intuitive that I get the feedback at the head they way I > > > specify it at the tail. But maybe that's also just me... :D > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <[hidden email]> wrote: > > > > > > > Hey, > > > > > > > > I am not sure what is the intuitive behaviour here. As you are not > > > applying > > > > a transformation on the feedback stream but pass it to a closeWith > > > method, > > > > I thought it was somehow nature that it gets the partitioning of the > > > > iteration input, but maybe its not intuitive. > > > > > > > > If others also think that preserving feedback partitioning should be > > the > > > > default I am not against it :) > > > > > > > > Btw, this still won't make it very simple. We still need as many > > > > source/sink pairs as we have different parallelism among the head > > > > operators. Otherwise the forwarding logic wont work. > > > > > > > > Cheers, > > > > Gyula > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > > > 31., > > > > P, 11:52): > > > > > > > > > Hi, > > > > > I'm currently working on making the StreamGraph generation more > > > > centralized > > > > > (i.e. not spread across the different API classes). The question is > > now > > > > why > > > > > we need to switch to preserve partitioning? Could we not make > > > "preserve" > > > > > partitioning the default and if users want to have shuffle > > partitioning > > > > or > > > > > anything they have to specify it manually when adding the feedback > > > edge? > > > > > > > > > > This would make for a very simple scheme where the iteration > sources > > > are > > > > > always connected to the heads using "forward" and the tails are > > > connected > > > > > to the iteration sinks using whatever partitioner was set by the > > user. > > > > This > > > > > would make it more transparent than the current default of the > > > "shuffle" > > > > > betweens tails and iteration sinks. > > > > > > > > > > Cheers, > > > > > Aljoscha > > > > > > > > > > P.S. I now we had quite some discussion about introducing "preserve > > > > > partitioning" but now, when I think of it it should be the > default... > > > :D > > > > > > > > > > > > > > > |
Yes, I'm not saying that it makes sense to do it, I'm just saying that it
does translate and run. Your observation is true. :D I'm wondering whether it makes sense to allow users to have iteration heads with differing parallelism, in fact. On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <[hidden email]> wrote: > I still don't get how it could possibly work, let me tell you how I see and > correct me in my logic :) > > You have this program: > ids.map1().setParallelism(2) > ids.map2().setParallelism(4) > > //... > > ids.closeWith(feedback.groupBy(0)) > > You are suggesting that we only have one iteration source/sink pair with > parallelism of either 2 or 4. I will assume that the parallelism is 2 for > the sake of the argument. > > The iteration source is connected to map1 and map2 with Forward > partitioning and the sink is connected with groupBy(0). > Each sink instance will receive all tuples of a given key which also means > that each iteration source instance (2) will too. > > Now here comes the problem: the source will forward the tuples to map 1 and > since we have forward connection we maintiain the groupby semantics (this > is perfect.) the sources will also forward to map 2 which has higher > parallelism so the tuple sending turns into round robin, which screws up > the groupby. > > What did I miss? > Gyula > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. 31., > P, 14:59): > > > Yes, this would still work. For example, I have this crazy graph: > > http://postimg.org/image/xtv8ay8hv/full/ That results from this program: > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > > > It works, and the implementation is very simple, actually. > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <[hidden email]> wrote: > > > > > I mean that the head operators have different parallelism: > > > > > > IterativeDataStream ids = ... > > > > > > ids.map().setParallelism(2) > > > ids.map().setParallelism(4) > > > > > > //... > > > > > > ids.closeWith(feedback) > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > > 31., > > > P, 14:23): > > > > > > > I thought about having some tighter restrictions here. My idea was to > > > > enforce that the feedback edges must have the same parallelism as the > > > > original input stream, otherwise shipping strategies such as "keyBy", > > > > "shuffle", "rebalance" don't seem to make sense because they would > > differ > > > > from the distribution of the original elements (at least IMHO). Maybe > > I'm > > > > wrong there, though. > > > > > > > > To me it seems intuitive that I get the feedback at the head they > way I > > > > specify it at the tail. But maybe that's also just me... :D > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <[hidden email]> wrote: > > > > > > > > > Hey, > > > > > > > > > > I am not sure what is the intuitive behaviour here. As you are not > > > > applying > > > > > a transformation on the feedback stream but pass it to a closeWith > > > > method, > > > > > I thought it was somehow nature that it gets the partitioning of > the > > > > > iteration input, but maybe its not intuitive. > > > > > > > > > > If others also think that preserving feedback partitioning should > be > > > the > > > > > default I am not against it :) > > > > > > > > > > Btw, this still won't make it very simple. We still need as many > > > > > source/sink pairs as we have different parallelism among the head > > > > > operators. Otherwise the forwarding logic wont work. > > > > > > > > > > Cheers, > > > > > Gyula > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. > júl. > > > > 31., > > > > > P, 11:52): > > > > > > > > > > > Hi, > > > > > > I'm currently working on making the StreamGraph generation more > > > > > centralized > > > > > > (i.e. not spread across the different API classes). The question > is > > > now > > > > > why > > > > > > we need to switch to preserve partitioning? Could we not make > > > > "preserve" > > > > > > partitioning the default and if users want to have shuffle > > > partitioning > > > > > or > > > > > > anything they have to specify it manually when adding the > feedback > > > > edge? > > > > > > > > > > > > This would make for a very simple scheme where the iteration > > sources > > > > are > > > > > > always connected to the heads using "forward" and the tails are > > > > connected > > > > > > to the iteration sinks using whatever partitioner was set by the > > > user. > > > > > This > > > > > > would make it more transparent than the current default of the > > > > "shuffle" > > > > > > betweens tails and iteration sinks. > > > > > > > > > > > > Cheers, > > > > > > Aljoscha > > > > > > > > > > > > P.S. I now we had quite some discussion about introducing > "preserve > > > > > > partitioning" but now, when I think of it it should be the > > default... > > > > :D > > > > > > > > > > > > > > > > > > > > > |
There might be reasons why a user would want different parallelism at the
head operators (depending on what else that head operator might process) so restricting them to the same parallelism is a little bit weird don't you think? It kind of goes against the whole opeartors-parallelism idea. I don't think its a huge complexity to group head operators together by parallelism and add a source/sink per each group like we do now. What do you say? Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. 31., P, 17:10): > Yes, I'm not saying that it makes sense to do it, I'm just saying that it > does translate and run. Your observation is true. :D > > I'm wondering whether it makes sense to allow users to have iteration heads > with differing parallelism, in fact. > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <[hidden email]> wrote: > > > I still don't get how it could possibly work, let me tell you how I see > and > > correct me in my logic :) > > > > You have this program: > > ids.map1().setParallelism(2) > > ids.map2().setParallelism(4) > > > > //... > > > > ids.closeWith(feedback.groupBy(0)) > > > > You are suggesting that we only have one iteration source/sink pair with > > parallelism of either 2 or 4. I will assume that the parallelism is 2 for > > the sake of the argument. > > > > The iteration source is connected to map1 and map2 with Forward > > partitioning and the sink is connected with groupBy(0). > > Each sink instance will receive all tuples of a given key which also > means > > that each iteration source instance (2) will too. > > > > Now here comes the problem: the source will forward the tuples to map 1 > and > > since we have forward connection we maintiain the groupby semantics (this > > is perfect.) the sources will also forward to map 2 which has higher > > parallelism so the tuple sending turns into round robin, which screws up > > the groupby. > > > > What did I miss? > > Gyula > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > 31., > > P, 14:59): > > > > > Yes, this would still work. For example, I have this crazy graph: > > > http://postimg.org/image/xtv8ay8hv/full/ That results from this > program: > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > > > > > It works, and the implementation is very simple, actually. > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <[hidden email]> wrote: > > > > > > > I mean that the head operators have different parallelism: > > > > > > > > IterativeDataStream ids = ... > > > > > > > > ids.map().setParallelism(2) > > > > ids.map().setParallelism(4) > > > > > > > > //... > > > > > > > > ids.closeWith(feedback) > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > > > 31., > > > > P, 14:23): > > > > > > > > > I thought about having some tighter restrictions here. My idea was > to > > > > > enforce that the feedback edges must have the same parallelism as > the > > > > > original input stream, otherwise shipping strategies such as > "keyBy", > > > > > "shuffle", "rebalance" don't seem to make sense because they would > > > differ > > > > > from the distribution of the original elements (at least IMHO). > Maybe > > > I'm > > > > > wrong there, though. > > > > > > > > > > To me it seems intuitive that I get the feedback at the head they > > way I > > > > > specify it at the tail. But maybe that's also just me... :D > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <[hidden email]> wrote: > > > > > > > > > > > Hey, > > > > > > > > > > > > I am not sure what is the intuitive behaviour here. As you are > not > > > > > applying > > > > > > a transformation on the feedback stream but pass it to a > closeWith > > > > > method, > > > > > > I thought it was somehow nature that it gets the partitioning of > > the > > > > > > iteration input, but maybe its not intuitive. > > > > > > > > > > > > If others also think that preserving feedback partitioning should > > be > > > > the > > > > > > default I am not against it :) > > > > > > > > > > > > Btw, this still won't make it very simple. We still need as many > > > > > > source/sink pairs as we have different parallelism among the head > > > > > > operators. Otherwise the forwarding logic wont work. > > > > > > > > > > > > Cheers, > > > > > > Gyula > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. > > júl. > > > > > 31., > > > > > > P, 11:52): > > > > > > > > > > > > > Hi, > > > > > > > I'm currently working on making the StreamGraph generation more > > > > > > centralized > > > > > > > (i.e. not spread across the different API classes). The > question > > is > > > > now > > > > > > why > > > > > > > we need to switch to preserve partitioning? Could we not make > > > > > "preserve" > > > > > > > partitioning the default and if users want to have shuffle > > > > partitioning > > > > > > or > > > > > > > anything they have to specify it manually when adding the > > feedback > > > > > edge? > > > > > > > > > > > > > > This would make for a very simple scheme where the iteration > > > sources > > > > > are > > > > > > > always connected to the heads using "forward" and the tails are > > > > > connected > > > > > > > to the iteration sinks using whatever partitioner was set by > the > > > > user. > > > > > > This > > > > > > > would make it more transparent than the current default of the > > > > > "shuffle" > > > > > > > betweens tails and iteration sinks. > > > > > > > > > > > > > > Cheers, > > > > > > > Aljoscha > > > > > > > > > > > > > > P.S. I now we had quite some discussion about introducing > > "preserve > > > > > > > partitioning" but now, when I think of it it should be the > > > default... > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Sure it can be done, it's just more complex if you try to do it in a sane
way without having the code that builds the StreamGraph all over the place. :D I'll try to come up with something. This is my current work in progress, by the way: https://github.com/aljoscha/flink/tree/stream-api-rework I managed to ban the StreamGraph from StreamExecutionEnvironment and the API classes such as DataStream. The API methods construct a Graph of Transformation Nodes and don't contain any information themselves. Then there is a StreamGraphGenerator that builds a StreamGraph from the transformations. The abstraction is very nice and simple, the only problem that remains are the differing-parallelism-iterations but I'll figure them out. P.S. The code is not well documented yet, but the base class for transformations is StreamTransformation. From there anyone who want's to check it out can find the other transformations. On Fri, 31 Jul 2015 at 17:17 Gyula Fóra <[hidden email]> wrote: > There might be reasons why a user would want different parallelism at the > head operators (depending on what else that head operator might process) so > restricting them to the same parallelism is a little bit weird don't you > think? It kind of goes against the whole opeartors-parallelism idea. > > I don't think its a huge complexity to group head operators together by > parallelism and add a source/sink per each group like we do now. What do > you say? > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. 31., > P, 17:10): > > > Yes, I'm not saying that it makes sense to do it, I'm just saying that it > > does translate and run. Your observation is true. :D > > > > I'm wondering whether it makes sense to allow users to have iteration > heads > > with differing parallelism, in fact. > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <[hidden email]> wrote: > > > > > I still don't get how it could possibly work, let me tell you how I see > > and > > > correct me in my logic :) > > > > > > You have this program: > > > ids.map1().setParallelism(2) > > > ids.map2().setParallelism(4) > > > > > > //... > > > > > > ids.closeWith(feedback.groupBy(0)) > > > > > > You are suggesting that we only have one iteration source/sink pair > with > > > parallelism of either 2 or 4. I will assume that the parallelism is 2 > for > > > the sake of the argument. > > > > > > The iteration source is connected to map1 and map2 with Forward > > > partitioning and the sink is connected with groupBy(0). > > > Each sink instance will receive all tuples of a given key which also > > means > > > that each iteration source instance (2) will too. > > > > > > Now here comes the problem: the source will forward the tuples to map 1 > > and > > > since we have forward connection we maintiain the groupby semantics > (this > > > is perfect.) the sources will also forward to map 2 which has higher > > > parallelism so the tuple sending turns into round robin, which screws > up > > > the groupby. > > > > > > What did I miss? > > > Gyula > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > > 31., > > > P, 14:59): > > > > > > > Yes, this would still work. For example, I have this crazy graph: > > > > http://postimg.org/image/xtv8ay8hv/full/ That results from this > > program: > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > > > > > > > It works, and the implementation is very simple, actually. > > > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <[hidden email]> > wrote: > > > > > > > > > I mean that the head operators have different parallelism: > > > > > > > > > > IterativeDataStream ids = ... > > > > > > > > > > ids.map().setParallelism(2) > > > > > ids.map().setParallelism(4) > > > > > > > > > > //... > > > > > > > > > > ids.closeWith(feedback) > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. > júl. > > > > 31., > > > > > P, 14:23): > > > > > > > > > > > I thought about having some tighter restrictions here. My idea > was > > to > > > > > > enforce that the feedback edges must have the same parallelism as > > the > > > > > > original input stream, otherwise shipping strategies such as > > "keyBy", > > > > > > "shuffle", "rebalance" don't seem to make sense because they > would > > > > differ > > > > > > from the distribution of the original elements (at least IMHO). > > Maybe > > > > I'm > > > > > > wrong there, though. > > > > > > > > > > > > To me it seems intuitive that I get the feedback at the head they > > > way I > > > > > > specify it at the tail. But maybe that's also just me... :D > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <[hidden email]> > wrote: > > > > > > > > > > > > > Hey, > > > > > > > > > > > > > > I am not sure what is the intuitive behaviour here. As you are > > not > > > > > > applying > > > > > > > a transformation on the feedback stream but pass it to a > > closeWith > > > > > > method, > > > > > > > I thought it was somehow nature that it gets the partitioning > of > > > the > > > > > > > iteration input, but maybe its not intuitive. > > > > > > > > > > > > > > If others also think that preserving feedback partitioning > should > > > be > > > > > the > > > > > > > default I am not against it :) > > > > > > > > > > > > > > Btw, this still won't make it very simple. We still need as > many > > > > > > > source/sink pairs as we have different parallelism among the > head > > > > > > > operators. Otherwise the forwarding logic wont work. > > > > > > > > > > > > > > Cheers, > > > > > > > Gyula > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: > 2015. > > > júl. > > > > > > 31., > > > > > > > P, 11:52): > > > > > > > > > > > > > > > Hi, > > > > > > > > I'm currently working on making the StreamGraph generation > more > > > > > > > centralized > > > > > > > > (i.e. not spread across the different API classes). The > > question > > > is > > > > > now > > > > > > > why > > > > > > > > we need to switch to preserve partitioning? Could we not make > > > > > > "preserve" > > > > > > > > partitioning the default and if users want to have shuffle > > > > > partitioning > > > > > > > or > > > > > > > > anything they have to specify it manually when adding the > > > feedback > > > > > > edge? > > > > > > > > > > > > > > > > This would make for a very simple scheme where the iteration > > > > sources > > > > > > are > > > > > > > > always connected to the heads using "forward" and the tails > are > > > > > > connected > > > > > > > > to the iteration sinks using whatever partitioner was set by > > the > > > > > user. > > > > > > > This > > > > > > > > would make it more transparent than the current default of > the > > > > > > "shuffle" > > > > > > > > betweens tails and iteration sinks. > > > > > > > > > > > > > > > > Cheers, > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > P.S. I now we had quite some discussion about introducing > > > "preserve > > > > > > > > partitioning" but now, when I think of it it should be the > > > > default... > > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Maybe you can reuse some of the logic that is currently there on the
StreamGraph, with building StreamLoops first which will be used to generate the sources and sinks right before building the JobGraph. This avoids the need of knowing everything beforehand. I actually added this to avoid the complexities that you are probably facing now. Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. 31., P, 17:28): > Sure it can be done, it's just more complex if you try to do it in a sane > way without having the code that builds the StreamGraph all over the place. > :D > > I'll try to come up with something. This is my current work in progress, by > the way: https://github.com/aljoscha/flink/tree/stream-api-rework > > I managed to ban the StreamGraph from StreamExecutionEnvironment and the > API classes such as DataStream. The API methods construct a Graph of > Transformation Nodes and don't contain any information themselves. Then > there is a StreamGraphGenerator that builds a StreamGraph from the > transformations. The abstraction is very nice and simple, the only problem > that remains are the differing-parallelism-iterations but I'll figure them > out. > > P.S. The code is not well documented yet, but the base class for > transformations is StreamTransformation. From there anyone who want's to > check it out can find the other transformations. > > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra <[hidden email]> wrote: > > > There might be reasons why a user would want different parallelism at the > > head operators (depending on what else that head operator might process) > so > > restricting them to the same parallelism is a little bit weird don't you > > think? It kind of goes against the whole opeartors-parallelism idea. > > > > I don't think its a huge complexity to group head operators together by > > parallelism and add a source/sink per each group like we do now. What do > > you say? > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > 31., > > P, 17:10): > > > > > Yes, I'm not saying that it makes sense to do it, I'm just saying that > it > > > does translate and run. Your observation is true. :D > > > > > > I'm wondering whether it makes sense to allow users to have iteration > > heads > > > with differing parallelism, in fact. > > > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <[hidden email]> wrote: > > > > > > > I still don't get how it could possibly work, let me tell you how I > see > > > and > > > > correct me in my logic :) > > > > > > > > You have this program: > > > > ids.map1().setParallelism(2) > > > > ids.map2().setParallelism(4) > > > > > > > > //... > > > > > > > > ids.closeWith(feedback.groupBy(0)) > > > > > > > > You are suggesting that we only have one iteration source/sink pair > > with > > > > parallelism of either 2 or 4. I will assume that the parallelism is 2 > > for > > > > the sake of the argument. > > > > > > > > The iteration source is connected to map1 and map2 with Forward > > > > partitioning and the sink is connected with groupBy(0). > > > > Each sink instance will receive all tuples of a given key which also > > > means > > > > that each iteration source instance (2) will too. > > > > > > > > Now here comes the problem: the source will forward the tuples to > map 1 > > > and > > > > since we have forward connection we maintiain the groupby semantics > > (this > > > > is perfect.) the sources will also forward to map 2 which has higher > > > > parallelism so the tuple sending turns into round robin, which screws > > up > > > > the groupby. > > > > > > > > What did I miss? > > > > Gyula > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > > > 31., > > > > P, 14:59): > > > > > > > > > Yes, this would still work. For example, I have this crazy graph: > > > > > http://postimg.org/image/xtv8ay8hv/full/ That results from this > > > program: > > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > > > > > > > > > It works, and the implementation is very simple, actually. > > > > > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <[hidden email]> > > wrote: > > > > > > > > > > > I mean that the head operators have different parallelism: > > > > > > > > > > > > IterativeDataStream ids = ... > > > > > > > > > > > > ids.map().setParallelism(2) > > > > > > ids.map().setParallelism(4) > > > > > > > > > > > > //... > > > > > > > > > > > > ids.closeWith(feedback) > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. > > júl. > > > > > 31., > > > > > > P, 14:23): > > > > > > > > > > > > > I thought about having some tighter restrictions here. My idea > > was > > > to > > > > > > > enforce that the feedback edges must have the same parallelism > as > > > the > > > > > > > original input stream, otherwise shipping strategies such as > > > "keyBy", > > > > > > > "shuffle", "rebalance" don't seem to make sense because they > > would > > > > > differ > > > > > > > from the distribution of the original elements (at least IMHO). > > > Maybe > > > > > I'm > > > > > > > wrong there, though. > > > > > > > > > > > > > > To me it seems intuitive that I get the feedback at the head > they > > > > way I > > > > > > > specify it at the tail. But maybe that's also just me... :D > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <[hidden email]> > > wrote: > > > > > > > > > > > > > > > Hey, > > > > > > > > > > > > > > > > I am not sure what is the intuitive behaviour here. As you > are > > > not > > > > > > > applying > > > > > > > > a transformation on the feedback stream but pass it to a > > > closeWith > > > > > > > method, > > > > > > > > I thought it was somehow nature that it gets the partitioning > > of > > > > the > > > > > > > > iteration input, but maybe its not intuitive. > > > > > > > > > > > > > > > > If others also think that preserving feedback partitioning > > should > > > > be > > > > > > the > > > > > > > > default I am not against it :) > > > > > > > > > > > > > > > > Btw, this still won't make it very simple. We still need as > > many > > > > > > > > source/sink pairs as we have different parallelism among the > > head > > > > > > > > operators. Otherwise the forwarding logic wont work. > > > > > > > > > > > > > > > > Cheers, > > > > > > > > Gyula > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: > > 2015. > > > > júl. > > > > > > > 31., > > > > > > > > P, 11:52): > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > I'm currently working on making the StreamGraph generation > > more > > > > > > > > centralized > > > > > > > > > (i.e. not spread across the different API classes). The > > > question > > > > is > > > > > > now > > > > > > > > why > > > > > > > > > we need to switch to preserve partitioning? Could we not > make > > > > > > > "preserve" > > > > > > > > > partitioning the default and if users want to have shuffle > > > > > > partitioning > > > > > > > > or > > > > > > > > > anything they have to specify it manually when adding the > > > > feedback > > > > > > > edge? > > > > > > > > > > > > > > > > > > This would make for a very simple scheme where the > iteration > > > > > sources > > > > > > > are > > > > > > > > > always connected to the heads using "forward" and the tails > > are > > > > > > > connected > > > > > > > > > to the iteration sinks using whatever partitioner was set > by > > > the > > > > > > user. > > > > > > > > This > > > > > > > > > would make it more transparent than the current default of > > the > > > > > > > "shuffle" > > > > > > > > > betweens tails and iteration sinks. > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > > > P.S. I now we had quite some discussion about introducing > > > > "preserve > > > > > > > > > partitioning" but now, when I think of it it should be the > > > > > default... > > > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
I don't get the discussion here, can you help me with what you mean by
"different iteration heads and tails" ? An iteration does not have one parallel head and one parallel tail? On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra <[hidden email]> wrote: > Maybe you can reuse some of the logic that is currently there on the > StreamGraph, with building StreamLoops first which will be used to generate > the sources and sinks right before building the JobGraph. This avoids the > need of knowing everything beforehand. > > I actually added this to avoid the complexities that you are probably > facing now. > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. 31., > P, 17:28): > > > Sure it can be done, it's just more complex if you try to do it in a sane > > way without having the code that builds the StreamGraph all over the > place. > > :D > > > > I'll try to come up with something. This is my current work in progress, > by > > the way: https://github.com/aljoscha/flink/tree/stream-api-rework > > > > I managed to ban the StreamGraph from StreamExecutionEnvironment and the > > API classes such as DataStream. The API methods construct a Graph of > > Transformation Nodes and don't contain any information themselves. Then > > there is a StreamGraphGenerator that builds a StreamGraph from the > > transformations. The abstraction is very nice and simple, the only > problem > > that remains are the differing-parallelism-iterations but I'll figure > them > > out. > > > > P.S. The code is not well documented yet, but the base class for > > transformations is StreamTransformation. From there anyone who want's to > > check it out can find the other transformations. > > > > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra <[hidden email]> wrote: > > > > > There might be reasons why a user would want different parallelism at > the > > > head operators (depending on what else that head operator might > process) > > so > > > restricting them to the same parallelism is a little bit weird don't > you > > > think? It kind of goes against the whole opeartors-parallelism idea. > > > > > > I don't think its a huge complexity to group head operators together by > > > parallelism and add a source/sink per each group like we do now. What > do > > > you say? > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > > 31., > > > P, 17:10): > > > > > > > Yes, I'm not saying that it makes sense to do it, I'm just saying > that > > it > > > > does translate and run. Your observation is true. :D > > > > > > > > I'm wondering whether it makes sense to allow users to have iteration > > > heads > > > > with differing parallelism, in fact. > > > > > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <[hidden email]> > wrote: > > > > > > > > > I still don't get how it could possibly work, let me tell you how I > > see > > > > and > > > > > correct me in my logic :) > > > > > > > > > > You have this program: > > > > > ids.map1().setParallelism(2) > > > > > ids.map2().setParallelism(4) > > > > > > > > > > //... > > > > > > > > > > ids.closeWith(feedback.groupBy(0)) > > > > > > > > > > You are suggesting that we only have one iteration source/sink pair > > > with > > > > > parallelism of either 2 or 4. I will assume that the parallelism > is 2 > > > for > > > > > the sake of the argument. > > > > > > > > > > The iteration source is connected to map1 and map2 with Forward > > > > > partitioning and the sink is connected with groupBy(0). > > > > > Each sink instance will receive all tuples of a given key which > also > > > > means > > > > > that each iteration source instance (2) will too. > > > > > > > > > > Now here comes the problem: the source will forward the tuples to > > map 1 > > > > and > > > > > since we have forward connection we maintiain the groupby semantics > > > (this > > > > > is perfect.) the sources will also forward to map 2 which has > higher > > > > > parallelism so the tuple sending turns into round robin, which > screws > > > up > > > > > the groupby. > > > > > > > > > > What did I miss? > > > > > Gyula > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. > júl. > > > > 31., > > > > > P, 14:59): > > > > > > > > > > > Yes, this would still work. For example, I have this crazy graph: > > > > > > http://postimg.org/image/xtv8ay8hv/full/ That results from this > > > > program: > > > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > > > > > > > > > > > It works, and the implementation is very simple, actually. > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <[hidden email]> > > > wrote: > > > > > > > > > > > > > I mean that the head operators have different parallelism: > > > > > > > > > > > > > > IterativeDataStream ids = ... > > > > > > > > > > > > > > ids.map().setParallelism(2) > > > > > > > ids.map().setParallelism(4) > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > ids.closeWith(feedback) > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: > 2015. > > > júl. > > > > > > 31., > > > > > > > P, 14:23): > > > > > > > > > > > > > > > I thought about having some tighter restrictions here. My > idea > > > was > > > > to > > > > > > > > enforce that the feedback edges must have the same > parallelism > > as > > > > the > > > > > > > > original input stream, otherwise shipping strategies such as > > > > "keyBy", > > > > > > > > "shuffle", "rebalance" don't seem to make sense because they > > > would > > > > > > differ > > > > > > > > from the distribution of the original elements (at least > IMHO). > > > > Maybe > > > > > > I'm > > > > > > > > wrong there, though. > > > > > > > > > > > > > > > > To me it seems intuitive that I get the feedback at the head > > they > > > > > way I > > > > > > > > specify it at the tail. But maybe that's also just me... :D > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <[hidden email]> > > > wrote: > > > > > > > > > > > > > > > > > Hey, > > > > > > > > > > > > > > > > > > I am not sure what is the intuitive behaviour here. As you > > are > > > > not > > > > > > > > applying > > > > > > > > > a transformation on the feedback stream but pass it to a > > > > closeWith > > > > > > > > method, > > > > > > > > > I thought it was somehow nature that it gets the > partitioning > > > of > > > > > the > > > > > > > > > iteration input, but maybe its not intuitive. > > > > > > > > > > > > > > > > > > If others also think that preserving feedback partitioning > > > should > > > > > be > > > > > > > the > > > > > > > > > default I am not against it :) > > > > > > > > > > > > > > > > > > Btw, this still won't make it very simple. We still need as > > > many > > > > > > > > > source/sink pairs as we have different parallelism among > the > > > head > > > > > > > > > operators. Otherwise the forwarding logic wont work. > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: > > > 2015. > > > > > júl. > > > > > > > > 31., > > > > > > > > > P, 11:52): > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > I'm currently working on making the StreamGraph > generation > > > more > > > > > > > > > centralized > > > > > > > > > > (i.e. not spread across the different API classes). The > > > > question > > > > > is > > > > > > > now > > > > > > > > > why > > > > > > > > > > we need to switch to preserve partitioning? Could we not > > make > > > > > > > > "preserve" > > > > > > > > > > partitioning the default and if users want to have > shuffle > > > > > > > partitioning > > > > > > > > > or > > > > > > > > > > anything they have to specify it manually when adding the > > > > > feedback > > > > > > > > edge? > > > > > > > > > > > > > > > > > > > > This would make for a very simple scheme where the > > iteration > > > > > > sources > > > > > > > > are > > > > > > > > > > always connected to the heads using "forward" and the > tails > > > are > > > > > > > > connected > > > > > > > > > > to the iteration sinks using whatever partitioner was set > > by > > > > the > > > > > > > user. > > > > > > > > > This > > > > > > > > > > would make it more transparent than the current default > of > > > the > > > > > > > > "shuffle" > > > > > > > > > > betweens tails and iteration sinks. > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > > > > > P.S. I now we had quite some discussion about introducing > > > > > "preserve > > > > > > > > > > partitioning" but now, when I think of it it should be > the > > > > > > default... > > > > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
In a streaming program when we create an IterativeDataStream, we
practically mark the union point of some later feedback stream (the one passed in to closeWith(..)). The operators applied on this IterativeDataStream will receive the feedback input as well. We call the operators applied on the iterative dataStream head operators. We call the operators that produce the streams passed into closeWith tail operators. With this terminology we can have many heads and tails with varying parallelism. Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. aug. 2., V, 20:16): > I don't get the discussion here, can you help me with what you mean by > "different iteration heads and tails" ? > > An iteration does not have one parallel head and one parallel tail? > > On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra <[hidden email]> wrote: > > > Maybe you can reuse some of the logic that is currently there on the > > StreamGraph, with building StreamLoops first which will be used to > generate > > the sources and sinks right before building the JobGraph. This avoids the > > need of knowing everything beforehand. > > > > I actually added this to avoid the complexities that you are probably > > facing now. > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > 31., > > P, 17:28): > > > > > Sure it can be done, it's just more complex if you try to do it in a > sane > > > way without having the code that builds the StreamGraph all over the > > place. > > > :D > > > > > > I'll try to come up with something. This is my current work in > progress, > > by > > > the way: https://github.com/aljoscha/flink/tree/stream-api-rework > > > > > > I managed to ban the StreamGraph from StreamExecutionEnvironment and > the > > > API classes such as DataStream. The API methods construct a Graph of > > > Transformation Nodes and don't contain any information themselves. Then > > > there is a StreamGraphGenerator that builds a StreamGraph from the > > > transformations. The abstraction is very nice and simple, the only > > problem > > > that remains are the differing-parallelism-iterations but I'll figure > > them > > > out. > > > > > > P.S. The code is not well documented yet, but the base class for > > > transformations is StreamTransformation. From there anyone who want's > to > > > check it out can find the other transformations. > > > > > > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra <[hidden email]> wrote: > > > > > > > There might be reasons why a user would want different parallelism at > > the > > > > head operators (depending on what else that head operator might > > process) > > > so > > > > restricting them to the same parallelism is a little bit weird don't > > you > > > > think? It kind of goes against the whole opeartors-parallelism idea. > > > > > > > > I don't think its a huge complexity to group head operators together > by > > > > parallelism and add a source/sink per each group like we do now. What > > do > > > > you say? > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > > > 31., > > > > P, 17:10): > > > > > > > > > Yes, I'm not saying that it makes sense to do it, I'm just saying > > that > > > it > > > > > does translate and run. Your observation is true. :D > > > > > > > > > > I'm wondering whether it makes sense to allow users to have > iteration > > > > heads > > > > > with differing parallelism, in fact. > > > > > > > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <[hidden email]> > > wrote: > > > > > > > > > > > I still don't get how it could possibly work, let me tell you > how I > > > see > > > > > and > > > > > > correct me in my logic :) > > > > > > > > > > > > You have this program: > > > > > > ids.map1().setParallelism(2) > > > > > > ids.map2().setParallelism(4) > > > > > > > > > > > > //... > > > > > > > > > > > > ids.closeWith(feedback.groupBy(0)) > > > > > > > > > > > > You are suggesting that we only have one iteration source/sink > pair > > > > with > > > > > > parallelism of either 2 or 4. I will assume that the parallelism > > is 2 > > > > for > > > > > > the sake of the argument. > > > > > > > > > > > > The iteration source is connected to map1 and map2 with Forward > > > > > > partitioning and the sink is connected with groupBy(0). > > > > > > Each sink instance will receive all tuples of a given key which > > also > > > > > means > > > > > > that each iteration source instance (2) will too. > > > > > > > > > > > > Now here comes the problem: the source will forward the tuples to > > > map 1 > > > > > and > > > > > > since we have forward connection we maintiain the groupby > semantics > > > > (this > > > > > > is perfect.) the sources will also forward to map 2 which has > > higher > > > > > > parallelism so the tuple sending turns into round robin, which > > screws > > > > up > > > > > > the groupby. > > > > > > > > > > > > What did I miss? > > > > > > Gyula > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. > > júl. > > > > > 31., > > > > > > P, 14:59): > > > > > > > > > > > > > Yes, this would still work. For example, I have this crazy > graph: > > > > > > > http://postimg.org/image/xtv8ay8hv/full/ That results from > this > > > > > program: > > > > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > > > > > > > > > > > > > It works, and the implementation is very simple, actually. > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <[hidden email]> > > > > wrote: > > > > > > > > > > > > > > > I mean that the head operators have different parallelism: > > > > > > > > > > > > > > > > IterativeDataStream ids = ... > > > > > > > > > > > > > > > > ids.map().setParallelism(2) > > > > > > > > ids.map().setParallelism(4) > > > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > > > ids.closeWith(feedback) > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: > > 2015. > > > > júl. > > > > > > > 31., > > > > > > > > P, 14:23): > > > > > > > > > > > > > > > > > I thought about having some tighter restrictions here. My > > idea > > > > was > > > > > to > > > > > > > > > enforce that the feedback edges must have the same > > parallelism > > > as > > > > > the > > > > > > > > > original input stream, otherwise shipping strategies such > as > > > > > "keyBy", > > > > > > > > > "shuffle", "rebalance" don't seem to make sense because > they > > > > would > > > > > > > differ > > > > > > > > > from the distribution of the original elements (at least > > IMHO). > > > > > Maybe > > > > > > > I'm > > > > > > > > > wrong there, though. > > > > > > > > > > > > > > > > > > To me it seems intuitive that I get the feedback at the > head > > > they > > > > > > way I > > > > > > > > > specify it at the tail. But maybe that's also just me... :D > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <[hidden email] > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hey, > > > > > > > > > > > > > > > > > > > > I am not sure what is the intuitive behaviour here. As > you > > > are > > > > > not > > > > > > > > > applying > > > > > > > > > > a transformation on the feedback stream but pass it to a > > > > > closeWith > > > > > > > > > method, > > > > > > > > > > I thought it was somehow nature that it gets the > > partitioning > > > > of > > > > > > the > > > > > > > > > > iteration input, but maybe its not intuitive. > > > > > > > > > > > > > > > > > > > > If others also think that preserving feedback > partitioning > > > > should > > > > > > be > > > > > > > > the > > > > > > > > > > default I am not against it :) > > > > > > > > > > > > > > > > > > > > Btw, this still won't make it very simple. We still need > as > > > > many > > > > > > > > > > source/sink pairs as we have different parallelism among > > the > > > > head > > > > > > > > > > operators. Otherwise the forwarding logic wont work. > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta > (időpont: > > > > 2015. > > > > > > júl. > > > > > > > > > 31., > > > > > > > > > > P, 11:52): > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > I'm currently working on making the StreamGraph > > generation > > > > more > > > > > > > > > > centralized > > > > > > > > > > > (i.e. not spread across the different API classes). The > > > > > question > > > > > > is > > > > > > > > now > > > > > > > > > > why > > > > > > > > > > > we need to switch to preserve partitioning? Could we > not > > > make > > > > > > > > > "preserve" > > > > > > > > > > > partitioning the default and if users want to have > > shuffle > > > > > > > > partitioning > > > > > > > > > > or > > > > > > > > > > > anything they have to specify it manually when adding > the > > > > > > feedback > > > > > > > > > edge? > > > > > > > > > > > > > > > > > > > > > > This would make for a very simple scheme where the > > > iteration > > > > > > > sources > > > > > > > > > are > > > > > > > > > > > always connected to the heads using "forward" and the > > tails > > > > are > > > > > > > > > connected > > > > > > > > > > > to the iteration sinks using whatever partitioner was > set > > > by > > > > > the > > > > > > > > user. > > > > > > > > > > This > > > > > > > > > > > would make it more transparent than the current default > > of > > > > the > > > > > > > > > "shuffle" > > > > > > > > > > > betweens tails and iteration sinks. > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > > > > > > > P.S. I now we had quite some discussion about > introducing > > > > > > "preserve > > > > > > > > > > > partitioning" but now, when I think of it it should be > > the > > > > > > > default... > > > > > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
To answer the question plain and simple: No, there are several different
parallel heads and tails. For example in this: val iter = ds.iteration() val head_tail1 = iter.map().parallelism(2) val head_tail2 = iter.map().parallelism(4) iter.closeWith(head_tail1.union(head_tail2)) We have one head/tail pair with parallelism 2 and on with parallelism 4. Of the top of my head, I don't know what happens in this case though: val iter = ds.iteration() val head1 = iter.map().parallelism(2) val head2 = iter.map().parallelism(4) val tail1 = head1.map().parallelism(6) val tail2 = head2.map().parallelism(8) iter.closeWith(tail1.union(tail2)) (Which is also tricky with the parallelism of the input stream) On Sun, 2 Aug 2015 at 21:22 Gyula Fóra <[hidden email]> wrote: > In a streaming program when we create an IterativeDataStream, we > practically mark the union point of some later feedback stream (the one > passed in to closeWith(..)). > > The operators applied on this IterativeDataStream will receive the feedback > input as well. We call the operators applied on the iterative dataStream > head operators. We call the operators that produce the streams passed into > closeWith tail operators. With this terminology we can have many heads and > tails with varying parallelism. > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. aug. 2., V, > 20:16): > > > I don't get the discussion here, can you help me with what you mean by > > "different iteration heads and tails" ? > > > > An iteration does not have one parallel head and one parallel tail? > > > > On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra <[hidden email]> > wrote: > > > > > Maybe you can reuse some of the logic that is currently there on the > > > StreamGraph, with building StreamLoops first which will be used to > > generate > > > the sources and sinks right before building the JobGraph. This avoids > the > > > need of knowing everything beforehand. > > > > > > I actually added this to avoid the complexities that you are probably > > > facing now. > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > > 31., > > > P, 17:28): > > > > > > > Sure it can be done, it's just more complex if you try to do it in a > > sane > > > > way without having the code that builds the StreamGraph all over the > > > place. > > > > :D > > > > > > > > I'll try to come up with something. This is my current work in > > progress, > > > by > > > > the way: https://github.com/aljoscha/flink/tree/stream-api-rework > > > > > > > > I managed to ban the StreamGraph from StreamExecutionEnvironment and > > the > > > > API classes such as DataStream. The API methods construct a Graph of > > > > Transformation Nodes and don't contain any information themselves. > Then > > > > there is a StreamGraphGenerator that builds a StreamGraph from the > > > > transformations. The abstraction is very nice and simple, the only > > > problem > > > > that remains are the differing-parallelism-iterations but I'll figure > > > them > > > > out. > > > > > > > > P.S. The code is not well documented yet, but the base class for > > > > transformations is StreamTransformation. From there anyone who want's > > to > > > > check it out can find the other transformations. > > > > > > > > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra <[hidden email]> > wrote: > > > > > > > > > There might be reasons why a user would want different parallelism > at > > > the > > > > > head operators (depending on what else that head operator might > > > process) > > > > so > > > > > restricting them to the same parallelism is a little bit weird > don't > > > you > > > > > think? It kind of goes against the whole opeartors-parallelism > idea. > > > > > > > > > > I don't think its a huge complexity to group head operators > together > > by > > > > > parallelism and add a source/sink per each group like we do now. > What > > > do > > > > > you say? > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. > júl. > > > > 31., > > > > > P, 17:10): > > > > > > > > > > > Yes, I'm not saying that it makes sense to do it, I'm just saying > > > that > > > > it > > > > > > does translate and run. Your observation is true. :D > > > > > > > > > > > > I'm wondering whether it makes sense to allow users to have > > iteration > > > > > heads > > > > > > with differing parallelism, in fact. > > > > > > > > > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <[hidden email]> > > > wrote: > > > > > > > > > > > > > I still don't get how it could possibly work, let me tell you > > how I > > > > see > > > > > > and > > > > > > > correct me in my logic :) > > > > > > > > > > > > > > You have this program: > > > > > > > ids.map1().setParallelism(2) > > > > > > > ids.map2().setParallelism(4) > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > ids.closeWith(feedback.groupBy(0)) > > > > > > > > > > > > > > You are suggesting that we only have one iteration source/sink > > pair > > > > > with > > > > > > > parallelism of either 2 or 4. I will assume that the > parallelism > > > is 2 > > > > > for > > > > > > > the sake of the argument. > > > > > > > > > > > > > > The iteration source is connected to map1 and map2 with Forward > > > > > > > partitioning and the sink is connected with groupBy(0). > > > > > > > Each sink instance will receive all tuples of a given key which > > > also > > > > > > means > > > > > > > that each iteration source instance (2) will too. > > > > > > > > > > > > > > Now here comes the problem: the source will forward the tuples > to > > > > map 1 > > > > > > and > > > > > > > since we have forward connection we maintiain the groupby > > semantics > > > > > (this > > > > > > > is perfect.) the sources will also forward to map 2 which has > > > higher > > > > > > > parallelism so the tuple sending turns into round robin, which > > > screws > > > > > up > > > > > > > the groupby. > > > > > > > > > > > > > > What did I miss? > > > > > > > Gyula > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: > 2015. > > > júl. > > > > > > 31., > > > > > > > P, 14:59): > > > > > > > > > > > > > > > Yes, this would still work. For example, I have this crazy > > graph: > > > > > > > > http://postimg.org/image/xtv8ay8hv/full/ That results from > > this > > > > > > program: > > > > > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > > > > > > > > > > > > > > > It works, and the implementation is very simple, actually. > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra < > [hidden email]> > > > > > wrote: > > > > > > > > > > > > > > > > > I mean that the head operators have different parallelism: > > > > > > > > > > > > > > > > > > IterativeDataStream ids = ... > > > > > > > > > > > > > > > > > > ids.map().setParallelism(2) > > > > > > > > > ids.map().setParallelism(4) > > > > > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > > > > > ids.closeWith(feedback) > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: > > > 2015. > > > > > júl. > > > > > > > > 31., > > > > > > > > > P, 14:23): > > > > > > > > > > > > > > > > > > > I thought about having some tighter restrictions here. My > > > idea > > > > > was > > > > > > to > > > > > > > > > > enforce that the feedback edges must have the same > > > parallelism > > > > as > > > > > > the > > > > > > > > > > original input stream, otherwise shipping strategies such > > as > > > > > > "keyBy", > > > > > > > > > > "shuffle", "rebalance" don't seem to make sense because > > they > > > > > would > > > > > > > > differ > > > > > > > > > > from the distribution of the original elements (at least > > > IMHO). > > > > > > Maybe > > > > > > > > I'm > > > > > > > > > > wrong there, though. > > > > > > > > > > > > > > > > > > > > To me it seems intuitive that I get the feedback at the > > head > > > > they > > > > > > > way I > > > > > > > > > > specify it at the tail. But maybe that's also just me... > :D > > > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra < > [hidden email] > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hey, > > > > > > > > > > > > > > > > > > > > > > I am not sure what is the intuitive behaviour here. As > > you > > > > are > > > > > > not > > > > > > > > > > applying > > > > > > > > > > > a transformation on the feedback stream but pass it to > a > > > > > > closeWith > > > > > > > > > > method, > > > > > > > > > > > I thought it was somehow nature that it gets the > > > partitioning > > > > > of > > > > > > > the > > > > > > > > > > > iteration input, but maybe its not intuitive. > > > > > > > > > > > > > > > > > > > > > > If others also think that preserving feedback > > partitioning > > > > > should > > > > > > > be > > > > > > > > > the > > > > > > > > > > > default I am not against it :) > > > > > > > > > > > > > > > > > > > > > > Btw, this still won't make it very simple. We still > need > > as > > > > > many > > > > > > > > > > > source/sink pairs as we have different parallelism > among > > > the > > > > > head > > > > > > > > > > > operators. Otherwise the forwarding logic wont work. > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta > > (időpont: > > > > > 2015. > > > > > > > júl. > > > > > > > > > > 31., > > > > > > > > > > > P, 11:52): > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > I'm currently working on making the StreamGraph > > > generation > > > > > more > > > > > > > > > > > centralized > > > > > > > > > > > > (i.e. not spread across the different API classes). > The > > > > > > question > > > > > > > is > > > > > > > > > now > > > > > > > > > > > why > > > > > > > > > > > > we need to switch to preserve partitioning? Could we > > not > > > > make > > > > > > > > > > "preserve" > > > > > > > > > > > > partitioning the default and if users want to have > > > shuffle > > > > > > > > > partitioning > > > > > > > > > > > or > > > > > > > > > > > > anything they have to specify it manually when adding > > the > > > > > > > feedback > > > > > > > > > > edge? > > > > > > > > > > > > > > > > > > > > > > > > This would make for a very simple scheme where the > > > > iteration > > > > > > > > sources > > > > > > > > > > are > > > > > > > > > > > > always connected to the heads using "forward" and the > > > tails > > > > > are > > > > > > > > > > connected > > > > > > > > > > > > to the iteration sinks using whatever partitioner was > > set > > > > by > > > > > > the > > > > > > > > > user. > > > > > > > > > > > This > > > > > > > > > > > > would make it more transparent than the current > default > > > of > > > > > the > > > > > > > > > > "shuffle" > > > > > > > > > > > > betweens tails and iteration sinks. > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > > > > > > > > > P.S. I now we had quite some discussion about > > introducing > > > > > > > "preserve > > > > > > > > > > > > partitioning" but now, when I think of it it should > be > > > the > > > > > > > > default... > > > > > > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
This model strikes me as pretty complicated. Imagine the extra logic and
code path necessary for proper checkpointing as well. Why not do a simple approach: - There is one parallel head, one parallel tail, both with the same parallelism - Any computation in between may have it own parallelism, no special cases - If the tail does not have the same parallelism as the head, it will not by the tail, but flow will attach an additional tail operator. Between the original tail and the additional tail, the streams are redistributed to achieve the required parallelism. Wouldn't that give us the same and make things much easier. The batch iterations work that way, by the way. On Sun, Aug 2, 2015 at 10:03 PM, Aljoscha Krettek <[hidden email]> wrote: > To answer the question plain and simple: No, there are several different > parallel heads and tails. > > For example in this: > val iter = ds.iteration() > > val head_tail1 = iter.map().parallelism(2) > val head_tail2 = iter.map().parallelism(4) > > iter.closeWith(head_tail1.union(head_tail2)) > > We have one head/tail pair with parallelism 2 and on with parallelism 4. > > Of the top of my head, I don't know what happens in this case though: > > val iter = ds.iteration() > > val head1 = iter.map().parallelism(2) > val head2 = iter.map().parallelism(4) > > val tail1 = head1.map().parallelism(6) > val tail2 = head2.map().parallelism(8) > > iter.closeWith(tail1.union(tail2)) > > (Which is also tricky with the parallelism of the input stream) > > > On Sun, 2 Aug 2015 at 21:22 Gyula Fóra <[hidden email]> wrote: > > > In a streaming program when we create an IterativeDataStream, we > > practically mark the union point of some later feedback stream (the one > > passed in to closeWith(..)). > > > > The operators applied on this IterativeDataStream will receive the > feedback > > input as well. We call the operators applied on the iterative dataStream > > head operators. We call the operators that produce the streams passed > into > > closeWith tail operators. With this terminology we can have many heads > and > > tails with varying parallelism. > > > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. aug. 2., V, > > 20:16): > > > > > I don't get the discussion here, can you help me with what you mean by > > > "different iteration heads and tails" ? > > > > > > An iteration does not have one parallel head and one parallel tail? > > > > > > On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra <[hidden email]> > > wrote: > > > > > > > Maybe you can reuse some of the logic that is currently there on the > > > > StreamGraph, with building StreamLoops first which will be used to > > > generate > > > > the sources and sinks right before building the JobGraph. This avoids > > the > > > > need of knowing everything beforehand. > > > > > > > > I actually added this to avoid the complexities that you are probably > > > > facing now. > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. júl. > > > 31., > > > > P, 17:28): > > > > > > > > > Sure it can be done, it's just more complex if you try to do it in > a > > > sane > > > > > way without having the code that builds the StreamGraph all over > the > > > > place. > > > > > :D > > > > > > > > > > I'll try to come up with something. This is my current work in > > > progress, > > > > by > > > > > the way: https://github.com/aljoscha/flink/tree/stream-api-rework > > > > > > > > > > I managed to ban the StreamGraph from StreamExecutionEnvironment > and > > > the > > > > > API classes such as DataStream. The API methods construct a Graph > of > > > > > Transformation Nodes and don't contain any information themselves. > > Then > > > > > there is a StreamGraphGenerator that builds a StreamGraph from the > > > > > transformations. The abstraction is very nice and simple, the only > > > > problem > > > > > that remains are the differing-parallelism-iterations but I'll > figure > > > > them > > > > > out. > > > > > > > > > > P.S. The code is not well documented yet, but the base class for > > > > > transformations is StreamTransformation. From there anyone who > want's > > > to > > > > > check it out can find the other transformations. > > > > > > > > > > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra <[hidden email]> > > wrote: > > > > > > > > > > > There might be reasons why a user would want different > parallelism > > at > > > > the > > > > > > head operators (depending on what else that head operator might > > > > process) > > > > > so > > > > > > restricting them to the same parallelism is a little bit weird > > don't > > > > you > > > > > > think? It kind of goes against the whole opeartors-parallelism > > idea. > > > > > > > > > > > > I don't think its a huge complexity to group head operators > > together > > > by > > > > > > parallelism and add a source/sink per each group like we do now. > > What > > > > do > > > > > > you say? > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. > > júl. > > > > > 31., > > > > > > P, 17:10): > > > > > > > > > > > > > Yes, I'm not saying that it makes sense to do it, I'm just > saying > > > > that > > > > > it > > > > > > > does translate and run. Your observation is true. :D > > > > > > > > > > > > > > I'm wondering whether it makes sense to allow users to have > > > iteration > > > > > > heads > > > > > > > with differing parallelism, in fact. > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <[hidden email]> > > > > wrote: > > > > > > > > > > > > > > > I still don't get how it could possibly work, let me tell you > > > how I > > > > > see > > > > > > > and > > > > > > > > correct me in my logic :) > > > > > > > > > > > > > > > > You have this program: > > > > > > > > ids.map1().setParallelism(2) > > > > > > > > ids.map2().setParallelism(4) > > > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > > > ids.closeWith(feedback.groupBy(0)) > > > > > > > > > > > > > > > > You are suggesting that we only have one iteration > source/sink > > > pair > > > > > > with > > > > > > > > parallelism of either 2 or 4. I will assume that the > > parallelism > > > > is 2 > > > > > > for > > > > > > > > the sake of the argument. > > > > > > > > > > > > > > > > The iteration source is connected to map1 and map2 with > Forward > > > > > > > > partitioning and the sink is connected with groupBy(0). > > > > > > > > Each sink instance will receive all tuples of a given key > which > > > > also > > > > > > > means > > > > > > > > that each iteration source instance (2) will too. > > > > > > > > > > > > > > > > Now here comes the problem: the source will forward the > tuples > > to > > > > > map 1 > > > > > > > and > > > > > > > > since we have forward connection we maintiain the groupby > > > semantics > > > > > > (this > > > > > > > > is perfect.) the sources will also forward to map 2 which > has > > > > higher > > > > > > > > parallelism so the tuple sending turns into round robin, > which > > > > screws > > > > > > up > > > > > > > > the groupby. > > > > > > > > > > > > > > > > What did I miss? > > > > > > > > Gyula > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: > > 2015. > > > > júl. > > > > > > > 31., > > > > > > > > P, 14:59): > > > > > > > > > > > > > > > > > Yes, this would still work. For example, I have this crazy > > > graph: > > > > > > > > > http://postimg.org/image/xtv8ay8hv/full/ That results from > > > this > > > > > > > program: > > > > > > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > > > > > > > > > > > > > > > > > It works, and the implementation is very simple, actually. > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra < > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > I mean that the head operators have different > parallelism: > > > > > > > > > > > > > > > > > > > > IterativeDataStream ids = ... > > > > > > > > > > > > > > > > > > > > ids.map().setParallelism(2) > > > > > > > > > > ids.map().setParallelism(4) > > > > > > > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > > > > > > > ids.closeWith(feedback) > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta > (időpont: > > > > 2015. > > > > > > júl. > > > > > > > > > 31., > > > > > > > > > > P, 14:23): > > > > > > > > > > > > > > > > > > > > > I thought about having some tighter restrictions here. > My > > > > idea > > > > > > was > > > > > > > to > > > > > > > > > > > enforce that the feedback edges must have the same > > > > parallelism > > > > > as > > > > > > > the > > > > > > > > > > > original input stream, otherwise shipping strategies > such > > > as > > > > > > > "keyBy", > > > > > > > > > > > "shuffle", "rebalance" don't seem to make sense because > > > they > > > > > > would > > > > > > > > > differ > > > > > > > > > > > from the distribution of the original elements (at > least > > > > IMHO). > > > > > > > Maybe > > > > > > > > > I'm > > > > > > > > > > > wrong there, though. > > > > > > > > > > > > > > > > > > > > > > To me it seems intuitive that I get the feedback at the > > > head > > > > > they > > > > > > > > way I > > > > > > > > > > > specify it at the tail. But maybe that's also just > me... > > :D > > > > > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra < > > [hidden email] > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hey, > > > > > > > > > > > > > > > > > > > > > > > > I am not sure what is the intuitive behaviour here. > As > > > you > > > > > are > > > > > > > not > > > > > > > > > > > applying > > > > > > > > > > > > a transformation on the feedback stream but pass it > to > > a > > > > > > > closeWith > > > > > > > > > > > method, > > > > > > > > > > > > I thought it was somehow nature that it gets the > > > > partitioning > > > > > > of > > > > > > > > the > > > > > > > > > > > > iteration input, but maybe its not intuitive. > > > > > > > > > > > > > > > > > > > > > > > > If others also think that preserving feedback > > > partitioning > > > > > > should > > > > > > > > be > > > > > > > > > > the > > > > > > > > > > > > default I am not against it :) > > > > > > > > > > > > > > > > > > > > > > > > Btw, this still won't make it very simple. We still > > need > > > as > > > > > > many > > > > > > > > > > > > source/sink pairs as we have different parallelism > > among > > > > the > > > > > > head > > > > > > > > > > > > operators. Otherwise the forwarding logic wont work. > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta > > > (időpont: > > > > > > 2015. > > > > > > > > júl. > > > > > > > > > > > 31., > > > > > > > > > > > > P, 11:52): > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > I'm currently working on making the StreamGraph > > > > generation > > > > > > more > > > > > > > > > > > > centralized > > > > > > > > > > > > > (i.e. not spread across the different API classes). > > The > > > > > > > question > > > > > > > > is > > > > > > > > > > now > > > > > > > > > > > > why > > > > > > > > > > > > > we need to switch to preserve partitioning? Could > we > > > not > > > > > make > > > > > > > > > > > "preserve" > > > > > > > > > > > > > partitioning the default and if users want to have > > > > shuffle > > > > > > > > > > partitioning > > > > > > > > > > > > or > > > > > > > > > > > > > anything they have to specify it manually when > adding > > > the > > > > > > > > feedback > > > > > > > > > > > edge? > > > > > > > > > > > > > > > > > > > > > > > > > > This would make for a very simple scheme where the > > > > > iteration > > > > > > > > > sources > > > > > > > > > > > are > > > > > > > > > > > > > always connected to the heads using "forward" and > the > > > > tails > > > > > > are > > > > > > > > > > > connected > > > > > > > > > > > > > to the iteration sinks using whatever partitioner > was > > > set > > > > > by > > > > > > > the > > > > > > > > > > user. > > > > > > > > > > > > This > > > > > > > > > > > > > would make it more transparent than the current > > default > > > > of > > > > > > the > > > > > > > > > > > "shuffle" > > > > > > > > > > > > > betweens tails and iteration sinks. > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > > > > > > > > > > > P.S. I now we had quite some discussion about > > > introducing > > > > > > > > "preserve > > > > > > > > > > > > > partitioning" but now, when I think of it it should > > be > > > > the > > > > > > > > > default... > > > > > > > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Yes, that's what I was proposing in my second mail:
I thought about having some tighter restrictions here. My idea was to enforce that the feedback edges must have the same parallelism as the original input stream, otherwise shipping strategies such as "keyBy", "shuffle", "rebalance" don't seem to make sense because they would differ from the distribution of the original elements (at least IMHO). Maybe I'm wrong there, though. To me it seems intuitive that I get the feedback at the head they way I specify it at the tail. But maybe that's also just me... :D On Mon, 3 Aug 2015 at 00:14 Stephan Ewen <[hidden email]> wrote: > This model strikes me as pretty complicated. Imagine the extra logic and > code path necessary for proper checkpointing as well. > > Why not do a simple approach: > - There is one parallel head, one parallel tail, both with the same > parallelism > > - Any computation in between may have it own parallelism, no special > cases > > - If the tail does not have the same parallelism as the head, it will not > by the tail, but flow will attach an additional tail operator. Between the > original tail and the additional tail, the streams are redistributed to > achieve the required parallelism. > > Wouldn't that give us the same and make things much easier. The batch > iterations work that way, by the way. > > > > On Sun, Aug 2, 2015 at 10:03 PM, Aljoscha Krettek <[hidden email]> > wrote: > > > To answer the question plain and simple: No, there are several different > > parallel heads and tails. > > > > For example in this: > > val iter = ds.iteration() > > > > val head_tail1 = iter.map().parallelism(2) > > val head_tail2 = iter.map().parallelism(4) > > > > iter.closeWith(head_tail1.union(head_tail2)) > > > > We have one head/tail pair with parallelism 2 and on with parallelism 4. > > > > Of the top of my head, I don't know what happens in this case though: > > > > val iter = ds.iteration() > > > > val head1 = iter.map().parallelism(2) > > val head2 = iter.map().parallelism(4) > > > > val tail1 = head1.map().parallelism(6) > > val tail2 = head2.map().parallelism(8) > > > > iter.closeWith(tail1.union(tail2)) > > > > (Which is also tricky with the parallelism of the input stream) > > > > > > On Sun, 2 Aug 2015 at 21:22 Gyula Fóra <[hidden email]> wrote: > > > > > In a streaming program when we create an IterativeDataStream, we > > > practically mark the union point of some later feedback stream (the one > > > passed in to closeWith(..)). > > > > > > The operators applied on this IterativeDataStream will receive the > > feedback > > > input as well. We call the operators applied on the iterative > dataStream > > > head operators. We call the operators that produce the streams passed > > into > > > closeWith tail operators. With this terminology we can have many heads > > and > > > tails with varying parallelism. > > > > > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. aug. 2., V, > > > 20:16): > > > > > > > I don't get the discussion here, can you help me with what you mean > by > > > > "different iteration heads and tails" ? > > > > > > > > An iteration does not have one parallel head and one parallel tail? > > > > > > > > On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra <[hidden email]> > > > wrote: > > > > > > > > > Maybe you can reuse some of the logic that is currently there on > the > > > > > StreamGraph, with building StreamLoops first which will be used to > > > > generate > > > > > the sources and sinks right before building the JobGraph. This > avoids > > > the > > > > > need of knowing everything beforehand. > > > > > > > > > > I actually added this to avoid the complexities that you are > probably > > > > > facing now. > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. > júl. > > > > 31., > > > > > P, 17:28): > > > > > > > > > > > Sure it can be done, it's just more complex if you try to do it > in > > a > > > > sane > > > > > > way without having the code that builds the StreamGraph all over > > the > > > > > place. > > > > > > :D > > > > > > > > > > > > I'll try to come up with something. This is my current work in > > > > progress, > > > > > by > > > > > > the way: > https://github.com/aljoscha/flink/tree/stream-api-rework > > > > > > > > > > > > I managed to ban the StreamGraph from StreamExecutionEnvironment > > and > > > > the > > > > > > API classes such as DataStream. The API methods construct a Graph > > of > > > > > > Transformation Nodes and don't contain any information > themselves. > > > Then > > > > > > there is a StreamGraphGenerator that builds a StreamGraph from > the > > > > > > transformations. The abstraction is very nice and simple, the > only > > > > > problem > > > > > > that remains are the differing-parallelism-iterations but I'll > > figure > > > > > them > > > > > > out. > > > > > > > > > > > > P.S. The code is not well documented yet, but the base class for > > > > > > transformations is StreamTransformation. From there anyone who > > want's > > > > to > > > > > > check it out can find the other transformations. > > > > > > > > > > > > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra <[hidden email]> > > > wrote: > > > > > > > > > > > > > There might be reasons why a user would want different > > parallelism > > > at > > > > > the > > > > > > > head operators (depending on what else that head operator might > > > > > process) > > > > > > so > > > > > > > restricting them to the same parallelism is a little bit weird > > > don't > > > > > you > > > > > > > think? It kind of goes against the whole opeartors-parallelism > > > idea. > > > > > > > > > > > > > > I don't think its a huge complexity to group head operators > > > together > > > > by > > > > > > > parallelism and add a source/sink per each group like we do > now. > > > What > > > > > do > > > > > > > you say? > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: > 2015. > > > júl. > > > > > > 31., > > > > > > > P, 17:10): > > > > > > > > > > > > > > > Yes, I'm not saying that it makes sense to do it, I'm just > > saying > > > > > that > > > > > > it > > > > > > > > does translate and run. Your observation is true. :D > > > > > > > > > > > > > > > > I'm wondering whether it makes sense to allow users to have > > > > iteration > > > > > > > heads > > > > > > > > with differing parallelism, in fact. > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra < > [hidden email]> > > > > > wrote: > > > > > > > > > > > > > > > > > I still don't get how it could possibly work, let me tell > you > > > > how I > > > > > > see > > > > > > > > and > > > > > > > > > correct me in my logic :) > > > > > > > > > > > > > > > > > > You have this program: > > > > > > > > > ids.map1().setParallelism(2) > > > > > > > > > ids.map2().setParallelism(4) > > > > > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > > > > > ids.closeWith(feedback.groupBy(0)) > > > > > > > > > > > > > > > > > > You are suggesting that we only have one iteration > > source/sink > > > > pair > > > > > > > with > > > > > > > > > parallelism of either 2 or 4. I will assume that the > > > parallelism > > > > > is 2 > > > > > > > for > > > > > > > > > the sake of the argument. > > > > > > > > > > > > > > > > > > The iteration source is connected to map1 and map2 with > > Forward > > > > > > > > > partitioning and the sink is connected with groupBy(0). > > > > > > > > > Each sink instance will receive all tuples of a given key > > which > > > > > also > > > > > > > > means > > > > > > > > > that each iteration source instance (2) will too. > > > > > > > > > > > > > > > > > > Now here comes the problem: the source will forward the > > tuples > > > to > > > > > > map 1 > > > > > > > > and > > > > > > > > > since we have forward connection we maintiain the groupby > > > > semantics > > > > > > > (this > > > > > > > > > is perfect.) the sources will also forward to map 2 which > > has > > > > > higher > > > > > > > > > parallelism so the tuple sending turns into round robin, > > which > > > > > screws > > > > > > > up > > > > > > > > > the groupby. > > > > > > > > > > > > > > > > > > What did I miss? > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: > > > 2015. > > > > > júl. > > > > > > > > 31., > > > > > > > > > P, 14:59): > > > > > > > > > > > > > > > > > > > Yes, this would still work. For example, I have this > crazy > > > > graph: > > > > > > > > > > http://postimg.org/image/xtv8ay8hv/full/ That results > from > > > > this > > > > > > > > program: > > > > > > > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > > > > > > > > > > > > > > > > > > > It works, and the implementation is very simple, > actually. > > > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra < > > > [hidden email]> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > I mean that the head operators have different > > parallelism: > > > > > > > > > > > > > > > > > > > > > > IterativeDataStream ids = ... > > > > > > > > > > > > > > > > > > > > > > ids.map().setParallelism(2) > > > > > > > > > > > ids.map().setParallelism(4) > > > > > > > > > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > > > > > > > > > ids.closeWith(feedback) > > > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta > > (időpont: > > > > > 2015. > > > > > > > júl. > > > > > > > > > > 31., > > > > > > > > > > > P, 14:23): > > > > > > > > > > > > > > > > > > > > > > > I thought about having some tighter restrictions > here. > > My > > > > > idea > > > > > > > was > > > > > > > > to > > > > > > > > > > > > enforce that the feedback edges must have the same > > > > > parallelism > > > > > > as > > > > > > > > the > > > > > > > > > > > > original input stream, otherwise shipping strategies > > such > > > > as > > > > > > > > "keyBy", > > > > > > > > > > > > "shuffle", "rebalance" don't seem to make sense > because > > > > they > > > > > > > would > > > > > > > > > > differ > > > > > > > > > > > > from the distribution of the original elements (at > > least > > > > > IMHO). > > > > > > > > Maybe > > > > > > > > > > I'm > > > > > > > > > > > > wrong there, though. > > > > > > > > > > > > > > > > > > > > > > > > To me it seems intuitive that I get the feedback at > the > > > > head > > > > > > they > > > > > > > > > way I > > > > > > > > > > > > specify it at the tail. But maybe that's also just > > me... > > > :D > > > > > > > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra < > > > [hidden email] > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hey, > > > > > > > > > > > > > > > > > > > > > > > > > > I am not sure what is the intuitive behaviour here. > > As > > > > you > > > > > > are > > > > > > > > not > > > > > > > > > > > > applying > > > > > > > > > > > > > a transformation on the feedback stream but pass it > > to > > > a > > > > > > > > closeWith > > > > > > > > > > > > method, > > > > > > > > > > > > > I thought it was somehow nature that it gets the > > > > > partitioning > > > > > > > of > > > > > > > > > the > > > > > > > > > > > > > iteration input, but maybe its not intuitive. > > > > > > > > > > > > > > > > > > > > > > > > > > If others also think that preserving feedback > > > > partitioning > > > > > > > should > > > > > > > > > be > > > > > > > > > > > the > > > > > > > > > > > > > default I am not against it :) > > > > > > > > > > > > > > > > > > > > > > > > > > Btw, this still won't make it very simple. We still > > > need > > > > as > > > > > > > many > > > > > > > > > > > > > source/sink pairs as we have different parallelism > > > among > > > > > the > > > > > > > head > > > > > > > > > > > > > operators. Otherwise the forwarding logic wont > work. > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta > > > > (időpont: > > > > > > > 2015. > > > > > > > > > júl. > > > > > > > > > > > > 31., > > > > > > > > > > > > > P, 11:52): > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > I'm currently working on making the StreamGraph > > > > > generation > > > > > > > more > > > > > > > > > > > > > centralized > > > > > > > > > > > > > > (i.e. not spread across the different API > classes). > > > The > > > > > > > > question > > > > > > > > > is > > > > > > > > > > > now > > > > > > > > > > > > > why > > > > > > > > > > > > > > we need to switch to preserve partitioning? Could > > we > > > > not > > > > > > make > > > > > > > > > > > > "preserve" > > > > > > > > > > > > > > partitioning the default and if users want to > have > > > > > shuffle > > > > > > > > > > > partitioning > > > > > > > > > > > > > or > > > > > > > > > > > > > > anything they have to specify it manually when > > adding > > > > the > > > > > > > > > feedback > > > > > > > > > > > > edge? > > > > > > > > > > > > > > > > > > > > > > > > > > > > This would make for a very simple scheme where > the > > > > > > iteration > > > > > > > > > > sources > > > > > > > > > > > > are > > > > > > > > > > > > > > always connected to the heads using "forward" and > > the > > > > > tails > > > > > > > are > > > > > > > > > > > > connected > > > > > > > > > > > > > > to the iteration sinks using whatever partitioner > > was > > > > set > > > > > > by > > > > > > > > the > > > > > > > > > > > user. > > > > > > > > > > > > > This > > > > > > > > > > > > > > would make it more transparent than the current > > > default > > > > > of > > > > > > > the > > > > > > > > > > > > "shuffle" > > > > > > > > > > > > > > betweens tails and iteration sinks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > > > > > > > > > > > > > P.S. I now we had quite some discussion about > > > > introducing > > > > > > > > > "preserve > > > > > > > > > > > > > > partitioning" but now, when I think of it it > should > > > be > > > > > the > > > > > > > > > > default... > > > > > > > > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
It is critical for many applications (such as SAMOA or Storm compatibility)
to build arbitrary cyclic flows. If your suggestion covers all cases (for instance nested iterations) then I am not against it. The current implementation is just one way to do it, but it allows arbitrary cycles. From the checkpointing perspective, I don't think this will make too much of a difference as that will probably have to be handled on the receiver side anyways if you think about the cyclic algorithm. Gyula Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. aug. 3., H, 9:41): > Yes, that's what I was proposing in my second mail: > > I thought about having some tighter restrictions here. My idea was to > enforce that the feedback edges must have the same parallelism as the > original input stream, otherwise shipping strategies such as "keyBy", > "shuffle", "rebalance" don't seem to make sense because they would differ > from the distribution of the original elements (at least IMHO). Maybe I'm > wrong there, though. > > To me it seems intuitive that I get the feedback at the head they way I > specify it at the tail. But maybe that's also just me... :D > > On Mon, 3 Aug 2015 at 00:14 Stephan Ewen <[hidden email]> wrote: > > > This model strikes me as pretty complicated. Imagine the extra logic and > > code path necessary for proper checkpointing as well. > > > > Why not do a simple approach: > > - There is one parallel head, one parallel tail, both with the same > > parallelism > > > > - Any computation in between may have it own parallelism, no special > > cases > > > > - If the tail does not have the same parallelism as the head, it will > not > > by the tail, but flow will attach an additional tail operator. Between > the > > original tail and the additional tail, the streams are redistributed to > > achieve the required parallelism. > > > > Wouldn't that give us the same and make things much easier. The batch > > iterations work that way, by the way. > > > > > > > > On Sun, Aug 2, 2015 at 10:03 PM, Aljoscha Krettek <[hidden email]> > > wrote: > > > > > To answer the question plain and simple: No, there are several > different > > > parallel heads and tails. > > > > > > For example in this: > > > val iter = ds.iteration() > > > > > > val head_tail1 = iter.map().parallelism(2) > > > val head_tail2 = iter.map().parallelism(4) > > > > > > iter.closeWith(head_tail1.union(head_tail2)) > > > > > > We have one head/tail pair with parallelism 2 and on with parallelism > 4. > > > > > > Of the top of my head, I don't know what happens in this case though: > > > > > > val iter = ds.iteration() > > > > > > val head1 = iter.map().parallelism(2) > > > val head2 = iter.map().parallelism(4) > > > > > > val tail1 = head1.map().parallelism(6) > > > val tail2 = head2.map().parallelism(8) > > > > > > iter.closeWith(tail1.union(tail2)) > > > > > > (Which is also tricky with the parallelism of the input stream) > > > > > > > > > On Sun, 2 Aug 2015 at 21:22 Gyula Fóra <[hidden email]> wrote: > > > > > > > In a streaming program when we create an IterativeDataStream, we > > > > practically mark the union point of some later feedback stream (the > one > > > > passed in to closeWith(..)). > > > > > > > > The operators applied on this IterativeDataStream will receive the > > > feedback > > > > input as well. We call the operators applied on the iterative > > dataStream > > > > head operators. We call the operators that produce the streams passed > > > into > > > > closeWith tail operators. With this terminology we can have many > heads > > > and > > > > tails with varying parallelism. > > > > > > > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. aug. 2., V, > > > > 20:16): > > > > > > > > > I don't get the discussion here, can you help me with what you mean > > by > > > > > "different iteration heads and tails" ? > > > > > > > > > > An iteration does not have one parallel head and one parallel tail? > > > > > > > > > > On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra <[hidden email]> > > > > wrote: > > > > > > > > > > > Maybe you can reuse some of the logic that is currently there on > > the > > > > > > StreamGraph, with building StreamLoops first which will be used > to > > > > > generate > > > > > > the sources and sinks right before building the JobGraph. This > > avoids > > > > the > > > > > > need of knowing everything beforehand. > > > > > > > > > > > > I actually added this to avoid the complexities that you are > > probably > > > > > > facing now. > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. > > júl. > > > > > 31., > > > > > > P, 17:28): > > > > > > > > > > > > > Sure it can be done, it's just more complex if you try to do it > > in > > > a > > > > > sane > > > > > > > way without having the code that builds the StreamGraph all > over > > > the > > > > > > place. > > > > > > > :D > > > > > > > > > > > > > > I'll try to come up with something. This is my current work in > > > > > progress, > > > > > > by > > > > > > > the way: > > https://github.com/aljoscha/flink/tree/stream-api-rework > > > > > > > > > > > > > > I managed to ban the StreamGraph from > StreamExecutionEnvironment > > > and > > > > > the > > > > > > > API classes such as DataStream. The API methods construct a > Graph > > > of > > > > > > > Transformation Nodes and don't contain any information > > themselves. > > > > Then > > > > > > > there is a StreamGraphGenerator that builds a StreamGraph from > > the > > > > > > > transformations. The abstraction is very nice and simple, the > > only > > > > > > problem > > > > > > > that remains are the differing-parallelism-iterations but I'll > > > figure > > > > > > them > > > > > > > out. > > > > > > > > > > > > > > P.S. The code is not well documented yet, but the base class > for > > > > > > > transformations is StreamTransformation. From there anyone who > > > want's > > > > > to > > > > > > > check it out can find the other transformations. > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra <[hidden email]> > > > > wrote: > > > > > > > > > > > > > > > There might be reasons why a user would want different > > > parallelism > > > > at > > > > > > the > > > > > > > > head operators (depending on what else that head operator > might > > > > > > process) > > > > > > > so > > > > > > > > restricting them to the same parallelism is a little bit > weird > > > > don't > > > > > > you > > > > > > > > think? It kind of goes against the whole > opeartors-parallelism > > > > idea. > > > > > > > > > > > > > > > > I don't think its a huge complexity to group head operators > > > > together > > > > > by > > > > > > > > parallelism and add a source/sink per each group like we do > > now. > > > > What > > > > > > do > > > > > > > > you say? > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: > > 2015. > > > > júl. > > > > > > > 31., > > > > > > > > P, 17:10): > > > > > > > > > > > > > > > > > Yes, I'm not saying that it makes sense to do it, I'm just > > > saying > > > > > > that > > > > > > > it > > > > > > > > > does translate and run. Your observation is true. :D > > > > > > > > > > > > > > > > > > I'm wondering whether it makes sense to allow users to have > > > > > iteration > > > > > > > > heads > > > > > > > > > with differing parallelism, in fact. > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra < > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > I still don't get how it could possibly work, let me tell > > you > > > > > how I > > > > > > > see > > > > > > > > > and > > > > > > > > > > correct me in my logic :) > > > > > > > > > > > > > > > > > > > > You have this program: > > > > > > > > > > ids.map1().setParallelism(2) > > > > > > > > > > ids.map2().setParallelism(4) > > > > > > > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > > > > > > > ids.closeWith(feedback.groupBy(0)) > > > > > > > > > > > > > > > > > > > > You are suggesting that we only have one iteration > > > source/sink > > > > > pair > > > > > > > > with > > > > > > > > > > parallelism of either 2 or 4. I will assume that the > > > > parallelism > > > > > > is 2 > > > > > > > > for > > > > > > > > > > the sake of the argument. > > > > > > > > > > > > > > > > > > > > The iteration source is connected to map1 and map2 with > > > Forward > > > > > > > > > > partitioning and the sink is connected with groupBy(0). > > > > > > > > > > Each sink instance will receive all tuples of a given key > > > which > > > > > > also > > > > > > > > > means > > > > > > > > > > that each iteration source instance (2) will too. > > > > > > > > > > > > > > > > > > > > Now here comes the problem: the source will forward the > > > tuples > > > > to > > > > > > > map 1 > > > > > > > > > and > > > > > > > > > > since we have forward connection we maintiain the groupby > > > > > semantics > > > > > > > > (this > > > > > > > > > > is perfect.) the sources will also forward to map 2 > which > > > has > > > > > > higher > > > > > > > > > > parallelism so the tuple sending turns into round robin, > > > which > > > > > > screws > > > > > > > > up > > > > > > > > > > the groupby. > > > > > > > > > > > > > > > > > > > > What did I miss? > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta > (időpont: > > > > 2015. > > > > > > júl. > > > > > > > > > 31., > > > > > > > > > > P, 14:59): > > > > > > > > > > > > > > > > > > > > > Yes, this would still work. For example, I have this > > crazy > > > > > graph: > > > > > > > > > > > http://postimg.org/image/xtv8ay8hv/full/ That results > > from > > > > > this > > > > > > > > > program: > > > > > > > > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > > > > > > > > > > > > > > > > > > > > > It works, and the implementation is very simple, > > actually. > > > > > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra < > > > > [hidden email]> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > I mean that the head operators have different > > > parallelism: > > > > > > > > > > > > > > > > > > > > > > > > IterativeDataStream ids = ... > > > > > > > > > > > > > > > > > > > > > > > > ids.map().setParallelism(2) > > > > > > > > > > > > ids.map().setParallelism(4) > > > > > > > > > > > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > > > > > > > > > > > ids.closeWith(feedback) > > > > > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta > > > (időpont: > > > > > > 2015. > > > > > > > > júl. > > > > > > > > > > > 31., > > > > > > > > > > > > P, 14:23): > > > > > > > > > > > > > > > > > > > > > > > > > I thought about having some tighter restrictions > > here. > > > My > > > > > > idea > > > > > > > > was > > > > > > > > > to > > > > > > > > > > > > > enforce that the feedback edges must have the same > > > > > > parallelism > > > > > > > as > > > > > > > > > the > > > > > > > > > > > > > original input stream, otherwise shipping > strategies > > > such > > > > > as > > > > > > > > > "keyBy", > > > > > > > > > > > > > "shuffle", "rebalance" don't seem to make sense > > because > > > > > they > > > > > > > > would > > > > > > > > > > > differ > > > > > > > > > > > > > from the distribution of the original elements (at > > > least > > > > > > IMHO). > > > > > > > > > Maybe > > > > > > > > > > > I'm > > > > > > > > > > > > > wrong there, though. > > > > > > > > > > > > > > > > > > > > > > > > > > To me it seems intuitive that I get the feedback at > > the > > > > > head > > > > > > > they > > > > > > > > > > way I > > > > > > > > > > > > > specify it at the tail. But maybe that's also just > > > me... > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra < > > > > [hidden email] > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey, > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am not sure what is the intuitive behaviour > here. > > > As > > > > > you > > > > > > > are > > > > > > > > > not > > > > > > > > > > > > > applying > > > > > > > > > > > > > > a transformation on the feedback stream but pass > it > > > to > > > > a > > > > > > > > > closeWith > > > > > > > > > > > > > method, > > > > > > > > > > > > > > I thought it was somehow nature that it gets the > > > > > > partitioning > > > > > > > > of > > > > > > > > > > the > > > > > > > > > > > > > > iteration input, but maybe its not intuitive. > > > > > > > > > > > > > > > > > > > > > > > > > > > > If others also think that preserving feedback > > > > > partitioning > > > > > > > > should > > > > > > > > > > be > > > > > > > > > > > > the > > > > > > > > > > > > > > default I am not against it :) > > > > > > > > > > > > > > > > > > > > > > > > > > > > Btw, this still won't make it very simple. We > still > > > > need > > > > > as > > > > > > > > many > > > > > > > > > > > > > > source/sink pairs as we have different > parallelism > > > > among > > > > > > the > > > > > > > > head > > > > > > > > > > > > > > operators. Otherwise the forwarding logic wont > > work. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta > > > > > (időpont: > > > > > > > > 2015. > > > > > > > > > > júl. > > > > > > > > > > > > > 31., > > > > > > > > > > > > > > P, 11:52): > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > I'm currently working on making the StreamGraph > > > > > > generation > > > > > > > > more > > > > > > > > > > > > > > centralized > > > > > > > > > > > > > > > (i.e. not spread across the different API > > classes). > > > > The > > > > > > > > > question > > > > > > > > > > is > > > > > > > > > > > > now > > > > > > > > > > > > > > why > > > > > > > > > > > > > > > we need to switch to preserve partitioning? > Could > > > we > > > > > not > > > > > > > make > > > > > > > > > > > > > "preserve" > > > > > > > > > > > > > > > partitioning the default and if users want to > > have > > > > > > shuffle > > > > > > > > > > > > partitioning > > > > > > > > > > > > > > or > > > > > > > > > > > > > > > anything they have to specify it manually when > > > adding > > > > > the > > > > > > > > > > feedback > > > > > > > > > > > > > edge? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This would make for a very simple scheme where > > the > > > > > > > iteration > > > > > > > > > > > sources > > > > > > > > > > > > > are > > > > > > > > > > > > > > > always connected to the heads using "forward" > and > > > the > > > > > > tails > > > > > > > > are > > > > > > > > > > > > > connected > > > > > > > > > > > > > > > to the iteration sinks using whatever > partitioner > > > was > > > > > set > > > > > > > by > > > > > > > > > the > > > > > > > > > > > > user. > > > > > > > > > > > > > > This > > > > > > > > > > > > > > > would make it more transparent than the current > > > > default > > > > > > of > > > > > > > > the > > > > > > > > > > > > > "shuffle" > > > > > > > > > > > > > > > betweens tails and iteration sinks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > P.S. I now we had quite some discussion about > > > > > introducing > > > > > > > > > > "preserve > > > > > > > > > > > > > > > partitioning" but now, when I think of it it > > should > > > > be > > > > > > the > > > > > > > > > > > default... > > > > > > > > > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
I don't think there is a fundamental limitation to the simpler approach.
The only real difference is that DOPs are adjusted before the tail, so only one head/tail pair is needed. Nested iterations should still be possible... On Mon, Aug 3, 2015 at 10:21 AM, Gyula Fóra <[hidden email]> wrote: > It is critical for many applications (such as SAMOA or Storm compatibility) > to build arbitrary cyclic flows. If your suggestion covers all cases (for > instance nested iterations) then I am not against it. > > The current implementation is just one way to do it, but it allows > arbitrary cycles. From the checkpointing perspective, I don't think this > will make too much of a difference as that will probably have to be handled > on the receiver side anyways if you think about the cyclic algorithm. > > Gyula > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. aug. 3., > H, > 9:41): > > > Yes, that's what I was proposing in my second mail: > > > > I thought about having some tighter restrictions here. My idea was to > > enforce that the feedback edges must have the same parallelism as the > > original input stream, otherwise shipping strategies such as "keyBy", > > "shuffle", "rebalance" don't seem to make sense because they would differ > > from the distribution of the original elements (at least IMHO). Maybe I'm > > wrong there, though. > > > > To me it seems intuitive that I get the feedback at the head they way I > > specify it at the tail. But maybe that's also just me... :D > > > > On Mon, 3 Aug 2015 at 00:14 Stephan Ewen <[hidden email]> wrote: > > > > > This model strikes me as pretty complicated. Imagine the extra logic > and > > > code path necessary for proper checkpointing as well. > > > > > > Why not do a simple approach: > > > - There is one parallel head, one parallel tail, both with the same > > > parallelism > > > > > > - Any computation in between may have it own parallelism, no special > > > cases > > > > > > - If the tail does not have the same parallelism as the head, it will > > not > > > by the tail, but flow will attach an additional tail operator. Between > > the > > > original tail and the additional tail, the streams are redistributed to > > > achieve the required parallelism. > > > > > > Wouldn't that give us the same and make things much easier. The batch > > > iterations work that way, by the way. > > > > > > > > > > > > On Sun, Aug 2, 2015 at 10:03 PM, Aljoscha Krettek <[hidden email] > > > > > wrote: > > > > > > > To answer the question plain and simple: No, there are several > > different > > > > parallel heads and tails. > > > > > > > > For example in this: > > > > val iter = ds.iteration() > > > > > > > > val head_tail1 = iter.map().parallelism(2) > > > > val head_tail2 = iter.map().parallelism(4) > > > > > > > > iter.closeWith(head_tail1.union(head_tail2)) > > > > > > > > We have one head/tail pair with parallelism 2 and on with parallelism > > 4. > > > > > > > > Of the top of my head, I don't know what happens in this case though: > > > > > > > > val iter = ds.iteration() > > > > > > > > val head1 = iter.map().parallelism(2) > > > > val head2 = iter.map().parallelism(4) > > > > > > > > val tail1 = head1.map().parallelism(6) > > > > val tail2 = head2.map().parallelism(8) > > > > > > > > iter.closeWith(tail1.union(tail2)) > > > > > > > > (Which is also tricky with the parallelism of the input stream) > > > > > > > > > > > > On Sun, 2 Aug 2015 at 21:22 Gyula Fóra <[hidden email]> wrote: > > > > > > > > > In a streaming program when we create an IterativeDataStream, we > > > > > practically mark the union point of some later feedback stream (the > > one > > > > > passed in to closeWith(..)). > > > > > > > > > > The operators applied on this IterativeDataStream will receive the > > > > feedback > > > > > input as well. We call the operators applied on the iterative > > > dataStream > > > > > head operators. We call the operators that produce the streams > passed > > > > into > > > > > closeWith tail operators. With this terminology we can have many > > heads > > > > and > > > > > tails with varying parallelism. > > > > > > > > > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. aug. 2., > V, > > > > > 20:16): > > > > > > > > > > > I don't get the discussion here, can you help me with what you > mean > > > by > > > > > > "different iteration heads and tails" ? > > > > > > > > > > > > An iteration does not have one parallel head and one parallel > tail? > > > > > > > > > > > > On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra < > [hidden email]> > > > > > wrote: > > > > > > > > > > > > > Maybe you can reuse some of the logic that is currently there > on > > > the > > > > > > > StreamGraph, with building StreamLoops first which will be used > > to > > > > > > generate > > > > > > > the sources and sinks right before building the JobGraph. This > > > avoids > > > > > the > > > > > > > need of knowing everything beforehand. > > > > > > > > > > > > > > I actually added this to avoid the complexities that you are > > > probably > > > > > > > facing now. > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: > 2015. > > > júl. > > > > > > 31., > > > > > > > P, 17:28): > > > > > > > > > > > > > > > Sure it can be done, it's just more complex if you try to do > it > > > in > > > > a > > > > > > sane > > > > > > > > way without having the code that builds the StreamGraph all > > over > > > > the > > > > > > > place. > > > > > > > > :D > > > > > > > > > > > > > > > > I'll try to come up with something. This is my current work > in > > > > > > progress, > > > > > > > by > > > > > > > > the way: > > > https://github.com/aljoscha/flink/tree/stream-api-rework > > > > > > > > > > > > > > > > I managed to ban the StreamGraph from > > StreamExecutionEnvironment > > > > and > > > > > > the > > > > > > > > API classes such as DataStream. The API methods construct a > > Graph > > > > of > > > > > > > > Transformation Nodes and don't contain any information > > > themselves. > > > > > Then > > > > > > > > there is a StreamGraphGenerator that builds a StreamGraph > from > > > the > > > > > > > > transformations. The abstraction is very nice and simple, the > > > only > > > > > > > problem > > > > > > > > that remains are the differing-parallelism-iterations but > I'll > > > > figure > > > > > > > them > > > > > > > > out. > > > > > > > > > > > > > > > > P.S. The code is not well documented yet, but the base class > > for > > > > > > > > transformations is StreamTransformation. From there anyone > who > > > > want's > > > > > > to > > > > > > > > check it out can find the other transformations. > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra < > [hidden email]> > > > > > wrote: > > > > > > > > > > > > > > > > > There might be reasons why a user would want different > > > > parallelism > > > > > at > > > > > > > the > > > > > > > > > head operators (depending on what else that head operator > > might > > > > > > > process) > > > > > > > > so > > > > > > > > > restricting them to the same parallelism is a little bit > > weird > > > > > don't > > > > > > > you > > > > > > > > > think? It kind of goes against the whole > > opeartors-parallelism > > > > > idea. > > > > > > > > > > > > > > > > > > I don't think its a huge complexity to group head operators > > > > > together > > > > > > by > > > > > > > > > parallelism and add a source/sink per each group like we do > > > now. > > > > > What > > > > > > > do > > > > > > > > > you say? > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: > > > 2015. > > > > > júl. > > > > > > > > 31., > > > > > > > > > P, 17:10): > > > > > > > > > > > > > > > > > > > Yes, I'm not saying that it makes sense to do it, I'm > just > > > > saying > > > > > > > that > > > > > > > > it > > > > > > > > > > does translate and run. Your observation is true. :D > > > > > > > > > > > > > > > > > > > > I'm wondering whether it makes sense to allow users to > have > > > > > > iteration > > > > > > > > > heads > > > > > > > > > > with differing parallelism, in fact. > > > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra < > > > [hidden email]> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > I still don't get how it could possibly work, let me > tell > > > you > > > > > > how I > > > > > > > > see > > > > > > > > > > and > > > > > > > > > > > correct me in my logic :) > > > > > > > > > > > > > > > > > > > > > > You have this program: > > > > > > > > > > > ids.map1().setParallelism(2) > > > > > > > > > > > ids.map2().setParallelism(4) > > > > > > > > > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > > > > > > > > > ids.closeWith(feedback.groupBy(0)) > > > > > > > > > > > > > > > > > > > > > > You are suggesting that we only have one iteration > > > > source/sink > > > > > > pair > > > > > > > > > with > > > > > > > > > > > parallelism of either 2 or 4. I will assume that the > > > > > parallelism > > > > > > > is 2 > > > > > > > > > for > > > > > > > > > > > the sake of the argument. > > > > > > > > > > > > > > > > > > > > > > The iteration source is connected to map1 and map2 with > > > > Forward > > > > > > > > > > > partitioning and the sink is connected with groupBy(0). > > > > > > > > > > > Each sink instance will receive all tuples of a given > key > > > > which > > > > > > > also > > > > > > > > > > means > > > > > > > > > > > that each iteration source instance (2) will too. > > > > > > > > > > > > > > > > > > > > > > Now here comes the problem: the source will forward the > > > > tuples > > > > > to > > > > > > > > map 1 > > > > > > > > > > and > > > > > > > > > > > since we have forward connection we maintiain the > groupby > > > > > > semantics > > > > > > > > > (this > > > > > > > > > > > is perfect.) the sources will also forward to map 2 > > which > > > > has > > > > > > > higher > > > > > > > > > > > parallelism so the tuple sending turns into round > robin, > > > > which > > > > > > > screws > > > > > > > > > up > > > > > > > > > > > the groupby. > > > > > > > > > > > > > > > > > > > > > > What did I miss? > > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta > > (időpont: > > > > > 2015. > > > > > > > júl. > > > > > > > > > > 31., > > > > > > > > > > > P, 14:59): > > > > > > > > > > > > > > > > > > > > > > > Yes, this would still work. For example, I have this > > > crazy > > > > > > graph: > > > > > > > > > > > > http://postimg.org/image/xtv8ay8hv/full/ That > results > > > from > > > > > > this > > > > > > > > > > program: > > > > > > > > > > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > > > > > > > > > > > > > > > > > > > > > > > It works, and the implementation is very simple, > > > actually. > > > > > > > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra < > > > > > [hidden email]> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > I mean that the head operators have different > > > > parallelism: > > > > > > > > > > > > > > > > > > > > > > > > > > IterativeDataStream ids = ... > > > > > > > > > > > > > > > > > > > > > > > > > > ids.map().setParallelism(2) > > > > > > > > > > > > > ids.map().setParallelism(4) > > > > > > > > > > > > > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > > > > > > > > > > > > > ids.closeWith(feedback) > > > > > > > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta > > > > (időpont: > > > > > > > 2015. > > > > > > > > > júl. > > > > > > > > > > > > 31., > > > > > > > > > > > > > P, 14:23): > > > > > > > > > > > > > > > > > > > > > > > > > > > I thought about having some tighter restrictions > > > here. > > > > My > > > > > > > idea > > > > > > > > > was > > > > > > > > > > to > > > > > > > > > > > > > > enforce that the feedback edges must have the > same > > > > > > > parallelism > > > > > > > > as > > > > > > > > > > the > > > > > > > > > > > > > > original input stream, otherwise shipping > > strategies > > > > such > > > > > > as > > > > > > > > > > "keyBy", > > > > > > > > > > > > > > "shuffle", "rebalance" don't seem to make sense > > > because > > > > > > they > > > > > > > > > would > > > > > > > > > > > > differ > > > > > > > > > > > > > > from the distribution of the original elements > (at > > > > least > > > > > > > IMHO). > > > > > > > > > > Maybe > > > > > > > > > > > > I'm > > > > > > > > > > > > > > wrong there, though. > > > > > > > > > > > > > > > > > > > > > > > > > > > > To me it seems intuitive that I get the feedback > at > > > the > > > > > > head > > > > > > > > they > > > > > > > > > > > way I > > > > > > > > > > > > > > specify it at the tail. But maybe that's also > just > > > > me... > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra < > > > > > [hidden email] > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am not sure what is the intuitive behaviour > > here. > > > > As > > > > > > you > > > > > > > > are > > > > > > > > > > not > > > > > > > > > > > > > > applying > > > > > > > > > > > > > > > a transformation on the feedback stream but > pass > > it > > > > to > > > > > a > > > > > > > > > > closeWith > > > > > > > > > > > > > > method, > > > > > > > > > > > > > > > I thought it was somehow nature that it gets > the > > > > > > > partitioning > > > > > > > > > of > > > > > > > > > > > the > > > > > > > > > > > > > > > iteration input, but maybe its not intuitive. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If others also think that preserving feedback > > > > > > partitioning > > > > > > > > > should > > > > > > > > > > > be > > > > > > > > > > > > > the > > > > > > > > > > > > > > > default I am not against it :) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Btw, this still won't make it very simple. We > > still > > > > > need > > > > > > as > > > > > > > > > many > > > > > > > > > > > > > > > source/sink pairs as we have different > > parallelism > > > > > among > > > > > > > the > > > > > > > > > head > > > > > > > > > > > > > > > operators. Otherwise the forwarding logic wont > > > work. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt > írta > > > > > > (időpont: > > > > > > > > > 2015. > > > > > > > > > > > júl. > > > > > > > > > > > > > > 31., > > > > > > > > > > > > > > > P, 11:52): > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > I'm currently working on making the > StreamGraph > > > > > > > generation > > > > > > > > > more > > > > > > > > > > > > > > > centralized > > > > > > > > > > > > > > > > (i.e. not spread across the different API > > > classes). > > > > > The > > > > > > > > > > question > > > > > > > > > > > is > > > > > > > > > > > > > now > > > > > > > > > > > > > > > why > > > > > > > > > > > > > > > > we need to switch to preserve partitioning? > > Could > > > > we > > > > > > not > > > > > > > > make > > > > > > > > > > > > > > "preserve" > > > > > > > > > > > > > > > > partitioning the default and if users want to > > > have > > > > > > > shuffle > > > > > > > > > > > > > partitioning > > > > > > > > > > > > > > > or > > > > > > > > > > > > > > > > anything they have to specify it manually > when > > > > adding > > > > > > the > > > > > > > > > > > feedback > > > > > > > > > > > > > > edge? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This would make for a very simple scheme > where > > > the > > > > > > > > iteration > > > > > > > > > > > > sources > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > always connected to the heads using "forward" > > and > > > > the > > > > > > > tails > > > > > > > > > are > > > > > > > > > > > > > > connected > > > > > > > > > > > > > > > > to the iteration sinks using whatever > > partitioner > > > > was > > > > > > set > > > > > > > > by > > > > > > > > > > the > > > > > > > > > > > > > user. > > > > > > > > > > > > > > > This > > > > > > > > > > > > > > > > would make it more transparent than the > current > > > > > default > > > > > > > of > > > > > > > > > the > > > > > > > > > > > > > > "shuffle" > > > > > > > > > > > > > > > > betweens tails and iteration sinks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > P.S. I now we had quite some discussion about > > > > > > introducing > > > > > > > > > > > "preserve > > > > > > > > > > > > > > > > partitioning" but now, when I think of it it > > > should > > > > > be > > > > > > > the > > > > > > > > > > > > default... > > > > > > > > > > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Okay, sounds reasonable :)
Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. aug. 3., H, 10:24): > I don't think there is a fundamental limitation to the simpler approach. > The only real difference is that DOPs are adjusted before the tail, so only > one head/tail pair is needed. > > Nested iterations should still be possible... > > On Mon, Aug 3, 2015 at 10:21 AM, Gyula Fóra <[hidden email]> wrote: > > > It is critical for many applications (such as SAMOA or Storm > compatibility) > > to build arbitrary cyclic flows. If your suggestion covers all cases (for > > instance nested iterations) then I am not against it. > > > > The current implementation is just one way to do it, but it allows > > arbitrary cycles. From the checkpointing perspective, I don't think this > > will make too much of a difference as that will probably have to be > handled > > on the receiver side anyways if you think about the cyclic algorithm. > > > > Gyula > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. aug. 3., > > H, > > 9:41): > > > > > Yes, that's what I was proposing in my second mail: > > > > > > I thought about having some tighter restrictions here. My idea was to > > > enforce that the feedback edges must have the same parallelism as the > > > original input stream, otherwise shipping strategies such as "keyBy", > > > "shuffle", "rebalance" don't seem to make sense because they would > differ > > > from the distribution of the original elements (at least IMHO). Maybe > I'm > > > wrong there, though. > > > > > > To me it seems intuitive that I get the feedback at the head they way I > > > specify it at the tail. But maybe that's also just me... :D > > > > > > On Mon, 3 Aug 2015 at 00:14 Stephan Ewen <[hidden email]> wrote: > > > > > > > This model strikes me as pretty complicated. Imagine the extra logic > > and > > > > code path necessary for proper checkpointing as well. > > > > > > > > Why not do a simple approach: > > > > - There is one parallel head, one parallel tail, both with the same > > > > parallelism > > > > > > > > - Any computation in between may have it own parallelism, no > special > > > > cases > > > > > > > > - If the tail does not have the same parallelism as the head, it > will > > > not > > > > by the tail, but flow will attach an additional tail operator. > Between > > > the > > > > original tail and the additional tail, the streams are redistributed > to > > > > achieve the required parallelism. > > > > > > > > Wouldn't that give us the same and make things much easier. The batch > > > > iterations work that way, by the way. > > > > > > > > > > > > > > > > On Sun, Aug 2, 2015 at 10:03 PM, Aljoscha Krettek < > [hidden email] > > > > > > > wrote: > > > > > > > > > To answer the question plain and simple: No, there are several > > > different > > > > > parallel heads and tails. > > > > > > > > > > For example in this: > > > > > val iter = ds.iteration() > > > > > > > > > > val head_tail1 = iter.map().parallelism(2) > > > > > val head_tail2 = iter.map().parallelism(4) > > > > > > > > > > iter.closeWith(head_tail1.union(head_tail2)) > > > > > > > > > > We have one head/tail pair with parallelism 2 and on with > parallelism > > > 4. > > > > > > > > > > Of the top of my head, I don't know what happens in this case > though: > > > > > > > > > > val iter = ds.iteration() > > > > > > > > > > val head1 = iter.map().parallelism(2) > > > > > val head2 = iter.map().parallelism(4) > > > > > > > > > > val tail1 = head1.map().parallelism(6) > > > > > val tail2 = head2.map().parallelism(8) > > > > > > > > > > iter.closeWith(tail1.union(tail2)) > > > > > > > > > > (Which is also tricky with the parallelism of the input stream) > > > > > > > > > > > > > > > On Sun, 2 Aug 2015 at 21:22 Gyula Fóra <[hidden email]> > wrote: > > > > > > > > > > > In a streaming program when we create an IterativeDataStream, we > > > > > > practically mark the union point of some later feedback stream > (the > > > one > > > > > > passed in to closeWith(..)). > > > > > > > > > > > > The operators applied on this IterativeDataStream will receive > the > > > > > feedback > > > > > > input as well. We call the operators applied on the iterative > > > > dataStream > > > > > > head operators. We call the operators that produce the streams > > passed > > > > > into > > > > > > closeWith tail operators. With this terminology we can have many > > > heads > > > > > and > > > > > > tails with varying parallelism. > > > > > > > > > > > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. aug. > 2., > > V, > > > > > > 20:16): > > > > > > > > > > > > > I don't get the discussion here, can you help me with what you > > mean > > > > by > > > > > > > "different iteration heads and tails" ? > > > > > > > > > > > > > > An iteration does not have one parallel head and one parallel > > tail? > > > > > > > > > > > > > > On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra < > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > > > Maybe you can reuse some of the logic that is currently there > > on > > > > the > > > > > > > > StreamGraph, with building StreamLoops first which will be > used > > > to > > > > > > > generate > > > > > > > > the sources and sinks right before building the JobGraph. > This > > > > avoids > > > > > > the > > > > > > > > need of knowing everything beforehand. > > > > > > > > > > > > > > > > I actually added this to avoid the complexities that you are > > > > probably > > > > > > > > facing now. > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: > > 2015. > > > > júl. > > > > > > > 31., > > > > > > > > P, 17:28): > > > > > > > > > > > > > > > > > Sure it can be done, it's just more complex if you try to > do > > it > > > > in > > > > > a > > > > > > > sane > > > > > > > > > way without having the code that builds the StreamGraph all > > > over > > > > > the > > > > > > > > place. > > > > > > > > > :D > > > > > > > > > > > > > > > > > > I'll try to come up with something. This is my current work > > in > > > > > > > progress, > > > > > > > > by > > > > > > > > > the way: > > > > https://github.com/aljoscha/flink/tree/stream-api-rework > > > > > > > > > > > > > > > > > > I managed to ban the StreamGraph from > > > StreamExecutionEnvironment > > > > > and > > > > > > > the > > > > > > > > > API classes such as DataStream. The API methods construct a > > > Graph > > > > > of > > > > > > > > > Transformation Nodes and don't contain any information > > > > themselves. > > > > > > Then > > > > > > > > > there is a StreamGraphGenerator that builds a StreamGraph > > from > > > > the > > > > > > > > > transformations. The abstraction is very nice and simple, > the > > > > only > > > > > > > > problem > > > > > > > > > that remains are the differing-parallelism-iterations but > > I'll > > > > > figure > > > > > > > > them > > > > > > > > > out. > > > > > > > > > > > > > > > > > > P.S. The code is not well documented yet, but the base > class > > > for > > > > > > > > > transformations is StreamTransformation. From there anyone > > who > > > > > want's > > > > > > > to > > > > > > > > > check it out can find the other transformations. > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra < > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > There might be reasons why a user would want different > > > > > parallelism > > > > > > at > > > > > > > > the > > > > > > > > > > head operators (depending on what else that head operator > > > might > > > > > > > > process) > > > > > > > > > so > > > > > > > > > > restricting them to the same parallelism is a little bit > > > weird > > > > > > don't > > > > > > > > you > > > > > > > > > > think? It kind of goes against the whole > > > opeartors-parallelism > > > > > > idea. > > > > > > > > > > > > > > > > > > > > I don't think its a huge complexity to group head > operators > > > > > > together > > > > > > > by > > > > > > > > > > parallelism and add a source/sink per each group like we > do > > > > now. > > > > > > What > > > > > > > > do > > > > > > > > > > you say? > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta > (időpont: > > > > 2015. > > > > > > júl. > > > > > > > > > 31., > > > > > > > > > > P, 17:10): > > > > > > > > > > > > > > > > > > > > > Yes, I'm not saying that it makes sense to do it, I'm > > just > > > > > saying > > > > > > > > that > > > > > > > > > it > > > > > > > > > > > does translate and run. Your observation is true. :D > > > > > > > > > > > > > > > > > > > > > > I'm wondering whether it makes sense to allow users to > > have > > > > > > > iteration > > > > > > > > > > heads > > > > > > > > > > > with differing parallelism, in fact. > > > > > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra < > > > > [hidden email]> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > I still don't get how it could possibly work, let me > > tell > > > > you > > > > > > > how I > > > > > > > > > see > > > > > > > > > > > and > > > > > > > > > > > > correct me in my logic :) > > > > > > > > > > > > > > > > > > > > > > > > You have this program: > > > > > > > > > > > > ids.map1().setParallelism(2) > > > > > > > > > > > > ids.map2().setParallelism(4) > > > > > > > > > > > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > > > > > > > > > > > ids.closeWith(feedback.groupBy(0)) > > > > > > > > > > > > > > > > > > > > > > > > You are suggesting that we only have one iteration > > > > > source/sink > > > > > > > pair > > > > > > > > > > with > > > > > > > > > > > > parallelism of either 2 or 4. I will assume that the > > > > > > parallelism > > > > > > > > is 2 > > > > > > > > > > for > > > > > > > > > > > > the sake of the argument. > > > > > > > > > > > > > > > > > > > > > > > > The iteration source is connected to map1 and map2 > with > > > > > Forward > > > > > > > > > > > > partitioning and the sink is connected with > groupBy(0). > > > > > > > > > > > > Each sink instance will receive all tuples of a given > > key > > > > > which > > > > > > > > also > > > > > > > > > > > means > > > > > > > > > > > > that each iteration source instance (2) will too. > > > > > > > > > > > > > > > > > > > > > > > > Now here comes the problem: the source will forward > the > > > > > tuples > > > > > > to > > > > > > > > > map 1 > > > > > > > > > > > and > > > > > > > > > > > > since we have forward connection we maintiain the > > groupby > > > > > > > semantics > > > > > > > > > > (this > > > > > > > > > > > > is perfect.) the sources will also forward to map 2 > > > which > > > > > has > > > > > > > > higher > > > > > > > > > > > > parallelism so the tuple sending turns into round > > robin, > > > > > which > > > > > > > > screws > > > > > > > > > > up > > > > > > > > > > > > the groupby. > > > > > > > > > > > > > > > > > > > > > > > > What did I miss? > > > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta > > > (időpont: > > > > > > 2015. > > > > > > > > júl. > > > > > > > > > > > 31., > > > > > > > > > > > > P, 14:59): > > > > > > > > > > > > > > > > > > > > > > > > > Yes, this would still work. For example, I have > this > > > > crazy > > > > > > > graph: > > > > > > > > > > > > > http://postimg.org/image/xtv8ay8hv/full/ That > > results > > > > from > > > > > > > this > > > > > > > > > > > program: > > > > > > > > > > > > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > > > > > > > > > > > > > > > > > > > > > > > > > It works, and the implementation is very simple, > > > > actually. > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra < > > > > > > [hidden email]> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > I mean that the head operators have different > > > > > parallelism: > > > > > > > > > > > > > > > > > > > > > > > > > > > > IterativeDataStream ids = ... > > > > > > > > > > > > > > > > > > > > > > > > > > > > ids.map().setParallelism(2) > > > > > > > > > > > > > > ids.map().setParallelism(4) > > > > > > > > > > > > > > > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > > > > > > > > > > > > > > > ids.closeWith(feedback) > > > > > > > > > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt írta > > > > > (időpont: > > > > > > > > 2015. > > > > > > > > > > júl. > > > > > > > > > > > > > 31., > > > > > > > > > > > > > > P, 14:23): > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I thought about having some tighter > restrictions > > > > here. > > > > > My > > > > > > > > idea > > > > > > > > > > was > > > > > > > > > > > to > > > > > > > > > > > > > > > enforce that the feedback edges must have the > > same > > > > > > > > parallelism > > > > > > > > > as > > > > > > > > > > > the > > > > > > > > > > > > > > > original input stream, otherwise shipping > > > strategies > > > > > such > > > > > > > as > > > > > > > > > > > "keyBy", > > > > > > > > > > > > > > > "shuffle", "rebalance" don't seem to make sense > > > > because > > > > > > > they > > > > > > > > > > would > > > > > > > > > > > > > differ > > > > > > > > > > > > > > > from the distribution of the original elements > > (at > > > > > least > > > > > > > > IMHO). > > > > > > > > > > > Maybe > > > > > > > > > > > > > I'm > > > > > > > > > > > > > > > wrong there, though. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > To me it seems intuitive that I get the > feedback > > at > > > > the > > > > > > > head > > > > > > > > > they > > > > > > > > > > > > way I > > > > > > > > > > > > > > > specify it at the tail. But maybe that's also > > just > > > > > me... > > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra < > > > > > > [hidden email] > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am not sure what is the intuitive behaviour > > > here. > > > > > As > > > > > > > you > > > > > > > > > are > > > > > > > > > > > not > > > > > > > > > > > > > > > applying > > > > > > > > > > > > > > > > a transformation on the feedback stream but > > pass > > > it > > > > > to > > > > > > a > > > > > > > > > > > closeWith > > > > > > > > > > > > > > > method, > > > > > > > > > > > > > > > > I thought it was somehow nature that it gets > > the > > > > > > > > partitioning > > > > > > > > > > of > > > > > > > > > > > > the > > > > > > > > > > > > > > > > iteration input, but maybe its not intuitive. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If others also think that preserving feedback > > > > > > > partitioning > > > > > > > > > > should > > > > > > > > > > > > be > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > default I am not against it :) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Btw, this still won't make it very simple. We > > > still > > > > > > need > > > > > > > as > > > > > > > > > > many > > > > > > > > > > > > > > > > source/sink pairs as we have different > > > parallelism > > > > > > among > > > > > > > > the > > > > > > > > > > head > > > > > > > > > > > > > > > > operators. Otherwise the forwarding logic > wont > > > > work. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <[hidden email]> ezt > > írta > > > > > > > (időpont: > > > > > > > > > > 2015. > > > > > > > > > > > > júl. > > > > > > > > > > > > > > > 31., > > > > > > > > > > > > > > > > P, 11:52): > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > I'm currently working on making the > > StreamGraph > > > > > > > > generation > > > > > > > > > > more > > > > > > > > > > > > > > > > centralized > > > > > > > > > > > > > > > > > (i.e. not spread across the different API > > > > classes). > > > > > > The > > > > > > > > > > > question > > > > > > > > > > > > is > > > > > > > > > > > > > > now > > > > > > > > > > > > > > > > why > > > > > > > > > > > > > > > > > we need to switch to preserve partitioning? > > > Could > > > > > we > > > > > > > not > > > > > > > > > make > > > > > > > > > > > > > > > "preserve" > > > > > > > > > > > > > > > > > partitioning the default and if users want > to > > > > have > > > > > > > > shuffle > > > > > > > > > > > > > > partitioning > > > > > > > > > > > > > > > > or > > > > > > > > > > > > > > > > > anything they have to specify it manually > > when > > > > > adding > > > > > > > the > > > > > > > > > > > > feedback > > > > > > > > > > > > > > > edge? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This would make for a very simple scheme > > where > > > > the > > > > > > > > > iteration > > > > > > > > > > > > > sources > > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > always connected to the heads using > "forward" > > > and > > > > > the > > > > > > > > tails > > > > > > > > > > are > > > > > > > > > > > > > > > connected > > > > > > > > > > > > > > > > > to the iteration sinks using whatever > > > partitioner > > > > > was > > > > > > > set > > > > > > > > > by > > > > > > > > > > > the > > > > > > > > > > > > > > user. > > > > > > > > > > > > > > > > This > > > > > > > > > > > > > > > > > would make it more transparent than the > > current > > > > > > default > > > > > > > > of > > > > > > > > > > the > > > > > > > > > > > > > > > "shuffle" > > > > > > > > > > > > > > > > > betweens tails and iteration sinks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > P.S. I now we had quite some discussion > about > > > > > > > introducing > > > > > > > > > > > > "preserve > > > > > > > > > > > > > > > > > partitioning" but now, when I think of it > it > > > > should > > > > > > be > > > > > > > > the > > > > > > > > > > > > > default... > > > > > > > > > > > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |