Hey All,
I would like to bring up a discussion regarding the different reduce functionalities in the streaming API, because there are several ways they can (and cannot) be implemented efficiently which will work in totally different ways. I will go through the different reduce types and their problems: *Simple reduce:* dataStream.reduce(ReduceFunction) The current working mechanism is that it combines the incoming elements using the reducefunction and emits the current reduced value. (so after every incoming, value there is one emit with the reduced). Currently if the reducer has higher parallelism than 1, the reduced value is only a 'local' reduce on the given vertex (there is no global reduce step). The problem here is that introducing a global reduce step would be equivalent to setting the reducer parallelism to 1, since we want to emit the current reduced value after each incoming data point. So the question here is if the current mechanism makes sense without a global aggregate or should we force parallelism 1. This is also the case for aggregation operators. *Simple Batch/Window reduce:* dataStream.window(1000).reduce(ReduceFunction) The current working mechanism is that it combines incoming elements with a ReduceFunction in windows/batches, currently also 'locally' on each parallel vertex and emitting one reduced output after each window. Here the same issue of global aggregation can be solved by introducing a global aggregator vertex with parallelism 1, which wouldnt cause a serious overhead if the windows are not too small. Another issue here is the assumptions we can make about the user-defined ReduceFunction. If we assume that the function is associative(we currently assume this) then the window reduce operators can be implemented to be almost as fast as simple reduces by storing pre-reduced groups of values. Do you think it is okay to make this assumption? *Batch/window groupreduce:* dataStream.window(1000).reduceGroup(GroupReduceFunction) The difference between .reduce and . groupReduce is that the user gets the window/batch as an iterable which can be quite useful in some cases. The problem here is the same as with the simple reduce, that is we couldnt figure out how to make global aggregations efficient. Unlike with window reduce where we can create a global aggregator vertex here that is impossible because the different working mechanics of the GroupReduce function (iterable input with custom output type). So even if we will make the window reduce global, the window groupreduce will have to remain local if we dont want to enforce a parallelism=1 bottleneck. This would make the API ambiguous. *Grouped reduces* dataStream.groupBy(keyPos).reduce(ReduceFunction) datastream.groupBy.(keyPos).window(1000).reduce/groupreduce Here we dont have the previous problems since local aggregations work as globals. So any ideas regarding this global/local reduce issue and reduce function associativity are appreciated :) Regards, Gyula |
Hi, thanks for starting this discussion!
I added some comments inline. 2014-09-24 11:43 GMT+02:00 Gyula Fóra <[hidden email]>: > Hey All, > > I would like to bring up a discussion regarding the different reduce > functionalities in the streaming API, because there are several ways they > can (and cannot) be implemented efficiently which will work in totally > different ways. > > I will go through the different reduce types and their problems: > > *Simple reduce:* > dataStream.reduce(ReduceFunction) > > The current working mechanism is that it combines the incoming elements > using the reducefunction and emits the current reduced value. (so after > every incoming, value there is one emit with the reduced). Currently if the > reducer has higher parallelism than 1, the reduced value is only a 'local' > reduce on the given vertex (there is no global reduce step). The problem > here is that introducing a global reduce step would be equivalent to > setting the reducer parallelism to 1, since we want to emit the current > reduced value after each incoming data point. So the question here is if > the current mechanism makes sense without a global aggregate or should we > force parallelism 1. > > enforce DOP = 1 in this case. > This is also the case for aggregation operators. > > *Simple Batch/Window reduce:* > dataStream.window(1000).reduce(ReduceFunction) > > The current working mechanism is that it combines incoming elements with a > ReduceFunction in windows/batches, currently also 'locally' on each > parallel vertex and emitting one reduced output after each window. Here the > same issue of global aggregation can be solved by introducing a global > aggregator vertex with parallelism 1, which wouldnt cause a serious > overhead if the windows are not too small. > > Another issue here is the assumptions we can make about the user-defined > ReduceFunction. If we assume that the function is associative(we currently > assume this) then the window reduce operators can be implemented to be > almost as fast as simple reduces by storing pre-reduced groups of values. > Do you think it is okay to make this assumption? > > leveraged by the batch API which uses reduce functions for local aggregates (combiners) as well as for global aggregates (reduce). For windowed streaming you can do the same thing. Do local preaggregations on windows, emit the partial result once a window is complete, and reinitialize the partital aggragator (start with no initial state). The global aggregator eagerly reduces a preaggregate with the last full aggregate (no windowing). Since, preaggregators are reinitialized, the global aggregators does not need to hold individiual state for the preaggregates and only keeps the last full aggregate. > *Batch/window groupreduce:* > dataStream.window(1000).reduceGroup(GroupReduceFunction) > > The difference between .reduce and . groupReduce is that the user gets the > window/batch as an iterable which can be quite useful in some cases. The > problem here is the same as with the simple reduce, that is we couldnt > figure out how to make global aggregations efficient. Unlike with window > reduce where we can create a global aggregator vertex here that is > impossible because the different working mechanics of the GroupReduce > function (iterable input with custom output type). > > So even if we will make the window reduce global, the window groupreduce > will have to remain local if we dont want to enforce a parallelism=1 > bottleneck. This would make the API ambiguous. > > > You can do the same as for window.reduce if the GroupReduce function associative!). In addition to the local preaggregates, the combine is also be used on the global aggregate to further reduce the state by combining the preaggregates. The reduce function is only called with a single value (the combined preaggregates) to make sure that the result is correct. If the GroupReduce function does not implement a combine interface, don't think that this can be done in a practical way (i.e., without caching the full stream). > *Grouped reduces* > > dataStream.groupBy(keyPos).reduce(ReduceFunction) > datastream.groupBy.(keyPos).window(1000).reduce/groupreduce > > Here we dont have the previous problems since local aggregations work as > globals. > > I think you can play the preaggregation/combine tricks for windows here as well. So even in case of high data skew, you could do preaggregations for a single group with multiple combiners in parallel. > > So any ideas regarding this global/local reduce issue and reduce function > associativity are appreciated :) > > Regards, > Gyula I hope I got everything right ;-) Cheers, Fabian |
Hey Fabian,
I have been thinking a bit a about your comments, let me give you my thoughts on them: Simple reduce: >Not sure if there are use cases for multiple local reducers. I think I'd >enforce DOP = 1 in this case. When I started the discussion yesterday I thought pretty much the same, but then I realized since we have custom partitioners between the task vertexes the user can control which tuples go to which vertex. So local reducers actually make a lot of sense. For instance I can implement a range partitioner by some field then perform reduce locally. And actually this argument goes for all the local stream reducers even on windows. So maybe there should be an option for local/global. Simple Batch/Window reduce: >The associativity assumption for reduce functions holds. This also >leveraged by the batch API which uses reduce functions for local aggregates >(combiners) as well as for global aggregates (reduce). For windowed >streaming you can do the same thing. Do local preaggregations on windows, >emit the partial result once a window is complete, and reinitialize the >partital aggragator (start with no initial state). The global aggregator >eagerly reduces a preaggregate with the last full aggregate (no windowing). >Since, preaggregators are reinitialized, the global aggregators does not >need to hold individiual state for the preaggregates and only keeps the >last full aggregate. Yes I agree with the preaggregation approach. On a second thought this will only make sense on windowed data streams where the window is defined by system time. Anything else, for example batch (number of incoming records) seems impossible to align if the datastream is not 100% balanced, which will not happen if the partitioning is not shuffle. (so this is another issue) Batch/window groupreduce: > You can do the same as for window.reduce if the GroupReduce function >implements the combine interface (Combine functions must be >associative!). In addition to the local preaggregates, the combine is also >be used on the global aggregate to further reduce the state by combining >the preaggregates. The reduce function is only called with a single value >(the combined preaggregates) to make sure that the result is correct. >If the GroupReduce function does not implement a combine interface, don't >think that this can be done in a practical way (i.e., without caching the >full stream). Good call with the combine interface, I haven't thought about that! Same issue with not-time windows as in the previous one. Group reduces: >I think you can play the preaggregation/combine tricks for windows here as >well. So even in case of high data skew, you could do preaggregations for a >single group with multiple combiners in parallel. Good point, I havent thought about this, we assumed that the grouping would give a balanced distribution but you are right. I suggest that we don't include these global reducers in the upcoming release, there is a lot of ways the api can be implemented it seems. I think we should give it more thought, and experiment with the different approaches first before we release it. What do you think about this? Regards, Gyula On Thu, Sep 25, 2014 at 12:14 AM, Fabian Hueske <[hidden email]> wrote: > Hi, thanks for starting this discussion! > I added some comments inline. > > 2014-09-24 11:43 GMT+02:00 Gyula Fóra <[hidden email]>: > > > Hey All, > > > > I would like to bring up a discussion regarding the different reduce > > functionalities in the streaming API, because there are several ways they > > can (and cannot) be implemented efficiently which will work in totally > > different ways. > > > > I will go through the different reduce types and their problems: > > > > *Simple reduce:* > > dataStream.reduce(ReduceFunction) > > > > The current working mechanism is that it combines the incoming elements > > using the reducefunction and emits the current reduced value. (so after > > every incoming, value there is one emit with the reduced). Currently if > the > > reducer has higher parallelism than 1, the reduced value is only a > 'local' > > reduce on the given vertex (there is no global reduce step). The problem > > here is that introducing a global reduce step would be equivalent to > > setting the reducer parallelism to 1, since we want to emit the current > > reduced value after each incoming data point. So the question here is if > > the current mechanism makes sense without a global aggregate or should we > > force parallelism 1. > > > > > Not sure if there are use cases for multiple local reducers. I think I'd > enforce DOP = 1 in this case. > > > > This is also the case for aggregation operators. > > > > *Simple Batch/Window reduce:* > > dataStream.window(1000).reduce(ReduceFunction) > > > > The current working mechanism is that it combines incoming elements with > a > > ReduceFunction in windows/batches, currently also 'locally' on each > > parallel vertex and emitting one reduced output after each window. Here > the > > same issue of global aggregation can be solved by introducing a global > > aggregator vertex with parallelism 1, which wouldnt cause a serious > > overhead if the windows are not too small. > > > > Another issue here is the assumptions we can make about the user-defined > > ReduceFunction. If we assume that the function is associative(we > currently > > assume this) then the window reduce operators can be implemented to be > > almost as fast as simple reduces by storing pre-reduced groups of values. > > Do you think it is okay to make this assumption? > > > > > The associativity assumption for reduce functions holds. This also > leveraged by the batch API which uses reduce functions for local aggregates > (combiners) as well as for global aggregates (reduce). For windowed > streaming you can do the same thing. Do local preaggregations on windows, > emit the partial result once a window is complete, and reinitialize the > partital aggragator (start with no initial state). The global aggregator > eagerly reduces a preaggregate with the last full aggregate (no windowing). > Since, preaggregators are reinitialized, the global aggregators does not > need to hold individiual state for the preaggregates and only keeps the > last full aggregate. > > > > *Batch/window groupreduce:* > > dataStream.window(1000).reduceGroup(GroupReduceFunction) > > > > The difference between .reduce and . groupReduce is that the user gets > the > > window/batch as an iterable which can be quite useful in some cases. The > > problem here is the same as with the simple reduce, that is we couldnt > > figure out how to make global aggregations efficient. Unlike with window > > reduce where we can create a global aggregator vertex here that is > > impossible because the different working mechanics of the GroupReduce > > function (iterable input with custom output type). > > > > So even if we will make the window reduce global, the window groupreduce > > will have to remain local if we dont want to enforce a parallelism=1 > > bottleneck. This would make the API ambiguous. > > > > > > You can do the same as for window.reduce if the GroupReduce function > implements the combine interface (Combine functions must be > associative!). In addition to the local preaggregates, the combine is also > be used on the global aggregate to further reduce the state by combining > the preaggregates. The reduce function is only called with a single value > (the combined preaggregates) to make sure that the result is correct. > If the GroupReduce function does not implement a combine interface, don't > think that this can be done in a practical way (i.e., without caching the > full stream). > > > > > *Grouped reduces* > > > > dataStream.groupBy(keyPos).reduce(ReduceFunction) > > datastream.groupBy.(keyPos).window(1000).reduce/groupreduce > > > > Here we dont have the previous problems since local aggregations work as > > globals. > > > > > I think you can play the preaggregation/combine tricks for windows here as > well. So even in case of high data skew, you could do preaggregations for a > single group with multiple combiners in parallel. > > > > > > So any ideas regarding this global/local reduce issue and reduce function > > associativity are appreciated :) > > > > Regards, > > Gyula > > > I hope I got everything right ;-) > > Cheers, Fabian > |
2014-09-25 10:01 GMT+02:00 Gyula Fóra <[hidden email]>:
> Hey Fabian, > > I have been thinking a bit a about your comments, let me give you my > thoughts on them: > > Simple reduce: > >Not sure if there are use cases for multiple local reducers. I think I'd > >enforce DOP = 1 in this case. > > When I started the discussion yesterday I thought pretty much the same, but > then I realized since we have custom partitioners between the task vertexes > the user can control which tuples go to which vertex. So local reducers > actually make a lot of sense. For instance I can implement a range > partitioner by some field then perform reduce locally. And actually this > argument goes for all the local stream reducers even on windows. So maybe > there should be an option for local/global. > > > Simple Batch/Window reduce: > > >The associativity assumption for reduce functions holds. This also > >leveraged by the batch API which uses reduce functions for local > aggregates > >(combiners) as well as for global aggregates (reduce). For windowed > >streaming you can do the same thing. Do local preaggregations on windows, > >emit the partial result once a window is complete, and reinitialize the > >partital aggragator (start with no initial state). The global aggregator > >eagerly reduces a preaggregate with the last full aggregate (no > windowing). > >Since, preaggregators are reinitialized, the global aggregators does not > >need to hold individiual state for the preaggregates and only keeps the > >last full aggregate. > > Yes I agree with the preaggregation approach. On a second thought this will > only make sense on windowed data streams where the window is defined by > system time. Anything else, for example batch (number of incoming records) > seems impossible to align if the datastream is not 100% balanced, which > will not happen if the partitioning is not shuffle. (so this is another > issue) > > windows, you are right. This is would be hard to implement. If windows are an upper bound, it might work if n preaggregators preaggregate (windowssize/n) elements. I think the question is, if the size of a window is essential for the semantics of a job or "only" to control preformance and accuracy. The former case is hard, the latter might be much easier to solve. But this definitely needs more thought. I guess a bit research on how other systems solve this might be useful. We're probably not the first once to encounter this challenge ;-) > Batch/window groupreduce: > > You can do the same as for window.reduce if the GroupReduce function > >implements the combine interface (Combine functions must be > >associative!). In addition to the local preaggregates, the combine is also > >be used on the global aggregate to further reduce the state by combining > >the preaggregates. The reduce function is only called with a single value > >(the combined preaggregates) to make sure that the result is correct. > >If the GroupReduce function does not implement a combine interface, don't > >think that this can be done in a practical way (i.e., without caching the > >full stream). > > Good call with the combine interface, I haven't thought about that! Same > issue with not-time windows as in the previous one. > > Group reduces: > >I think you can play the preaggregation/combine tricks for windows here as > >well. So even in case of high data skew, you could do preaggregations for > a > >single group with multiple combiners in parallel. > > Good point, I havent thought about this, we assumed that the grouping would > give a balanced distribution but you are right. > > > I suggest that we don't include these global reducers in the upcoming > release, there is a lot of ways the api can be implemented it seems. I > think we should give it more thought, and experiment with the different > approaches first before we release it. What do you think about this? > > We should also carefully think about the semantics of the windows too to avoid that our semantics change (which still might happen...). > Regards, > Gyula > > Cheers, Fabian > > On Thu, Sep 25, 2014 at 12:14 AM, Fabian Hueske <[hidden email]> > wrote: > > > Hi, thanks for starting this discussion! > > I added some comments inline. > > > > 2014-09-24 11:43 GMT+02:00 Gyula Fóra <[hidden email]>: > > > > > Hey All, > > > > > > I would like to bring up a discussion regarding the different reduce > > > functionalities in the streaming API, because there are several ways > they > > > can (and cannot) be implemented efficiently which will work in totally > > > different ways. > > > > > > I will go through the different reduce types and their problems: > > > > > > *Simple reduce:* > > > dataStream.reduce(ReduceFunction) > > > > > > The current working mechanism is that it combines the incoming elements > > > using the reducefunction and emits the current reduced value. (so after > > > every incoming, value there is one emit with the reduced). Currently if > > the > > > reducer has higher parallelism than 1, the reduced value is only a > > 'local' > > > reduce on the given vertex (there is no global reduce step). The > problem > > > here is that introducing a global reduce step would be equivalent to > > > setting the reducer parallelism to 1, since we want to emit the current > > > reduced value after each incoming data point. So the question here is > if > > > the current mechanism makes sense without a global aggregate or should > we > > > force parallelism 1. > > > > > > > > Not sure if there are use cases for multiple local reducers. I think I'd > > enforce DOP = 1 in this case. > > > > > > > This is also the case for aggregation operators. > > > > > > *Simple Batch/Window reduce:* > > > dataStream.window(1000).reduce(ReduceFunction) > > > > > > The current working mechanism is that it combines incoming elements > with > > a > > > ReduceFunction in windows/batches, currently also 'locally' on each > > > parallel vertex and emitting one reduced output after each window. Here > > the > > > same issue of global aggregation can be solved by introducing a global > > > aggregator vertex with parallelism 1, which wouldnt cause a serious > > > overhead if the windows are not too small. > > > > > > Another issue here is the assumptions we can make about the > user-defined > > > ReduceFunction. If we assume that the function is associative(we > > currently > > > assume this) then the window reduce operators can be implemented to be > > > almost as fast as simple reduces by storing pre-reduced groups of > values. > > > Do you think it is okay to make this assumption? > > > > > > > > The associativity assumption for reduce functions holds. This also > > leveraged by the batch API which uses reduce functions for local > aggregates > > (combiners) as well as for global aggregates (reduce). For windowed > > streaming you can do the same thing. Do local preaggregations on windows, > > emit the partial result once a window is complete, and reinitialize the > > partital aggragator (start with no initial state). The global aggregator > > eagerly reduces a preaggregate with the last full aggregate (no > windowing). > > Since, preaggregators are reinitialized, the global aggregators does not > > need to hold individiual state for the preaggregates and only keeps the > > last full aggregate. > > > > > > > *Batch/window groupreduce:* > > > dataStream.window(1000).reduceGroup(GroupReduceFunction) > > > > > > The difference between .reduce and . groupReduce is that the user gets > > the > > > window/batch as an iterable which can be quite useful in some cases. > The > > > problem here is the same as with the simple reduce, that is we couldnt > > > figure out how to make global aggregations efficient. Unlike with > window > > > reduce where we can create a global aggregator vertex here that is > > > impossible because the different working mechanics of the GroupReduce > > > function (iterable input with custom output type). > > > > > > So even if we will make the window reduce global, the window > groupreduce > > > will have to remain local if we dont want to enforce a parallelism=1 > > > bottleneck. This would make the API ambiguous. > > > > > > > > > You can do the same as for window.reduce if the GroupReduce function > > implements the combine interface (Combine functions must be > > associative!). In addition to the local preaggregates, the combine is > also > > be used on the global aggregate to further reduce the state by combining > > the preaggregates. The reduce function is only called with a single value > > (the combined preaggregates) to make sure that the result is correct. > > If the GroupReduce function does not implement a combine interface, don't > > think that this can be done in a practical way (i.e., without caching the > > full stream). > > > > > > > > > *Grouped reduces* > > > > > > dataStream.groupBy(keyPos).reduce(ReduceFunction) > > > datastream.groupBy.(keyPos).window(1000).reduce/groupreduce > > > > > > Here we dont have the previous problems since local aggregations work > as > > > globals. > > > > > > > > I think you can play the preaggregation/combine tricks for windows here > as > > well. So even in case of high data skew, you could do preaggregations > for a > > single group with multiple combiners in parallel. > > > > > > > > > > So any ideas regarding this global/local reduce issue and reduce > function > > > associativity are appreciated :) > > > > > > Regards, > > > Gyula > > > > > > I hope I got everything right ;-) > > > > Cheers, Fabian > > > |
Free forum by Nabble | Edit this page |