Hi all,
As we mentioned in some conference, such as Flink Forward SF 2019 and QCon Beijing 2019, our team has implemented "Local aggregation" in our inner Flink fork. This feature can effectively alleviate data skew. Currently, keyed streams are widely used to perform aggregating operations (e.g., reduce, sum and window) on the elements that having the same key. When executed at runtime, the elements with the same key will be sent to and aggregated by the same task. The performance of these aggregating operations is very sensitive to the distribution of keys. In the cases where the distribution of keys follows a powerful law, the performance will be significantly downgraded. More unluckily, increasing the degree of parallelism does not help when a task is overloaded by a single key. Local aggregation is a widely-adopted method to reduce the performance degraded by data skew. We can decompose the aggregating operations into two phases. In the first phase, we aggregate the elements of the same key at the sender side to obtain partial results. Then at the second phase, these partial results are sent to receivers according to their keys and are combined to obtain the final result. Since the number of partial results received by each receiver is limited by the number of senders, the imbalance among receivers can be reduced. Besides, by reducing the amount of transferred data the performance can be further improved. The design documentation is here: https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing Any comment and feedback are welcome and appreciated. Best, Vino |
Nice feature.
Looking forward to having it in Flink. Regards, Xiaogang vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: > Hi all, > > As we mentioned in some conference, such as Flink Forward SF 2019 and QCon > Beijing 2019, our team has implemented "Local aggregation" in our inner > Flink fork. This feature can effectively alleviate data skew. > > Currently, keyed streams are widely used to perform aggregating operations > (e.g., reduce, sum and window) on the elements that having the same key. > When executed at runtime, the elements with the same key will be sent to > and aggregated by the same task. > > The performance of these aggregating operations is very sensitive to the > distribution of keys. In the cases where the distribution of keys follows a > powerful law, the performance will be significantly downgraded. More > unluckily, increasing the degree of parallelism does not help when a task > is overloaded by a single key. > > Local aggregation is a widely-adopted method to reduce the performance > degraded by data skew. We can decompose the aggregating operations into two > phases. In the first phase, we aggregate the elements of the same key at > the sender side to obtain partial results. Then at the second phase, these > partial results are sent to receivers according to their keys and are > combined to obtain the final result. Since the number of partial results > received by each receiver is limited by the number of senders, the > imbalance among receivers can be reduced. Besides, by reducing the amount > of transferred data the performance can be further improved. > > The design documentation is here: > > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > > Any comment and feedback are welcome and appreciated. > > Best, > Vino > |
Excited and Big +1 for this feature.
SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: > Nice feature. > Looking forward to having it in Flink. > > Regards, > Xiaogang > > vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: > > > Hi all, > > > > As we mentioned in some conference, such as Flink Forward SF 2019 and > QCon > > Beijing 2019, our team has implemented "Local aggregation" in our inner > > Flink fork. This feature can effectively alleviate data skew. > > > > Currently, keyed streams are widely used to perform aggregating > operations > > (e.g., reduce, sum and window) on the elements that having the same key. > > When executed at runtime, the elements with the same key will be sent to > > and aggregated by the same task. > > > > The performance of these aggregating operations is very sensitive to the > > distribution of keys. In the cases where the distribution of keys > follows a > > powerful law, the performance will be significantly downgraded. More > > unluckily, increasing the degree of parallelism does not help when a task > > is overloaded by a single key. > > > > Local aggregation is a widely-adopted method to reduce the performance > > degraded by data skew. We can decompose the aggregating operations into > two > > phases. In the first phase, we aggregate the elements of the same key at > > the sender side to obtain partial results. Then at the second phase, > these > > partial results are sent to receivers according to their keys and are > > combined to obtain the final result. Since the number of partial results > > received by each receiver is limited by the number of senders, the > > imbalance among receivers can be reduced. Besides, by reducing the amount > > of transferred data the performance can be further improved. > > > > The design documentation is here: > > > > > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > > > > Any comment and feedback are welcome and appreciated. > > > > Best, > > Vino > > > |
Hi,
+1 for the idea from my side. I’ve even attempted to add similar feature quite some time ago, but didn’t get enough traction [1]. I’ve read through your document and I couldn’t find it mentioning anywhere, when the pre aggregated result should be emitted down the stream? I think that’s one of the most crucial decision, since wrong decision here can lead to decrease of performance or to an explosion of memory/state consumption (both with bounded and unbounded data streams). For streaming it can also lead to an increased latency. Since this is also a decision that’s impossible to make automatically perfectly reliably, first and foremost I would expect this to be configurable via the API. With maybe some predefined triggers, like on watermark (for windowed operations), on checkpoint barrier (to decrease state size?), on element count, maybe memory usage (much easier to estimate with a known/predefined types, like in SQL)… and with some option to implement custom trigger. Also what would work the best would be to have a some form of memory consumption priority. For example if we are running out of memory for HashJoin/Final aggregation, instead of spilling to disk or crashing the job with OOM it would be probably better to prune/dump the pre/local aggregation state. But that’s another story. [1] https://github.com/apache/flink/pull/4626 <https://github.com/apache/flink/pull/4626> Piotrek > On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: > > Excited and Big +1 for this feature. > > SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: > >> Nice feature. >> Looking forward to having it in Flink. >> >> Regards, >> Xiaogang >> >> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: >> >>> Hi all, >>> >>> As we mentioned in some conference, such as Flink Forward SF 2019 and >> QCon >>> Beijing 2019, our team has implemented "Local aggregation" in our inner >>> Flink fork. This feature can effectively alleviate data skew. >>> >>> Currently, keyed streams are widely used to perform aggregating >> operations >>> (e.g., reduce, sum and window) on the elements that having the same key. >>> When executed at runtime, the elements with the same key will be sent to >>> and aggregated by the same task. >>> >>> The performance of these aggregating operations is very sensitive to the >>> distribution of keys. In the cases where the distribution of keys >> follows a >>> powerful law, the performance will be significantly downgraded. More >>> unluckily, increasing the degree of parallelism does not help when a task >>> is overloaded by a single key. >>> >>> Local aggregation is a widely-adopted method to reduce the performance >>> degraded by data skew. We can decompose the aggregating operations into >> two >>> phases. In the first phase, we aggregate the elements of the same key at >>> the sender side to obtain partial results. Then at the second phase, >> these >>> partial results are sent to receivers according to their keys and are >>> combined to obtain the final result. Since the number of partial results >>> received by each receiver is limited by the number of senders, the >>> imbalance among receivers can be reduced. Besides, by reducing the amount >>> of transferred data the performance can be further improved. >>> >>> The design documentation is here: >>> >>> >> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing >>> >>> Any comment and feedback are welcome and appreciated. >>> >>> Best, >>> Vino >>> >> |
Hi Piotr,
The localKeyBy API returns an instance of KeyedStream (we just added an inner flag to identify the local mode) which is Flink has provided before. Users can call all the APIs(especially *window* APIs) which KeyedStream provided. So if users want to use local aggregation, they should call the window API to build a local window that means users should (or say "can") specify the window length and other information based on their needs. I think you described another idea different from us. We did not try to react after triggering some predefined threshold. We tend to give users the discretion to make decisions. Our design idea tends to reuse Flink provided concept and functions like state and window (IMO, we do not need to worry about OOM and the issues you mentioned). Best, Vino Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: > Hi, > > +1 for the idea from my side. I’ve even attempted to add similar feature > quite some time ago, but didn’t get enough traction [1]. > > I’ve read through your document and I couldn’t find it mentioning > anywhere, when the pre aggregated result should be emitted down the stream? > I think that’s one of the most crucial decision, since wrong decision here > can lead to decrease of performance or to an explosion of memory/state > consumption (both with bounded and unbounded data streams). For streaming > it can also lead to an increased latency. > > Since this is also a decision that’s impossible to make automatically > perfectly reliably, first and foremost I would expect this to be > configurable via the API. With maybe some predefined triggers, like on > watermark (for windowed operations), on checkpoint barrier (to decrease > state size?), on element count, maybe memory usage (much easier to estimate > with a known/predefined types, like in SQL)… and with some option to > implement custom trigger. > > Also what would work the best would be to have a some form of memory > consumption priority. For example if we are running out of memory for > HashJoin/Final aggregation, instead of spilling to disk or crashing the job > with OOM it would be probably better to prune/dump the pre/local > aggregation state. But that’s another story. > > [1] https://github.com/apache/flink/pull/4626 < > https://github.com/apache/flink/pull/4626> > > Piotrek > > > On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: > > > > Excited and Big +1 for this feature. > > > > SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: > > > >> Nice feature. > >> Looking forward to having it in Flink. > >> > >> Regards, > >> Xiaogang > >> > >> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: > >> > >>> Hi all, > >>> > >>> As we mentioned in some conference, such as Flink Forward SF 2019 and > >> QCon > >>> Beijing 2019, our team has implemented "Local aggregation" in our inner > >>> Flink fork. This feature can effectively alleviate data skew. > >>> > >>> Currently, keyed streams are widely used to perform aggregating > >> operations > >>> (e.g., reduce, sum and window) on the elements that having the same > key. > >>> When executed at runtime, the elements with the same key will be sent > to > >>> and aggregated by the same task. > >>> > >>> The performance of these aggregating operations is very sensitive to > the > >>> distribution of keys. In the cases where the distribution of keys > >> follows a > >>> powerful law, the performance will be significantly downgraded. More > >>> unluckily, increasing the degree of parallelism does not help when a > task > >>> is overloaded by a single key. > >>> > >>> Local aggregation is a widely-adopted method to reduce the performance > >>> degraded by data skew. We can decompose the aggregating operations into > >> two > >>> phases. In the first phase, we aggregate the elements of the same key > at > >>> the sender side to obtain partial results. Then at the second phase, > >> these > >>> partial results are sent to receivers according to their keys and are > >>> combined to obtain the final result. Since the number of partial > results > >>> received by each receiver is limited by the number of senders, the > >>> imbalance among receivers can be reduced. Besides, by reducing the > amount > >>> of transferred data the performance can be further improved. > >>> > >>> The design documentation is here: > >>> > >>> > >> > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > >>> > >>> Any comment and feedback are welcome and appreciated. > >>> > >>> Best, > >>> Vino > >>> > >> > > |
Hi all,
Cascading implemented this “map-side reduce” functionality with an LLR cache. That worked well, as then the skewed keys would always be in the cache. The API let you decide the size of the cache, in terms of number of entries. Having a memory limit would have been better for many of our use cases, though FWIR there’s no good way to estimate in-memory size for objects. — Ken > On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote: > > Hi Piotr, > > The localKeyBy API returns an instance of KeyedStream (we just added an > inner flag to identify the local mode) which is Flink has provided before. > Users can call all the APIs(especially *window* APIs) which KeyedStream > provided. > > So if users want to use local aggregation, they should call the window API > to build a local window that means users should (or say "can") specify the > window length and other information based on their needs. > > I think you described another idea different from us. We did not try to > react after triggering some predefined threshold. We tend to give users the > discretion to make decisions. > > Our design idea tends to reuse Flink provided concept and functions like > state and window (IMO, we do not need to worry about OOM and the issues you > mentioned). > > Best, > Vino > > Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: > >> Hi, >> >> +1 for the idea from my side. I’ve even attempted to add similar feature >> quite some time ago, but didn’t get enough traction [1]. >> >> I’ve read through your document and I couldn’t find it mentioning >> anywhere, when the pre aggregated result should be emitted down the stream? >> I think that’s one of the most crucial decision, since wrong decision here >> can lead to decrease of performance or to an explosion of memory/state >> consumption (both with bounded and unbounded data streams). For streaming >> it can also lead to an increased latency. >> >> Since this is also a decision that’s impossible to make automatically >> perfectly reliably, first and foremost I would expect this to be >> configurable via the API. With maybe some predefined triggers, like on >> watermark (for windowed operations), on checkpoint barrier (to decrease >> state size?), on element count, maybe memory usage (much easier to estimate >> with a known/predefined types, like in SQL)… and with some option to >> implement custom trigger. >> >> Also what would work the best would be to have a some form of memory >> consumption priority. For example if we are running out of memory for >> HashJoin/Final aggregation, instead of spilling to disk or crashing the job >> with OOM it would be probably better to prune/dump the pre/local >> aggregation state. But that’s another story. >> >> [1] https://github.com/apache/flink/pull/4626 < >> https://github.com/apache/flink/pull/4626> >> >> Piotrek >> >>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: >>> >>> Excited and Big +1 for this feature. >>> >>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: >>> >>>> Nice feature. >>>> Looking forward to having it in Flink. >>>> >>>> Regards, >>>> Xiaogang >>>> >>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: >>>> >>>>> Hi all, >>>>> >>>>> As we mentioned in some conference, such as Flink Forward SF 2019 and >>>> QCon >>>>> Beijing 2019, our team has implemented "Local aggregation" in our inner >>>>> Flink fork. This feature can effectively alleviate data skew. >>>>> >>>>> Currently, keyed streams are widely used to perform aggregating >>>> operations >>>>> (e.g., reduce, sum and window) on the elements that having the same >> key. >>>>> When executed at runtime, the elements with the same key will be sent >> to >>>>> and aggregated by the same task. >>>>> >>>>> The performance of these aggregating operations is very sensitive to >> the >>>>> distribution of keys. In the cases where the distribution of keys >>>> follows a >>>>> powerful law, the performance will be significantly downgraded. More >>>>> unluckily, increasing the degree of parallelism does not help when a >> task >>>>> is overloaded by a single key. >>>>> >>>>> Local aggregation is a widely-adopted method to reduce the performance >>>>> degraded by data skew. We can decompose the aggregating operations into >>>> two >>>>> phases. In the first phase, we aggregate the elements of the same key >> at >>>>> the sender side to obtain partial results. Then at the second phase, >>>> these >>>>> partial results are sent to receivers according to their keys and are >>>>> combined to obtain the final result. Since the number of partial >> results >>>>> received by each receiver is limited by the number of senders, the >>>>> imbalance among receivers can be reduced. Besides, by reducing the >> amount >>>>> of transferred data the performance can be further improved. >>>>> >>>>> The design documentation is here: >>>>> >>>>> >>>> >> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing >>>>> >>>>> Any comment and feedback are welcome and appreciated. >>>>> >>>>> Best, >>>>> Vino >>>>> >>>> >> >> -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
Hi Ken,
Thanks for your reply. As I said before, we try to reuse Flink's state concept (fault tolerance and guarantee "Exactly-Once" semantics). So we did not consider cache. In addition, if we use Flink's state, the OOM related issue is not a key problem we need to consider. Best, Vino Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道: > Hi all, > > Cascading implemented this “map-side reduce” functionality with an LLR > cache. > > That worked well, as then the skewed keys would always be in the cache. > > The API let you decide the size of the cache, in terms of number of > entries. > > Having a memory limit would have been better for many of our use cases, > though FWIR there’s no good way to estimate in-memory size for objects. > > — Ken > > > On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote: > > > > Hi Piotr, > > > > The localKeyBy API returns an instance of KeyedStream (we just added an > > inner flag to identify the local mode) which is Flink has provided > before. > > Users can call all the APIs(especially *window* APIs) which KeyedStream > > provided. > > > > So if users want to use local aggregation, they should call the window > API > > to build a local window that means users should (or say "can") specify > the > > window length and other information based on their needs. > > > > I think you described another idea different from us. We did not try to > > react after triggering some predefined threshold. We tend to give users > the > > discretion to make decisions. > > > > Our design idea tends to reuse Flink provided concept and functions like > > state and window (IMO, we do not need to worry about OOM and the issues > you > > mentioned). > > > > Best, > > Vino > > > > Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: > > > >> Hi, > >> > >> +1 for the idea from my side. I’ve even attempted to add similar feature > >> quite some time ago, but didn’t get enough traction [1]. > >> > >> I’ve read through your document and I couldn’t find it mentioning > >> anywhere, when the pre aggregated result should be emitted down the > stream? > >> I think that’s one of the most crucial decision, since wrong decision > here > >> can lead to decrease of performance or to an explosion of memory/state > >> consumption (both with bounded and unbounded data streams). For > streaming > >> it can also lead to an increased latency. > >> > >> Since this is also a decision that’s impossible to make automatically > >> perfectly reliably, first and foremost I would expect this to be > >> configurable via the API. With maybe some predefined triggers, like on > >> watermark (for windowed operations), on checkpoint barrier (to decrease > >> state size?), on element count, maybe memory usage (much easier to > estimate > >> with a known/predefined types, like in SQL)… and with some option to > >> implement custom trigger. > >> > >> Also what would work the best would be to have a some form of memory > >> consumption priority. For example if we are running out of memory for > >> HashJoin/Final aggregation, instead of spilling to disk or crashing the > job > >> with OOM it would be probably better to prune/dump the pre/local > >> aggregation state. But that’s another story. > >> > >> [1] https://github.com/apache/flink/pull/4626 < > >> https://github.com/apache/flink/pull/4626> > >> > >> Piotrek > >> > >>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: > >>> > >>> Excited and Big +1 for this feature. > >>> > >>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: > >>> > >>>> Nice feature. > >>>> Looking forward to having it in Flink. > >>>> > >>>> Regards, > >>>> Xiaogang > >>>> > >>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: > >>>> > >>>>> Hi all, > >>>>> > >>>>> As we mentioned in some conference, such as Flink Forward SF 2019 and > >>>> QCon > >>>>> Beijing 2019, our team has implemented "Local aggregation" in our > inner > >>>>> Flink fork. This feature can effectively alleviate data skew. > >>>>> > >>>>> Currently, keyed streams are widely used to perform aggregating > >>>> operations > >>>>> (e.g., reduce, sum and window) on the elements that having the same > >> key. > >>>>> When executed at runtime, the elements with the same key will be sent > >> to > >>>>> and aggregated by the same task. > >>>>> > >>>>> The performance of these aggregating operations is very sensitive to > >> the > >>>>> distribution of keys. In the cases where the distribution of keys > >>>> follows a > >>>>> powerful law, the performance will be significantly downgraded. More > >>>>> unluckily, increasing the degree of parallelism does not help when a > >> task > >>>>> is overloaded by a single key. > >>>>> > >>>>> Local aggregation is a widely-adopted method to reduce the > performance > >>>>> degraded by data skew. We can decompose the aggregating operations > into > >>>> two > >>>>> phases. In the first phase, we aggregate the elements of the same key > >> at > >>>>> the sender side to obtain partial results. Then at the second phase, > >>>> these > >>>>> partial results are sent to receivers according to their keys and are > >>>>> combined to obtain the final result. Since the number of partial > >> results > >>>>> received by each receiver is limited by the number of senders, the > >>>>> imbalance among receivers can be reduced. Besides, by reducing the > >> amount > >>>>> of transferred data the performance can be further improved. > >>>>> > >>>>> The design documentation is here: > >>>>> > >>>>> > >>>> > >> > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > >>>>> > >>>>> Any comment and feedback are welcome and appreciated. > >>>>> > >>>>> Best, > >>>>> Vino > >>>>> > >>>> > >> > >> > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > > |
Hi Vino,
Thanks a lot for starting this discussion. +1 to this feature as I think it will be very useful. Regarding to using window to buffer the input elements, personally I don't think it's a good solution for the following reasons: 1) As we know that WindowOperator will store the accumulated results in states, this is not necessary for Local Aggregate operator. 2) For WindowOperator, each input element will be accumulated to states. This is also not necessary for Local Aggregate operator and storing the input elements in memory is enough. Thanks, Dian > 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道: > > Hi Ken, > > Thanks for your reply. > > As I said before, we try to reuse Flink's state concept (fault tolerance > and guarantee "Exactly-Once" semantics). So we did not consider cache. > > In addition, if we use Flink's state, the OOM related issue is not a key > problem we need to consider. > > Best, > Vino > > Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道: > >> Hi all, >> >> Cascading implemented this “map-side reduce” functionality with an LLR >> cache. >> >> That worked well, as then the skewed keys would always be in the cache. >> >> The API let you decide the size of the cache, in terms of number of >> entries. >> >> Having a memory limit would have been better for many of our use cases, >> though FWIR there’s no good way to estimate in-memory size for objects. >> >> — Ken >> >>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote: >>> >>> Hi Piotr, >>> >>> The localKeyBy API returns an instance of KeyedStream (we just added an >>> inner flag to identify the local mode) which is Flink has provided >> before. >>> Users can call all the APIs(especially *window* APIs) which KeyedStream >>> provided. >>> >>> So if users want to use local aggregation, they should call the window >> API >>> to build a local window that means users should (or say "can") specify >> the >>> window length and other information based on their needs. >>> >>> I think you described another idea different from us. We did not try to >>> react after triggering some predefined threshold. We tend to give users >> the >>> discretion to make decisions. >>> >>> Our design idea tends to reuse Flink provided concept and functions like >>> state and window (IMO, we do not need to worry about OOM and the issues >> you >>> mentioned). >>> >>> Best, >>> Vino >>> >>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: >>> >>>> Hi, >>>> >>>> +1 for the idea from my side. I’ve even attempted to add similar feature >>>> quite some time ago, but didn’t get enough traction [1]. >>>> >>>> I’ve read through your document and I couldn’t find it mentioning >>>> anywhere, when the pre aggregated result should be emitted down the >> stream? >>>> I think that’s one of the most crucial decision, since wrong decision >> here >>>> can lead to decrease of performance or to an explosion of memory/state >>>> consumption (both with bounded and unbounded data streams). For >> streaming >>>> it can also lead to an increased latency. >>>> >>>> Since this is also a decision that’s impossible to make automatically >>>> perfectly reliably, first and foremost I would expect this to be >>>> configurable via the API. With maybe some predefined triggers, like on >>>> watermark (for windowed operations), on checkpoint barrier (to decrease >>>> state size?), on element count, maybe memory usage (much easier to >> estimate >>>> with a known/predefined types, like in SQL)… and with some option to >>>> implement custom trigger. >>>> >>>> Also what would work the best would be to have a some form of memory >>>> consumption priority. For example if we are running out of memory for >>>> HashJoin/Final aggregation, instead of spilling to disk or crashing the >> job >>>> with OOM it would be probably better to prune/dump the pre/local >>>> aggregation state. But that’s another story. >>>> >>>> [1] https://github.com/apache/flink/pull/4626 < >>>> https://github.com/apache/flink/pull/4626> >>>> >>>> Piotrek >>>> >>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: >>>>> >>>>> Excited and Big +1 for this feature. >>>>> >>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: >>>>> >>>>>> Nice feature. >>>>>> Looking forward to having it in Flink. >>>>>> >>>>>> Regards, >>>>>> Xiaogang >>>>>> >>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019 and >>>>>> QCon >>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our >> inner >>>>>>> Flink fork. This feature can effectively alleviate data skew. >>>>>>> >>>>>>> Currently, keyed streams are widely used to perform aggregating >>>>>> operations >>>>>>> (e.g., reduce, sum and window) on the elements that having the same >>>> key. >>>>>>> When executed at runtime, the elements with the same key will be sent >>>> to >>>>>>> and aggregated by the same task. >>>>>>> >>>>>>> The performance of these aggregating operations is very sensitive to >>>> the >>>>>>> distribution of keys. In the cases where the distribution of keys >>>>>> follows a >>>>>>> powerful law, the performance will be significantly downgraded. More >>>>>>> unluckily, increasing the degree of parallelism does not help when a >>>> task >>>>>>> is overloaded by a single key. >>>>>>> >>>>>>> Local aggregation is a widely-adopted method to reduce the >> performance >>>>>>> degraded by data skew. We can decompose the aggregating operations >> into >>>>>> two >>>>>>> phases. In the first phase, we aggregate the elements of the same key >>>> at >>>>>>> the sender side to obtain partial results. Then at the second phase, >>>>>> these >>>>>>> partial results are sent to receivers according to their keys and are >>>>>>> combined to obtain the final result. Since the number of partial >>>> results >>>>>>> received by each receiver is limited by the number of senders, the >>>>>>> imbalance among receivers can be reduced. Besides, by reducing the >>>> amount >>>>>>> of transferred data the performance can be further improved. >>>>>>> >>>>>>> The design documentation is here: >>>>>>> >>>>>>> >>>>>> >>>> >> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing >>>>>>> >>>>>>> Any comment and feedback are welcome and appreciated. >>>>>>> >>>>>>> Best, >>>>>>> Vino >>>>>>> >>>>>> >>>> >>>> >> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra >> >> |
Hi Vino,
> So if users want to use local aggregation, they should call the window API > to build a local window that means users should (or say "can") specify the > window length and other information based on their needs. It sounds ok for me. It would have to be run against some API guys from the community though. Piotrek > On 4 Jun 2019, at 10:19, Dian Fu <[hidden email]> wrote: > > Hi Vino, > > Thanks a lot for starting this discussion. +1 to this feature as I think it will be very useful. > > Regarding to using window to buffer the input elements, personally I don't think it's a good solution for the following reasons: > 1) As we know that WindowOperator will store the accumulated results in states, this is not necessary for Local Aggregate operator. > 2) For WindowOperator, each input element will be accumulated to states. This is also not necessary for Local Aggregate operator and storing the input elements in memory is enough. > > Thanks, > Dian > >> 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道: >> >> Hi Ken, >> >> Thanks for your reply. >> >> As I said before, we try to reuse Flink's state concept (fault tolerance >> and guarantee "Exactly-Once" semantics). So we did not consider cache. >> >> In addition, if we use Flink's state, the OOM related issue is not a key >> problem we need to consider. >> >> Best, >> Vino >> >> Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道: >> >>> Hi all, >>> >>> Cascading implemented this “map-side reduce” functionality with an LLR >>> cache. >>> >>> That worked well, as then the skewed keys would always be in the cache. >>> >>> The API let you decide the size of the cache, in terms of number of >>> entries. >>> >>> Having a memory limit would have been better for many of our use cases, >>> though FWIR there’s no good way to estimate in-memory size for objects. >>> >>> — Ken >>> >>>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote: >>>> >>>> Hi Piotr, >>>> >>>> The localKeyBy API returns an instance of KeyedStream (we just added an >>>> inner flag to identify the local mode) which is Flink has provided >>> before. >>>> Users can call all the APIs(especially *window* APIs) which KeyedStream >>>> provided. >>>> >>>> So if users want to use local aggregation, they should call the window >>> API >>>> to build a local window that means users should (or say "can") specify >>> the >>>> window length and other information based on their needs. >>>> >>>> I think you described another idea different from us. We did not try to >>>> react after triggering some predefined threshold. We tend to give users >>> the >>>> discretion to make decisions. >>>> >>>> Our design idea tends to reuse Flink provided concept and functions like >>>> state and window (IMO, we do not need to worry about OOM and the issues >>> you >>>> mentioned). >>>> >>>> Best, >>>> Vino >>>> >>>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: >>>> >>>>> Hi, >>>>> >>>>> +1 for the idea from my side. I’ve even attempted to add similar feature >>>>> quite some time ago, but didn’t get enough traction [1]. >>>>> >>>>> I’ve read through your document and I couldn’t find it mentioning >>>>> anywhere, when the pre aggregated result should be emitted down the >>> stream? >>>>> I think that’s one of the most crucial decision, since wrong decision >>> here >>>>> can lead to decrease of performance or to an explosion of memory/state >>>>> consumption (both with bounded and unbounded data streams). For >>> streaming >>>>> it can also lead to an increased latency. >>>>> >>>>> Since this is also a decision that’s impossible to make automatically >>>>> perfectly reliably, first and foremost I would expect this to be >>>>> configurable via the API. With maybe some predefined triggers, like on >>>>> watermark (for windowed operations), on checkpoint barrier (to decrease >>>>> state size?), on element count, maybe memory usage (much easier to >>> estimate >>>>> with a known/predefined types, like in SQL)… and with some option to >>>>> implement custom trigger. >>>>> >>>>> Also what would work the best would be to have a some form of memory >>>>> consumption priority. For example if we are running out of memory for >>>>> HashJoin/Final aggregation, instead of spilling to disk or crashing the >>> job >>>>> with OOM it would be probably better to prune/dump the pre/local >>>>> aggregation state. But that’s another story. >>>>> >>>>> [1] https://github.com/apache/flink/pull/4626 < >>>>> https://github.com/apache/flink/pull/4626> >>>>> >>>>> Piotrek >>>>> >>>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: >>>>>> >>>>>> Excited and Big +1 for this feature. >>>>>> >>>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: >>>>>> >>>>>>> Nice feature. >>>>>>> Looking forward to having it in Flink. >>>>>>> >>>>>>> Regards, >>>>>>> Xiaogang >>>>>>> >>>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: >>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019 and >>>>>>> QCon >>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our >>> inner >>>>>>>> Flink fork. This feature can effectively alleviate data skew. >>>>>>>> >>>>>>>> Currently, keyed streams are widely used to perform aggregating >>>>>>> operations >>>>>>>> (e.g., reduce, sum and window) on the elements that having the same >>>>> key. >>>>>>>> When executed at runtime, the elements with the same key will be sent >>>>> to >>>>>>>> and aggregated by the same task. >>>>>>>> >>>>>>>> The performance of these aggregating operations is very sensitive to >>>>> the >>>>>>>> distribution of keys. In the cases where the distribution of keys >>>>>>> follows a >>>>>>>> powerful law, the performance will be significantly downgraded. More >>>>>>>> unluckily, increasing the degree of parallelism does not help when a >>>>> task >>>>>>>> is overloaded by a single key. >>>>>>>> >>>>>>>> Local aggregation is a widely-adopted method to reduce the >>> performance >>>>>>>> degraded by data skew. We can decompose the aggregating operations >>> into >>>>>>> two >>>>>>>> phases. In the first phase, we aggregate the elements of the same key >>>>> at >>>>>>>> the sender side to obtain partial results. Then at the second phase, >>>>>>> these >>>>>>>> partial results are sent to receivers according to their keys and are >>>>>>>> combined to obtain the final result. Since the number of partial >>>>> results >>>>>>>> received by each receiver is limited by the number of senders, the >>>>>>>> imbalance among receivers can be reduced. Besides, by reducing the >>>>> amount >>>>>>>> of transferred data the performance can be further improved. >>>>>>>> >>>>>>>> The design documentation is here: >>>>>>>> >>>>>>>> >>>>>>> >>>>> >>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing >>>>>>>> >>>>>>>> Any comment and feedback are welcome and appreciated. >>>>>>>> >>>>>>>> Best, >>>>>>>> Vino >>>>>>>> >>>>>>> >>>>> >>>>> >>> >>> -------------------------- >>> Ken Krugler >>> +1 530-210-6378 >>> http://www.scaleunlimited.com >>> Custom big data solutions & training >>> Flink, Solr, Hadoop, Cascading & Cassandra >>> >>> > |
In reply to this post by Dian Fu-2
Hi Dian,
Thanks for your reply. I know what you mean. However, if you think deeply, you will find your implementation need to provide an operator which looks like a window operator. You need to use state and receive aggregation function and specify the trigger time. It looks like a lightweight window operator. Right? We try to reuse Flink provided functions and reduce complexity. IMO, It is more user-friendly because users are familiar with the window API. Best, Vino Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道: > Hi Vino, > > Thanks a lot for starting this discussion. +1 to this feature as I think > it will be very useful. > > Regarding to using window to buffer the input elements, personally I don't > think it's a good solution for the following reasons: > 1) As we know that WindowOperator will store the accumulated results in > states, this is not necessary for Local Aggregate operator. > 2) For WindowOperator, each input element will be accumulated to states. > This is also not necessary for Local Aggregate operator and storing the > input elements in memory is enough. > > Thanks, > Dian > > > 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道: > > > > Hi Ken, > > > > Thanks for your reply. > > > > As I said before, we try to reuse Flink's state concept (fault tolerance > > and guarantee "Exactly-Once" semantics). So we did not consider cache. > > > > In addition, if we use Flink's state, the OOM related issue is not a key > > problem we need to consider. > > > > Best, > > Vino > > > > Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道: > > > >> Hi all, > >> > >> Cascading implemented this “map-side reduce” functionality with an LLR > >> cache. > >> > >> That worked well, as then the skewed keys would always be in the cache. > >> > >> The API let you decide the size of the cache, in terms of number of > >> entries. > >> > >> Having a memory limit would have been better for many of our use cases, > >> though FWIR there’s no good way to estimate in-memory size for objects. > >> > >> — Ken > >> > >>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote: > >>> > >>> Hi Piotr, > >>> > >>> The localKeyBy API returns an instance of KeyedStream (we just added an > >>> inner flag to identify the local mode) which is Flink has provided > >> before. > >>> Users can call all the APIs(especially *window* APIs) which KeyedStream > >>> provided. > >>> > >>> So if users want to use local aggregation, they should call the window > >> API > >>> to build a local window that means users should (or say "can") specify > >> the > >>> window length and other information based on their needs. > >>> > >>> I think you described another idea different from us. We did not try to > >>> react after triggering some predefined threshold. We tend to give users > >> the > >>> discretion to make decisions. > >>> > >>> Our design idea tends to reuse Flink provided concept and functions > like > >>> state and window (IMO, we do not need to worry about OOM and the issues > >> you > >>> mentioned). > >>> > >>> Best, > >>> Vino > >>> > >>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: > >>> > >>>> Hi, > >>>> > >>>> +1 for the idea from my side. I’ve even attempted to add similar > feature > >>>> quite some time ago, but didn’t get enough traction [1]. > >>>> > >>>> I’ve read through your document and I couldn’t find it mentioning > >>>> anywhere, when the pre aggregated result should be emitted down the > >> stream? > >>>> I think that’s one of the most crucial decision, since wrong decision > >> here > >>>> can lead to decrease of performance or to an explosion of memory/state > >>>> consumption (both with bounded and unbounded data streams). For > >> streaming > >>>> it can also lead to an increased latency. > >>>> > >>>> Since this is also a decision that’s impossible to make automatically > >>>> perfectly reliably, first and foremost I would expect this to be > >>>> configurable via the API. With maybe some predefined triggers, like on > >>>> watermark (for windowed operations), on checkpoint barrier (to > decrease > >>>> state size?), on element count, maybe memory usage (much easier to > >> estimate > >>>> with a known/predefined types, like in SQL)… and with some option to > >>>> implement custom trigger. > >>>> > >>>> Also what would work the best would be to have a some form of memory > >>>> consumption priority. For example if we are running out of memory for > >>>> HashJoin/Final aggregation, instead of spilling to disk or crashing > the > >> job > >>>> with OOM it would be probably better to prune/dump the pre/local > >>>> aggregation state. But that’s another story. > >>>> > >>>> [1] https://github.com/apache/flink/pull/4626 < > >>>> https://github.com/apache/flink/pull/4626> > >>>> > >>>> Piotrek > >>>> > >>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: > >>>>> > >>>>> Excited and Big +1 for this feature. > >>>>> > >>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: > >>>>> > >>>>>> Nice feature. > >>>>>> Looking forward to having it in Flink. > >>>>>> > >>>>>> Regards, > >>>>>> Xiaogang > >>>>>> > >>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: > >>>>>> > >>>>>>> Hi all, > >>>>>>> > >>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019 > and > >>>>>> QCon > >>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our > >> inner > >>>>>>> Flink fork. This feature can effectively alleviate data skew. > >>>>>>> > >>>>>>> Currently, keyed streams are widely used to perform aggregating > >>>>>> operations > >>>>>>> (e.g., reduce, sum and window) on the elements that having the same > >>>> key. > >>>>>>> When executed at runtime, the elements with the same key will be > sent > >>>> to > >>>>>>> and aggregated by the same task. > >>>>>>> > >>>>>>> The performance of these aggregating operations is very sensitive > to > >>>> the > >>>>>>> distribution of keys. In the cases where the distribution of keys > >>>>>> follows a > >>>>>>> powerful law, the performance will be significantly downgraded. > More > >>>>>>> unluckily, increasing the degree of parallelism does not help when > a > >>>> task > >>>>>>> is overloaded by a single key. > >>>>>>> > >>>>>>> Local aggregation is a widely-adopted method to reduce the > >> performance > >>>>>>> degraded by data skew. We can decompose the aggregating operations > >> into > >>>>>> two > >>>>>>> phases. In the first phase, we aggregate the elements of the same > key > >>>> at > >>>>>>> the sender side to obtain partial results. Then at the second > phase, > >>>>>> these > >>>>>>> partial results are sent to receivers according to their keys and > are > >>>>>>> combined to obtain the final result. Since the number of partial > >>>> results > >>>>>>> received by each receiver is limited by the number of senders, the > >>>>>>> imbalance among receivers can be reduced. Besides, by reducing the > >>>> amount > >>>>>>> of transferred data the performance can be further improved. > >>>>>>> > >>>>>>> The design documentation is here: > >>>>>>> > >>>>>>> > >>>>>> > >>>> > >> > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > >>>>>>> > >>>>>>> Any comment and feedback are welcome and appreciated. > >>>>>>> > >>>>>>> Best, > >>>>>>> Vino > >>>>>>> > >>>>>> > >>>> > >>>> > >> > >> -------------------------- > >> Ken Krugler > >> +1 530-210-6378 > >> http://www.scaleunlimited.com > >> Custom big data solutions & training > >> Flink, Solr, Hadoop, Cascading & Cassandra > >> > >> > > |
Hi Vino,
It may seem similar to window operator but there are also a few key differences. For example, the local aggregate operator can send out the results at any time and the window operator can only send out the results at the end of window (without early fire). This means that the local aggregate operator can send out the results not only when the trigger time is reached, but also when the memory is exhausted. This difference makes optimization available as it means that the local aggregate operator rarely need to operate the state. I admit that window operator can solve part of the problem (the data skew) and just wonder if we can do more. Using window operator at present seems OK for me as it can indeed solve part of the problems. We just need to think a little more in the design and make sure that the current solution is consistent with future optimizations. Thanks, Dian 在 2019年6月4日,下午5:22,vino yang <[hidden email]> 写道: Hi Dian, Thanks for your reply. I know what you mean. However, if you think deeply, you will find your implementation need to provide an operator which looks like a window operator. You need to use state and receive aggregation function and specify the trigger time. It looks like a lightweight window operator. Right? We try to reuse Flink provided functions and reduce complexity. IMO, It is more user-friendly because users are familiar with the window API. Best, Vino Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道: Hi Vino, Thanks a lot for starting this discussion. +1 to this feature as I think it will be very useful. Regarding to using window to buffer the input elements, personally I don't think it's a good solution for the following reasons: 1) As we know that WindowOperator will store the accumulated results in states, this is not necessary for Local Aggregate operator. 2) For WindowOperator, each input element will be accumulated to states. This is also not necessary for Local Aggregate operator and storing the input elements in memory is enough. Thanks, Dian 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道: Hi Ken, Thanks for your reply. As I said before, we try to reuse Flink's state concept (fault tolerance and guarantee "Exactly-Once" semantics). So we did not consider cache. In addition, if we use Flink's state, the OOM related issue is not a key problem we need to consider. Best, Vino Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道: Hi all, Cascading implemented this “map-side reduce” functionality with an LLR cache. That worked well, as then the skewed keys would always be in the cache. The API let you decide the size of the cache, in terms of number of entries. Having a memory limit would have been better for many of our use cases, though FWIR there’s no good way to estimate in-memory size for objects. — Ken On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote: Hi Piotr, The localKeyBy API returns an instance of KeyedStream (we just added an inner flag to identify the local mode) which is Flink has provided before. Users can call all the APIs(especially *window* APIs) which KeyedStream provided. So if users want to use local aggregation, they should call the window API to build a local window that means users should (or say "can") specify the window length and other information based on their needs. I think you described another idea different from us. We did not try to react after triggering some predefined threshold. We tend to give users the discretion to make decisions. Our design idea tends to reuse Flink provided concept and functions like state and window (IMO, we do not need to worry about OOM and the issues you mentioned). Best, Vino Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: Hi, +1 for the idea from my side. I’ve even attempted to add similar feature quite some time ago, but didn’t get enough traction [1]. I’ve read through your document and I couldn’t find it mentioning anywhere, when the pre aggregated result should be emitted down the stream? I think that’s one of the most crucial decision, since wrong decision here can lead to decrease of performance or to an explosion of memory/state consumption (both with bounded and unbounded data streams). For streaming it can also lead to an increased latency. Since this is also a decision that’s impossible to make automatically perfectly reliably, first and foremost I would expect this to be configurable via the API. With maybe some predefined triggers, like on watermark (for windowed operations), on checkpoint barrier (to decrease state size?), on element count, maybe memory usage (much easier to estimate with a known/predefined types, like in SQL)… and with some option to implement custom trigger. Also what would work the best would be to have a some form of memory consumption priority. For example if we are running out of memory for HashJoin/Final aggregation, instead of spilling to disk or crashing the job with OOM it would be probably better to prune/dump the pre/local aggregation state. But that’s another story. [1] https://github.com/apache/flink/pull/4626 < https://github.com/apache/flink/pull/4626> Piotrek On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: Excited and Big +1 for this feature. SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: Nice feature. Looking forward to having it in Flink. Regards, Xiaogang vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: Hi all, As we mentioned in some conference, such as Flink Forward SF 2019 and QCon Beijing 2019, our team has implemented "Local aggregation" in our inner Flink fork. This feature can effectively alleviate data skew. Currently, keyed streams are widely used to perform aggregating operations (e.g., reduce, sum and window) on the elements that having the same key. When executed at runtime, the elements with the same key will be sent to and aggregated by the same task. The performance of these aggregating operations is very sensitive to the distribution of keys. In the cases where the distribution of keys follows a powerful law, the performance will be significantly downgraded. More unluckily, increasing the degree of parallelism does not help when a task is overloaded by a single key. Local aggregation is a widely-adopted method to reduce the performance degraded by data skew. We can decompose the aggregating operations into two phases. In the first phase, we aggregate the elements of the same key at the sender side to obtain partial results. Then at the second phase, these partial results are sent to receivers according to their keys and are combined to obtain the final result. Since the number of partial results received by each receiver is limited by the number of senders, the imbalance among receivers can be reduced. Besides, by reducing the amount of transferred data the performance can be further improved. The design documentation is here: https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing Any comment and feedback are welcome and appreciated. Best, Vino -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
In reply to this post by vino yang
Hi Vino,
I have read your design,something I want to know is the usage of these new APIs.It looks like when I use localByKey,i must then use a window operator to return a datastream,and then use keyby and another window operator to get the final result? thanks, Litree On 06/04/2019 17:22, vino yang wrote: Hi Dian, Thanks for your reply. I know what you mean. However, if you think deeply, you will find your implementation need to provide an operator which looks like a window operator. You need to use state and receive aggregation function and specify the trigger time. It looks like a lightweight window operator. Right? We try to reuse Flink provided functions and reduce complexity. IMO, It is more user-friendly because users are familiar with the window API. Best, Vino Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道: > Hi Vino, > > Thanks a lot for starting this discussion. +1 to this feature as I think > it will be very useful. > > Regarding to using window to buffer the input elements, personally I don't > think it's a good solution for the following reasons: > 1) As we know that WindowOperator will store the accumulated results in > states, this is not necessary for Local Aggregate operator. > 2) For WindowOperator, each input element will be accumulated to states. > This is also not necessary for Local Aggregate operator and storing the > input elements in memory is enough. > > Thanks, > Dian > > > 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道: > > > > Hi Ken, > > > > Thanks for your reply. > > > > As I said before, we try to reuse Flink's state concept (fault tolerance > > and guarantee "Exactly-Once" semantics). So we did not consider cache. > > > > In addition, if we use Flink's state, the OOM related issue is not a key > > problem we need to consider. > > > > Best, > > Vino > > > > Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道: > > > >> Hi all, > >> > >> Cascading implemented this “map-side reduce” functionality with an LLR > >> cache. > >> > >> That worked well, as then the skewed keys would always be in the cache. > >> > >> The API let you decide the size of the cache, in terms of number of > >> entries. > >> > >> Having a memory limit would have been better for many of our use cases, > >> though FWIR there’s no good way to estimate in-memory size for objects. > >> > >> — Ken > >> > >>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote: > >>> > >>> Hi Piotr, > >>> > >>> The localKeyBy API returns an instance of KeyedStream (we just added an > >>> inner flag to identify the local mode) which is Flink has provided > >> before. > >>> Users can call all the APIs(especially *window* APIs) which KeyedStream > >>> provided. > >>> > >>> So if users want to use local aggregation, they should call the window > >> API > >>> to build a local window that means users should (or say "can") specify > >> the > >>> window length and other information based on their needs. > >>> > >>> I think you described another idea different from us. We did not try to > >>> react after triggering some predefined threshold. We tend to give users > >> the > >>> discretion to make decisions. > >>> > >>> Our design idea tends to reuse Flink provided concept and functions > like > >>> state and window (IMO, we do not need to worry about OOM and the issues > >> you > >>> mentioned). > >>> > >>> Best, > >>> Vino > >>> > >>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: > >>> > >>>> Hi, > >>>> > >>>> +1 for the idea from my side. I’ve even attempted to add similar > feature > >>>> quite some time ago, but didn’t get enough traction [1]. > >>>> > >>>> I’ve read through your document and I couldn’t find it mentioning > >>>> anywhere, when the pre aggregated result should be emitted down the > >> stream? > >>>> I think that’s one of the most crucial decision, since wrong decision > >> here > >>>> can lead to decrease of performance or to an explosion of memory/state > >>>> consumption (both with bounded and unbounded data streams). For > >> streaming > >>>> it can also lead to an increased latency. > >>>> > >>>> Since this is also a decision that’s impossible to make automatically > >>>> perfectly reliably, first and foremost I would expect this to be > >>>> configurable via the API. With maybe some predefined triggers, like on > >>>> watermark (for windowed operations), on checkpoint barrier (to > decrease > >>>> state size?), on element count, maybe memory usage (much easier to > >> estimate > >>>> with a known/predefined types, like in SQL)… and with some option to > >>>> implement custom trigger. > >>>> > >>>> Also what would work the best would be to have a some form of memory > >>>> consumption priority. For example if we are running out of memory for > >>>> HashJoin/Final aggregation, instead of spilling to disk or crashing > the > >> job > >>>> with OOM it would be probably better to prune/dump the pre/local > >>>> aggregation state. But that’s another story. > >>>> > >>>> [1] https://github.com/apache/flink/pull/4626 < > >>>> https://github.com/apache/flink/pull/4626> > >>>> > >>>> Piotrek > >>>> > >>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: > >>>>> > >>>>> Excited and Big +1 for this feature. > >>>>> > >>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: > >>>>> > >>>>>> Nice feature. > >>>>>> Looking forward to having it in Flink. > >>>>>> > >>>>>> Regards, > >>>>>> Xiaogang > >>>>>> > >>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: > >>>>>> > >>>>>>> Hi all, > >>>>>>> > >>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019 > and > >>>>>> QCon > >>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our > >> inner > >>>>>>> Flink fork. This feature can effectively alleviate data skew. > >>>>>>> > >>>>>>> Currently, keyed streams are widely used to perform aggregating > >>>>>> operations > >>>>>>> (e.g., reduce, sum and window) on the elements that having the same > >>>> key. > >>>>>>> When executed at runtime, the elements with the same key will be > sent > >>>> to > >>>>>>> and aggregated by the same task. > >>>>>>> > >>>>>>> The performance of these aggregating operations is very sensitive > to > >>>> the > >>>>>>> distribution of keys. In the cases where the distribution of keys > >>>>>> follows a > >>>>>>> powerful law, the performance will be significantly downgraded. > More > >>>>>>> unluckily, increasing the degree of parallelism does not help when > a > >>>> task > >>>>>>> is overloaded by a single key. > >>>>>>> > >>>>>>> Local aggregation is a widely-adopted method to reduce the > >> performance > >>>>>>> degraded by data skew. We can decompose the aggregating operations > >> into > >>>>>> two > >>>>>>> phases. In the first phase, we aggregate the elements of the same > key > >>>> at > >>>>>>> the sender side to obtain partial results. Then at the second > phase, > >>>>>> these > >>>>>>> partial results are sent to receivers according to their keys and > are > >>>>>>> combined to obtain the final result. Since the number of partial > >>>> results > >>>>>>> received by each receiver is limited by the number of senders, the > >>>>>>> imbalance among receivers can be reduced. Besides, by reducing the > >>>> amount > >>>>>>> of transferred data the performance can be further improved. > >>>>>>> > >>>>>>> The design documentation is here: > >>>>>>> > >>>>>>> > >>>>>> > >>>> > >> > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > >>>>>>> > >>>>>>> Any comment and feedback are welcome and appreciated. > >>>>>>> > >>>>>>> Best, > >>>>>>> Vino > >>>>>>> > >>>>>> > >>>> > >>>> > >> > >> -------------------------- > >> Ken Krugler > >> +1 530-210-6378 > >> http://www.scaleunlimited.com > >> Custom big data solutions & training > >> Flink, Solr, Hadoop, Cascading & Cassandra > >> > >> > > |
In reply to this post by Dian Fu-2
Hi Dian,
The different opinion is fine for me, If there is a better solution or there are obvious deficiencies in our design, we are very happy to accept and improve it. I agree with you that customized local aggregate operator is more scalable in the way of the trigger mechanism. However, I have two questions about your reply. 1) When, Why and How to judge the memory is exhausted? IMO, the operator is in a high abstract level, when implementing we should not care about the memory is exhausted. 2) If the local aggregate operator rarely needs to operate the state, what do you think about fault tolerance? We reuse Flink's state concept because we can get the benefit from the fault tolerance. We need to guarantee correctness semantics. Best, Vino Dian Fu <[hidden email]> 于2019年6月4日周二 下午10:31写道: > Hi Vino, > > It may seem similar to window operator but there are also a few key > differences. For example, the local aggregate operator can send out the > results at any time and the window operator can only send out the results > at the end of window (without early fire). This means that the local > aggregate operator can send out the results not only when the trigger time > is reached, but also when the memory is exhausted. This difference makes > optimization available as it means that the local aggregate operator rarely > need to operate the state. > > I admit that window operator can solve part of the problem (the data skew) > and just wonder if we can do more. Using window operator at present seems > OK for me as it can indeed solve part of the problems. We just need to > think a little more in the design and make sure that the current solution > is consistent with future optimizations. > > Thanks, > > Dian > > 在 2019年6月4日,下午5:22,vino yang <[hidden email]> 写道: > > Hi Dian, > > Thanks for your reply. > > I know what you mean. However, if you think deeply, you will find your > implementation need to provide an operator which looks like a window > operator. You need to use state and receive aggregation function and > specify the trigger time. It looks like a lightweight window operator. > Right? > > We try to reuse Flink provided functions and reduce complexity. IMO, It is > more user-friendly because users are familiar with the window API. > > Best, > Vino > > > Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道: > > Hi Vino, > > Thanks a lot for starting this discussion. +1 to this feature as I think > it will be very useful. > > Regarding to using window to buffer the input elements, personally I don't > think it's a good solution for the following reasons: > 1) As we know that WindowOperator will store the accumulated results in > states, this is not necessary for Local Aggregate operator. > 2) For WindowOperator, each input element will be accumulated to states. > This is also not necessary for Local Aggregate operator and storing the > input elements in memory is enough. > > Thanks, > Dian > > 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道: > > Hi Ken, > > Thanks for your reply. > > As I said before, we try to reuse Flink's state concept (fault tolerance > and guarantee "Exactly-Once" semantics). So we did not consider cache. > > In addition, if we use Flink's state, the OOM related issue is not a key > problem we need to consider. > > Best, > Vino > > Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道: > > Hi all, > > Cascading implemented this “map-side reduce” functionality with an LLR > cache. > > That worked well, as then the skewed keys would always be in the cache. > > The API let you decide the size of the cache, in terms of number of > entries. > > Having a memory limit would have been better for many of our use cases, > though FWIR there’s no good way to estimate in-memory size for objects. > > — Ken > > On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote: > > Hi Piotr, > > The localKeyBy API returns an instance of KeyedStream (we just added an > inner flag to identify the local mode) which is Flink has provided > > before. > > Users can call all the APIs(especially *window* APIs) which KeyedStream > provided. > > So if users want to use local aggregation, they should call the window > > API > > to build a local window that means users should (or say "can") specify > > the > > window length and other information based on their needs. > > I think you described another idea different from us. We did not try to > react after triggering some predefined threshold. We tend to give users > > the > > discretion to make decisions. > > Our design idea tends to reuse Flink provided concept and functions > > like > > state and window (IMO, we do not need to worry about OOM and the issues > > you > > mentioned). > > Best, > Vino > > Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: > > Hi, > > +1 for the idea from my side. I’ve even attempted to add similar > > feature > > quite some time ago, but didn’t get enough traction [1]. > > I’ve read through your document and I couldn’t find it mentioning > anywhere, when the pre aggregated result should be emitted down the > > stream? > > I think that’s one of the most crucial decision, since wrong decision > > here > > can lead to decrease of performance or to an explosion of memory/state > consumption (both with bounded and unbounded data streams). For > > streaming > > it can also lead to an increased latency. > > Since this is also a decision that’s impossible to make automatically > perfectly reliably, first and foremost I would expect this to be > configurable via the API. With maybe some predefined triggers, like on > watermark (for windowed operations), on checkpoint barrier (to > > decrease > > state size?), on element count, maybe memory usage (much easier to > > estimate > > with a known/predefined types, like in SQL)… and with some option to > implement custom trigger. > > Also what would work the best would be to have a some form of memory > consumption priority. For example if we are running out of memory for > HashJoin/Final aggregation, instead of spilling to disk or crashing > > the > > job > > with OOM it would be probably better to prune/dump the pre/local > aggregation state. But that’s another story. > > [1] https://github.com/apache/flink/pull/4626 < > https://github.com/apache/flink/pull/4626> > > Piotrek > > On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: > > Excited and Big +1 for this feature. > > SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: > > Nice feature. > Looking forward to having it in Flink. > > Regards, > Xiaogang > > vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: > > Hi all, > > As we mentioned in some conference, such as Flink Forward SF 2019 > > and > > QCon > > Beijing 2019, our team has implemented "Local aggregation" in our > > inner > > Flink fork. This feature can effectively alleviate data skew. > > Currently, keyed streams are widely used to perform aggregating > > operations > > (e.g., reduce, sum and window) on the elements that having the same > > key. > > When executed at runtime, the elements with the same key will be > > sent > > to > > and aggregated by the same task. > > The performance of these aggregating operations is very sensitive > > to > > the > > distribution of keys. In the cases where the distribution of keys > > follows a > > powerful law, the performance will be significantly downgraded. > > More > > unluckily, increasing the degree of parallelism does not help when > > a > > task > > is overloaded by a single key. > > Local aggregation is a widely-adopted method to reduce the > > performance > > degraded by data skew. We can decompose the aggregating operations > > into > > two > > phases. In the first phase, we aggregate the elements of the same > > key > > at > > the sender side to obtain partial results. Then at the second > > phase, > > these > > partial results are sent to receivers according to their keys and > > are > > combined to obtain the final result. Since the number of partial > > results > > received by each receiver is limited by the number of senders, the > imbalance among receivers can be reduced. Besides, by reducing the > > amount > > of transferred data the performance can be further improved. > > The design documentation is here: > > > > > > > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > > > Any comment and feedback are welcome and appreciated. > > Best, > Vino > > > > > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > |
In reply to this post by litree
Hi Litree,
From an implementation level, the localKeyBy API returns a general KeyedStream, you can call all the APIs which KeyedStream provides, we did not restrict its usage, although we can do this (for example returns a new stream object named LocalKeyedStream). However, to achieve the goal of local aggregation, it only makes sense to call the window API. Best, Vino litree <[hidden email]> 于2019年6月4日周二 下午10:41写道: > Hi Vino, > > > I have read your design,something I want to know is the usage of these new > APIs.It looks like when I use localByKey,i must then use a window operator > to return a datastream,and then use keyby and another window operator to > get the final result? > > > thanks, > Litree > > > On 06/04/2019 17:22, vino yang wrote: > Hi Dian, > > Thanks for your reply. > > I know what you mean. However, if you think deeply, you will find your > implementation need to provide an operator which looks like a window > operator. You need to use state and receive aggregation function and > specify the trigger time. It looks like a lightweight window operator. > Right? > > We try to reuse Flink provided functions and reduce complexity. IMO, It is > more user-friendly because users are familiar with the window API. > > Best, > Vino > > > Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道: > > > Hi Vino, > > > > Thanks a lot for starting this discussion. +1 to this feature as I think > > it will be very useful. > > > > Regarding to using window to buffer the input elements, personally I > don't > > think it's a good solution for the following reasons: > > 1) As we know that WindowOperator will store the accumulated results in > > states, this is not necessary for Local Aggregate operator. > > 2) For WindowOperator, each input element will be accumulated to states. > > This is also not necessary for Local Aggregate operator and storing the > > input elements in memory is enough. > > > > Thanks, > > Dian > > > > > 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道: > > > > > > Hi Ken, > > > > > > Thanks for your reply. > > > > > > As I said before, we try to reuse Flink's state concept (fault > tolerance > > > and guarantee "Exactly-Once" semantics). So we did not consider cache. > > > > > > In addition, if we use Flink's state, the OOM related issue is not a > key > > > problem we need to consider. > > > > > > Best, > > > Vino > > > > > > Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道: > > > > > >> Hi all, > > >> > > >> Cascading implemented this “map-side reduce” functionality with an LLR > > >> cache. > > >> > > >> That worked well, as then the skewed keys would always be in the > cache. > > >> > > >> The API let you decide the size of the cache, in terms of number of > > >> entries. > > >> > > >> Having a memory limit would have been better for many of our use > cases, > > >> though FWIR there’s no good way to estimate in-memory size for > objects. > > >> > > >> — Ken > > >> > > >>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote: > > >>> > > >>> Hi Piotr, > > >>> > > >>> The localKeyBy API returns an instance of KeyedStream (we just added > an > > >>> inner flag to identify the local mode) which is Flink has provided > > >> before. > > >>> Users can call all the APIs(especially *window* APIs) which > KeyedStream > > >>> provided. > > >>> > > >>> So if users want to use local aggregation, they should call the > window > > >> API > > >>> to build a local window that means users should (or say "can") > specify > > >> the > > >>> window length and other information based on their needs. > > >>> > > >>> I think you described another idea different from us. We did not try > to > > >>> react after triggering some predefined threshold. We tend to give > users > > >> the > > >>> discretion to make decisions. > > >>> > > >>> Our design idea tends to reuse Flink provided concept and functions > > like > > >>> state and window (IMO, we do not need to worry about OOM and the > issues > > >> you > > >>> mentioned). > > >>> > > >>> Best, > > >>> Vino > > >>> > > >>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: > > >>> > > >>>> Hi, > > >>>> > > >>>> +1 for the idea from my side. I’ve even attempted to add similar > > feature > > >>>> quite some time ago, but didn’t get enough traction [1]. > > >>>> > > >>>> I’ve read through your document and I couldn’t find it mentioning > > >>>> anywhere, when the pre aggregated result should be emitted down the > > >> stream? > > >>>> I think that’s one of the most crucial decision, since wrong > decision > > >> here > > >>>> can lead to decrease of performance or to an explosion of > memory/state > > >>>> consumption (both with bounded and unbounded data streams). For > > >> streaming > > >>>> it can also lead to an increased latency. > > >>>> > > >>>> Since this is also a decision that’s impossible to make > automatically > > >>>> perfectly reliably, first and foremost I would expect this to be > > >>>> configurable via the API. With maybe some predefined triggers, like > on > > >>>> watermark (for windowed operations), on checkpoint barrier (to > > decrease > > >>>> state size?), on element count, maybe memory usage (much easier to > > >> estimate > > >>>> with a known/predefined types, like in SQL)… and with some option to > > >>>> implement custom trigger. > > >>>> > > >>>> Also what would work the best would be to have a some form of memory > > >>>> consumption priority. For example if we are running out of memory > for > > >>>> HashJoin/Final aggregation, instead of spilling to disk or crashing > > the > > >> job > > >>>> with OOM it would be probably better to prune/dump the pre/local > > >>>> aggregation state. But that’s another story. > > >>>> > > >>>> [1] https://github.com/apache/flink/pull/4626 < > > >>>> https://github.com/apache/flink/pull/4626> > > >>>> > > >>>> Piotrek > > >>>> > > >>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: > > >>>>> > > >>>>> Excited and Big +1 for this feature. > > >>>>> > > >>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: > > >>>>> > > >>>>>> Nice feature. > > >>>>>> Looking forward to having it in Flink. > > >>>>>> > > >>>>>> Regards, > > >>>>>> Xiaogang > > >>>>>> > > >>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: > > >>>>>> > > >>>>>>> Hi all, > > >>>>>>> > > >>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019 > > and > > >>>>>> QCon > > >>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our > > >> inner > > >>>>>>> Flink fork. This feature can effectively alleviate data skew. > > >>>>>>> > > >>>>>>> Currently, keyed streams are widely used to perform aggregating > > >>>>>> operations > > >>>>>>> (e.g., reduce, sum and window) on the elements that having the > same > > >>>> key. > > >>>>>>> When executed at runtime, the elements with the same key will be > > sent > > >>>> to > > >>>>>>> and aggregated by the same task. > > >>>>>>> > > >>>>>>> The performance of these aggregating operations is very sensitive > > to > > >>>> the > > >>>>>>> distribution of keys. In the cases where the distribution of keys > > >>>>>> follows a > > >>>>>>> powerful law, the performance will be significantly downgraded. > > More > > >>>>>>> unluckily, increasing the degree of parallelism does not help > when > > a > > >>>> task > > >>>>>>> is overloaded by a single key. > > >>>>>>> > > >>>>>>> Local aggregation is a widely-adopted method to reduce the > > >> performance > > >>>>>>> degraded by data skew. We can decompose the aggregating > operations > > >> into > > >>>>>> two > > >>>>>>> phases. In the first phase, we aggregate the elements of the same > > key > > >>>> at > > >>>>>>> the sender side to obtain partial results. Then at the second > > phase, > > >>>>>> these > > >>>>>>> partial results are sent to receivers according to their keys and > > are > > >>>>>>> combined to obtain the final result. Since the number of partial > > >>>> results > > >>>>>>> received by each receiver is limited by the number of senders, > the > > >>>>>>> imbalance among receivers can be reduced. Besides, by reducing > the > > >>>> amount > > >>>>>>> of transferred data the performance can be further improved. > > >>>>>>> > > >>>>>>> The design documentation is here: > > >>>>>>> > > >>>>>>> > > >>>>>> > > >>>> > > >> > > > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > > >>>>>>> > > >>>>>>> Any comment and feedback are welcome and appreciated. > > >>>>>>> > > >>>>>>> Best, > > >>>>>>> Vino > > >>>>>>> > > >>>>>> > > >>>> > > >>>> > > >> > > >> -------------------------- > > >> Ken Krugler > > >> +1 530-210-6378 > > >> http://www.scaleunlimited.com > > >> Custom big data solutions & training > > >> Flink, Solr, Hadoop, Cascading & Cassandra > > >> > > >> > > > > > |
Hi Vino,
+1 for this feature. It's useful for data skew. And it could also reduce shuffled datum. I have some concerns about the API part. From my side, this feature should be more like an improvement. I'm afraid the proposal is an overkill about the API part. Many other systems support pre-aggregation as an optimization of global aggregation. The optimization might be used automatically or manually but with a simple API. The proposal introduces a series of flexible local aggregation APIs. They could be independent with global aggregation. It doesn't look like an improvement but introduces a lot of features. I'm not sure if there is a bigger picture later. As for now the API part looks a little heavy for me. vino yang <[hidden email]> 于2019年6月5日周三 上午10:38写道: > Hi Litree, > > From an implementation level, the localKeyBy API returns a general > KeyedStream, you can call all the APIs which KeyedStream provides, we did > not restrict its usage, although we can do this (for example returns a new > stream object named LocalKeyedStream). > > However, to achieve the goal of local aggregation, it only makes sense to > call the window API. > > Best, > Vino > > litree <[hidden email]> 于2019年6月4日周二 下午10:41写道: > > > Hi Vino, > > > > > > I have read your design,something I want to know is the usage of these > new > > APIs.It looks like when I use localByKey,i must then use a window > operator > > to return a datastream,and then use keyby and another window operator to > > get the final result? > > > > > > thanks, > > Litree > > > > > > On 06/04/2019 17:22, vino yang wrote: > > Hi Dian, > > > > Thanks for your reply. > > > > I know what you mean. However, if you think deeply, you will find your > > implementation need to provide an operator which looks like a window > > operator. You need to use state and receive aggregation function and > > specify the trigger time. It looks like a lightweight window operator. > > Right? > > > > We try to reuse Flink provided functions and reduce complexity. IMO, It > is > > more user-friendly because users are familiar with the window API. > > > > Best, > > Vino > > > > > > Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道: > > > > > Hi Vino, > > > > > > Thanks a lot for starting this discussion. +1 to this feature as I > think > > > it will be very useful. > > > > > > Regarding to using window to buffer the input elements, personally I > > don't > > > think it's a good solution for the following reasons: > > > 1) As we know that WindowOperator will store the accumulated results in > > > states, this is not necessary for Local Aggregate operator. > > > 2) For WindowOperator, each input element will be accumulated to > states. > > > This is also not necessary for Local Aggregate operator and storing the > > > input elements in memory is enough. > > > > > > Thanks, > > > Dian > > > > > > > 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道: > > > > > > > > Hi Ken, > > > > > > > > Thanks for your reply. > > > > > > > > As I said before, we try to reuse Flink's state concept (fault > > tolerance > > > > and guarantee "Exactly-Once" semantics). So we did not consider > cache. > > > > > > > > In addition, if we use Flink's state, the OOM related issue is not a > > key > > > > problem we need to consider. > > > > > > > > Best, > > > > Vino > > > > > > > > Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道: > > > > > > > >> Hi all, > > > >> > > > >> Cascading implemented this “map-side reduce” functionality with an > LLR > > > >> cache. > > > >> > > > >> That worked well, as then the skewed keys would always be in the > > cache. > > > >> > > > >> The API let you decide the size of the cache, in terms of number of > > > >> entries. > > > >> > > > >> Having a memory limit would have been better for many of our use > > cases, > > > >> though FWIR there’s no good way to estimate in-memory size for > > objects. > > > >> > > > >> — Ken > > > >> > > > >>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> > wrote: > > > >>> > > > >>> Hi Piotr, > > > >>> > > > >>> The localKeyBy API returns an instance of KeyedStream (we just > added > > an > > > >>> inner flag to identify the local mode) which is Flink has provided > > > >> before. > > > >>> Users can call all the APIs(especially *window* APIs) which > > KeyedStream > > > >>> provided. > > > >>> > > > >>> So if users want to use local aggregation, they should call the > > window > > > >> API > > > >>> to build a local window that means users should (or say "can") > > specify > > > >> the > > > >>> window length and other information based on their needs. > > > >>> > > > >>> I think you described another idea different from us. We did not > try > > to > > > >>> react after triggering some predefined threshold. We tend to give > > users > > > >> the > > > >>> discretion to make decisions. > > > >>> > > > >>> Our design idea tends to reuse Flink provided concept and functions > > > like > > > >>> state and window (IMO, we do not need to worry about OOM and the > > issues > > > >> you > > > >>> mentioned). > > > >>> > > > >>> Best, > > > >>> Vino > > > >>> > > > >>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: > > > >>> > > > >>>> Hi, > > > >>>> > > > >>>> +1 for the idea from my side. I’ve even attempted to add similar > > > feature > > > >>>> quite some time ago, but didn’t get enough traction [1]. > > > >>>> > > > >>>> I’ve read through your document and I couldn’t find it mentioning > > > >>>> anywhere, when the pre aggregated result should be emitted down > the > > > >> stream? > > > >>>> I think that’s one of the most crucial decision, since wrong > > decision > > > >> here > > > >>>> can lead to decrease of performance or to an explosion of > > memory/state > > > >>>> consumption (both with bounded and unbounded data streams). For > > > >> streaming > > > >>>> it can also lead to an increased latency. > > > >>>> > > > >>>> Since this is also a decision that’s impossible to make > > automatically > > > >>>> perfectly reliably, first and foremost I would expect this to be > > > >>>> configurable via the API. With maybe some predefined triggers, > like > > on > > > >>>> watermark (for windowed operations), on checkpoint barrier (to > > > decrease > > > >>>> state size?), on element count, maybe memory usage (much easier to > > > >> estimate > > > >>>> with a known/predefined types, like in SQL)… and with some option > to > > > >>>> implement custom trigger. > > > >>>> > > > >>>> Also what would work the best would be to have a some form of > memory > > > >>>> consumption priority. For example if we are running out of memory > > for > > > >>>> HashJoin/Final aggregation, instead of spilling to disk or > crashing > > > the > > > >> job > > > >>>> with OOM it would be probably better to prune/dump the pre/local > > > >>>> aggregation state. But that’s another story. > > > >>>> > > > >>>> [1] https://github.com/apache/flink/pull/4626 < > > > >>>> https://github.com/apache/flink/pull/4626> > > > >>>> > > > >>>> Piotrek > > > >>>> > > > >>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: > > > >>>>> > > > >>>>> Excited and Big +1 for this feature. > > > >>>>> > > > >>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: > > > >>>>> > > > >>>>>> Nice feature. > > > >>>>>> Looking forward to having it in Flink. > > > >>>>>> > > > >>>>>> Regards, > > > >>>>>> Xiaogang > > > >>>>>> > > > >>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: > > > >>>>>> > > > >>>>>>> Hi all, > > > >>>>>>> > > > >>>>>>> As we mentioned in some conference, such as Flink Forward SF > 2019 > > > and > > > >>>>>> QCon > > > >>>>>>> Beijing 2019, our team has implemented "Local aggregation" in > our > > > >> inner > > > >>>>>>> Flink fork. This feature can effectively alleviate data skew. > > > >>>>>>> > > > >>>>>>> Currently, keyed streams are widely used to perform aggregating > > > >>>>>> operations > > > >>>>>>> (e.g., reduce, sum and window) on the elements that having the > > same > > > >>>> key. > > > >>>>>>> When executed at runtime, the elements with the same key will > be > > > sent > > > >>>> to > > > >>>>>>> and aggregated by the same task. > > > >>>>>>> > > > >>>>>>> The performance of these aggregating operations is very > sensitive > > > to > > > >>>> the > > > >>>>>>> distribution of keys. In the cases where the distribution of > keys > > > >>>>>> follows a > > > >>>>>>> powerful law, the performance will be significantly downgraded. > > > More > > > >>>>>>> unluckily, increasing the degree of parallelism does not help > > when > > > a > > > >>>> task > > > >>>>>>> is overloaded by a single key. > > > >>>>>>> > > > >>>>>>> Local aggregation is a widely-adopted method to reduce the > > > >> performance > > > >>>>>>> degraded by data skew. We can decompose the aggregating > > operations > > > >> into > > > >>>>>> two > > > >>>>>>> phases. In the first phase, we aggregate the elements of the > same > > > key > > > >>>> at > > > >>>>>>> the sender side to obtain partial results. Then at the second > > > phase, > > > >>>>>> these > > > >>>>>>> partial results are sent to receivers according to their keys > and > > > are > > > >>>>>>> combined to obtain the final result. Since the number of > partial > > > >>>> results > > > >>>>>>> received by each receiver is limited by the number of senders, > > the > > > >>>>>>> imbalance among receivers can be reduced. Besides, by reducing > > the > > > >>>> amount > > > >>>>>>> of transferred data the performance can be further improved. > > > >>>>>>> > > > >>>>>>> The design documentation is here: > > > >>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>> > > > >> > > > > > > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > > > >>>>>>> > > > >>>>>>> Any comment and feedback are welcome and appreciated. > > > >>>>>>> > > > >>>>>>> Best, > > > >>>>>>> Vino > > > >>>>>>> > > > >>>>>> > > > >>>> > > > >>>> > > > >> > > > >> -------------------------- > > > >> Ken Krugler > > > >> +1 530-210-6378 > > > >> http://www.scaleunlimited.com > > > >> Custom big data solutions & training > > > >> Flink, Solr, Hadoop, Cascading & Cassandra > > > >> > > > >> > > > > > > > > > |
Hi Vino,
Thanks a lot for your reply. > 1) When, Why and How to judge the memory is exhausted? My point here is that the local aggregate operator can buffer the inputs in memory and send out the results AT ANY TIME. i.e. element count or the time interval reached a pre-configured value, the memory usage of buffered elements reached a configured valued (suppose we can estimate the object size efficiently), or even when checkpoint is triggered. > > 2) If the local aggregate operator rarely needs to operate the state, what > do you think about fault tolerance? AbstractStreamOperator provides a method `prepareSnapshotPreBarrier` which can be used here to send out the results to the downstream when checkpoint is triggered. Then fault tolerance can work well. Even if there is no such a method available, we can still store the buffered elements or pre-aggregate results to state when checkpoint is triggered. The state access will be much less compared with window operator as only the elements not sent out when checkpoint occur have to be written to state. Suppose the checkpoint interval is 3 minutes and the trigger interval is 10 seconds, then only about less than "10/180" elements will be written to state. Thanks, Dian > 在 2019年6月5日,上午11:43,Biao Liu <[hidden email]> 写道: > > Hi Vino, > > +1 for this feature. It's useful for data skew. And it could also reduce > shuffled datum. > > I have some concerns about the API part. From my side, this feature should > be more like an improvement. I'm afraid the proposal is an overkill about > the API part. Many other systems support pre-aggregation as an optimization > of global aggregation. The optimization might be used automatically or > manually but with a simple API. The proposal introduces a series of > flexible local aggregation APIs. They could be independent with global > aggregation. It doesn't look like an improvement but introduces a lot of > features. I'm not sure if there is a bigger picture later. As for now the > API part looks a little heavy for me. > > > vino yang <[hidden email]> 于2019年6月5日周三 上午10:38写道: > >> Hi Litree, >> >> From an implementation level, the localKeyBy API returns a general >> KeyedStream, you can call all the APIs which KeyedStream provides, we did >> not restrict its usage, although we can do this (for example returns a new >> stream object named LocalKeyedStream). >> >> However, to achieve the goal of local aggregation, it only makes sense to >> call the window API. >> >> Best, >> Vino >> >> litree <[hidden email]> 于2019年6月4日周二 下午10:41写道: >> >>> Hi Vino, >>> >>> >>> I have read your design,something I want to know is the usage of these >> new >>> APIs.It looks like when I use localByKey,i must then use a window >> operator >>> to return a datastream,and then use keyby and another window operator to >>> get the final result? >>> >>> >>> thanks, >>> Litree >>> >>> >>> On 06/04/2019 17:22, vino yang wrote: >>> Hi Dian, >>> >>> Thanks for your reply. >>> >>> I know what you mean. However, if you think deeply, you will find your >>> implementation need to provide an operator which looks like a window >>> operator. You need to use state and receive aggregation function and >>> specify the trigger time. It looks like a lightweight window operator. >>> Right? >>> >>> We try to reuse Flink provided functions and reduce complexity. IMO, It >> is >>> more user-friendly because users are familiar with the window API. >>> >>> Best, >>> Vino >>> >>> >>> Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道: >>> >>>> Hi Vino, >>>> >>>> Thanks a lot for starting this discussion. +1 to this feature as I >> think >>>> it will be very useful. >>>> >>>> Regarding to using window to buffer the input elements, personally I >>> don't >>>> think it's a good solution for the following reasons: >>>> 1) As we know that WindowOperator will store the accumulated results in >>>> states, this is not necessary for Local Aggregate operator. >>>> 2) For WindowOperator, each input element will be accumulated to >> states. >>>> This is also not necessary for Local Aggregate operator and storing the >>>> input elements in memory is enough. >>>> >>>> Thanks, >>>> Dian >>>> >>>>> 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道: >>>>> >>>>> Hi Ken, >>>>> >>>>> Thanks for your reply. >>>>> >>>>> As I said before, we try to reuse Flink's state concept (fault >>> tolerance >>>>> and guarantee "Exactly-Once" semantics). So we did not consider >> cache. >>>>> >>>>> In addition, if we use Flink's state, the OOM related issue is not a >>> key >>>>> problem we need to consider. >>>>> >>>>> Best, >>>>> Vino >>>>> >>>>> Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道: >>>>> >>>>>> Hi all, >>>>>> >>>>>> Cascading implemented this “map-side reduce” functionality with an >> LLR >>>>>> cache. >>>>>> >>>>>> That worked well, as then the skewed keys would always be in the >>> cache. >>>>>> >>>>>> The API let you decide the size of the cache, in terms of number of >>>>>> entries. >>>>>> >>>>>> Having a memory limit would have been better for many of our use >>> cases, >>>>>> though FWIR there’s no good way to estimate in-memory size for >>> objects. >>>>>> >>>>>> — Ken >>>>>> >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> >> wrote: >>>>>>> >>>>>>> Hi Piotr, >>>>>>> >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just >> added >>> an >>>>>>> inner flag to identify the local mode) which is Flink has provided >>>>>> before. >>>>>>> Users can call all the APIs(especially *window* APIs) which >>> KeyedStream >>>>>>> provided. >>>>>>> >>>>>>> So if users want to use local aggregation, they should call the >>> window >>>>>> API >>>>>>> to build a local window that means users should (or say "can") >>> specify >>>>>> the >>>>>>> window length and other information based on their needs. >>>>>>> >>>>>>> I think you described another idea different from us. We did not >> try >>> to >>>>>>> react after triggering some predefined threshold. We tend to give >>> users >>>>>> the >>>>>>> discretion to make decisions. >>>>>>> >>>>>>> Our design idea tends to reuse Flink provided concept and functions >>>> like >>>>>>> state and window (IMO, we do not need to worry about OOM and the >>> issues >>>>>> you >>>>>>> mentioned). >>>>>>> >>>>>>> Best, >>>>>>> Vino >>>>>>> >>>>>>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> +1 for the idea from my side. I’ve even attempted to add similar >>>> feature >>>>>>>> quite some time ago, but didn’t get enough traction [1]. >>>>>>>> >>>>>>>> I’ve read through your document and I couldn’t find it mentioning >>>>>>>> anywhere, when the pre aggregated result should be emitted down >> the >>>>>> stream? >>>>>>>> I think that’s one of the most crucial decision, since wrong >>> decision >>>>>> here >>>>>>>> can lead to decrease of performance or to an explosion of >>> memory/state >>>>>>>> consumption (both with bounded and unbounded data streams). For >>>>>> streaming >>>>>>>> it can also lead to an increased latency. >>>>>>>> >>>>>>>> Since this is also a decision that’s impossible to make >>> automatically >>>>>>>> perfectly reliably, first and foremost I would expect this to be >>>>>>>> configurable via the API. With maybe some predefined triggers, >> like >>> on >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to >>>> decrease >>>>>>>> state size?), on element count, maybe memory usage (much easier to >>>>>> estimate >>>>>>>> with a known/predefined types, like in SQL)… and with some option >> to >>>>>>>> implement custom trigger. >>>>>>>> >>>>>>>> Also what would work the best would be to have a some form of >> memory >>>>>>>> consumption priority. For example if we are running out of memory >>> for >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or >> crashing >>>> the >>>>>> job >>>>>>>> with OOM it would be probably better to prune/dump the pre/local >>>>>>>> aggregation state. But that’s another story. >>>>>>>> >>>>>>>> [1] https://github.com/apache/flink/pull/4626 < >>>>>>>> https://github.com/apache/flink/pull/4626> >>>>>>>> >>>>>>>> Piotrek >>>>>>>> >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: >>>>>>>>> >>>>>>>>> Excited and Big +1 for this feature. >>>>>>>>> >>>>>>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: >>>>>>>>> >>>>>>>>>> Nice feature. >>>>>>>>>> Looking forward to having it in Flink. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Xiaogang >>>>>>>>>> >>>>>>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: >>>>>>>>>> >>>>>>>>>>> Hi all, >>>>>>>>>>> >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward SF >> 2019 >>>> and >>>>>>>>>> QCon >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in >> our >>>>>> inner >>>>>>>>>>> Flink fork. This feature can effectively alleviate data skew. >>>>>>>>>>> >>>>>>>>>>> Currently, keyed streams are widely used to perform aggregating >>>>>>>>>> operations >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having the >>> same >>>>>>>> key. >>>>>>>>>>> When executed at runtime, the elements with the same key will >> be >>>> sent >>>>>>>> to >>>>>>>>>>> and aggregated by the same task. >>>>>>>>>>> >>>>>>>>>>> The performance of these aggregating operations is very >> sensitive >>>> to >>>>>>>> the >>>>>>>>>>> distribution of keys. In the cases where the distribution of >> keys >>>>>>>>>> follows a >>>>>>>>>>> powerful law, the performance will be significantly downgraded. >>>> More >>>>>>>>>>> unluckily, increasing the degree of parallelism does not help >>> when >>>> a >>>>>>>> task >>>>>>>>>>> is overloaded by a single key. >>>>>>>>>>> >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the >>>>>> performance >>>>>>>>>>> degraded by data skew. We can decompose the aggregating >>> operations >>>>>> into >>>>>>>>>> two >>>>>>>>>>> phases. In the first phase, we aggregate the elements of the >> same >>>> key >>>>>>>> at >>>>>>>>>>> the sender side to obtain partial results. Then at the second >>>> phase, >>>>>>>>>> these >>>>>>>>>>> partial results are sent to receivers according to their keys >> and >>>> are >>>>>>>>>>> combined to obtain the final result. Since the number of >> partial >>>>>>>> results >>>>>>>>>>> received by each receiver is limited by the number of senders, >>> the >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing >>> the >>>>>>>> amount >>>>>>>>>>> of transferred data the performance can be further improved. >>>>>>>>>>> >>>>>>>>>>> The design documentation is here: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> >>> >> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing >>>>>>>>>>> >>>>>>>>>>> Any comment and feedback are welcome and appreciated. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Vino >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>>> -------------------------- >>>>>> Ken Krugler >>>>>> +1 530-210-6378 >>>>>> http://www.scaleunlimited.com >>>>>> Custom big data solutions & training >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra >>>>>> >>>>>> >>>> >>>> >>> >> |
Hi Dian,
I still think your implementation is similar to the window operator, you mentioned the scalable trigger mechanism, the window API also can customize trigger. Moreover, IMO, the design should guarantee a deterministic semantics, I think based on memory availability is a non-deterministic design. In addition, if the implementation depends on the timing of checkpoint, I do not think it is reasonable, we should avoid affecting the checkpoint's progress. Best, Vino Dian Fu <[hidden email]> 于2019年6月5日周三 下午1:55写道: > Hi Vino, > > Thanks a lot for your reply. > > > 1) When, Why and How to judge the memory is exhausted? > > My point here is that the local aggregate operator can buffer the inputs > in memory and send out the results AT ANY TIME. i.e. element count or the > time interval reached a pre-configured value, the memory usage of buffered > elements reached a configured valued (suppose we can estimate the object > size efficiently), or even when checkpoint is triggered. > > > > > 2) If the local aggregate operator rarely needs to operate the state, > what > > do you think about fault tolerance? > > AbstractStreamOperator provides a method `prepareSnapshotPreBarrier` which > can be used here to send out the results to the downstream when checkpoint > is triggered. Then fault tolerance can work well. > > Even if there is no such a method available, we can still store the > buffered elements or pre-aggregate results to state when checkpoint is > triggered. The state access will be much less compared with window operator > as only the elements not sent out when checkpoint occur have to be written > to state. Suppose the checkpoint interval is 3 minutes and the trigger > interval is 10 seconds, then only about less than "10/180" elements will be > written to state. > > > Thanks, > Dian > > > > 在 2019年6月5日,上午11:43,Biao Liu <[hidden email]> 写道: > > > > Hi Vino, > > > > +1 for this feature. It's useful for data skew. And it could also reduce > > shuffled datum. > > > > I have some concerns about the API part. From my side, this feature > should > > be more like an improvement. I'm afraid the proposal is an overkill about > > the API part. Many other systems support pre-aggregation as an > optimization > > of global aggregation. The optimization might be used automatically or > > manually but with a simple API. The proposal introduces a series of > > flexible local aggregation APIs. They could be independent with global > > aggregation. It doesn't look like an improvement but introduces a lot of > > features. I'm not sure if there is a bigger picture later. As for now the > > API part looks a little heavy for me. > > > > > > vino yang <[hidden email]> 于2019年6月5日周三 上午10:38写道: > > > >> Hi Litree, > >> > >> From an implementation level, the localKeyBy API returns a general > >> KeyedStream, you can call all the APIs which KeyedStream provides, we > did > >> not restrict its usage, although we can do this (for example returns a > new > >> stream object named LocalKeyedStream). > >> > >> However, to achieve the goal of local aggregation, it only makes sense > to > >> call the window API. > >> > >> Best, > >> Vino > >> > >> litree <[hidden email]> 于2019年6月4日周二 下午10:41写道: > >> > >>> Hi Vino, > >>> > >>> > >>> I have read your design,something I want to know is the usage of these > >> new > >>> APIs.It looks like when I use localByKey,i must then use a window > >> operator > >>> to return a datastream,and then use keyby and another window operator > to > >>> get the final result? > >>> > >>> > >>> thanks, > >>> Litree > >>> > >>> > >>> On 06/04/2019 17:22, vino yang wrote: > >>> Hi Dian, > >>> > >>> Thanks for your reply. > >>> > >>> I know what you mean. However, if you think deeply, you will find your > >>> implementation need to provide an operator which looks like a window > >>> operator. You need to use state and receive aggregation function and > >>> specify the trigger time. It looks like a lightweight window operator. > >>> Right? > >>> > >>> We try to reuse Flink provided functions and reduce complexity. IMO, It > >> is > >>> more user-friendly because users are familiar with the window API. > >>> > >>> Best, > >>> Vino > >>> > >>> > >>> Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道: > >>> > >>>> Hi Vino, > >>>> > >>>> Thanks a lot for starting this discussion. +1 to this feature as I > >> think > >>>> it will be very useful. > >>>> > >>>> Regarding to using window to buffer the input elements, personally I > >>> don't > >>>> think it's a good solution for the following reasons: > >>>> 1) As we know that WindowOperator will store the accumulated results > in > >>>> states, this is not necessary for Local Aggregate operator. > >>>> 2) For WindowOperator, each input element will be accumulated to > >> states. > >>>> This is also not necessary for Local Aggregate operator and storing > the > >>>> input elements in memory is enough. > >>>> > >>>> Thanks, > >>>> Dian > >>>> > >>>>> 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道: > >>>>> > >>>>> Hi Ken, > >>>>> > >>>>> Thanks for your reply. > >>>>> > >>>>> As I said before, we try to reuse Flink's state concept (fault > >>> tolerance > >>>>> and guarantee "Exactly-Once" semantics). So we did not consider > >> cache. > >>>>> > >>>>> In addition, if we use Flink's state, the OOM related issue is not a > >>> key > >>>>> problem we need to consider. > >>>>> > >>>>> Best, > >>>>> Vino > >>>>> > >>>>> Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道: > >>>>> > >>>>>> Hi all, > >>>>>> > >>>>>> Cascading implemented this “map-side reduce” functionality with an > >> LLR > >>>>>> cache. > >>>>>> > >>>>>> That worked well, as then the skewed keys would always be in the > >>> cache. > >>>>>> > >>>>>> The API let you decide the size of the cache, in terms of number of > >>>>>> entries. > >>>>>> > >>>>>> Having a memory limit would have been better for many of our use > >>> cases, > >>>>>> though FWIR there’s no good way to estimate in-memory size for > >>> objects. > >>>>>> > >>>>>> — Ken > >>>>>> > >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> > >> wrote: > >>>>>>> > >>>>>>> Hi Piotr, > >>>>>>> > >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just > >> added > >>> an > >>>>>>> inner flag to identify the local mode) which is Flink has provided > >>>>>> before. > >>>>>>> Users can call all the APIs(especially *window* APIs) which > >>> KeyedStream > >>>>>>> provided. > >>>>>>> > >>>>>>> So if users want to use local aggregation, they should call the > >>> window > >>>>>> API > >>>>>>> to build a local window that means users should (or say "can") > >>> specify > >>>>>> the > >>>>>>> window length and other information based on their needs. > >>>>>>> > >>>>>>> I think you described another idea different from us. We did not > >> try > >>> to > >>>>>>> react after triggering some predefined threshold. We tend to give > >>> users > >>>>>> the > >>>>>>> discretion to make decisions. > >>>>>>> > >>>>>>> Our design idea tends to reuse Flink provided concept and functions > >>>> like > >>>>>>> state and window (IMO, we do not need to worry about OOM and the > >>> issues > >>>>>> you > >>>>>>> mentioned). > >>>>>>> > >>>>>>> Best, > >>>>>>> Vino > >>>>>>> > >>>>>>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: > >>>>>>> > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> +1 for the idea from my side. I’ve even attempted to add similar > >>>> feature > >>>>>>>> quite some time ago, but didn’t get enough traction [1]. > >>>>>>>> > >>>>>>>> I’ve read through your document and I couldn’t find it mentioning > >>>>>>>> anywhere, when the pre aggregated result should be emitted down > >> the > >>>>>> stream? > >>>>>>>> I think that’s one of the most crucial decision, since wrong > >>> decision > >>>>>> here > >>>>>>>> can lead to decrease of performance or to an explosion of > >>> memory/state > >>>>>>>> consumption (both with bounded and unbounded data streams). For > >>>>>> streaming > >>>>>>>> it can also lead to an increased latency. > >>>>>>>> > >>>>>>>> Since this is also a decision that’s impossible to make > >>> automatically > >>>>>>>> perfectly reliably, first and foremost I would expect this to be > >>>>>>>> configurable via the API. With maybe some predefined triggers, > >> like > >>> on > >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to > >>>> decrease > >>>>>>>> state size?), on element count, maybe memory usage (much easier to > >>>>>> estimate > >>>>>>>> with a known/predefined types, like in SQL)… and with some option > >> to > >>>>>>>> implement custom trigger. > >>>>>>>> > >>>>>>>> Also what would work the best would be to have a some form of > >> memory > >>>>>>>> consumption priority. For example if we are running out of memory > >>> for > >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or > >> crashing > >>>> the > >>>>>> job > >>>>>>>> with OOM it would be probably better to prune/dump the pre/local > >>>>>>>> aggregation state. But that’s another story. > >>>>>>>> > >>>>>>>> [1] https://github.com/apache/flink/pull/4626 < > >>>>>>>> https://github.com/apache/flink/pull/4626> > >>>>>>>> > >>>>>>>> Piotrek > >>>>>>>> > >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: > >>>>>>>>> > >>>>>>>>> Excited and Big +1 for this feature. > >>>>>>>>> > >>>>>>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: > >>>>>>>>> > >>>>>>>>>> Nice feature. > >>>>>>>>>> Looking forward to having it in Flink. > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Xiaogang > >>>>>>>>>> > >>>>>>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: > >>>>>>>>>> > >>>>>>>>>>> Hi all, > >>>>>>>>>>> > >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward SF > >> 2019 > >>>> and > >>>>>>>>>> QCon > >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in > >> our > >>>>>> inner > >>>>>>>>>>> Flink fork. This feature can effectively alleviate data skew. > >>>>>>>>>>> > >>>>>>>>>>> Currently, keyed streams are widely used to perform aggregating > >>>>>>>>>> operations > >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having the > >>> same > >>>>>>>> key. > >>>>>>>>>>> When executed at runtime, the elements with the same key will > >> be > >>>> sent > >>>>>>>> to > >>>>>>>>>>> and aggregated by the same task. > >>>>>>>>>>> > >>>>>>>>>>> The performance of these aggregating operations is very > >> sensitive > >>>> to > >>>>>>>> the > >>>>>>>>>>> distribution of keys. In the cases where the distribution of > >> keys > >>>>>>>>>> follows a > >>>>>>>>>>> powerful law, the performance will be significantly downgraded. > >>>> More > >>>>>>>>>>> unluckily, increasing the degree of parallelism does not help > >>> when > >>>> a > >>>>>>>> task > >>>>>>>>>>> is overloaded by a single key. > >>>>>>>>>>> > >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the > >>>>>> performance > >>>>>>>>>>> degraded by data skew. We can decompose the aggregating > >>> operations > >>>>>> into > >>>>>>>>>> two > >>>>>>>>>>> phases. In the first phase, we aggregate the elements of the > >> same > >>>> key > >>>>>>>> at > >>>>>>>>>>> the sender side to obtain partial results. Then at the second > >>>> phase, > >>>>>>>>>> these > >>>>>>>>>>> partial results are sent to receivers according to their keys > >> and > >>>> are > >>>>>>>>>>> combined to obtain the final result. Since the number of > >> partial > >>>>>>>> results > >>>>>>>>>>> received by each receiver is limited by the number of senders, > >>> the > >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing > >>> the > >>>>>>>> amount > >>>>>>>>>>> of transferred data the performance can be further improved. > >>>>>>>>>>> > >>>>>>>>>>> The design documentation is here: > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > >>>>>>>>>>> > >>>>>>>>>>> Any comment and feedback are welcome and appreciated. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Vino > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>> > >>>>>> -------------------------- > >>>>>> Ken Krugler > >>>>>> +1 530-210-6378 > >>>>>> http://www.scaleunlimited.com > >>>>>> Custom big data solutions & training > >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra > >>>>>> > >>>>>> > >>>> > >>>> > >>> > >> > > |
Hi Aljoscha,
What do you think about this feature and design document? Best, Vino vino yang <[hidden email]> 于2019年6月5日周三 下午4:18写道: > Hi Dian, > > I still think your implementation is similar to the window operator, you > mentioned the scalable trigger mechanism, the window API also can customize > trigger. > > Moreover, IMO, the design should guarantee a deterministic semantics, I > think based on memory availability is a non-deterministic design. > > In addition, if the implementation depends on the timing of checkpoint, I > do not think it is reasonable, we should avoid affecting the checkpoint's > progress. > > Best, > Vino > > Dian Fu <[hidden email]> 于2019年6月5日周三 下午1:55写道: > >> Hi Vino, >> >> Thanks a lot for your reply. >> >> > 1) When, Why and How to judge the memory is exhausted? >> >> My point here is that the local aggregate operator can buffer the inputs >> in memory and send out the results AT ANY TIME. i.e. element count or the >> time interval reached a pre-configured value, the memory usage of buffered >> elements reached a configured valued (suppose we can estimate the object >> size efficiently), or even when checkpoint is triggered. >> >> > >> > 2) If the local aggregate operator rarely needs to operate the state, >> what >> > do you think about fault tolerance? >> >> AbstractStreamOperator provides a method `prepareSnapshotPreBarrier` >> which can be used here to send out the results to the downstream when >> checkpoint is triggered. Then fault tolerance can work well. >> >> Even if there is no such a method available, we can still store the >> buffered elements or pre-aggregate results to state when checkpoint is >> triggered. The state access will be much less compared with window operator >> as only the elements not sent out when checkpoint occur have to be written >> to state. Suppose the checkpoint interval is 3 minutes and the trigger >> interval is 10 seconds, then only about less than "10/180" elements will be >> written to state. >> >> >> Thanks, >> Dian >> >> >> > 在 2019年6月5日,上午11:43,Biao Liu <[hidden email]> 写道: >> > >> > Hi Vino, >> > >> > +1 for this feature. It's useful for data skew. And it could also reduce >> > shuffled datum. >> > >> > I have some concerns about the API part. From my side, this feature >> should >> > be more like an improvement. I'm afraid the proposal is an overkill >> about >> > the API part. Many other systems support pre-aggregation as an >> optimization >> > of global aggregation. The optimization might be used automatically or >> > manually but with a simple API. The proposal introduces a series of >> > flexible local aggregation APIs. They could be independent with global >> > aggregation. It doesn't look like an improvement but introduces a lot of >> > features. I'm not sure if there is a bigger picture later. As for now >> the >> > API part looks a little heavy for me. >> > >> > >> > vino yang <[hidden email]> 于2019年6月5日周三 上午10:38写道: >> > >> >> Hi Litree, >> >> >> >> From an implementation level, the localKeyBy API returns a general >> >> KeyedStream, you can call all the APIs which KeyedStream provides, we >> did >> >> not restrict its usage, although we can do this (for example returns a >> new >> >> stream object named LocalKeyedStream). >> >> >> >> However, to achieve the goal of local aggregation, it only makes sense >> to >> >> call the window API. >> >> >> >> Best, >> >> Vino >> >> >> >> litree <[hidden email]> 于2019年6月4日周二 下午10:41写道: >> >> >> >>> Hi Vino, >> >>> >> >>> >> >>> I have read your design,something I want to know is the usage of these >> >> new >> >>> APIs.It looks like when I use localByKey,i must then use a window >> >> operator >> >>> to return a datastream,and then use keyby and another window operator >> to >> >>> get the final result? >> >>> >> >>> >> >>> thanks, >> >>> Litree >> >>> >> >>> >> >>> On 06/04/2019 17:22, vino yang wrote: >> >>> Hi Dian, >> >>> >> >>> Thanks for your reply. >> >>> >> >>> I know what you mean. However, if you think deeply, you will find your >> >>> implementation need to provide an operator which looks like a window >> >>> operator. You need to use state and receive aggregation function and >> >>> specify the trigger time. It looks like a lightweight window operator. >> >>> Right? >> >>> >> >>> We try to reuse Flink provided functions and reduce complexity. IMO, >> It >> >> is >> >>> more user-friendly because users are familiar with the window API. >> >>> >> >>> Best, >> >>> Vino >> >>> >> >>> >> >>> Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道: >> >>> >> >>>> Hi Vino, >> >>>> >> >>>> Thanks a lot for starting this discussion. +1 to this feature as I >> >> think >> >>>> it will be very useful. >> >>>> >> >>>> Regarding to using window to buffer the input elements, personally I >> >>> don't >> >>>> think it's a good solution for the following reasons: >> >>>> 1) As we know that WindowOperator will store the accumulated results >> in >> >>>> states, this is not necessary for Local Aggregate operator. >> >>>> 2) For WindowOperator, each input element will be accumulated to >> >> states. >> >>>> This is also not necessary for Local Aggregate operator and storing >> the >> >>>> input elements in memory is enough. >> >>>> >> >>>> Thanks, >> >>>> Dian >> >>>> >> >>>>> 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道: >> >>>>> >> >>>>> Hi Ken, >> >>>>> >> >>>>> Thanks for your reply. >> >>>>> >> >>>>> As I said before, we try to reuse Flink's state concept (fault >> >>> tolerance >> >>>>> and guarantee "Exactly-Once" semantics). So we did not consider >> >> cache. >> >>>>> >> >>>>> In addition, if we use Flink's state, the OOM related issue is not a >> >>> key >> >>>>> problem we need to consider. >> >>>>> >> >>>>> Best, >> >>>>> Vino >> >>>>> >> >>>>> Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道: >> >>>>> >> >>>>>> Hi all, >> >>>>>> >> >>>>>> Cascading implemented this “map-side reduce” functionality with an >> >> LLR >> >>>>>> cache. >> >>>>>> >> >>>>>> That worked well, as then the skewed keys would always be in the >> >>> cache. >> >>>>>> >> >>>>>> The API let you decide the size of the cache, in terms of number of >> >>>>>> entries. >> >>>>>> >> >>>>>> Having a memory limit would have been better for many of our use >> >>> cases, >> >>>>>> though FWIR there’s no good way to estimate in-memory size for >> >>> objects. >> >>>>>> >> >>>>>> — Ken >> >>>>>> >> >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> >> >> wrote: >> >>>>>>> >> >>>>>>> Hi Piotr, >> >>>>>>> >> >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just >> >> added >> >>> an >> >>>>>>> inner flag to identify the local mode) which is Flink has provided >> >>>>>> before. >> >>>>>>> Users can call all the APIs(especially *window* APIs) which >> >>> KeyedStream >> >>>>>>> provided. >> >>>>>>> >> >>>>>>> So if users want to use local aggregation, they should call the >> >>> window >> >>>>>> API >> >>>>>>> to build a local window that means users should (or say "can") >> >>> specify >> >>>>>> the >> >>>>>>> window length and other information based on their needs. >> >>>>>>> >> >>>>>>> I think you described another idea different from us. We did not >> >> try >> >>> to >> >>>>>>> react after triggering some predefined threshold. We tend to give >> >>> users >> >>>>>> the >> >>>>>>> discretion to make decisions. >> >>>>>>> >> >>>>>>> Our design idea tends to reuse Flink provided concept and >> functions >> >>>> like >> >>>>>>> state and window (IMO, we do not need to worry about OOM and the >> >>> issues >> >>>>>> you >> >>>>>>> mentioned). >> >>>>>>> >> >>>>>>> Best, >> >>>>>>> Vino >> >>>>>>> >> >>>>>>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: >> >>>>>>> >> >>>>>>>> Hi, >> >>>>>>>> >> >>>>>>>> +1 for the idea from my side. I’ve even attempted to add similar >> >>>> feature >> >>>>>>>> quite some time ago, but didn’t get enough traction [1]. >> >>>>>>>> >> >>>>>>>> I’ve read through your document and I couldn’t find it mentioning >> >>>>>>>> anywhere, when the pre aggregated result should be emitted down >> >> the >> >>>>>> stream? >> >>>>>>>> I think that’s one of the most crucial decision, since wrong >> >>> decision >> >>>>>> here >> >>>>>>>> can lead to decrease of performance or to an explosion of >> >>> memory/state >> >>>>>>>> consumption (both with bounded and unbounded data streams). For >> >>>>>> streaming >> >>>>>>>> it can also lead to an increased latency. >> >>>>>>>> >> >>>>>>>> Since this is also a decision that’s impossible to make >> >>> automatically >> >>>>>>>> perfectly reliably, first and foremost I would expect this to be >> >>>>>>>> configurable via the API. With maybe some predefined triggers, >> >> like >> >>> on >> >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to >> >>>> decrease >> >>>>>>>> state size?), on element count, maybe memory usage (much easier >> to >> >>>>>> estimate >> >>>>>>>> with a known/predefined types, like in SQL)… and with some option >> >> to >> >>>>>>>> implement custom trigger. >> >>>>>>>> >> >>>>>>>> Also what would work the best would be to have a some form of >> >> memory >> >>>>>>>> consumption priority. For example if we are running out of memory >> >>> for >> >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or >> >> crashing >> >>>> the >> >>>>>> job >> >>>>>>>> with OOM it would be probably better to prune/dump the pre/local >> >>>>>>>> aggregation state. But that’s another story. >> >>>>>>>> >> >>>>>>>> [1] https://github.com/apache/flink/pull/4626 < >> >>>>>>>> https://github.com/apache/flink/pull/4626> >> >>>>>>>> >> >>>>>>>> Piotrek >> >>>>>>>> >> >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: >> >>>>>>>>> >> >>>>>>>>> Excited and Big +1 for this feature. >> >>>>>>>>> >> >>>>>>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: >> >>>>>>>>> >> >>>>>>>>>> Nice feature. >> >>>>>>>>>> Looking forward to having it in Flink. >> >>>>>>>>>> >> >>>>>>>>>> Regards, >> >>>>>>>>>> Xiaogang >> >>>>>>>>>> >> >>>>>>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: >> >>>>>>>>>> >> >>>>>>>>>>> Hi all, >> >>>>>>>>>>> >> >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward SF >> >> 2019 >> >>>> and >> >>>>>>>>>> QCon >> >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in >> >> our >> >>>>>> inner >> >>>>>>>>>>> Flink fork. This feature can effectively alleviate data skew. >> >>>>>>>>>>> >> >>>>>>>>>>> Currently, keyed streams are widely used to perform >> aggregating >> >>>>>>>>>> operations >> >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having the >> >>> same >> >>>>>>>> key. >> >>>>>>>>>>> When executed at runtime, the elements with the same key will >> >> be >> >>>> sent >> >>>>>>>> to >> >>>>>>>>>>> and aggregated by the same task. >> >>>>>>>>>>> >> >>>>>>>>>>> The performance of these aggregating operations is very >> >> sensitive >> >>>> to >> >>>>>>>> the >> >>>>>>>>>>> distribution of keys. In the cases where the distribution of >> >> keys >> >>>>>>>>>> follows a >> >>>>>>>>>>> powerful law, the performance will be significantly >> downgraded. >> >>>> More >> >>>>>>>>>>> unluckily, increasing the degree of parallelism does not help >> >>> when >> >>>> a >> >>>>>>>> task >> >>>>>>>>>>> is overloaded by a single key. >> >>>>>>>>>>> >> >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the >> >>>>>> performance >> >>>>>>>>>>> degraded by data skew. We can decompose the aggregating >> >>> operations >> >>>>>> into >> >>>>>>>>>> two >> >>>>>>>>>>> phases. In the first phase, we aggregate the elements of the >> >> same >> >>>> key >> >>>>>>>> at >> >>>>>>>>>>> the sender side to obtain partial results. Then at the second >> >>>> phase, >> >>>>>>>>>> these >> >>>>>>>>>>> partial results are sent to receivers according to their keys >> >> and >> >>>> are >> >>>>>>>>>>> combined to obtain the final result. Since the number of >> >> partial >> >>>>>>>> results >> >>>>>>>>>>> received by each receiver is limited by the number of senders, >> >>> the >> >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing >> >>> the >> >>>>>>>> amount >> >>>>>>>>>>> of transferred data the performance can be further improved. >> >>>>>>>>>>> >> >>>>>>>>>>> The design documentation is here: >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>> >> >>>>>> >> >>>> >> >>> >> >> >> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing >> >>>>>>>>>>> >> >>>>>>>>>>> Any comment and feedback are welcome and appreciated. >> >>>>>>>>>>> >> >>>>>>>>>>> Best, >> >>>>>>>>>>> Vino >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>> >> >>>>>> -------------------------- >> >>>>>> Ken Krugler >> >>>>>> +1 530-210-6378 >> >>>>>> http://www.scaleunlimited.com >> >>>>>> Custom big data solutions & training >> >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra >> >>>>>> >> >>>>>> >> >>>> >> >>>> >> >>> >> >> >> >> |
Hi,
+1 from my side. Looking forward to this must-have feature :) Best, boshu At 2019-06-05 16:33:13, "vino yang" <[hidden email]> wrote: >Hi Aljoscha, > >What do you think about this feature and design document? > >Best, >Vino > >vino yang <[hidden email]> 于2019年6月5日周三 下午4:18写道: > >> Hi Dian, >> >> I still think your implementation is similar to the window operator, you >> mentioned the scalable trigger mechanism, the window API also can customize >> trigger. >> >> Moreover, IMO, the design should guarantee a deterministic semantics, I >> think based on memory availability is a non-deterministic design. >> >> In addition, if the implementation depends on the timing of checkpoint, I >> do not think it is reasonable, we should avoid affecting the checkpoint's >> progress. >> >> Best, >> Vino >> >> Dian Fu <[hidden email]> 于2019年6月5日周三 下午1:55写道: >> >>> Hi Vino, >>> >>> Thanks a lot for your reply. >>> >>> > 1) When, Why and How to judge the memory is exhausted? >>> >>> My point here is that the local aggregate operator can buffer the inputs >>> in memory and send out the results AT ANY TIME. i.e. element count or the >>> time interval reached a pre-configured value, the memory usage of buffered >>> elements reached a configured valued (suppose we can estimate the object >>> size efficiently), or even when checkpoint is triggered. >>> >>> > >>> > 2) If the local aggregate operator rarely needs to operate the state, >>> what >>> > do you think about fault tolerance? >>> >>> AbstractStreamOperator provides a method `prepareSnapshotPreBarrier` >>> which can be used here to send out the results to the downstream when >>> checkpoint is triggered. Then fault tolerance can work well. >>> >>> Even if there is no such a method available, we can still store the >>> buffered elements or pre-aggregate results to state when checkpoint is >>> triggered. The state access will be much less compared with window operator >>> as only the elements not sent out when checkpoint occur have to be written >>> to state. Suppose the checkpoint interval is 3 minutes and the trigger >>> interval is 10 seconds, then only about less than "10/180" elements will be >>> written to state. >>> >>> >>> Thanks, >>> Dian >>> >>> >>> > 在 2019年6月5日,上午11:43,Biao Liu <[hidden email]> 写道: >>> > >>> > Hi Vino, >>> > >>> > +1 for this feature. It's useful for data skew. And it could also reduce >>> > shuffled datum. >>> > >>> > I have some concerns about the API part. From my side, this feature >>> should >>> > be more like an improvement. I'm afraid the proposal is an overkill >>> about >>> > the API part. Many other systems support pre-aggregation as an >>> optimization >>> > of global aggregation. The optimization might be used automatically or >>> > manually but with a simple API. The proposal introduces a series of >>> > flexible local aggregation APIs. They could be independent with global >>> > aggregation. It doesn't look like an improvement but introduces a lot of >>> > features. I'm not sure if there is a bigger picture later. As for now >>> the >>> > API part looks a little heavy for me. >>> > >>> > >>> > vino yang <[hidden email]> 于2019年6月5日周三 上午10:38写道: >>> > >>> >> Hi Litree, >>> >> >>> >> From an implementation level, the localKeyBy API returns a general >>> >> KeyedStream, you can call all the APIs which KeyedStream provides, we >>> did >>> >> not restrict its usage, although we can do this (for example returns a >>> new >>> >> stream object named LocalKeyedStream). >>> >> >>> >> However, to achieve the goal of local aggregation, it only makes sense >>> to >>> >> call the window API. >>> >> >>> >> Best, >>> >> Vino >>> >> >>> >> litree <[hidden email]> 于2019年6月4日周二 下午10:41写道: >>> >> >>> >>> Hi Vino, >>> >>> >>> >>> >>> >>> I have read your design,something I want to know is the usage of these >>> >> new >>> >>> APIs.It looks like when I use localByKey,i must then use a window >>> >> operator >>> >>> to return a datastream,and then use keyby and another window operator >>> to >>> >>> get the final result? >>> >>> >>> >>> >>> >>> thanks, >>> >>> Litree >>> >>> >>> >>> >>> >>> On 06/04/2019 17:22, vino yang wrote: >>> >>> Hi Dian, >>> >>> >>> >>> Thanks for your reply. >>> >>> >>> >>> I know what you mean. However, if you think deeply, you will find your >>> >>> implementation need to provide an operator which looks like a window >>> >>> operator. You need to use state and receive aggregation function and >>> >>> specify the trigger time. It looks like a lightweight window operator. >>> >>> Right? >>> >>> >>> >>> We try to reuse Flink provided functions and reduce complexity. IMO, >>> It >>> >> is >>> >>> more user-friendly because users are familiar with the window API. >>> >>> >>> >>> Best, >>> >>> Vino >>> >>> >>> >>> >>> >>> Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道: >>> >>> >>> >>>> Hi Vino, >>> >>>> >>> >>>> Thanks a lot for starting this discussion. +1 to this feature as I >>> >> think >>> >>>> it will be very useful. >>> >>>> >>> >>>> Regarding to using window to buffer the input elements, personally I >>> >>> don't >>> >>>> think it's a good solution for the following reasons: >>> >>>> 1) As we know that WindowOperator will store the accumulated results >>> in >>> >>>> states, this is not necessary for Local Aggregate operator. >>> >>>> 2) For WindowOperator, each input element will be accumulated to >>> >> states. >>> >>>> This is also not necessary for Local Aggregate operator and storing >>> the >>> >>>> input elements in memory is enough. >>> >>>> >>> >>>> Thanks, >>> >>>> Dian >>> >>>> >>> >>>>> 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道: >>> >>>>> >>> >>>>> Hi Ken, >>> >>>>> >>> >>>>> Thanks for your reply. >>> >>>>> >>> >>>>> As I said before, we try to reuse Flink's state concept (fault >>> >>> tolerance >>> >>>>> and guarantee "Exactly-Once" semantics). So we did not consider >>> >> cache. >>> >>>>> >>> >>>>> In addition, if we use Flink's state, the OOM related issue is not a >>> >>> key >>> >>>>> problem we need to consider. >>> >>>>> >>> >>>>> Best, >>> >>>>> Vino >>> >>>>> >>> >>>>> Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道: >>> >>>>> >>> >>>>>> Hi all, >>> >>>>>> >>> >>>>>> Cascading implemented this “map-side reduce” functionality with an >>> >> LLR >>> >>>>>> cache. >>> >>>>>> >>> >>>>>> That worked well, as then the skewed keys would always be in the >>> >>> cache. >>> >>>>>> >>> >>>>>> The API let you decide the size of the cache, in terms of number of >>> >>>>>> entries. >>> >>>>>> >>> >>>>>> Having a memory limit would have been better for many of our use >>> >>> cases, >>> >>>>>> though FWIR there’s no good way to estimate in-memory size for >>> >>> objects. >>> >>>>>> >>> >>>>>> — Ken >>> >>>>>> >>> >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> >>> >> wrote: >>> >>>>>>> >>> >>>>>>> Hi Piotr, >>> >>>>>>> >>> >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just >>> >> added >>> >>> an >>> >>>>>>> inner flag to identify the local mode) which is Flink has provided >>> >>>>>> before. >>> >>>>>>> Users can call all the APIs(especially *window* APIs) which >>> >>> KeyedStream >>> >>>>>>> provided. >>> >>>>>>> >>> >>>>>>> So if users want to use local aggregation, they should call the >>> >>> window >>> >>>>>> API >>> >>>>>>> to build a local window that means users should (or say "can") >>> >>> specify >>> >>>>>> the >>> >>>>>>> window length and other information based on their needs. >>> >>>>>>> >>> >>>>>>> I think you described another idea different from us. We did not >>> >> try >>> >>> to >>> >>>>>>> react after triggering some predefined threshold. We tend to give >>> >>> users >>> >>>>>> the >>> >>>>>>> discretion to make decisions. >>> >>>>>>> >>> >>>>>>> Our design idea tends to reuse Flink provided concept and >>> functions >>> >>>> like >>> >>>>>>> state and window (IMO, we do not need to worry about OOM and the >>> >>> issues >>> >>>>>> you >>> >>>>>>> mentioned). >>> >>>>>>> >>> >>>>>>> Best, >>> >>>>>>> Vino >>> >>>>>>> >>> >>>>>>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: >>> >>>>>>> >>> >>>>>>>> Hi, >>> >>>>>>>> >>> >>>>>>>> +1 for the idea from my side. I’ve even attempted to add similar >>> >>>> feature >>> >>>>>>>> quite some time ago, but didn’t get enough traction [1]. >>> >>>>>>>> >>> >>>>>>>> I’ve read through your document and I couldn’t find it mentioning >>> >>>>>>>> anywhere, when the pre aggregated result should be emitted down >>> >> the >>> >>>>>> stream? >>> >>>>>>>> I think that’s one of the most crucial decision, since wrong >>> >>> decision >>> >>>>>> here >>> >>>>>>>> can lead to decrease of performance or to an explosion of >>> >>> memory/state >>> >>>>>>>> consumption (both with bounded and unbounded data streams). For >>> >>>>>> streaming >>> >>>>>>>> it can also lead to an increased latency. >>> >>>>>>>> >>> >>>>>>>> Since this is also a decision that’s impossible to make >>> >>> automatically >>> >>>>>>>> perfectly reliably, first and foremost I would expect this to be >>> >>>>>>>> configurable via the API. With maybe some predefined triggers, >>> >> like >>> >>> on >>> >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to >>> >>>> decrease >>> >>>>>>>> state size?), on element count, maybe memory usage (much easier >>> to >>> >>>>>> estimate >>> >>>>>>>> with a known/predefined types, like in SQL)… and with some option >>> >> to >>> >>>>>>>> implement custom trigger. >>> >>>>>>>> >>> >>>>>>>> Also what would work the best would be to have a some form of >>> >> memory >>> >>>>>>>> consumption priority. For example if we are running out of memory >>> >>> for >>> >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or >>> >> crashing >>> >>>> the >>> >>>>>> job >>> >>>>>>>> with OOM it would be probably better to prune/dump the pre/local >>> >>>>>>>> aggregation state. But that’s another story. >>> >>>>>>>> >>> >>>>>>>> [1] https://github.com/apache/flink/pull/4626 < >>> >>>>>>>> https://github.com/apache/flink/pull/4626> >>> >>>>>>>> >>> >>>>>>>> Piotrek >>> >>>>>>>> >>> >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: >>> >>>>>>>>> >>> >>>>>>>>> Excited and Big +1 for this feature. >>> >>>>>>>>> >>> >>>>>>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: >>> >>>>>>>>> >>> >>>>>>>>>> Nice feature. >>> >>>>>>>>>> Looking forward to having it in Flink. >>> >>>>>>>>>> >>> >>>>>>>>>> Regards, >>> >>>>>>>>>> Xiaogang >>> >>>>>>>>>> >>> >>>>>>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: >>> >>>>>>>>>> >>> >>>>>>>>>>> Hi all, >>> >>>>>>>>>>> >>> >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward SF >>> >> 2019 >>> >>>> and >>> >>>>>>>>>> QCon >>> >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in >>> >> our >>> >>>>>> inner >>> >>>>>>>>>>> Flink fork. This feature can effectively alleviate data skew. >>> >>>>>>>>>>> >>> >>>>>>>>>>> Currently, keyed streams are widely used to perform >>> aggregating >>> >>>>>>>>>> operations >>> >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having the >>> >>> same >>> >>>>>>>> key. >>> >>>>>>>>>>> When executed at runtime, the elements with the same key will >>> >> be >>> >>>> sent >>> >>>>>>>> to >>> >>>>>>>>>>> and aggregated by the same task. >>> >>>>>>>>>>> >>> >>>>>>>>>>> The performance of these aggregating operations is very >>> >> sensitive >>> >>>> to >>> >>>>>>>> the >>> >>>>>>>>>>> distribution of keys. In the cases where the distribution of >>> >> keys >>> >>>>>>>>>> follows a >>> >>>>>>>>>>> powerful law, the performance will be significantly >>> downgraded. >>> >>>> More >>> >>>>>>>>>>> unluckily, increasing the degree of parallelism does not help >>> >>> when >>> >>>> a >>> >>>>>>>> task >>> >>>>>>>>>>> is overloaded by a single key. >>> >>>>>>>>>>> >>> >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the >>> >>>>>> performance >>> >>>>>>>>>>> degraded by data skew. We can decompose the aggregating >>> >>> operations >>> >>>>>> into >>> >>>>>>>>>> two >>> >>>>>>>>>>> phases. In the first phase, we aggregate the elements of the >>> >> same >>> >>>> key >>> >>>>>>>> at >>> >>>>>>>>>>> the sender side to obtain partial results. Then at the second >>> >>>> phase, >>> >>>>>>>>>> these >>> >>>>>>>>>>> partial results are sent to receivers according to their keys >>> >> and >>> >>>> are >>> >>>>>>>>>>> combined to obtain the final result. Since the number of >>> >> partial >>> >>>>>>>> results >>> >>>>>>>>>>> received by each receiver is limited by the number of senders, >>> >>> the >>> >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing >>> >>> the >>> >>>>>>>> amount >>> >>>>>>>>>>> of transferred data the performance can be further improved. >>> >>>>>>>>>>> >>> >>>>>>>>>>> The design documentation is here: >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>> >>> >>>>>> >>> >>>> >>> >>> >>> >> >>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing >>> >>>>>>>>>>> >>> >>>>>>>>>>> Any comment and feedback are welcome and appreciated. >>> >>>>>>>>>>> >>> >>>>>>>>>>> Best, >>> >>>>>>>>>>> Vino >>> >>>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>> >>> >>>>>> -------------------------- >>> >>>>>> Ken Krugler >>> >>>>>> +1 530-210-6378 >>> >>>>>> http://www.scaleunlimited.com >>> >>>>>> Custom big data solutions & training >>> >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra >>> >>>>>> >>> >>>>>> >>> >>>> >>> >>>> >>> >>> >>> >> >>> >>> |
hi
+1,nice work best forwardxu boshu Zheng <[hidden email]> 于2019年6月6日周四 下午4:30写道: > Hi, > > > +1 from my side. Looking forward to this must-have feature :) > > > Best, > boshu > > At 2019-06-05 16:33:13, "vino yang" <[hidden email]> wrote: > >Hi Aljoscha, > > > >What do you think about this feature and design document? > > > >Best, > >Vino > > > >vino yang <[hidden email]> 于2019年6月5日周三 下午4:18写道: > > > >> Hi Dian, > >> > >> I still think your implementation is similar to the window operator, you > >> mentioned the scalable trigger mechanism, the window API also can > customize > >> trigger. > >> > >> Moreover, IMO, the design should guarantee a deterministic semantics, I > >> think based on memory availability is a non-deterministic design. > >> > >> In addition, if the implementation depends on the timing of checkpoint, > I > >> do not think it is reasonable, we should avoid affecting the > checkpoint's > >> progress. > >> > >> Best, > >> Vino > >> > >> Dian Fu <[hidden email]> 于2019年6月5日周三 下午1:55写道: > >> > >>> Hi Vino, > >>> > >>> Thanks a lot for your reply. > >>> > >>> > 1) When, Why and How to judge the memory is exhausted? > >>> > >>> My point here is that the local aggregate operator can buffer the > inputs > >>> in memory and send out the results AT ANY TIME. i.e. element count or > the > >>> time interval reached a pre-configured value, the memory usage of > buffered > >>> elements reached a configured valued (suppose we can estimate the > object > >>> size efficiently), or even when checkpoint is triggered. > >>> > >>> > > >>> > 2) If the local aggregate operator rarely needs to operate the state, > >>> what > >>> > do you think about fault tolerance? > >>> > >>> AbstractStreamOperator provides a method `prepareSnapshotPreBarrier` > >>> which can be used here to send out the results to the downstream when > >>> checkpoint is triggered. Then fault tolerance can work well. > >>> > >>> Even if there is no such a method available, we can still store the > >>> buffered elements or pre-aggregate results to state when checkpoint is > >>> triggered. The state access will be much less compared with window > operator > >>> as only the elements not sent out when checkpoint occur have to be > written > >>> to state. Suppose the checkpoint interval is 3 minutes and the trigger > >>> interval is 10 seconds, then only about less than "10/180" elements > will be > >>> written to state. > >>> > >>> > >>> Thanks, > >>> Dian > >>> > >>> > >>> > 在 2019年6月5日,上午11:43,Biao Liu <[hidden email]> 写道: > >>> > > >>> > Hi Vino, > >>> > > >>> > +1 for this feature. It's useful for data skew. And it could also > reduce > >>> > shuffled datum. > >>> > > >>> > I have some concerns about the API part. From my side, this feature > >>> should > >>> > be more like an improvement. I'm afraid the proposal is an overkill > >>> about > >>> > the API part. Many other systems support pre-aggregation as an > >>> optimization > >>> > of global aggregation. The optimization might be used automatically > or > >>> > manually but with a simple API. The proposal introduces a series of > >>> > flexible local aggregation APIs. They could be independent with > global > >>> > aggregation. It doesn't look like an improvement but introduces a > lot of > >>> > features. I'm not sure if there is a bigger picture later. As for now > >>> the > >>> > API part looks a little heavy for me. > >>> > > >>> > > >>> > vino yang <[hidden email]> 于2019年6月5日周三 上午10:38写道: > >>> > > >>> >> Hi Litree, > >>> >> > >>> >> From an implementation level, the localKeyBy API returns a general > >>> >> KeyedStream, you can call all the APIs which KeyedStream provides, > we > >>> did > >>> >> not restrict its usage, although we can do this (for example > returns a > >>> new > >>> >> stream object named LocalKeyedStream). > >>> >> > >>> >> However, to achieve the goal of local aggregation, it only makes > sense > >>> to > >>> >> call the window API. > >>> >> > >>> >> Best, > >>> >> Vino > >>> >> > >>> >> litree <[hidden email]> 于2019年6月4日周二 下午10:41写道: > >>> >> > >>> >>> Hi Vino, > >>> >>> > >>> >>> > >>> >>> I have read your design,something I want to know is the usage of > these > >>> >> new > >>> >>> APIs.It looks like when I use localByKey,i must then use a window > >>> >> operator > >>> >>> to return a datastream,and then use keyby and another window > operator > >>> to > >>> >>> get the final result? > >>> >>> > >>> >>> > >>> >>> thanks, > >>> >>> Litree > >>> >>> > >>> >>> > >>> >>> On 06/04/2019 17:22, vino yang wrote: > >>> >>> Hi Dian, > >>> >>> > >>> >>> Thanks for your reply. > >>> >>> > >>> >>> I know what you mean. However, if you think deeply, you will find > your > >>> >>> implementation need to provide an operator which looks like a > window > >>> >>> operator. You need to use state and receive aggregation function > and > >>> >>> specify the trigger time. It looks like a lightweight window > operator. > >>> >>> Right? > >>> >>> > >>> >>> We try to reuse Flink provided functions and reduce complexity. > IMO, > >>> It > >>> >> is > >>> >>> more user-friendly because users are familiar with the window API. > >>> >>> > >>> >>> Best, > >>> >>> Vino > >>> >>> > >>> >>> > >>> >>> Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道: > >>> >>> > >>> >>>> Hi Vino, > >>> >>>> > >>> >>>> Thanks a lot for starting this discussion. +1 to this feature as I > >>> >> think > >>> >>>> it will be very useful. > >>> >>>> > >>> >>>> Regarding to using window to buffer the input elements, > personally I > >>> >>> don't > >>> >>>> think it's a good solution for the following reasons: > >>> >>>> 1) As we know that WindowOperator will store the accumulated > results > >>> in > >>> >>>> states, this is not necessary for Local Aggregate operator. > >>> >>>> 2) For WindowOperator, each input element will be accumulated to > >>> >> states. > >>> >>>> This is also not necessary for Local Aggregate operator and > storing > >>> the > >>> >>>> input elements in memory is enough. > >>> >>>> > >>> >>>> Thanks, > >>> >>>> Dian > >>> >>>> > >>> >>>>> 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道: > >>> >>>>> > >>> >>>>> Hi Ken, > >>> >>>>> > >>> >>>>> Thanks for your reply. > >>> >>>>> > >>> >>>>> As I said before, we try to reuse Flink's state concept (fault > >>> >>> tolerance > >>> >>>>> and guarantee "Exactly-Once" semantics). So we did not consider > >>> >> cache. > >>> >>>>> > >>> >>>>> In addition, if we use Flink's state, the OOM related issue is > not a > >>> >>> key > >>> >>>>> problem we need to consider. > >>> >>>>> > >>> >>>>> Best, > >>> >>>>> Vino > >>> >>>>> > >>> >>>>> Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道: > >>> >>>>> > >>> >>>>>> Hi all, > >>> >>>>>> > >>> >>>>>> Cascading implemented this “map-side reduce” functionality with > an > >>> >> LLR > >>> >>>>>> cache. > >>> >>>>>> > >>> >>>>>> That worked well, as then the skewed keys would always be in the > >>> >>> cache. > >>> >>>>>> > >>> >>>>>> The API let you decide the size of the cache, in terms of > number of > >>> >>>>>> entries. > >>> >>>>>> > >>> >>>>>> Having a memory limit would have been better for many of our use > >>> >>> cases, > >>> >>>>>> though FWIR there’s no good way to estimate in-memory size for > >>> >>> objects. > >>> >>>>>> > >>> >>>>>> — Ken > >>> >>>>>> > >>> >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> > >>> >> wrote: > >>> >>>>>>> > >>> >>>>>>> Hi Piotr, > >>> >>>>>>> > >>> >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just > >>> >> added > >>> >>> an > >>> >>>>>>> inner flag to identify the local mode) which is Flink has > provided > >>> >>>>>> before. > >>> >>>>>>> Users can call all the APIs(especially *window* APIs) which > >>> >>> KeyedStream > >>> >>>>>>> provided. > >>> >>>>>>> > >>> >>>>>>> So if users want to use local aggregation, they should call the > >>> >>> window > >>> >>>>>> API > >>> >>>>>>> to build a local window that means users should (or say "can") > >>> >>> specify > >>> >>>>>> the > >>> >>>>>>> window length and other information based on their needs. > >>> >>>>>>> > >>> >>>>>>> I think you described another idea different from us. We did > not > >>> >> try > >>> >>> to > >>> >>>>>>> react after triggering some predefined threshold. We tend to > give > >>> >>> users > >>> >>>>>> the > >>> >>>>>>> discretion to make decisions. > >>> >>>>>>> > >>> >>>>>>> Our design idea tends to reuse Flink provided concept and > >>> functions > >>> >>>> like > >>> >>>>>>> state and window (IMO, we do not need to worry about OOM and > the > >>> >>> issues > >>> >>>>>> you > >>> >>>>>>> mentioned). > >>> >>>>>>> > >>> >>>>>>> Best, > >>> >>>>>>> Vino > >>> >>>>>>> > >>> >>>>>>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道: > >>> >>>>>>> > >>> >>>>>>>> Hi, > >>> >>>>>>>> > >>> >>>>>>>> +1 for the idea from my side. I’ve even attempted to add > similar > >>> >>>> feature > >>> >>>>>>>> quite some time ago, but didn’t get enough traction [1]. > >>> >>>>>>>> > >>> >>>>>>>> I’ve read through your document and I couldn’t find it > mentioning > >>> >>>>>>>> anywhere, when the pre aggregated result should be emitted > down > >>> >> the > >>> >>>>>> stream? > >>> >>>>>>>> I think that’s one of the most crucial decision, since wrong > >>> >>> decision > >>> >>>>>> here > >>> >>>>>>>> can lead to decrease of performance or to an explosion of > >>> >>> memory/state > >>> >>>>>>>> consumption (both with bounded and unbounded data streams). > For > >>> >>>>>> streaming > >>> >>>>>>>> it can also lead to an increased latency. > >>> >>>>>>>> > >>> >>>>>>>> Since this is also a decision that’s impossible to make > >>> >>> automatically > >>> >>>>>>>> perfectly reliably, first and foremost I would expect this to > be > >>> >>>>>>>> configurable via the API. With maybe some predefined triggers, > >>> >> like > >>> >>> on > >>> >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to > >>> >>>> decrease > >>> >>>>>>>> state size?), on element count, maybe memory usage (much > easier > >>> to > >>> >>>>>> estimate > >>> >>>>>>>> with a known/predefined types, like in SQL)… and with some > option > >>> >> to > >>> >>>>>>>> implement custom trigger. > >>> >>>>>>>> > >>> >>>>>>>> Also what would work the best would be to have a some form of > >>> >> memory > >>> >>>>>>>> consumption priority. For example if we are running out of > memory > >>> >>> for > >>> >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or > >>> >> crashing > >>> >>>> the > >>> >>>>>> job > >>> >>>>>>>> with OOM it would be probably better to prune/dump the > pre/local > >>> >>>>>>>> aggregation state. But that’s another story. > >>> >>>>>>>> > >>> >>>>>>>> [1] https://github.com/apache/flink/pull/4626 < > >>> >>>>>>>> https://github.com/apache/flink/pull/4626> > >>> >>>>>>>> > >>> >>>>>>>> Piotrek > >>> >>>>>>>> > >>> >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote: > >>> >>>>>>>>> > >>> >>>>>>>>> Excited and Big +1 for this feature. > >>> >>>>>>>>> > >>> >>>>>>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道: > >>> >>>>>>>>> > >>> >>>>>>>>>> Nice feature. > >>> >>>>>>>>>> Looking forward to having it in Flink. > >>> >>>>>>>>>> > >>> >>>>>>>>>> Regards, > >>> >>>>>>>>>> Xiaogang > >>> >>>>>>>>>> > >>> >>>>>>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道: > >>> >>>>>>>>>> > >>> >>>>>>>>>>> Hi all, > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward > SF > >>> >> 2019 > >>> >>>> and > >>> >>>>>>>>>> QCon > >>> >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" > in > >>> >> our > >>> >>>>>> inner > >>> >>>>>>>>>>> Flink fork. This feature can effectively alleviate data > skew. > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> Currently, keyed streams are widely used to perform > >>> aggregating > >>> >>>>>>>>>> operations > >>> >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having > the > >>> >>> same > >>> >>>>>>>> key. > >>> >>>>>>>>>>> When executed at runtime, the elements with the same key > will > >>> >> be > >>> >>>> sent > >>> >>>>>>>> to > >>> >>>>>>>>>>> and aggregated by the same task. > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> The performance of these aggregating operations is very > >>> >> sensitive > >>> >>>> to > >>> >>>>>>>> the > >>> >>>>>>>>>>> distribution of keys. In the cases where the distribution > of > >>> >> keys > >>> >>>>>>>>>> follows a > >>> >>>>>>>>>>> powerful law, the performance will be significantly > >>> downgraded. > >>> >>>> More > >>> >>>>>>>>>>> unluckily, increasing the degree of parallelism does not > help > >>> >>> when > >>> >>>> a > >>> >>>>>>>> task > >>> >>>>>>>>>>> is overloaded by a single key. > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the > >>> >>>>>> performance > >>> >>>>>>>>>>> degraded by data skew. We can decompose the aggregating > >>> >>> operations > >>> >>>>>> into > >>> >>>>>>>>>> two > >>> >>>>>>>>>>> phases. In the first phase, we aggregate the elements of > the > >>> >> same > >>> >>>> key > >>> >>>>>>>> at > >>> >>>>>>>>>>> the sender side to obtain partial results. Then at the > second > >>> >>>> phase, > >>> >>>>>>>>>> these > >>> >>>>>>>>>>> partial results are sent to receivers according to their > keys > >>> >> and > >>> >>>> are > >>> >>>>>>>>>>> combined to obtain the final result. Since the number of > >>> >> partial > >>> >>>>>>>> results > >>> >>>>>>>>>>> received by each receiver is limited by the number of > senders, > >>> >>> the > >>> >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by > reducing > >>> >>> the > >>> >>>>>>>> amount > >>> >>>>>>>>>>> of transferred data the performance can be further > improved. > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> The design documentation is here: > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> > >>> >>>>>>>>>> > >>> >>>>>>>> > >>> >>>>>> > >>> >>>> > >>> >>> > >>> >> > >>> > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> Any comment and feedback are welcome and appreciated. > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> Best, > >>> >>>>>>>>>>> Vino > >>> >>>>>>>>>>> > >>> >>>>>>>>>> > >>> >>>>>>>> > >>> >>>>>>>> > >>> >>>>>> > >>> >>>>>> -------------------------- > >>> >>>>>> Ken Krugler > >>> >>>>>> +1 530-210-6378 > >>> >>>>>> http://www.scaleunlimited.com > >>> >>>>>> Custom big data solutions & training > >>> >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra > >>> >>>>>> > >>> >>>>>> > >>> >>>> > >>> >>>> > >>> >>> > >>> >> > >>> > >>> > |
Free forum by Nabble | Edit this page |