[DISCUSS] Support Local Aggregation in Flink

classic Classic list List threaded Threaded
26 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Support Local Aggregation in Flink

vino yang
Hi all,

As we mentioned in some conference, such as Flink Forward SF 2019 and QCon
Beijing 2019, our team has implemented "Local aggregation" in our inner
Flink fork. This feature can effectively alleviate data skew.

Currently, keyed streams are widely used to perform aggregating operations
(e.g., reduce, sum and window) on the elements that having the same key.
When executed at runtime, the elements with the same key will be sent to
and aggregated by the same task.

The performance of these aggregating operations is very sensitive to the
distribution of keys. In the cases where the distribution of keys follows a
powerful law, the performance will be significantly downgraded. More
unluckily, increasing the degree of parallelism does not help when a task
is overloaded by a single key.

Local aggregation is a widely-adopted method to reduce the performance
degraded by data skew. We can decompose the aggregating operations into two
phases. In the first phase, we aggregate the elements of the same key at
the sender side to obtain partial results. Then at the second phase, these
partial results are sent to receivers according to their keys and are
combined to obtain the final result. Since the number of partial results
received by each receiver is limited by the number of senders, the
imbalance among receivers can be reduced. Besides, by reducing the amount
of transferred data the performance can be further improved.

The design documentation is here:
https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing

Any comment and feedback are welcome and appreciated.

Best,
Vino
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

SHI Xiaogang
Nice feature.
Looking forward to having it in Flink.

Regards,
Xiaogang

vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:

> Hi all,
>
> As we mentioned in some conference, such as Flink Forward SF 2019 and QCon
> Beijing 2019, our team has implemented "Local aggregation" in our inner
> Flink fork. This feature can effectively alleviate data skew.
>
> Currently, keyed streams are widely used to perform aggregating operations
> (e.g., reduce, sum and window) on the elements that having the same key.
> When executed at runtime, the elements with the same key will be sent to
> and aggregated by the same task.
>
> The performance of these aggregating operations is very sensitive to the
> distribution of keys. In the cases where the distribution of keys follows a
> powerful law, the performance will be significantly downgraded. More
> unluckily, increasing the degree of parallelism does not help when a task
> is overloaded by a single key.
>
> Local aggregation is a widely-adopted method to reduce the performance
> degraded by data skew. We can decompose the aggregating operations into two
> phases. In the first phase, we aggregate the elements of the same key at
> the sender side to obtain partial results. Then at the second phase, these
> partial results are sent to receivers according to their keys and are
> combined to obtain the final result. Since the number of partial results
> received by each receiver is limited by the number of senders, the
> imbalance among receivers can be reduced. Besides, by reducing the amount
> of transferred data the performance can be further improved.
>
> The design documentation is here:
>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>
> Any comment and feedback are welcome and appreciated.
>
> Best,
> Vino
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

leesf
Excited and  Big +1 for this feature.

SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:

> Nice feature.
> Looking forward to having it in Flink.
>
> Regards,
> Xiaogang
>
> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
>
> > Hi all,
> >
> > As we mentioned in some conference, such as Flink Forward SF 2019 and
> QCon
> > Beijing 2019, our team has implemented "Local aggregation" in our inner
> > Flink fork. This feature can effectively alleviate data skew.
> >
> > Currently, keyed streams are widely used to perform aggregating
> operations
> > (e.g., reduce, sum and window) on the elements that having the same key.
> > When executed at runtime, the elements with the same key will be sent to
> > and aggregated by the same task.
> >
> > The performance of these aggregating operations is very sensitive to the
> > distribution of keys. In the cases where the distribution of keys
> follows a
> > powerful law, the performance will be significantly downgraded. More
> > unluckily, increasing the degree of parallelism does not help when a task
> > is overloaded by a single key.
> >
> > Local aggregation is a widely-adopted method to reduce the performance
> > degraded by data skew. We can decompose the aggregating operations into
> two
> > phases. In the first phase, we aggregate the elements of the same key at
> > the sender side to obtain partial results. Then at the second phase,
> these
> > partial results are sent to receivers according to their keys and are
> > combined to obtain the final result. Since the number of partial results
> > received by each receiver is limited by the number of senders, the
> > imbalance among receivers can be reduced. Besides, by reducing the amount
> > of transferred data the performance can be further improved.
> >
> > The design documentation is here:
> >
> >
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> >
> > Any comment and feedback are welcome and appreciated.
> >
> > Best,
> > Vino
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

Piotr Nowojski-3
Hi,

+1 for the idea from my side. I’ve even attempted to add similar feature quite some time ago, but didn’t get enough traction [1].

I’ve read through your document and I couldn’t find it mentioning anywhere, when the pre aggregated result should be emitted down the stream? I think that’s one of the most crucial decision, since wrong decision here can lead to decrease of performance or to an explosion of memory/state consumption (both with bounded and unbounded data streams). For streaming it can also lead to an increased latency.

Since this is also a decision that’s impossible to make automatically perfectly reliably, first and foremost I would expect this to be configurable via the API. With maybe some predefined triggers, like on watermark (for windowed operations), on checkpoint barrier (to decrease state size?), on element count, maybe memory usage (much easier to estimate with a known/predefined types, like in SQL)… and with some option to implement custom trigger.

Also what would work the best would be to have a some form of memory consumption priority. For example if we are running out of memory for HashJoin/Final aggregation, instead of spilling to disk or crashing the job with OOM it would be probably better to prune/dump the pre/local aggregation state. But that’s another story.

[1] https://github.com/apache/flink/pull/4626 <https://github.com/apache/flink/pull/4626>

Piotrek

> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
>
> Excited and  Big +1 for this feature.
>
> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
>
>> Nice feature.
>> Looking forward to having it in Flink.
>>
>> Regards,
>> Xiaogang
>>
>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
>>
>>> Hi all,
>>>
>>> As we mentioned in some conference, such as Flink Forward SF 2019 and
>> QCon
>>> Beijing 2019, our team has implemented "Local aggregation" in our inner
>>> Flink fork. This feature can effectively alleviate data skew.
>>>
>>> Currently, keyed streams are widely used to perform aggregating
>> operations
>>> (e.g., reduce, sum and window) on the elements that having the same key.
>>> When executed at runtime, the elements with the same key will be sent to
>>> and aggregated by the same task.
>>>
>>> The performance of these aggregating operations is very sensitive to the
>>> distribution of keys. In the cases where the distribution of keys
>> follows a
>>> powerful law, the performance will be significantly downgraded. More
>>> unluckily, increasing the degree of parallelism does not help when a task
>>> is overloaded by a single key.
>>>
>>> Local aggregation is a widely-adopted method to reduce the performance
>>> degraded by data skew. We can decompose the aggregating operations into
>> two
>>> phases. In the first phase, we aggregate the elements of the same key at
>>> the sender side to obtain partial results. Then at the second phase,
>> these
>>> partial results are sent to receivers according to their keys and are
>>> combined to obtain the final result. Since the number of partial results
>>> received by each receiver is limited by the number of senders, the
>>> imbalance among receivers can be reduced. Besides, by reducing the amount
>>> of transferred data the performance can be further improved.
>>>
>>> The design documentation is here:
>>>
>>>
>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>>>
>>> Any comment and feedback are welcome and appreciated.
>>>
>>> Best,
>>> Vino
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

vino yang
Hi Piotr,

The localKeyBy API returns an instance of KeyedStream (we just added an
inner flag to identify the local mode) which is Flink has provided before.
Users can call all the APIs(especially *window* APIs) which KeyedStream
provided.

So if users want to use local aggregation, they should call the window API
to build a local window that means users should (or say "can") specify the
window length and other information based on their needs.

I think you described another idea different from us. We did not try to
react after triggering some predefined threshold. We tend to give users the
discretion to make decisions.

Our design idea tends to reuse Flink provided concept and functions like
state and window (IMO, we do not need to worry about OOM and the issues you
mentioned).

Best,
Vino

Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:

> Hi,
>
> +1 for the idea from my side. I’ve even attempted to add similar feature
> quite some time ago, but didn’t get enough traction [1].
>
> I’ve read through your document and I couldn’t find it mentioning
> anywhere, when the pre aggregated result should be emitted down the stream?
> I think that’s one of the most crucial decision, since wrong decision here
> can lead to decrease of performance or to an explosion of memory/state
> consumption (both with bounded and unbounded data streams). For streaming
> it can also lead to an increased latency.
>
> Since this is also a decision that’s impossible to make automatically
> perfectly reliably, first and foremost I would expect this to be
> configurable via the API. With maybe some predefined triggers, like on
> watermark (for windowed operations), on checkpoint barrier (to decrease
> state size?), on element count, maybe memory usage (much easier to estimate
> with a known/predefined types, like in SQL)… and with some option to
> implement custom trigger.
>
> Also what would work the best would be to have a some form of memory
> consumption priority. For example if we are running out of memory for
> HashJoin/Final aggregation, instead of spilling to disk or crashing the job
> with OOM it would be probably better to prune/dump the pre/local
> aggregation state. But that’s another story.
>
> [1] https://github.com/apache/flink/pull/4626 <
> https://github.com/apache/flink/pull/4626>
>
> Piotrek
>
> > On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
> >
> > Excited and  Big +1 for this feature.
> >
> > SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
> >
> >> Nice feature.
> >> Looking forward to having it in Flink.
> >>
> >> Regards,
> >> Xiaogang
> >>
> >> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
> >>
> >>> Hi all,
> >>>
> >>> As we mentioned in some conference, such as Flink Forward SF 2019 and
> >> QCon
> >>> Beijing 2019, our team has implemented "Local aggregation" in our inner
> >>> Flink fork. This feature can effectively alleviate data skew.
> >>>
> >>> Currently, keyed streams are widely used to perform aggregating
> >> operations
> >>> (e.g., reduce, sum and window) on the elements that having the same
> key.
> >>> When executed at runtime, the elements with the same key will be sent
> to
> >>> and aggregated by the same task.
> >>>
> >>> The performance of these aggregating operations is very sensitive to
> the
> >>> distribution of keys. In the cases where the distribution of keys
> >> follows a
> >>> powerful law, the performance will be significantly downgraded. More
> >>> unluckily, increasing the degree of parallelism does not help when a
> task
> >>> is overloaded by a single key.
> >>>
> >>> Local aggregation is a widely-adopted method to reduce the performance
> >>> degraded by data skew. We can decompose the aggregating operations into
> >> two
> >>> phases. In the first phase, we aggregate the elements of the same key
> at
> >>> the sender side to obtain partial results. Then at the second phase,
> >> these
> >>> partial results are sent to receivers according to their keys and are
> >>> combined to obtain the final result. Since the number of partial
> results
> >>> received by each receiver is limited by the number of senders, the
> >>> imbalance among receivers can be reduced. Besides, by reducing the
> amount
> >>> of transferred data the performance can be further improved.
> >>>
> >>> The design documentation is here:
> >>>
> >>>
> >>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> >>>
> >>> Any comment and feedback are welcome and appreciated.
> >>>
> >>> Best,
> >>> Vino
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

Ken Krugler
Hi all,

Cascading implemented this “map-side reduce” functionality with an LLR cache.

That worked well, as then the skewed keys would always be in the cache.

The API let you decide the size of the cache, in terms of number of entries.

Having a memory limit would have been better for many of our use cases, though FWIR there’s no good way to estimate in-memory size for objects.

— Ken

> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote:
>
> Hi Piotr,
>
> The localKeyBy API returns an instance of KeyedStream (we just added an
> inner flag to identify the local mode) which is Flink has provided before.
> Users can call all the APIs(especially *window* APIs) which KeyedStream
> provided.
>
> So if users want to use local aggregation, they should call the window API
> to build a local window that means users should (or say "can") specify the
> window length and other information based on their needs.
>
> I think you described another idea different from us. We did not try to
> react after triggering some predefined threshold. We tend to give users the
> discretion to make decisions.
>
> Our design idea tends to reuse Flink provided concept and functions like
> state and window (IMO, we do not need to worry about OOM and the issues you
> mentioned).
>
> Best,
> Vino
>
> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:
>
>> Hi,
>>
>> +1 for the idea from my side. I’ve even attempted to add similar feature
>> quite some time ago, but didn’t get enough traction [1].
>>
>> I’ve read through your document and I couldn’t find it mentioning
>> anywhere, when the pre aggregated result should be emitted down the stream?
>> I think that’s one of the most crucial decision, since wrong decision here
>> can lead to decrease of performance or to an explosion of memory/state
>> consumption (both with bounded and unbounded data streams). For streaming
>> it can also lead to an increased latency.
>>
>> Since this is also a decision that’s impossible to make automatically
>> perfectly reliably, first and foremost I would expect this to be
>> configurable via the API. With maybe some predefined triggers, like on
>> watermark (for windowed operations), on checkpoint barrier (to decrease
>> state size?), on element count, maybe memory usage (much easier to estimate
>> with a known/predefined types, like in SQL)… and with some option to
>> implement custom trigger.
>>
>> Also what would work the best would be to have a some form of memory
>> consumption priority. For example if we are running out of memory for
>> HashJoin/Final aggregation, instead of spilling to disk or crashing the job
>> with OOM it would be probably better to prune/dump the pre/local
>> aggregation state. But that’s another story.
>>
>> [1] https://github.com/apache/flink/pull/4626 <
>> https://github.com/apache/flink/pull/4626>
>>
>> Piotrek
>>
>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
>>>
>>> Excited and  Big +1 for this feature.
>>>
>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
>>>
>>>> Nice feature.
>>>> Looking forward to having it in Flink.
>>>>
>>>> Regards,
>>>> Xiaogang
>>>>
>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
>>>>
>>>>> Hi all,
>>>>>
>>>>> As we mentioned in some conference, such as Flink Forward SF 2019 and
>>>> QCon
>>>>> Beijing 2019, our team has implemented "Local aggregation" in our inner
>>>>> Flink fork. This feature can effectively alleviate data skew.
>>>>>
>>>>> Currently, keyed streams are widely used to perform aggregating
>>>> operations
>>>>> (e.g., reduce, sum and window) on the elements that having the same
>> key.
>>>>> When executed at runtime, the elements with the same key will be sent
>> to
>>>>> and aggregated by the same task.
>>>>>
>>>>> The performance of these aggregating operations is very sensitive to
>> the
>>>>> distribution of keys. In the cases where the distribution of keys
>>>> follows a
>>>>> powerful law, the performance will be significantly downgraded. More
>>>>> unluckily, increasing the degree of parallelism does not help when a
>> task
>>>>> is overloaded by a single key.
>>>>>
>>>>> Local aggregation is a widely-adopted method to reduce the performance
>>>>> degraded by data skew. We can decompose the aggregating operations into
>>>> two
>>>>> phases. In the first phase, we aggregate the elements of the same key
>> at
>>>>> the sender side to obtain partial results. Then at the second phase,
>>>> these
>>>>> partial results are sent to receivers according to their keys and are
>>>>> combined to obtain the final result. Since the number of partial
>> results
>>>>> received by each receiver is limited by the number of senders, the
>>>>> imbalance among receivers can be reduced. Besides, by reducing the
>> amount
>>>>> of transferred data the performance can be further improved.
>>>>>
>>>>> The design documentation is here:
>>>>>
>>>>>
>>>>
>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>>>>>
>>>>> Any comment and feedback are welcome and appreciated.
>>>>>
>>>>> Best,
>>>>> Vino
>>>>>
>>>>
>>
>>

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

vino yang
Hi Ken,

Thanks for your reply.

As I said before, we try to reuse Flink's state concept (fault tolerance
and guarantee "Exactly-Once" semantics). So we did not consider cache.

In addition, if we use Flink's state, the OOM related issue is not a key
problem we need to consider.

Best,
Vino

Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道:

> Hi all,
>
> Cascading implemented this “map-side reduce” functionality with an LLR
> cache.
>
> That worked well, as then the skewed keys would always be in the cache.
>
> The API let you decide the size of the cache, in terms of number of
> entries.
>
> Having a memory limit would have been better for many of our use cases,
> though FWIR there’s no good way to estimate in-memory size for objects.
>
> — Ken
>
> > On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote:
> >
> > Hi Piotr,
> >
> > The localKeyBy API returns an instance of KeyedStream (we just added an
> > inner flag to identify the local mode) which is Flink has provided
> before.
> > Users can call all the APIs(especially *window* APIs) which KeyedStream
> > provided.
> >
> > So if users want to use local aggregation, they should call the window
> API
> > to build a local window that means users should (or say "can") specify
> the
> > window length and other information based on their needs.
> >
> > I think you described another idea different from us. We did not try to
> > react after triggering some predefined threshold. We tend to give users
> the
> > discretion to make decisions.
> >
> > Our design idea tends to reuse Flink provided concept and functions like
> > state and window (IMO, we do not need to worry about OOM and the issues
> you
> > mentioned).
> >
> > Best,
> > Vino
> >
> > Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:
> >
> >> Hi,
> >>
> >> +1 for the idea from my side. I’ve even attempted to add similar feature
> >> quite some time ago, but didn’t get enough traction [1].
> >>
> >> I’ve read through your document and I couldn’t find it mentioning
> >> anywhere, when the pre aggregated result should be emitted down the
> stream?
> >> I think that’s one of the most crucial decision, since wrong decision
> here
> >> can lead to decrease of performance or to an explosion of memory/state
> >> consumption (both with bounded and unbounded data streams). For
> streaming
> >> it can also lead to an increased latency.
> >>
> >> Since this is also a decision that’s impossible to make automatically
> >> perfectly reliably, first and foremost I would expect this to be
> >> configurable via the API. With maybe some predefined triggers, like on
> >> watermark (for windowed operations), on checkpoint barrier (to decrease
> >> state size?), on element count, maybe memory usage (much easier to
> estimate
> >> with a known/predefined types, like in SQL)… and with some option to
> >> implement custom trigger.
> >>
> >> Also what would work the best would be to have a some form of memory
> >> consumption priority. For example if we are running out of memory for
> >> HashJoin/Final aggregation, instead of spilling to disk or crashing the
> job
> >> with OOM it would be probably better to prune/dump the pre/local
> >> aggregation state. But that’s another story.
> >>
> >> [1] https://github.com/apache/flink/pull/4626 <
> >> https://github.com/apache/flink/pull/4626>
> >>
> >> Piotrek
> >>
> >>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
> >>>
> >>> Excited and  Big +1 for this feature.
> >>>
> >>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
> >>>
> >>>> Nice feature.
> >>>> Looking forward to having it in Flink.
> >>>>
> >>>> Regards,
> >>>> Xiaogang
> >>>>
> >>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
> >>>>
> >>>>> Hi all,
> >>>>>
> >>>>> As we mentioned in some conference, such as Flink Forward SF 2019 and
> >>>> QCon
> >>>>> Beijing 2019, our team has implemented "Local aggregation" in our
> inner
> >>>>> Flink fork. This feature can effectively alleviate data skew.
> >>>>>
> >>>>> Currently, keyed streams are widely used to perform aggregating
> >>>> operations
> >>>>> (e.g., reduce, sum and window) on the elements that having the same
> >> key.
> >>>>> When executed at runtime, the elements with the same key will be sent
> >> to
> >>>>> and aggregated by the same task.
> >>>>>
> >>>>> The performance of these aggregating operations is very sensitive to
> >> the
> >>>>> distribution of keys. In the cases where the distribution of keys
> >>>> follows a
> >>>>> powerful law, the performance will be significantly downgraded. More
> >>>>> unluckily, increasing the degree of parallelism does not help when a
> >> task
> >>>>> is overloaded by a single key.
> >>>>>
> >>>>> Local aggregation is a widely-adopted method to reduce the
> performance
> >>>>> degraded by data skew. We can decompose the aggregating operations
> into
> >>>> two
> >>>>> phases. In the first phase, we aggregate the elements of the same key
> >> at
> >>>>> the sender side to obtain partial results. Then at the second phase,
> >>>> these
> >>>>> partial results are sent to receivers according to their keys and are
> >>>>> combined to obtain the final result. Since the number of partial
> >> results
> >>>>> received by each receiver is limited by the number of senders, the
> >>>>> imbalance among receivers can be reduced. Besides, by reducing the
> >> amount
> >>>>> of transferred data the performance can be further improved.
> >>>>>
> >>>>> The design documentation is here:
> >>>>>
> >>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> >>>>>
> >>>>> Any comment and feedback are welcome and appreciated.
> >>>>>
> >>>>> Best,
> >>>>> Vino
> >>>>>
> >>>>
> >>
> >>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

Dian Fu-2
Hi Vino,

Thanks a lot for starting this discussion. +1 to this feature as I think it will be very useful.

Regarding to using window to buffer the input elements, personally I don't think it's a good solution for the following reasons:
1) As we know that WindowOperator will store the accumulated results in states, this is not necessary for Local Aggregate operator.
2) For WindowOperator, each input element will be accumulated to states. This is also not necessary for Local Aggregate operator and storing the input elements in memory is enough.

Thanks,
Dian

> 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道:
>
> Hi Ken,
>
> Thanks for your reply.
>
> As I said before, we try to reuse Flink's state concept (fault tolerance
> and guarantee "Exactly-Once" semantics). So we did not consider cache.
>
> In addition, if we use Flink's state, the OOM related issue is not a key
> problem we need to consider.
>
> Best,
> Vino
>
> Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道:
>
>> Hi all,
>>
>> Cascading implemented this “map-side reduce” functionality with an LLR
>> cache.
>>
>> That worked well, as then the skewed keys would always be in the cache.
>>
>> The API let you decide the size of the cache, in terms of number of
>> entries.
>>
>> Having a memory limit would have been better for many of our use cases,
>> though FWIR there’s no good way to estimate in-memory size for objects.
>>
>> — Ken
>>
>>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote:
>>>
>>> Hi Piotr,
>>>
>>> The localKeyBy API returns an instance of KeyedStream (we just added an
>>> inner flag to identify the local mode) which is Flink has provided
>> before.
>>> Users can call all the APIs(especially *window* APIs) which KeyedStream
>>> provided.
>>>
>>> So if users want to use local aggregation, they should call the window
>> API
>>> to build a local window that means users should (or say "can") specify
>> the
>>> window length and other information based on their needs.
>>>
>>> I think you described another idea different from us. We did not try to
>>> react after triggering some predefined threshold. We tend to give users
>> the
>>> discretion to make decisions.
>>>
>>> Our design idea tends to reuse Flink provided concept and functions like
>>> state and window (IMO, we do not need to worry about OOM and the issues
>> you
>>> mentioned).
>>>
>>> Best,
>>> Vino
>>>
>>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:
>>>
>>>> Hi,
>>>>
>>>> +1 for the idea from my side. I’ve even attempted to add similar feature
>>>> quite some time ago, but didn’t get enough traction [1].
>>>>
>>>> I’ve read through your document and I couldn’t find it mentioning
>>>> anywhere, when the pre aggregated result should be emitted down the
>> stream?
>>>> I think that’s one of the most crucial decision, since wrong decision
>> here
>>>> can lead to decrease of performance or to an explosion of memory/state
>>>> consumption (both with bounded and unbounded data streams). For
>> streaming
>>>> it can also lead to an increased latency.
>>>>
>>>> Since this is also a decision that’s impossible to make automatically
>>>> perfectly reliably, first and foremost I would expect this to be
>>>> configurable via the API. With maybe some predefined triggers, like on
>>>> watermark (for windowed operations), on checkpoint barrier (to decrease
>>>> state size?), on element count, maybe memory usage (much easier to
>> estimate
>>>> with a known/predefined types, like in SQL)… and with some option to
>>>> implement custom trigger.
>>>>
>>>> Also what would work the best would be to have a some form of memory
>>>> consumption priority. For example if we are running out of memory for
>>>> HashJoin/Final aggregation, instead of spilling to disk or crashing the
>> job
>>>> with OOM it would be probably better to prune/dump the pre/local
>>>> aggregation state. But that’s another story.
>>>>
>>>> [1] https://github.com/apache/flink/pull/4626 <
>>>> https://github.com/apache/flink/pull/4626>
>>>>
>>>> Piotrek
>>>>
>>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
>>>>>
>>>>> Excited and  Big +1 for this feature.
>>>>>
>>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
>>>>>
>>>>>> Nice feature.
>>>>>> Looking forward to having it in Flink.
>>>>>>
>>>>>> Regards,
>>>>>> Xiaogang
>>>>>>
>>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019 and
>>>>>> QCon
>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our
>> inner
>>>>>>> Flink fork. This feature can effectively alleviate data skew.
>>>>>>>
>>>>>>> Currently, keyed streams are widely used to perform aggregating
>>>>>> operations
>>>>>>> (e.g., reduce, sum and window) on the elements that having the same
>>>> key.
>>>>>>> When executed at runtime, the elements with the same key will be sent
>>>> to
>>>>>>> and aggregated by the same task.
>>>>>>>
>>>>>>> The performance of these aggregating operations is very sensitive to
>>>> the
>>>>>>> distribution of keys. In the cases where the distribution of keys
>>>>>> follows a
>>>>>>> powerful law, the performance will be significantly downgraded. More
>>>>>>> unluckily, increasing the degree of parallelism does not help when a
>>>> task
>>>>>>> is overloaded by a single key.
>>>>>>>
>>>>>>> Local aggregation is a widely-adopted method to reduce the
>> performance
>>>>>>> degraded by data skew. We can decompose the aggregating operations
>> into
>>>>>> two
>>>>>>> phases. In the first phase, we aggregate the elements of the same key
>>>> at
>>>>>>> the sender side to obtain partial results. Then at the second phase,
>>>>>> these
>>>>>>> partial results are sent to receivers according to their keys and are
>>>>>>> combined to obtain the final result. Since the number of partial
>>>> results
>>>>>>> received by each receiver is limited by the number of senders, the
>>>>>>> imbalance among receivers can be reduced. Besides, by reducing the
>>>> amount
>>>>>>> of transferred data the performance can be further improved.
>>>>>>>
>>>>>>> The design documentation is here:
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>>>>>>>
>>>>>>> Any comment and feedback are welcome and appreciated.
>>>>>>>
>>>>>>> Best,
>>>>>>> Vino
>>>>>>>
>>>>>>
>>>>
>>>>
>>
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

Piotr Nowojski-3
Hi Vino,

> So if users want to use local aggregation, they should call the window API
> to build a local window that means users should (or say "can") specify the
> window length and other information based on their needs.

It sounds ok for me. It would have to be run against some API guys from the community though.

Piotrek

> On 4 Jun 2019, at 10:19, Dian Fu <[hidden email]> wrote:
>
> Hi Vino,
>
> Thanks a lot for starting this discussion. +1 to this feature as I think it will be very useful.
>
> Regarding to using window to buffer the input elements, personally I don't think it's a good solution for the following reasons:
> 1) As we know that WindowOperator will store the accumulated results in states, this is not necessary for Local Aggregate operator.
> 2) For WindowOperator, each input element will be accumulated to states. This is also not necessary for Local Aggregate operator and storing the input elements in memory is enough.
>
> Thanks,
> Dian
>
>> 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道:
>>
>> Hi Ken,
>>
>> Thanks for your reply.
>>
>> As I said before, we try to reuse Flink's state concept (fault tolerance
>> and guarantee "Exactly-Once" semantics). So we did not consider cache.
>>
>> In addition, if we use Flink's state, the OOM related issue is not a key
>> problem we need to consider.
>>
>> Best,
>> Vino
>>
>> Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道:
>>
>>> Hi all,
>>>
>>> Cascading implemented this “map-side reduce” functionality with an LLR
>>> cache.
>>>
>>> That worked well, as then the skewed keys would always be in the cache.
>>>
>>> The API let you decide the size of the cache, in terms of number of
>>> entries.
>>>
>>> Having a memory limit would have been better for many of our use cases,
>>> though FWIR there’s no good way to estimate in-memory size for objects.
>>>
>>> — Ken
>>>
>>>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote:
>>>>
>>>> Hi Piotr,
>>>>
>>>> The localKeyBy API returns an instance of KeyedStream (we just added an
>>>> inner flag to identify the local mode) which is Flink has provided
>>> before.
>>>> Users can call all the APIs(especially *window* APIs) which KeyedStream
>>>> provided.
>>>>
>>>> So if users want to use local aggregation, they should call the window
>>> API
>>>> to build a local window that means users should (or say "can") specify
>>> the
>>>> window length and other information based on their needs.
>>>>
>>>> I think you described another idea different from us. We did not try to
>>>> react after triggering some predefined threshold. We tend to give users
>>> the
>>>> discretion to make decisions.
>>>>
>>>> Our design idea tends to reuse Flink provided concept and functions like
>>>> state and window (IMO, we do not need to worry about OOM and the issues
>>> you
>>>> mentioned).
>>>>
>>>> Best,
>>>> Vino
>>>>
>>>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> +1 for the idea from my side. I’ve even attempted to add similar feature
>>>>> quite some time ago, but didn’t get enough traction [1].
>>>>>
>>>>> I’ve read through your document and I couldn’t find it mentioning
>>>>> anywhere, when the pre aggregated result should be emitted down the
>>> stream?
>>>>> I think that’s one of the most crucial decision, since wrong decision
>>> here
>>>>> can lead to decrease of performance or to an explosion of memory/state
>>>>> consumption (both with bounded and unbounded data streams). For
>>> streaming
>>>>> it can also lead to an increased latency.
>>>>>
>>>>> Since this is also a decision that’s impossible to make automatically
>>>>> perfectly reliably, first and foremost I would expect this to be
>>>>> configurable via the API. With maybe some predefined triggers, like on
>>>>> watermark (for windowed operations), on checkpoint barrier (to decrease
>>>>> state size?), on element count, maybe memory usage (much easier to
>>> estimate
>>>>> with a known/predefined types, like in SQL)… and with some option to
>>>>> implement custom trigger.
>>>>>
>>>>> Also what would work the best would be to have a some form of memory
>>>>> consumption priority. For example if we are running out of memory for
>>>>> HashJoin/Final aggregation, instead of spilling to disk or crashing the
>>> job
>>>>> with OOM it would be probably better to prune/dump the pre/local
>>>>> aggregation state. But that’s another story.
>>>>>
>>>>> [1] https://github.com/apache/flink/pull/4626 <
>>>>> https://github.com/apache/flink/pull/4626>
>>>>>
>>>>> Piotrek
>>>>>
>>>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
>>>>>>
>>>>>> Excited and  Big +1 for this feature.
>>>>>>
>>>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
>>>>>>
>>>>>>> Nice feature.
>>>>>>> Looking forward to having it in Flink.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Xiaogang
>>>>>>>
>>>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019 and
>>>>>>> QCon
>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our
>>> inner
>>>>>>>> Flink fork. This feature can effectively alleviate data skew.
>>>>>>>>
>>>>>>>> Currently, keyed streams are widely used to perform aggregating
>>>>>>> operations
>>>>>>>> (e.g., reduce, sum and window) on the elements that having the same
>>>>> key.
>>>>>>>> When executed at runtime, the elements with the same key will be sent
>>>>> to
>>>>>>>> and aggregated by the same task.
>>>>>>>>
>>>>>>>> The performance of these aggregating operations is very sensitive to
>>>>> the
>>>>>>>> distribution of keys. In the cases where the distribution of keys
>>>>>>> follows a
>>>>>>>> powerful law, the performance will be significantly downgraded. More
>>>>>>>> unluckily, increasing the degree of parallelism does not help when a
>>>>> task
>>>>>>>> is overloaded by a single key.
>>>>>>>>
>>>>>>>> Local aggregation is a widely-adopted method to reduce the
>>> performance
>>>>>>>> degraded by data skew. We can decompose the aggregating operations
>>> into
>>>>>>> two
>>>>>>>> phases. In the first phase, we aggregate the elements of the same key
>>>>> at
>>>>>>>> the sender side to obtain partial results. Then at the second phase,
>>>>>>> these
>>>>>>>> partial results are sent to receivers according to their keys and are
>>>>>>>> combined to obtain the final result. Since the number of partial
>>>>> results
>>>>>>>> received by each receiver is limited by the number of senders, the
>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing the
>>>>> amount
>>>>>>>> of transferred data the performance can be further improved.
>>>>>>>>
>>>>>>>> The design documentation is here:
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>>>>>>>>
>>>>>>>> Any comment and feedback are welcome and appreciated.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Vino
>>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>
>>> --------------------------
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>
>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

vino yang
In reply to this post by Dian Fu-2
Hi Dian,

Thanks for your reply.

I know what you mean. However, if you think deeply, you will find your
implementation need to provide an operator which looks like a window
operator. You need to use state and receive aggregation function and
specify the trigger time. It looks like a lightweight window operator.
Right?

We try to reuse Flink provided functions and reduce complexity. IMO, It is
more user-friendly because users are familiar with the window API.

Best,
Vino


Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道:

> Hi Vino,
>
> Thanks a lot for starting this discussion. +1 to this feature as I think
> it will be very useful.
>
> Regarding to using window to buffer the input elements, personally I don't
> think it's a good solution for the following reasons:
> 1) As we know that WindowOperator will store the accumulated results in
> states, this is not necessary for Local Aggregate operator.
> 2) For WindowOperator, each input element will be accumulated to states.
> This is also not necessary for Local Aggregate operator and storing the
> input elements in memory is enough.
>
> Thanks,
> Dian
>
> > 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道:
> >
> > Hi Ken,
> >
> > Thanks for your reply.
> >
> > As I said before, we try to reuse Flink's state concept (fault tolerance
> > and guarantee "Exactly-Once" semantics). So we did not consider cache.
> >
> > In addition, if we use Flink's state, the OOM related issue is not a key
> > problem we need to consider.
> >
> > Best,
> > Vino
> >
> > Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道:
> >
> >> Hi all,
> >>
> >> Cascading implemented this “map-side reduce” functionality with an LLR
> >> cache.
> >>
> >> That worked well, as then the skewed keys would always be in the cache.
> >>
> >> The API let you decide the size of the cache, in terms of number of
> >> entries.
> >>
> >> Having a memory limit would have been better for many of our use cases,
> >> though FWIR there’s no good way to estimate in-memory size for objects.
> >>
> >> — Ken
> >>
> >>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote:
> >>>
> >>> Hi Piotr,
> >>>
> >>> The localKeyBy API returns an instance of KeyedStream (we just added an
> >>> inner flag to identify the local mode) which is Flink has provided
> >> before.
> >>> Users can call all the APIs(especially *window* APIs) which KeyedStream
> >>> provided.
> >>>
> >>> So if users want to use local aggregation, they should call the window
> >> API
> >>> to build a local window that means users should (or say "can") specify
> >> the
> >>> window length and other information based on their needs.
> >>>
> >>> I think you described another idea different from us. We did not try to
> >>> react after triggering some predefined threshold. We tend to give users
> >> the
> >>> discretion to make decisions.
> >>>
> >>> Our design idea tends to reuse Flink provided concept and functions
> like
> >>> state and window (IMO, we do not need to worry about OOM and the issues
> >> you
> >>> mentioned).
> >>>
> >>> Best,
> >>> Vino
> >>>
> >>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:
> >>>
> >>>> Hi,
> >>>>
> >>>> +1 for the idea from my side. I’ve even attempted to add similar
> feature
> >>>> quite some time ago, but didn’t get enough traction [1].
> >>>>
> >>>> I’ve read through your document and I couldn’t find it mentioning
> >>>> anywhere, when the pre aggregated result should be emitted down the
> >> stream?
> >>>> I think that’s one of the most crucial decision, since wrong decision
> >> here
> >>>> can lead to decrease of performance or to an explosion of memory/state
> >>>> consumption (both with bounded and unbounded data streams). For
> >> streaming
> >>>> it can also lead to an increased latency.
> >>>>
> >>>> Since this is also a decision that’s impossible to make automatically
> >>>> perfectly reliably, first and foremost I would expect this to be
> >>>> configurable via the API. With maybe some predefined triggers, like on
> >>>> watermark (for windowed operations), on checkpoint barrier (to
> decrease
> >>>> state size?), on element count, maybe memory usage (much easier to
> >> estimate
> >>>> with a known/predefined types, like in SQL)… and with some option to
> >>>> implement custom trigger.
> >>>>
> >>>> Also what would work the best would be to have a some form of memory
> >>>> consumption priority. For example if we are running out of memory for
> >>>> HashJoin/Final aggregation, instead of spilling to disk or crashing
> the
> >> job
> >>>> with OOM it would be probably better to prune/dump the pre/local
> >>>> aggregation state. But that’s another story.
> >>>>
> >>>> [1] https://github.com/apache/flink/pull/4626 <
> >>>> https://github.com/apache/flink/pull/4626>
> >>>>
> >>>> Piotrek
> >>>>
> >>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
> >>>>>
> >>>>> Excited and  Big +1 for this feature.
> >>>>>
> >>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
> >>>>>
> >>>>>> Nice feature.
> >>>>>> Looking forward to having it in Flink.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Xiaogang
> >>>>>>
> >>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019
> and
> >>>>>> QCon
> >>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our
> >> inner
> >>>>>>> Flink fork. This feature can effectively alleviate data skew.
> >>>>>>>
> >>>>>>> Currently, keyed streams are widely used to perform aggregating
> >>>>>> operations
> >>>>>>> (e.g., reduce, sum and window) on the elements that having the same
> >>>> key.
> >>>>>>> When executed at runtime, the elements with the same key will be
> sent
> >>>> to
> >>>>>>> and aggregated by the same task.
> >>>>>>>
> >>>>>>> The performance of these aggregating operations is very sensitive
> to
> >>>> the
> >>>>>>> distribution of keys. In the cases where the distribution of keys
> >>>>>> follows a
> >>>>>>> powerful law, the performance will be significantly downgraded.
> More
> >>>>>>> unluckily, increasing the degree of parallelism does not help when
> a
> >>>> task
> >>>>>>> is overloaded by a single key.
> >>>>>>>
> >>>>>>> Local aggregation is a widely-adopted method to reduce the
> >> performance
> >>>>>>> degraded by data skew. We can decompose the aggregating operations
> >> into
> >>>>>> two
> >>>>>>> phases. In the first phase, we aggregate the elements of the same
> key
> >>>> at
> >>>>>>> the sender side to obtain partial results. Then at the second
> phase,
> >>>>>> these
> >>>>>>> partial results are sent to receivers according to their keys and
> are
> >>>>>>> combined to obtain the final result. Since the number of partial
> >>>> results
> >>>>>>> received by each receiver is limited by the number of senders, the
> >>>>>>> imbalance among receivers can be reduced. Besides, by reducing the
> >>>> amount
> >>>>>>> of transferred data the performance can be further improved.
> >>>>>>>
> >>>>>>> The design documentation is here:
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> >>>>>>>
> >>>>>>> Any comment and feedback are welcome and appreciated.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Vino
> >>>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >> --------------------------
> >> Ken Krugler
> >> +1 530-210-6378
> >> http://www.scaleunlimited.com
> >> Custom big data solutions & training
> >> Flink, Solr, Hadoop, Cascading & Cassandra
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

Dian Fu-2
Hi Vino,

It may seem similar to window operator but there are also a few key
differences. For example, the local aggregate operator can send out the
results at any time and the window operator can only send out the results
at the end of window (without early fire). This means that the local
aggregate operator can send out the results not only when the trigger time
is reached, but also when the memory is exhausted. This difference makes
optimization available as it means that the local aggregate operator rarely
need to operate the state.

I admit that window operator can solve part of the problem (the data skew)
and just wonder if we can do more. Using window operator at present seems
OK for me as it can indeed solve part of the problems. We just need to
think a little more in the design and make sure that the current solution
is consistent with future optimizations.

Thanks,

Dian

在 2019年6月4日,下午5:22,vino yang <[hidden email]> 写道:

Hi Dian,

Thanks for your reply.

I know what you mean. However, if you think deeply, you will find your
implementation need to provide an operator which looks like a window
operator. You need to use state and receive aggregation function and
specify the trigger time. It looks like a lightweight window operator.
Right?

We try to reuse Flink provided functions and reduce complexity. IMO, It is
more user-friendly because users are familiar with the window API.

Best,
Vino


Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道:

Hi Vino,

Thanks a lot for starting this discussion. +1 to this feature as I think
it will be very useful.

Regarding to using window to buffer the input elements, personally I don't
think it's a good solution for the following reasons:
1) As we know that WindowOperator will store the accumulated results in
states, this is not necessary for Local Aggregate operator.
2) For WindowOperator, each input element will be accumulated to states.
This is also not necessary for Local Aggregate operator and storing the
input elements in memory is enough.

Thanks,
Dian

在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道:

Hi Ken,

Thanks for your reply.

As I said before, we try to reuse Flink's state concept (fault tolerance
and guarantee "Exactly-Once" semantics). So we did not consider cache.

In addition, if we use Flink's state, the OOM related issue is not a key
problem we need to consider.

Best,
Vino

Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道:

Hi all,

Cascading implemented this “map-side reduce” functionality with an LLR
cache.

That worked well, as then the skewed keys would always be in the cache.

The API let you decide the size of the cache, in terms of number of
entries.

Having a memory limit would have been better for many of our use cases,
though FWIR there’s no good way to estimate in-memory size for objects.

— Ken

On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote:

Hi Piotr,

The localKeyBy API returns an instance of KeyedStream (we just added an
inner flag to identify the local mode) which is Flink has provided

before.

Users can call all the APIs(especially *window* APIs) which KeyedStream
provided.

So if users want to use local aggregation, they should call the window

API

to build a local window that means users should (or say "can") specify

the

window length and other information based on their needs.

I think you described another idea different from us. We did not try to
react after triggering some predefined threshold. We tend to give users

the

discretion to make decisions.

Our design idea tends to reuse Flink provided concept and functions

like

state and window (IMO, we do not need to worry about OOM and the issues

you

mentioned).

Best,
Vino

Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:

Hi,

+1 for the idea from my side. I’ve even attempted to add similar

feature

quite some time ago, but didn’t get enough traction [1].

I’ve read through your document and I couldn’t find it mentioning
anywhere, when the pre aggregated result should be emitted down the

stream?

I think that’s one of the most crucial decision, since wrong decision

here

can lead to decrease of performance or to an explosion of memory/state
consumption (both with bounded and unbounded data streams). For

streaming

it can also lead to an increased latency.

Since this is also a decision that’s impossible to make automatically
perfectly reliably, first and foremost I would expect this to be
configurable via the API. With maybe some predefined triggers, like on
watermark (for windowed operations), on checkpoint barrier (to

decrease

state size?), on element count, maybe memory usage (much easier to

estimate

with a known/predefined types, like in SQL)… and with some option to
implement custom trigger.

Also what would work the best would be to have a some form of memory
consumption priority. For example if we are running out of memory for
HashJoin/Final aggregation, instead of spilling to disk or crashing

the

job

with OOM it would be probably better to prune/dump the pre/local
aggregation state. But that’s another story.

[1] https://github.com/apache/flink/pull/4626 <
https://github.com/apache/flink/pull/4626>

Piotrek

On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:

Excited and  Big +1 for this feature.

SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:

Nice feature.
Looking forward to having it in Flink.

Regards,
Xiaogang

vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:

Hi all,

As we mentioned in some conference, such as Flink Forward SF 2019

and

QCon

Beijing 2019, our team has implemented "Local aggregation" in our

inner

Flink fork. This feature can effectively alleviate data skew.

Currently, keyed streams are widely used to perform aggregating

operations

(e.g., reduce, sum and window) on the elements that having the same

key.

When executed at runtime, the elements with the same key will be

sent

to

and aggregated by the same task.

The performance of these aggregating operations is very sensitive

to

the

distribution of keys. In the cases where the distribution of keys

follows a

powerful law, the performance will be significantly downgraded.

More

unluckily, increasing the degree of parallelism does not help when

a

task

is overloaded by a single key.

Local aggregation is a widely-adopted method to reduce the

performance

degraded by data skew. We can decompose the aggregating operations

into

two

phases. In the first phase, we aggregate the elements of the same

key

at

the sender side to obtain partial results. Then at the second

phase,

these

partial results are sent to receivers according to their keys and

are

combined to obtain the final result. Since the number of partial

results

received by each receiver is limited by the number of senders, the
imbalance among receivers can be reduced. Besides, by reducing the

amount

of transferred data the performance can be further improved.

The design documentation is here:





https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing


Any comment and feedback are welcome and appreciated.

Best,
Vino





--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

litree
In reply to this post by vino yang
Hi Vino,


I have read your design,something I want to know is the usage of these new APIs.It looks like when I use localByKey,i must then use a window operator to return a datastream,and then use keyby and another window operator to get the final result?


thanks,
Litree


On 06/04/2019 17:22, vino yang wrote:
Hi Dian,

Thanks for your reply.

I know what you mean. However, if you think deeply, you will find your
implementation need to provide an operator which looks like a window
operator. You need to use state and receive aggregation function and
specify the trigger time. It looks like a lightweight window operator.
Right?

We try to reuse Flink provided functions and reduce complexity. IMO, It is
more user-friendly because users are familiar with the window API.

Best,
Vino


Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道:

> Hi Vino,
>
> Thanks a lot for starting this discussion. +1 to this feature as I think
> it will be very useful.
>
> Regarding to using window to buffer the input elements, personally I don't
> think it's a good solution for the following reasons:
> 1) As we know that WindowOperator will store the accumulated results in
> states, this is not necessary for Local Aggregate operator.
> 2) For WindowOperator, each input element will be accumulated to states.
> This is also not necessary for Local Aggregate operator and storing the
> input elements in memory is enough.
>
> Thanks,
> Dian
>
> > 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道:
> >
> > Hi Ken,
> >
> > Thanks for your reply.
> >
> > As I said before, we try to reuse Flink's state concept (fault tolerance
> > and guarantee "Exactly-Once" semantics). So we did not consider cache.
> >
> > In addition, if we use Flink's state, the OOM related issue is not a key
> > problem we need to consider.
> >
> > Best,
> > Vino
> >
> > Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道:
> >
> >> Hi all,
> >>
> >> Cascading implemented this “map-side reduce” functionality with an LLR
> >> cache.
> >>
> >> That worked well, as then the skewed keys would always be in the cache.
> >>
> >> The API let you decide the size of the cache, in terms of number of
> >> entries.
> >>
> >> Having a memory limit would have been better for many of our use cases,
> >> though FWIR there’s no good way to estimate in-memory size for objects.
> >>
> >> — Ken
> >>
> >>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote:
> >>>
> >>> Hi Piotr,
> >>>
> >>> The localKeyBy API returns an instance of KeyedStream (we just added an
> >>> inner flag to identify the local mode) which is Flink has provided
> >> before.
> >>> Users can call all the APIs(especially *window* APIs) which KeyedStream
> >>> provided.
> >>>
> >>> So if users want to use local aggregation, they should call the window
> >> API
> >>> to build a local window that means users should (or say "can") specify
> >> the
> >>> window length and other information based on their needs.
> >>>
> >>> I think you described another idea different from us. We did not try to
> >>> react after triggering some predefined threshold. We tend to give users
> >> the
> >>> discretion to make decisions.
> >>>
> >>> Our design idea tends to reuse Flink provided concept and functions
> like
> >>> state and window (IMO, we do not need to worry about OOM and the issues
> >> you
> >>> mentioned).
> >>>
> >>> Best,
> >>> Vino
> >>>
> >>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:
> >>>
> >>>> Hi,
> >>>>
> >>>> +1 for the idea from my side. I’ve even attempted to add similar
> feature
> >>>> quite some time ago, but didn’t get enough traction [1].
> >>>>
> >>>> I’ve read through your document and I couldn’t find it mentioning
> >>>> anywhere, when the pre aggregated result should be emitted down the
> >> stream?
> >>>> I think that’s one of the most crucial decision, since wrong decision
> >> here
> >>>> can lead to decrease of performance or to an explosion of memory/state
> >>>> consumption (both with bounded and unbounded data streams). For
> >> streaming
> >>>> it can also lead to an increased latency.
> >>>>
> >>>> Since this is also a decision that’s impossible to make automatically
> >>>> perfectly reliably, first and foremost I would expect this to be
> >>>> configurable via the API. With maybe some predefined triggers, like on
> >>>> watermark (for windowed operations), on checkpoint barrier (to
> decrease
> >>>> state size?), on element count, maybe memory usage (much easier to
> >> estimate
> >>>> with a known/predefined types, like in SQL)… and with some option to
> >>>> implement custom trigger.
> >>>>
> >>>> Also what would work the best would be to have a some form of memory
> >>>> consumption priority. For example if we are running out of memory for
> >>>> HashJoin/Final aggregation, instead of spilling to disk or crashing
> the
> >> job
> >>>> with OOM it would be probably better to prune/dump the pre/local
> >>>> aggregation state. But that’s another story.
> >>>>
> >>>> [1] https://github.com/apache/flink/pull/4626 <
> >>>> https://github.com/apache/flink/pull/4626>
> >>>>
> >>>> Piotrek
> >>>>
> >>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
> >>>>>
> >>>>> Excited and  Big +1 for this feature.
> >>>>>
> >>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
> >>>>>
> >>>>>> Nice feature.
> >>>>>> Looking forward to having it in Flink.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Xiaogang
> >>>>>>
> >>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019
> and
> >>>>>> QCon
> >>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our
> >> inner
> >>>>>>> Flink fork. This feature can effectively alleviate data skew.
> >>>>>>>
> >>>>>>> Currently, keyed streams are widely used to perform aggregating
> >>>>>> operations
> >>>>>>> (e.g., reduce, sum and window) on the elements that having the same
> >>>> key.
> >>>>>>> When executed at runtime, the elements with the same key will be
> sent
> >>>> to
> >>>>>>> and aggregated by the same task.
> >>>>>>>
> >>>>>>> The performance of these aggregating operations is very sensitive
> to
> >>>> the
> >>>>>>> distribution of keys. In the cases where the distribution of keys
> >>>>>> follows a
> >>>>>>> powerful law, the performance will be significantly downgraded.
> More
> >>>>>>> unluckily, increasing the degree of parallelism does not help when
> a
> >>>> task
> >>>>>>> is overloaded by a single key.
> >>>>>>>
> >>>>>>> Local aggregation is a widely-adopted method to reduce the
> >> performance
> >>>>>>> degraded by data skew. We can decompose the aggregating operations
> >> into
> >>>>>> two
> >>>>>>> phases. In the first phase, we aggregate the elements of the same
> key
> >>>> at
> >>>>>>> the sender side to obtain partial results. Then at the second
> phase,
> >>>>>> these
> >>>>>>> partial results are sent to receivers according to their keys and
> are
> >>>>>>> combined to obtain the final result. Since the number of partial
> >>>> results
> >>>>>>> received by each receiver is limited by the number of senders, the
> >>>>>>> imbalance among receivers can be reduced. Besides, by reducing the
> >>>> amount
> >>>>>>> of transferred data the performance can be further improved.
> >>>>>>>
> >>>>>>> The design documentation is here:
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> >>>>>>>
> >>>>>>> Any comment and feedback are welcome and appreciated.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Vino
> >>>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >> --------------------------
> >> Ken Krugler
> >> +1 530-210-6378
> >> http://www.scaleunlimited.com
> >> Custom big data solutions & training
> >> Flink, Solr, Hadoop, Cascading & Cassandra
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

vino yang
In reply to this post by Dian Fu-2
Hi Dian,

The different opinion is fine for me, If there is a better solution or
there are obvious deficiencies in our design, we are very happy to accept
and improve it.

I agree with you that customized local aggregate operator is more scalable
in the way of the trigger mechanism. However, I have two questions about
your reply.

1) When, Why and How to judge the memory is exhausted?

IMO, the operator is in a high abstract level, when implementing we should
not care about the memory is exhausted.

2) If the local aggregate operator rarely needs to operate the state, what
do you think about fault tolerance?

We reuse Flink's state concept because we can get the benefit from the
fault tolerance. We need to guarantee correctness semantics.

Best,
Vino


Dian Fu <[hidden email]> 于2019年6月4日周二 下午10:31写道:

> Hi Vino,
>
> It may seem similar to window operator but there are also a few key
> differences. For example, the local aggregate operator can send out the
> results at any time and the window operator can only send out the results
> at the end of window (without early fire). This means that the local
> aggregate operator can send out the results not only when the trigger time
> is reached, but also when the memory is exhausted. This difference makes
> optimization available as it means that the local aggregate operator rarely
> need to operate the state.
>
> I admit that window operator can solve part of the problem (the data skew)
> and just wonder if we can do more. Using window operator at present seems
> OK for me as it can indeed solve part of the problems. We just need to
> think a little more in the design and make sure that the current solution
> is consistent with future optimizations.
>
> Thanks,
>
> Dian
>
> 在 2019年6月4日,下午5:22,vino yang <[hidden email]> 写道:
>
> Hi Dian,
>
> Thanks for your reply.
>
> I know what you mean. However, if you think deeply, you will find your
> implementation need to provide an operator which looks like a window
> operator. You need to use state and receive aggregation function and
> specify the trigger time. It looks like a lightweight window operator.
> Right?
>
> We try to reuse Flink provided functions and reduce complexity. IMO, It is
> more user-friendly because users are familiar with the window API.
>
> Best,
> Vino
>
>
> Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道:
>
> Hi Vino,
>
> Thanks a lot for starting this discussion. +1 to this feature as I think
> it will be very useful.
>
> Regarding to using window to buffer the input elements, personally I don't
> think it's a good solution for the following reasons:
> 1) As we know that WindowOperator will store the accumulated results in
> states, this is not necessary for Local Aggregate operator.
> 2) For WindowOperator, each input element will be accumulated to states.
> This is also not necessary for Local Aggregate operator and storing the
> input elements in memory is enough.
>
> Thanks,
> Dian
>
> 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道:
>
> Hi Ken,
>
> Thanks for your reply.
>
> As I said before, we try to reuse Flink's state concept (fault tolerance
> and guarantee "Exactly-Once" semantics). So we did not consider cache.
>
> In addition, if we use Flink's state, the OOM related issue is not a key
> problem we need to consider.
>
> Best,
> Vino
>
> Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道:
>
> Hi all,
>
> Cascading implemented this “map-side reduce” functionality with an LLR
> cache.
>
> That worked well, as then the skewed keys would always be in the cache.
>
> The API let you decide the size of the cache, in terms of number of
> entries.
>
> Having a memory limit would have been better for many of our use cases,
> though FWIR there’s no good way to estimate in-memory size for objects.
>
> — Ken
>
> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote:
>
> Hi Piotr,
>
> The localKeyBy API returns an instance of KeyedStream (we just added an
> inner flag to identify the local mode) which is Flink has provided
>
> before.
>
> Users can call all the APIs(especially *window* APIs) which KeyedStream
> provided.
>
> So if users want to use local aggregation, they should call the window
>
> API
>
> to build a local window that means users should (or say "can") specify
>
> the
>
> window length and other information based on their needs.
>
> I think you described another idea different from us. We did not try to
> react after triggering some predefined threshold. We tend to give users
>
> the
>
> discretion to make decisions.
>
> Our design idea tends to reuse Flink provided concept and functions
>
> like
>
> state and window (IMO, we do not need to worry about OOM and the issues
>
> you
>
> mentioned).
>
> Best,
> Vino
>
> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:
>
> Hi,
>
> +1 for the idea from my side. I’ve even attempted to add similar
>
> feature
>
> quite some time ago, but didn’t get enough traction [1].
>
> I’ve read through your document and I couldn’t find it mentioning
> anywhere, when the pre aggregated result should be emitted down the
>
> stream?
>
> I think that’s one of the most crucial decision, since wrong decision
>
> here
>
> can lead to decrease of performance or to an explosion of memory/state
> consumption (both with bounded and unbounded data streams). For
>
> streaming
>
> it can also lead to an increased latency.
>
> Since this is also a decision that’s impossible to make automatically
> perfectly reliably, first and foremost I would expect this to be
> configurable via the API. With maybe some predefined triggers, like on
> watermark (for windowed operations), on checkpoint barrier (to
>
> decrease
>
> state size?), on element count, maybe memory usage (much easier to
>
> estimate
>
> with a known/predefined types, like in SQL)… and with some option to
> implement custom trigger.
>
> Also what would work the best would be to have a some form of memory
> consumption priority. For example if we are running out of memory for
> HashJoin/Final aggregation, instead of spilling to disk or crashing
>
> the
>
> job
>
> with OOM it would be probably better to prune/dump the pre/local
> aggregation state. But that’s another story.
>
> [1] https://github.com/apache/flink/pull/4626 <
> https://github.com/apache/flink/pull/4626>
>
> Piotrek
>
> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
>
> Excited and  Big +1 for this feature.
>
> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
>
> Nice feature.
> Looking forward to having it in Flink.
>
> Regards,
> Xiaogang
>
> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
>
> Hi all,
>
> As we mentioned in some conference, such as Flink Forward SF 2019
>
> and
>
> QCon
>
> Beijing 2019, our team has implemented "Local aggregation" in our
>
> inner
>
> Flink fork. This feature can effectively alleviate data skew.
>
> Currently, keyed streams are widely used to perform aggregating
>
> operations
>
> (e.g., reduce, sum and window) on the elements that having the same
>
> key.
>
> When executed at runtime, the elements with the same key will be
>
> sent
>
> to
>
> and aggregated by the same task.
>
> The performance of these aggregating operations is very sensitive
>
> to
>
> the
>
> distribution of keys. In the cases where the distribution of keys
>
> follows a
>
> powerful law, the performance will be significantly downgraded.
>
> More
>
> unluckily, increasing the degree of parallelism does not help when
>
> a
>
> task
>
> is overloaded by a single key.
>
> Local aggregation is a widely-adopted method to reduce the
>
> performance
>
> degraded by data skew. We can decompose the aggregating operations
>
> into
>
> two
>
> phases. In the first phase, we aggregate the elements of the same
>
> key
>
> at
>
> the sender side to obtain partial results. Then at the second
>
> phase,
>
> these
>
> partial results are sent to receivers according to their keys and
>
> are
>
> combined to obtain the final result. Since the number of partial
>
> results
>
> received by each receiver is limited by the number of senders, the
> imbalance among receivers can be reduced. Besides, by reducing the
>
> amount
>
> of transferred data the performance can be further improved.
>
> The design documentation is here:
>
>
>
>
>
>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>
>
> Any comment and feedback are welcome and appreciated.
>
> Best,
> Vino
>
>
>
>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

vino yang
In reply to this post by litree
Hi Litree,

From an implementation level, the localKeyBy API returns a general
KeyedStream, you can call all the APIs which KeyedStream provides, we did
not restrict its usage, although we can do this (for example returns a new
stream object named LocalKeyedStream).

However, to achieve the goal of local aggregation, it only makes sense to
call the window API.

Best,
Vino

litree <[hidden email]> 于2019年6月4日周二 下午10:41写道:

> Hi Vino,
>
>
> I have read your design,something I want to know is the usage of these new
> APIs.It looks like when I use localByKey,i must then use a window operator
> to return a datastream,and then use keyby and another window operator to
> get the final result?
>
>
> thanks,
> Litree
>
>
> On 06/04/2019 17:22, vino yang wrote:
> Hi Dian,
>
> Thanks for your reply.
>
> I know what you mean. However, if you think deeply, you will find your
> implementation need to provide an operator which looks like a window
> operator. You need to use state and receive aggregation function and
> specify the trigger time. It looks like a lightweight window operator.
> Right?
>
> We try to reuse Flink provided functions and reduce complexity. IMO, It is
> more user-friendly because users are familiar with the window API.
>
> Best,
> Vino
>
>
> Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道:
>
> > Hi Vino,
> >
> > Thanks a lot for starting this discussion. +1 to this feature as I think
> > it will be very useful.
> >
> > Regarding to using window to buffer the input elements, personally I
> don't
> > think it's a good solution for the following reasons:
> > 1) As we know that WindowOperator will store the accumulated results in
> > states, this is not necessary for Local Aggregate operator.
> > 2) For WindowOperator, each input element will be accumulated to states.
> > This is also not necessary for Local Aggregate operator and storing the
> > input elements in memory is enough.
> >
> > Thanks,
> > Dian
> >
> > > 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道:
> > >
> > > Hi Ken,
> > >
> > > Thanks for your reply.
> > >
> > > As I said before, we try to reuse Flink's state concept (fault
> tolerance
> > > and guarantee "Exactly-Once" semantics). So we did not consider cache.
> > >
> > > In addition, if we use Flink's state, the OOM related issue is not a
> key
> > > problem we need to consider.
> > >
> > > Best,
> > > Vino
> > >
> > > Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道:
> > >
> > >> Hi all,
> > >>
> > >> Cascading implemented this “map-side reduce” functionality with an LLR
> > >> cache.
> > >>
> > >> That worked well, as then the skewed keys would always be in the
> cache.
> > >>
> > >> The API let you decide the size of the cache, in terms of number of
> > >> entries.
> > >>
> > >> Having a memory limit would have been better for many of our use
> cases,
> > >> though FWIR there’s no good way to estimate in-memory size for
> objects.
> > >>
> > >> — Ken
> > >>
> > >>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]> wrote:
> > >>>
> > >>> Hi Piotr,
> > >>>
> > >>> The localKeyBy API returns an instance of KeyedStream (we just added
> an
> > >>> inner flag to identify the local mode) which is Flink has provided
> > >> before.
> > >>> Users can call all the APIs(especially *window* APIs) which
> KeyedStream
> > >>> provided.
> > >>>
> > >>> So if users want to use local aggregation, they should call the
> window
> > >> API
> > >>> to build a local window that means users should (or say "can")
> specify
> > >> the
> > >>> window length and other information based on their needs.
> > >>>
> > >>> I think you described another idea different from us. We did not try
> to
> > >>> react after triggering some predefined threshold. We tend to give
> users
> > >> the
> > >>> discretion to make decisions.
> > >>>
> > >>> Our design idea tends to reuse Flink provided concept and functions
> > like
> > >>> state and window (IMO, we do not need to worry about OOM and the
> issues
> > >> you
> > >>> mentioned).
> > >>>
> > >>> Best,
> > >>> Vino
> > >>>
> > >>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:
> > >>>
> > >>>> Hi,
> > >>>>
> > >>>> +1 for the idea from my side. I’ve even attempted to add similar
> > feature
> > >>>> quite some time ago, but didn’t get enough traction [1].
> > >>>>
> > >>>> I’ve read through your document and I couldn’t find it mentioning
> > >>>> anywhere, when the pre aggregated result should be emitted down the
> > >> stream?
> > >>>> I think that’s one of the most crucial decision, since wrong
> decision
> > >> here
> > >>>> can lead to decrease of performance or to an explosion of
> memory/state
> > >>>> consumption (both with bounded and unbounded data streams). For
> > >> streaming
> > >>>> it can also lead to an increased latency.
> > >>>>
> > >>>> Since this is also a decision that’s impossible to make
> automatically
> > >>>> perfectly reliably, first and foremost I would expect this to be
> > >>>> configurable via the API. With maybe some predefined triggers, like
> on
> > >>>> watermark (for windowed operations), on checkpoint barrier (to
> > decrease
> > >>>> state size?), on element count, maybe memory usage (much easier to
> > >> estimate
> > >>>> with a known/predefined types, like in SQL)… and with some option to
> > >>>> implement custom trigger.
> > >>>>
> > >>>> Also what would work the best would be to have a some form of memory
> > >>>> consumption priority. For example if we are running out of memory
> for
> > >>>> HashJoin/Final aggregation, instead of spilling to disk or crashing
> > the
> > >> job
> > >>>> with OOM it would be probably better to prune/dump the pre/local
> > >>>> aggregation state. But that’s another story.
> > >>>>
> > >>>> [1] https://github.com/apache/flink/pull/4626 <
> > >>>> https://github.com/apache/flink/pull/4626>
> > >>>>
> > >>>> Piotrek
> > >>>>
> > >>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
> > >>>>>
> > >>>>> Excited and  Big +1 for this feature.
> > >>>>>
> > >>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
> > >>>>>
> > >>>>>> Nice feature.
> > >>>>>> Looking forward to having it in Flink.
> > >>>>>>
> > >>>>>> Regards,
> > >>>>>> Xiaogang
> > >>>>>>
> > >>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
> > >>>>>>
> > >>>>>>> Hi all,
> > >>>>>>>
> > >>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019
> > and
> > >>>>>> QCon
> > >>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our
> > >> inner
> > >>>>>>> Flink fork. This feature can effectively alleviate data skew.
> > >>>>>>>
> > >>>>>>> Currently, keyed streams are widely used to perform aggregating
> > >>>>>> operations
> > >>>>>>> (e.g., reduce, sum and window) on the elements that having the
> same
> > >>>> key.
> > >>>>>>> When executed at runtime, the elements with the same key will be
> > sent
> > >>>> to
> > >>>>>>> and aggregated by the same task.
> > >>>>>>>
> > >>>>>>> The performance of these aggregating operations is very sensitive
> > to
> > >>>> the
> > >>>>>>> distribution of keys. In the cases where the distribution of keys
> > >>>>>> follows a
> > >>>>>>> powerful law, the performance will be significantly downgraded.
> > More
> > >>>>>>> unluckily, increasing the degree of parallelism does not help
> when
> > a
> > >>>> task
> > >>>>>>> is overloaded by a single key.
> > >>>>>>>
> > >>>>>>> Local aggregation is a widely-adopted method to reduce the
> > >> performance
> > >>>>>>> degraded by data skew. We can decompose the aggregating
> operations
> > >> into
> > >>>>>> two
> > >>>>>>> phases. In the first phase, we aggregate the elements of the same
> > key
> > >>>> at
> > >>>>>>> the sender side to obtain partial results. Then at the second
> > phase,
> > >>>>>> these
> > >>>>>>> partial results are sent to receivers according to their keys and
> > are
> > >>>>>>> combined to obtain the final result. Since the number of partial
> > >>>> results
> > >>>>>>> received by each receiver is limited by the number of senders,
> the
> > >>>>>>> imbalance among receivers can be reduced. Besides, by reducing
> the
> > >>>> amount
> > >>>>>>> of transferred data the performance can be further improved.
> > >>>>>>>
> > >>>>>>> The design documentation is here:
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>
> >
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> > >>>>>>>
> > >>>>>>> Any comment and feedback are welcome and appreciated.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Vino
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>>>
> > >>
> > >> --------------------------
> > >> Ken Krugler
> > >> +1 530-210-6378
> > >> http://www.scaleunlimited.com
> > >> Custom big data solutions & training
> > >> Flink, Solr, Hadoop, Cascading & Cassandra
> > >>
> > >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

Biao Liu
Hi Vino,

+1 for this feature. It's useful for data skew. And it could also reduce
shuffled datum.

I have some concerns about the API part. From my side, this feature should
be more like an improvement. I'm afraid the proposal is an overkill about
the API part. Many other systems support pre-aggregation as an optimization
of global aggregation. The optimization might be used automatically or
manually but with a simple API. The proposal introduces a series of
flexible local aggregation APIs. They could be independent with global
aggregation. It doesn't look like an improvement but introduces a lot of
features. I'm not sure if there is a bigger picture later. As for now the
API part looks a little heavy for me.


vino yang <[hidden email]> 于2019年6月5日周三 上午10:38写道:

> Hi Litree,
>
> From an implementation level, the localKeyBy API returns a general
> KeyedStream, you can call all the APIs which KeyedStream provides, we did
> not restrict its usage, although we can do this (for example returns a new
> stream object named LocalKeyedStream).
>
> However, to achieve the goal of local aggregation, it only makes sense to
> call the window API.
>
> Best,
> Vino
>
> litree <[hidden email]> 于2019年6月4日周二 下午10:41写道:
>
> > Hi Vino,
> >
> >
> > I have read your design,something I want to know is the usage of these
> new
> > APIs.It looks like when I use localByKey,i must then use a window
> operator
> > to return a datastream,and then use keyby and another window operator to
> > get the final result?
> >
> >
> > thanks,
> > Litree
> >
> >
> > On 06/04/2019 17:22, vino yang wrote:
> > Hi Dian,
> >
> > Thanks for your reply.
> >
> > I know what you mean. However, if you think deeply, you will find your
> > implementation need to provide an operator which looks like a window
> > operator. You need to use state and receive aggregation function and
> > specify the trigger time. It looks like a lightweight window operator.
> > Right?
> >
> > We try to reuse Flink provided functions and reduce complexity. IMO, It
> is
> > more user-friendly because users are familiar with the window API.
> >
> > Best,
> > Vino
> >
> >
> > Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道:
> >
> > > Hi Vino,
> > >
> > > Thanks a lot for starting this discussion. +1 to this feature as I
> think
> > > it will be very useful.
> > >
> > > Regarding to using window to buffer the input elements, personally I
> > don't
> > > think it's a good solution for the following reasons:
> > > 1) As we know that WindowOperator will store the accumulated results in
> > > states, this is not necessary for Local Aggregate operator.
> > > 2) For WindowOperator, each input element will be accumulated to
> states.
> > > This is also not necessary for Local Aggregate operator and storing the
> > > input elements in memory is enough.
> > >
> > > Thanks,
> > > Dian
> > >
> > > > 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道:
> > > >
> > > > Hi Ken,
> > > >
> > > > Thanks for your reply.
> > > >
> > > > As I said before, we try to reuse Flink's state concept (fault
> > tolerance
> > > > and guarantee "Exactly-Once" semantics). So we did not consider
> cache.
> > > >
> > > > In addition, if we use Flink's state, the OOM related issue is not a
> > key
> > > > problem we need to consider.
> > > >
> > > > Best,
> > > > Vino
> > > >
> > > > Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道:
> > > >
> > > >> Hi all,
> > > >>
> > > >> Cascading implemented this “map-side reduce” functionality with an
> LLR
> > > >> cache.
> > > >>
> > > >> That worked well, as then the skewed keys would always be in the
> > cache.
> > > >>
> > > >> The API let you decide the size of the cache, in terms of number of
> > > >> entries.
> > > >>
> > > >> Having a memory limit would have been better for many of our use
> > cases,
> > > >> though FWIR there’s no good way to estimate in-memory size for
> > objects.
> > > >>
> > > >> — Ken
> > > >>
> > > >>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]>
> wrote:
> > > >>>
> > > >>> Hi Piotr,
> > > >>>
> > > >>> The localKeyBy API returns an instance of KeyedStream (we just
> added
> > an
> > > >>> inner flag to identify the local mode) which is Flink has provided
> > > >> before.
> > > >>> Users can call all the APIs(especially *window* APIs) which
> > KeyedStream
> > > >>> provided.
> > > >>>
> > > >>> So if users want to use local aggregation, they should call the
> > window
> > > >> API
> > > >>> to build a local window that means users should (or say "can")
> > specify
> > > >> the
> > > >>> window length and other information based on their needs.
> > > >>>
> > > >>> I think you described another idea different from us. We did not
> try
> > to
> > > >>> react after triggering some predefined threshold. We tend to give
> > users
> > > >> the
> > > >>> discretion to make decisions.
> > > >>>
> > > >>> Our design idea tends to reuse Flink provided concept and functions
> > > like
> > > >>> state and window (IMO, we do not need to worry about OOM and the
> > issues
> > > >> you
> > > >>> mentioned).
> > > >>>
> > > >>> Best,
> > > >>> Vino
> > > >>>
> > > >>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:
> > > >>>
> > > >>>> Hi,
> > > >>>>
> > > >>>> +1 for the idea from my side. I’ve even attempted to add similar
> > > feature
> > > >>>> quite some time ago, but didn’t get enough traction [1].
> > > >>>>
> > > >>>> I’ve read through your document and I couldn’t find it mentioning
> > > >>>> anywhere, when the pre aggregated result should be emitted down
> the
> > > >> stream?
> > > >>>> I think that’s one of the most crucial decision, since wrong
> > decision
> > > >> here
> > > >>>> can lead to decrease of performance or to an explosion of
> > memory/state
> > > >>>> consumption (both with bounded and unbounded data streams). For
> > > >> streaming
> > > >>>> it can also lead to an increased latency.
> > > >>>>
> > > >>>> Since this is also a decision that’s impossible to make
> > automatically
> > > >>>> perfectly reliably, first and foremost I would expect this to be
> > > >>>> configurable via the API. With maybe some predefined triggers,
> like
> > on
> > > >>>> watermark (for windowed operations), on checkpoint barrier (to
> > > decrease
> > > >>>> state size?), on element count, maybe memory usage (much easier to
> > > >> estimate
> > > >>>> with a known/predefined types, like in SQL)… and with some option
> to
> > > >>>> implement custom trigger.
> > > >>>>
> > > >>>> Also what would work the best would be to have a some form of
> memory
> > > >>>> consumption priority. For example if we are running out of memory
> > for
> > > >>>> HashJoin/Final aggregation, instead of spilling to disk or
> crashing
> > > the
> > > >> job
> > > >>>> with OOM it would be probably better to prune/dump the pre/local
> > > >>>> aggregation state. But that’s another story.
> > > >>>>
> > > >>>> [1] https://github.com/apache/flink/pull/4626 <
> > > >>>> https://github.com/apache/flink/pull/4626>
> > > >>>>
> > > >>>> Piotrek
> > > >>>>
> > > >>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
> > > >>>>>
> > > >>>>> Excited and  Big +1 for this feature.
> > > >>>>>
> > > >>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
> > > >>>>>
> > > >>>>>> Nice feature.
> > > >>>>>> Looking forward to having it in Flink.
> > > >>>>>>
> > > >>>>>> Regards,
> > > >>>>>> Xiaogang
> > > >>>>>>
> > > >>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
> > > >>>>>>
> > > >>>>>>> Hi all,
> > > >>>>>>>
> > > >>>>>>> As we mentioned in some conference, such as Flink Forward SF
> 2019
> > > and
> > > >>>>>> QCon
> > > >>>>>>> Beijing 2019, our team has implemented "Local aggregation" in
> our
> > > >> inner
> > > >>>>>>> Flink fork. This feature can effectively alleviate data skew.
> > > >>>>>>>
> > > >>>>>>> Currently, keyed streams are widely used to perform aggregating
> > > >>>>>> operations
> > > >>>>>>> (e.g., reduce, sum and window) on the elements that having the
> > same
> > > >>>> key.
> > > >>>>>>> When executed at runtime, the elements with the same key will
> be
> > > sent
> > > >>>> to
> > > >>>>>>> and aggregated by the same task.
> > > >>>>>>>
> > > >>>>>>> The performance of these aggregating operations is very
> sensitive
> > > to
> > > >>>> the
> > > >>>>>>> distribution of keys. In the cases where the distribution of
> keys
> > > >>>>>> follows a
> > > >>>>>>> powerful law, the performance will be significantly downgraded.
> > > More
> > > >>>>>>> unluckily, increasing the degree of parallelism does not help
> > when
> > > a
> > > >>>> task
> > > >>>>>>> is overloaded by a single key.
> > > >>>>>>>
> > > >>>>>>> Local aggregation is a widely-adopted method to reduce the
> > > >> performance
> > > >>>>>>> degraded by data skew. We can decompose the aggregating
> > operations
> > > >> into
> > > >>>>>> two
> > > >>>>>>> phases. In the first phase, we aggregate the elements of the
> same
> > > key
> > > >>>> at
> > > >>>>>>> the sender side to obtain partial results. Then at the second
> > > phase,
> > > >>>>>> these
> > > >>>>>>> partial results are sent to receivers according to their keys
> and
> > > are
> > > >>>>>>> combined to obtain the final result. Since the number of
> partial
> > > >>>> results
> > > >>>>>>> received by each receiver is limited by the number of senders,
> > the
> > > >>>>>>> imbalance among receivers can be reduced. Besides, by reducing
> > the
> > > >>>> amount
> > > >>>>>>> of transferred data the performance can be further improved.
> > > >>>>>>>
> > > >>>>>>> The design documentation is here:
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> > > >>>>>>>
> > > >>>>>>> Any comment and feedback are welcome and appreciated.
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> Vino
> > > >>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>>>
> > > >>
> > > >> --------------------------
> > > >> Ken Krugler
> > > >> +1 530-210-6378
> > > >> http://www.scaleunlimited.com
> > > >> Custom big data solutions & training
> > > >> Flink, Solr, Hadoop, Cascading & Cassandra
> > > >>
> > > >>
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

Dian Fu-2
Hi Vino,

Thanks a lot for your reply.

> 1) When, Why and How to judge the memory is exhausted?

My point here is that the local aggregate operator can buffer the inputs in memory and send out the results AT ANY TIME. i.e. element count or the time interval reached a pre-configured value, the memory usage of buffered elements reached a configured valued (suppose we can estimate the object size efficiently), or even when checkpoint is triggered.

>
> 2) If the local aggregate operator rarely needs to operate the state, what
> do you think about fault tolerance?

AbstractStreamOperator provides a method `prepareSnapshotPreBarrier` which can be used here to send out the results to the downstream when checkpoint is triggered. Then fault tolerance can work well.

Even if there is no such a method available, we can still store the buffered elements or pre-aggregate results to state when checkpoint is triggered. The state access will be much less compared with window operator as only the elements not sent out when checkpoint occur have to be written to state. Suppose the checkpoint interval is 3 minutes and the trigger interval is 10 seconds, then only about less than "10/180" elements will be written to state.


Thanks,
Dian


> 在 2019年6月5日,上午11:43,Biao Liu <[hidden email]> 写道:
>
> Hi Vino,
>
> +1 for this feature. It's useful for data skew. And it could also reduce
> shuffled datum.
>
> I have some concerns about the API part. From my side, this feature should
> be more like an improvement. I'm afraid the proposal is an overkill about
> the API part. Many other systems support pre-aggregation as an optimization
> of global aggregation. The optimization might be used automatically or
> manually but with a simple API. The proposal introduces a series of
> flexible local aggregation APIs. They could be independent with global
> aggregation. It doesn't look like an improvement but introduces a lot of
> features. I'm not sure if there is a bigger picture later. As for now the
> API part looks a little heavy for me.
>
>
> vino yang <[hidden email]> 于2019年6月5日周三 上午10:38写道:
>
>> Hi Litree,
>>
>> From an implementation level, the localKeyBy API returns a general
>> KeyedStream, you can call all the APIs which KeyedStream provides, we did
>> not restrict its usage, although we can do this (for example returns a new
>> stream object named LocalKeyedStream).
>>
>> However, to achieve the goal of local aggregation, it only makes sense to
>> call the window API.
>>
>> Best,
>> Vino
>>
>> litree <[hidden email]> 于2019年6月4日周二 下午10:41写道:
>>
>>> Hi Vino,
>>>
>>>
>>> I have read your design,something I want to know is the usage of these
>> new
>>> APIs.It looks like when I use localByKey,i must then use a window
>> operator
>>> to return a datastream,and then use keyby and another window operator to
>>> get the final result?
>>>
>>>
>>> thanks,
>>> Litree
>>>
>>>
>>> On 06/04/2019 17:22, vino yang wrote:
>>> Hi Dian,
>>>
>>> Thanks for your reply.
>>>
>>> I know what you mean. However, if you think deeply, you will find your
>>> implementation need to provide an operator which looks like a window
>>> operator. You need to use state and receive aggregation function and
>>> specify the trigger time. It looks like a lightweight window operator.
>>> Right?
>>>
>>> We try to reuse Flink provided functions and reduce complexity. IMO, It
>> is
>>> more user-friendly because users are familiar with the window API.
>>>
>>> Best,
>>> Vino
>>>
>>>
>>> Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道:
>>>
>>>> Hi Vino,
>>>>
>>>> Thanks a lot for starting this discussion. +1 to this feature as I
>> think
>>>> it will be very useful.
>>>>
>>>> Regarding to using window to buffer the input elements, personally I
>>> don't
>>>> think it's a good solution for the following reasons:
>>>> 1) As we know that WindowOperator will store the accumulated results in
>>>> states, this is not necessary for Local Aggregate operator.
>>>> 2) For WindowOperator, each input element will be accumulated to
>> states.
>>>> This is also not necessary for Local Aggregate operator and storing the
>>>> input elements in memory is enough.
>>>>
>>>> Thanks,
>>>> Dian
>>>>
>>>>> 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道:
>>>>>
>>>>> Hi Ken,
>>>>>
>>>>> Thanks for your reply.
>>>>>
>>>>> As I said before, we try to reuse Flink's state concept (fault
>>> tolerance
>>>>> and guarantee "Exactly-Once" semantics). So we did not consider
>> cache.
>>>>>
>>>>> In addition, if we use Flink's state, the OOM related issue is not a
>>> key
>>>>> problem we need to consider.
>>>>>
>>>>> Best,
>>>>> Vino
>>>>>
>>>>> Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Cascading implemented this “map-side reduce” functionality with an
>> LLR
>>>>>> cache.
>>>>>>
>>>>>> That worked well, as then the skewed keys would always be in the
>>> cache.
>>>>>>
>>>>>> The API let you decide the size of the cache, in terms of number of
>>>>>> entries.
>>>>>>
>>>>>> Having a memory limit would have been better for many of our use
>>> cases,
>>>>>> though FWIR there’s no good way to estimate in-memory size for
>>> objects.
>>>>>>
>>>>>> — Ken
>>>>>>
>>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]>
>> wrote:
>>>>>>>
>>>>>>> Hi Piotr,
>>>>>>>
>>>>>>> The localKeyBy API returns an instance of KeyedStream (we just
>> added
>>> an
>>>>>>> inner flag to identify the local mode) which is Flink has provided
>>>>>> before.
>>>>>>> Users can call all the APIs(especially *window* APIs) which
>>> KeyedStream
>>>>>>> provided.
>>>>>>>
>>>>>>> So if users want to use local aggregation, they should call the
>>> window
>>>>>> API
>>>>>>> to build a local window that means users should (or say "can")
>>> specify
>>>>>> the
>>>>>>> window length and other information based on their needs.
>>>>>>>
>>>>>>> I think you described another idea different from us. We did not
>> try
>>> to
>>>>>>> react after triggering some predefined threshold. We tend to give
>>> users
>>>>>> the
>>>>>>> discretion to make decisions.
>>>>>>>
>>>>>>> Our design idea tends to reuse Flink provided concept and functions
>>>> like
>>>>>>> state and window (IMO, we do not need to worry about OOM and the
>>> issues
>>>>>> you
>>>>>>> mentioned).
>>>>>>>
>>>>>>> Best,
>>>>>>> Vino
>>>>>>>
>>>>>>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> +1 for the idea from my side. I’ve even attempted to add similar
>>>> feature
>>>>>>>> quite some time ago, but didn’t get enough traction [1].
>>>>>>>>
>>>>>>>> I’ve read through your document and I couldn’t find it mentioning
>>>>>>>> anywhere, when the pre aggregated result should be emitted down
>> the
>>>>>> stream?
>>>>>>>> I think that’s one of the most crucial decision, since wrong
>>> decision
>>>>>> here
>>>>>>>> can lead to decrease of performance or to an explosion of
>>> memory/state
>>>>>>>> consumption (both with bounded and unbounded data streams). For
>>>>>> streaming
>>>>>>>> it can also lead to an increased latency.
>>>>>>>>
>>>>>>>> Since this is also a decision that’s impossible to make
>>> automatically
>>>>>>>> perfectly reliably, first and foremost I would expect this to be
>>>>>>>> configurable via the API. With maybe some predefined triggers,
>> like
>>> on
>>>>>>>> watermark (for windowed operations), on checkpoint barrier (to
>>>> decrease
>>>>>>>> state size?), on element count, maybe memory usage (much easier to
>>>>>> estimate
>>>>>>>> with a known/predefined types, like in SQL)… and with some option
>> to
>>>>>>>> implement custom trigger.
>>>>>>>>
>>>>>>>> Also what would work the best would be to have a some form of
>> memory
>>>>>>>> consumption priority. For example if we are running out of memory
>>> for
>>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or
>> crashing
>>>> the
>>>>>> job
>>>>>>>> with OOM it would be probably better to prune/dump the pre/local
>>>>>>>> aggregation state. But that’s another story.
>>>>>>>>
>>>>>>>> [1] https://github.com/apache/flink/pull/4626 <
>>>>>>>> https://github.com/apache/flink/pull/4626>
>>>>>>>>
>>>>>>>> Piotrek
>>>>>>>>
>>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> Excited and  Big +1 for this feature.
>>>>>>>>>
>>>>>>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
>>>>>>>>>
>>>>>>>>>> Nice feature.
>>>>>>>>>> Looking forward to having it in Flink.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Xiaogang
>>>>>>>>>>
>>>>>>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>> As we mentioned in some conference, such as Flink Forward SF
>> 2019
>>>> and
>>>>>>>>>> QCon
>>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in
>> our
>>>>>> inner
>>>>>>>>>>> Flink fork. This feature can effectively alleviate data skew.
>>>>>>>>>>>
>>>>>>>>>>> Currently, keyed streams are widely used to perform aggregating
>>>>>>>>>> operations
>>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having the
>>> same
>>>>>>>> key.
>>>>>>>>>>> When executed at runtime, the elements with the same key will
>> be
>>>> sent
>>>>>>>> to
>>>>>>>>>>> and aggregated by the same task.
>>>>>>>>>>>
>>>>>>>>>>> The performance of these aggregating operations is very
>> sensitive
>>>> to
>>>>>>>> the
>>>>>>>>>>> distribution of keys. In the cases where the distribution of
>> keys
>>>>>>>>>> follows a
>>>>>>>>>>> powerful law, the performance will be significantly downgraded.
>>>> More
>>>>>>>>>>> unluckily, increasing the degree of parallelism does not help
>>> when
>>>> a
>>>>>>>> task
>>>>>>>>>>> is overloaded by a single key.
>>>>>>>>>>>
>>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the
>>>>>> performance
>>>>>>>>>>> degraded by data skew. We can decompose the aggregating
>>> operations
>>>>>> into
>>>>>>>>>> two
>>>>>>>>>>> phases. In the first phase, we aggregate the elements of the
>> same
>>>> key
>>>>>>>> at
>>>>>>>>>>> the sender side to obtain partial results. Then at the second
>>>> phase,
>>>>>>>>>> these
>>>>>>>>>>> partial results are sent to receivers according to their keys
>> and
>>>> are
>>>>>>>>>>> combined to obtain the final result. Since the number of
>> partial
>>>>>>>> results
>>>>>>>>>>> received by each receiver is limited by the number of senders,
>>> the
>>>>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing
>>> the
>>>>>>>> amount
>>>>>>>>>>> of transferred data the performance can be further improved.
>>>>>>>>>>>
>>>>>>>>>>> The design documentation is here:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>>
>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>>>>>>>>>>>
>>>>>>>>>>> Any comment and feedback are welcome and appreciated.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Vino
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --------------------------
>>>>>> Ken Krugler
>>>>>> +1 530-210-6378
>>>>>> http://www.scaleunlimited.com
>>>>>> Custom big data solutions & training
>>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

vino yang
Hi Dian,

I still think your implementation is similar to the window operator, you
mentioned the scalable trigger mechanism, the window API also can customize
trigger.

Moreover, IMO, the design should guarantee a deterministic semantics, I
think based on memory availability is a non-deterministic design.

In addition, if the implementation depends on the timing of checkpoint, I
do not think it is reasonable, we should avoid affecting the checkpoint's
progress.

Best,
Vino

Dian Fu <[hidden email]> 于2019年6月5日周三 下午1:55写道:

> Hi Vino,
>
> Thanks a lot for your reply.
>
> > 1) When, Why and How to judge the memory is exhausted?
>
> My point here is that the local aggregate operator can buffer the inputs
> in memory and send out the results AT ANY TIME. i.e. element count or the
> time interval reached a pre-configured value, the memory usage of buffered
> elements reached a configured valued (suppose we can estimate the object
> size efficiently), or even when checkpoint is triggered.
>
> >
> > 2) If the local aggregate operator rarely needs to operate the state,
> what
> > do you think about fault tolerance?
>
> AbstractStreamOperator provides a method `prepareSnapshotPreBarrier` which
> can be used here to send out the results to the downstream when checkpoint
> is triggered. Then fault tolerance can work well.
>
> Even if there is no such a method available, we can still store the
> buffered elements or pre-aggregate results to state when checkpoint is
> triggered. The state access will be much less compared with window operator
> as only the elements not sent out when checkpoint occur have to be written
> to state. Suppose the checkpoint interval is 3 minutes and the trigger
> interval is 10 seconds, then only about less than "10/180" elements will be
> written to state.
>
>
> Thanks,
> Dian
>
>
> > 在 2019年6月5日,上午11:43,Biao Liu <[hidden email]> 写道:
> >
> > Hi Vino,
> >
> > +1 for this feature. It's useful for data skew. And it could also reduce
> > shuffled datum.
> >
> > I have some concerns about the API part. From my side, this feature
> should
> > be more like an improvement. I'm afraid the proposal is an overkill about
> > the API part. Many other systems support pre-aggregation as an
> optimization
> > of global aggregation. The optimization might be used automatically or
> > manually but with a simple API. The proposal introduces a series of
> > flexible local aggregation APIs. They could be independent with global
> > aggregation. It doesn't look like an improvement but introduces a lot of
> > features. I'm not sure if there is a bigger picture later. As for now the
> > API part looks a little heavy for me.
> >
> >
> > vino yang <[hidden email]> 于2019年6月5日周三 上午10:38写道:
> >
> >> Hi Litree,
> >>
> >> From an implementation level, the localKeyBy API returns a general
> >> KeyedStream, you can call all the APIs which KeyedStream provides, we
> did
> >> not restrict its usage, although we can do this (for example returns a
> new
> >> stream object named LocalKeyedStream).
> >>
> >> However, to achieve the goal of local aggregation, it only makes sense
> to
> >> call the window API.
> >>
> >> Best,
> >> Vino
> >>
> >> litree <[hidden email]> 于2019年6月4日周二 下午10:41写道:
> >>
> >>> Hi Vino,
> >>>
> >>>
> >>> I have read your design,something I want to know is the usage of these
> >> new
> >>> APIs.It looks like when I use localByKey,i must then use a window
> >> operator
> >>> to return a datastream,and then use keyby and another window operator
> to
> >>> get the final result?
> >>>
> >>>
> >>> thanks,
> >>> Litree
> >>>
> >>>
> >>> On 06/04/2019 17:22, vino yang wrote:
> >>> Hi Dian,
> >>>
> >>> Thanks for your reply.
> >>>
> >>> I know what you mean. However, if you think deeply, you will find your
> >>> implementation need to provide an operator which looks like a window
> >>> operator. You need to use state and receive aggregation function and
> >>> specify the trigger time. It looks like a lightweight window operator.
> >>> Right?
> >>>
> >>> We try to reuse Flink provided functions and reduce complexity. IMO, It
> >> is
> >>> more user-friendly because users are familiar with the window API.
> >>>
> >>> Best,
> >>> Vino
> >>>
> >>>
> >>> Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道:
> >>>
> >>>> Hi Vino,
> >>>>
> >>>> Thanks a lot for starting this discussion. +1 to this feature as I
> >> think
> >>>> it will be very useful.
> >>>>
> >>>> Regarding to using window to buffer the input elements, personally I
> >>> don't
> >>>> think it's a good solution for the following reasons:
> >>>> 1) As we know that WindowOperator will store the accumulated results
> in
> >>>> states, this is not necessary for Local Aggregate operator.
> >>>> 2) For WindowOperator, each input element will be accumulated to
> >> states.
> >>>> This is also not necessary for Local Aggregate operator and storing
> the
> >>>> input elements in memory is enough.
> >>>>
> >>>> Thanks,
> >>>> Dian
> >>>>
> >>>>> 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道:
> >>>>>
> >>>>> Hi Ken,
> >>>>>
> >>>>> Thanks for your reply.
> >>>>>
> >>>>> As I said before, we try to reuse Flink's state concept (fault
> >>> tolerance
> >>>>> and guarantee "Exactly-Once" semantics). So we did not consider
> >> cache.
> >>>>>
> >>>>> In addition, if we use Flink's state, the OOM related issue is not a
> >>> key
> >>>>> problem we need to consider.
> >>>>>
> >>>>> Best,
> >>>>> Vino
> >>>>>
> >>>>> Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> Cascading implemented this “map-side reduce” functionality with an
> >> LLR
> >>>>>> cache.
> >>>>>>
> >>>>>> That worked well, as then the skewed keys would always be in the
> >>> cache.
> >>>>>>
> >>>>>> The API let you decide the size of the cache, in terms of number of
> >>>>>> entries.
> >>>>>>
> >>>>>> Having a memory limit would have been better for many of our use
> >>> cases,
> >>>>>> though FWIR there’s no good way to estimate in-memory size for
> >>> objects.
> >>>>>>
> >>>>>> — Ken
> >>>>>>
> >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]>
> >> wrote:
> >>>>>>>
> >>>>>>> Hi Piotr,
> >>>>>>>
> >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just
> >> added
> >>> an
> >>>>>>> inner flag to identify the local mode) which is Flink has provided
> >>>>>> before.
> >>>>>>> Users can call all the APIs(especially *window* APIs) which
> >>> KeyedStream
> >>>>>>> provided.
> >>>>>>>
> >>>>>>> So if users want to use local aggregation, they should call the
> >>> window
> >>>>>> API
> >>>>>>> to build a local window that means users should (or say "can")
> >>> specify
> >>>>>> the
> >>>>>>> window length and other information based on their needs.
> >>>>>>>
> >>>>>>> I think you described another idea different from us. We did not
> >> try
> >>> to
> >>>>>>> react after triggering some predefined threshold. We tend to give
> >>> users
> >>>>>> the
> >>>>>>> discretion to make decisions.
> >>>>>>>
> >>>>>>> Our design idea tends to reuse Flink provided concept and functions
> >>>> like
> >>>>>>> state and window (IMO, we do not need to worry about OOM and the
> >>> issues
> >>>>>> you
> >>>>>>> mentioned).
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Vino
> >>>>>>>
> >>>>>>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> +1 for the idea from my side. I’ve even attempted to add similar
> >>>> feature
> >>>>>>>> quite some time ago, but didn’t get enough traction [1].
> >>>>>>>>
> >>>>>>>> I’ve read through your document and I couldn’t find it mentioning
> >>>>>>>> anywhere, when the pre aggregated result should be emitted down
> >> the
> >>>>>> stream?
> >>>>>>>> I think that’s one of the most crucial decision, since wrong
> >>> decision
> >>>>>> here
> >>>>>>>> can lead to decrease of performance or to an explosion of
> >>> memory/state
> >>>>>>>> consumption (both with bounded and unbounded data streams). For
> >>>>>> streaming
> >>>>>>>> it can also lead to an increased latency.
> >>>>>>>>
> >>>>>>>> Since this is also a decision that’s impossible to make
> >>> automatically
> >>>>>>>> perfectly reliably, first and foremost I would expect this to be
> >>>>>>>> configurable via the API. With maybe some predefined triggers,
> >> like
> >>> on
> >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to
> >>>> decrease
> >>>>>>>> state size?), on element count, maybe memory usage (much easier to
> >>>>>> estimate
> >>>>>>>> with a known/predefined types, like in SQL)… and with some option
> >> to
> >>>>>>>> implement custom trigger.
> >>>>>>>>
> >>>>>>>> Also what would work the best would be to have a some form of
> >> memory
> >>>>>>>> consumption priority. For example if we are running out of memory
> >>> for
> >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or
> >> crashing
> >>>> the
> >>>>>> job
> >>>>>>>> with OOM it would be probably better to prune/dump the pre/local
> >>>>>>>> aggregation state. But that’s another story.
> >>>>>>>>
> >>>>>>>> [1] https://github.com/apache/flink/pull/4626 <
> >>>>>>>> https://github.com/apache/flink/pull/4626>
> >>>>>>>>
> >>>>>>>> Piotrek
> >>>>>>>>
> >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
> >>>>>>>>>
> >>>>>>>>> Excited and  Big +1 for this feature.
> >>>>>>>>>
> >>>>>>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
> >>>>>>>>>
> >>>>>>>>>> Nice feature.
> >>>>>>>>>> Looking forward to having it in Flink.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Xiaogang
> >>>>>>>>>>
> >>>>>>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
> >>>>>>>>>>
> >>>>>>>>>>> Hi all,
> >>>>>>>>>>>
> >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward SF
> >> 2019
> >>>> and
> >>>>>>>>>> QCon
> >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in
> >> our
> >>>>>> inner
> >>>>>>>>>>> Flink fork. This feature can effectively alleviate data skew.
> >>>>>>>>>>>
> >>>>>>>>>>> Currently, keyed streams are widely used to perform aggregating
> >>>>>>>>>> operations
> >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having the
> >>> same
> >>>>>>>> key.
> >>>>>>>>>>> When executed at runtime, the elements with the same key will
> >> be
> >>>> sent
> >>>>>>>> to
> >>>>>>>>>>> and aggregated by the same task.
> >>>>>>>>>>>
> >>>>>>>>>>> The performance of these aggregating operations is very
> >> sensitive
> >>>> to
> >>>>>>>> the
> >>>>>>>>>>> distribution of keys. In the cases where the distribution of
> >> keys
> >>>>>>>>>> follows a
> >>>>>>>>>>> powerful law, the performance will be significantly downgraded.
> >>>> More
> >>>>>>>>>>> unluckily, increasing the degree of parallelism does not help
> >>> when
> >>>> a
> >>>>>>>> task
> >>>>>>>>>>> is overloaded by a single key.
> >>>>>>>>>>>
> >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the
> >>>>>> performance
> >>>>>>>>>>> degraded by data skew. We can decompose the aggregating
> >>> operations
> >>>>>> into
> >>>>>>>>>> two
> >>>>>>>>>>> phases. In the first phase, we aggregate the elements of the
> >> same
> >>>> key
> >>>>>>>> at
> >>>>>>>>>>> the sender side to obtain partial results. Then at the second
> >>>> phase,
> >>>>>>>>>> these
> >>>>>>>>>>> partial results are sent to receivers according to their keys
> >> and
> >>>> are
> >>>>>>>>>>> combined to obtain the final result. Since the number of
> >> partial
> >>>>>>>> results
> >>>>>>>>>>> received by each receiver is limited by the number of senders,
> >>> the
> >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing
> >>> the
> >>>>>>>> amount
> >>>>>>>>>>> of transferred data the performance can be further improved.
> >>>>>>>>>>>
> >>>>>>>>>>> The design documentation is here:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> >>>>>>>>>>>
> >>>>>>>>>>> Any comment and feedback are welcome and appreciated.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Vino
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>> --------------------------
> >>>>>> Ken Krugler
> >>>>>> +1 530-210-6378
> >>>>>> http://www.scaleunlimited.com
> >>>>>> Custom big data solutions & training
> >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Local Aggregation in Flink

vino yang
Hi Aljoscha,

What do you think about this feature and design document?

Best,
Vino

vino yang <[hidden email]> 于2019年6月5日周三 下午4:18写道:

> Hi Dian,
>
> I still think your implementation is similar to the window operator, you
> mentioned the scalable trigger mechanism, the window API also can customize
> trigger.
>
> Moreover, IMO, the design should guarantee a deterministic semantics, I
> think based on memory availability is a non-deterministic design.
>
> In addition, if the implementation depends on the timing of checkpoint, I
> do not think it is reasonable, we should avoid affecting the checkpoint's
> progress.
>
> Best,
> Vino
>
> Dian Fu <[hidden email]> 于2019年6月5日周三 下午1:55写道:
>
>> Hi Vino,
>>
>> Thanks a lot for your reply.
>>
>> > 1) When, Why and How to judge the memory is exhausted?
>>
>> My point here is that the local aggregate operator can buffer the inputs
>> in memory and send out the results AT ANY TIME. i.e. element count or the
>> time interval reached a pre-configured value, the memory usage of buffered
>> elements reached a configured valued (suppose we can estimate the object
>> size efficiently), or even when checkpoint is triggered.
>>
>> >
>> > 2) If the local aggregate operator rarely needs to operate the state,
>> what
>> > do you think about fault tolerance?
>>
>> AbstractStreamOperator provides a method `prepareSnapshotPreBarrier`
>> which can be used here to send out the results to the downstream when
>> checkpoint is triggered. Then fault tolerance can work well.
>>
>> Even if there is no such a method available, we can still store the
>> buffered elements or pre-aggregate results to state when checkpoint is
>> triggered. The state access will be much less compared with window operator
>> as only the elements not sent out when checkpoint occur have to be written
>> to state. Suppose the checkpoint interval is 3 minutes and the trigger
>> interval is 10 seconds, then only about less than "10/180" elements will be
>> written to state.
>>
>>
>> Thanks,
>> Dian
>>
>>
>> > 在 2019年6月5日,上午11:43,Biao Liu <[hidden email]> 写道:
>> >
>> > Hi Vino,
>> >
>> > +1 for this feature. It's useful for data skew. And it could also reduce
>> > shuffled datum.
>> >
>> > I have some concerns about the API part. From my side, this feature
>> should
>> > be more like an improvement. I'm afraid the proposal is an overkill
>> about
>> > the API part. Many other systems support pre-aggregation as an
>> optimization
>> > of global aggregation. The optimization might be used automatically or
>> > manually but with a simple API. The proposal introduces a series of
>> > flexible local aggregation APIs. They could be independent with global
>> > aggregation. It doesn't look like an improvement but introduces a lot of
>> > features. I'm not sure if there is a bigger picture later. As for now
>> the
>> > API part looks a little heavy for me.
>> >
>> >
>> > vino yang <[hidden email]> 于2019年6月5日周三 上午10:38写道:
>> >
>> >> Hi Litree,
>> >>
>> >> From an implementation level, the localKeyBy API returns a general
>> >> KeyedStream, you can call all the APIs which KeyedStream provides, we
>> did
>> >> not restrict its usage, although we can do this (for example returns a
>> new
>> >> stream object named LocalKeyedStream).
>> >>
>> >> However, to achieve the goal of local aggregation, it only makes sense
>> to
>> >> call the window API.
>> >>
>> >> Best,
>> >> Vino
>> >>
>> >> litree <[hidden email]> 于2019年6月4日周二 下午10:41写道:
>> >>
>> >>> Hi Vino,
>> >>>
>> >>>
>> >>> I have read your design,something I want to know is the usage of these
>> >> new
>> >>> APIs.It looks like when I use localByKey,i must then use a window
>> >> operator
>> >>> to return a datastream,and then use keyby and another window operator
>> to
>> >>> get the final result?
>> >>>
>> >>>
>> >>> thanks,
>> >>> Litree
>> >>>
>> >>>
>> >>> On 06/04/2019 17:22, vino yang wrote:
>> >>> Hi Dian,
>> >>>
>> >>> Thanks for your reply.
>> >>>
>> >>> I know what you mean. However, if you think deeply, you will find your
>> >>> implementation need to provide an operator which looks like a window
>> >>> operator. You need to use state and receive aggregation function and
>> >>> specify the trigger time. It looks like a lightweight window operator.
>> >>> Right?
>> >>>
>> >>> We try to reuse Flink provided functions and reduce complexity. IMO,
>> It
>> >> is
>> >>> more user-friendly because users are familiar with the window API.
>> >>>
>> >>> Best,
>> >>> Vino
>> >>>
>> >>>
>> >>> Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道:
>> >>>
>> >>>> Hi Vino,
>> >>>>
>> >>>> Thanks a lot for starting this discussion. +1 to this feature as I
>> >> think
>> >>>> it will be very useful.
>> >>>>
>> >>>> Regarding to using window to buffer the input elements, personally I
>> >>> don't
>> >>>> think it's a good solution for the following reasons:
>> >>>> 1) As we know that WindowOperator will store the accumulated results
>> in
>> >>>> states, this is not necessary for Local Aggregate operator.
>> >>>> 2) For WindowOperator, each input element will be accumulated to
>> >> states.
>> >>>> This is also not necessary for Local Aggregate operator and storing
>> the
>> >>>> input elements in memory is enough.
>> >>>>
>> >>>> Thanks,
>> >>>> Dian
>> >>>>
>> >>>>> 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道:
>> >>>>>
>> >>>>> Hi Ken,
>> >>>>>
>> >>>>> Thanks for your reply.
>> >>>>>
>> >>>>> As I said before, we try to reuse Flink's state concept (fault
>> >>> tolerance
>> >>>>> and guarantee "Exactly-Once" semantics). So we did not consider
>> >> cache.
>> >>>>>
>> >>>>> In addition, if we use Flink's state, the OOM related issue is not a
>> >>> key
>> >>>>> problem we need to consider.
>> >>>>>
>> >>>>> Best,
>> >>>>> Vino
>> >>>>>
>> >>>>> Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道:
>> >>>>>
>> >>>>>> Hi all,
>> >>>>>>
>> >>>>>> Cascading implemented this “map-side reduce” functionality with an
>> >> LLR
>> >>>>>> cache.
>> >>>>>>
>> >>>>>> That worked well, as then the skewed keys would always be in the
>> >>> cache.
>> >>>>>>
>> >>>>>> The API let you decide the size of the cache, in terms of number of
>> >>>>>> entries.
>> >>>>>>
>> >>>>>> Having a memory limit would have been better for many of our use
>> >>> cases,
>> >>>>>> though FWIR there’s no good way to estimate in-memory size for
>> >>> objects.
>> >>>>>>
>> >>>>>> — Ken
>> >>>>>>
>> >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]>
>> >> wrote:
>> >>>>>>>
>> >>>>>>> Hi Piotr,
>> >>>>>>>
>> >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just
>> >> added
>> >>> an
>> >>>>>>> inner flag to identify the local mode) which is Flink has provided
>> >>>>>> before.
>> >>>>>>> Users can call all the APIs(especially *window* APIs) which
>> >>> KeyedStream
>> >>>>>>> provided.
>> >>>>>>>
>> >>>>>>> So if users want to use local aggregation, they should call the
>> >>> window
>> >>>>>> API
>> >>>>>>> to build a local window that means users should (or say "can")
>> >>> specify
>> >>>>>> the
>> >>>>>>> window length and other information based on their needs.
>> >>>>>>>
>> >>>>>>> I think you described another idea different from us. We did not
>> >> try
>> >>> to
>> >>>>>>> react after triggering some predefined threshold. We tend to give
>> >>> users
>> >>>>>> the
>> >>>>>>> discretion to make decisions.
>> >>>>>>>
>> >>>>>>> Our design idea tends to reuse Flink provided concept and
>> functions
>> >>>> like
>> >>>>>>> state and window (IMO, we do not need to worry about OOM and the
>> >>> issues
>> >>>>>> you
>> >>>>>>> mentioned).
>> >>>>>>>
>> >>>>>>> Best,
>> >>>>>>> Vino
>> >>>>>>>
>> >>>>>>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:
>> >>>>>>>
>> >>>>>>>> Hi,
>> >>>>>>>>
>> >>>>>>>> +1 for the idea from my side. I’ve even attempted to add similar
>> >>>> feature
>> >>>>>>>> quite some time ago, but didn’t get enough traction [1].
>> >>>>>>>>
>> >>>>>>>> I’ve read through your document and I couldn’t find it mentioning
>> >>>>>>>> anywhere, when the pre aggregated result should be emitted down
>> >> the
>> >>>>>> stream?
>> >>>>>>>> I think that’s one of the most crucial decision, since wrong
>> >>> decision
>> >>>>>> here
>> >>>>>>>> can lead to decrease of performance or to an explosion of
>> >>> memory/state
>> >>>>>>>> consumption (both with bounded and unbounded data streams). For
>> >>>>>> streaming
>> >>>>>>>> it can also lead to an increased latency.
>> >>>>>>>>
>> >>>>>>>> Since this is also a decision that’s impossible to make
>> >>> automatically
>> >>>>>>>> perfectly reliably, first and foremost I would expect this to be
>> >>>>>>>> configurable via the API. With maybe some predefined triggers,
>> >> like
>> >>> on
>> >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to
>> >>>> decrease
>> >>>>>>>> state size?), on element count, maybe memory usage (much easier
>> to
>> >>>>>> estimate
>> >>>>>>>> with a known/predefined types, like in SQL)… and with some option
>> >> to
>> >>>>>>>> implement custom trigger.
>> >>>>>>>>
>> >>>>>>>> Also what would work the best would be to have a some form of
>> >> memory
>> >>>>>>>> consumption priority. For example if we are running out of memory
>> >>> for
>> >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or
>> >> crashing
>> >>>> the
>> >>>>>> job
>> >>>>>>>> with OOM it would be probably better to prune/dump the pre/local
>> >>>>>>>> aggregation state. But that’s another story.
>> >>>>>>>>
>> >>>>>>>> [1] https://github.com/apache/flink/pull/4626 <
>> >>>>>>>> https://github.com/apache/flink/pull/4626>
>> >>>>>>>>
>> >>>>>>>> Piotrek
>> >>>>>>>>
>> >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Excited and  Big +1 for this feature.
>> >>>>>>>>>
>> >>>>>>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
>> >>>>>>>>>
>> >>>>>>>>>> Nice feature.
>> >>>>>>>>>> Looking forward to having it in Flink.
>> >>>>>>>>>>
>> >>>>>>>>>> Regards,
>> >>>>>>>>>> Xiaogang
>> >>>>>>>>>>
>> >>>>>>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
>> >>>>>>>>>>
>> >>>>>>>>>>> Hi all,
>> >>>>>>>>>>>
>> >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward SF
>> >> 2019
>> >>>> and
>> >>>>>>>>>> QCon
>> >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in
>> >> our
>> >>>>>> inner
>> >>>>>>>>>>> Flink fork. This feature can effectively alleviate data skew.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Currently, keyed streams are widely used to perform
>> aggregating
>> >>>>>>>>>> operations
>> >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having the
>> >>> same
>> >>>>>>>> key.
>> >>>>>>>>>>> When executed at runtime, the elements with the same key will
>> >> be
>> >>>> sent
>> >>>>>>>> to
>> >>>>>>>>>>> and aggregated by the same task.
>> >>>>>>>>>>>
>> >>>>>>>>>>> The performance of these aggregating operations is very
>> >> sensitive
>> >>>> to
>> >>>>>>>> the
>> >>>>>>>>>>> distribution of keys. In the cases where the distribution of
>> >> keys
>> >>>>>>>>>> follows a
>> >>>>>>>>>>> powerful law, the performance will be significantly
>> downgraded.
>> >>>> More
>> >>>>>>>>>>> unluckily, increasing the degree of parallelism does not help
>> >>> when
>> >>>> a
>> >>>>>>>> task
>> >>>>>>>>>>> is overloaded by a single key.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the
>> >>>>>> performance
>> >>>>>>>>>>> degraded by data skew. We can decompose the aggregating
>> >>> operations
>> >>>>>> into
>> >>>>>>>>>> two
>> >>>>>>>>>>> phases. In the first phase, we aggregate the elements of the
>> >> same
>> >>>> key
>> >>>>>>>> at
>> >>>>>>>>>>> the sender side to obtain partial results. Then at the second
>> >>>> phase,
>> >>>>>>>>>> these
>> >>>>>>>>>>> partial results are sent to receivers according to their keys
>> >> and
>> >>>> are
>> >>>>>>>>>>> combined to obtain the final result. Since the number of
>> >> partial
>> >>>>>>>> results
>> >>>>>>>>>>> received by each receiver is limited by the number of senders,
>> >>> the
>> >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing
>> >>> the
>> >>>>>>>> amount
>> >>>>>>>>>>> of transferred data the performance can be further improved.
>> >>>>>>>>>>>
>> >>>>>>>>>>> The design documentation is here:
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>>
>> >>>>
>> >>>
>> >>
>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>> >>>>>>>>>>>
>> >>>>>>>>>>> Any comment and feedback are welcome and appreciated.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Best,
>> >>>>>>>>>>> Vino
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>
>> >>>>>> --------------------------
>> >>>>>> Ken Krugler
>> >>>>>> +1 530-210-6378
>> >>>>>> http://www.scaleunlimited.com
>> >>>>>> Custom big data solutions & training
>> >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>> >>>>>>
>> >>>>>>
>> >>>>
>> >>>>
>> >>>
>> >>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: [DISCUSS] Support Local Aggregation in Flink

blues zheng
Hi,


  +1 from my side. Looking forward to this must-have feature :)


Best,
boshu

At 2019-06-05 16:33:13, "vino yang" <[hidden email]> wrote:

>Hi Aljoscha,
>
>What do you think about this feature and design document?
>
>Best,
>Vino
>
>vino yang <[hidden email]> 于2019年6月5日周三 下午4:18写道:
>
>> Hi Dian,
>>
>> I still think your implementation is similar to the window operator, you
>> mentioned the scalable trigger mechanism, the window API also can customize
>> trigger.
>>
>> Moreover, IMO, the design should guarantee a deterministic semantics, I
>> think based on memory availability is a non-deterministic design.
>>
>> In addition, if the implementation depends on the timing of checkpoint, I
>> do not think it is reasonable, we should avoid affecting the checkpoint's
>> progress.
>>
>> Best,
>> Vino
>>
>> Dian Fu <[hidden email]> 于2019年6月5日周三 下午1:55写道:
>>
>>> Hi Vino,
>>>
>>> Thanks a lot for your reply.
>>>
>>> > 1) When, Why and How to judge the memory is exhausted?
>>>
>>> My point here is that the local aggregate operator can buffer the inputs
>>> in memory and send out the results AT ANY TIME. i.e. element count or the
>>> time interval reached a pre-configured value, the memory usage of buffered
>>> elements reached a configured valued (suppose we can estimate the object
>>> size efficiently), or even when checkpoint is triggered.
>>>
>>> >
>>> > 2) If the local aggregate operator rarely needs to operate the state,
>>> what
>>> > do you think about fault tolerance?
>>>
>>> AbstractStreamOperator provides a method `prepareSnapshotPreBarrier`
>>> which can be used here to send out the results to the downstream when
>>> checkpoint is triggered. Then fault tolerance can work well.
>>>
>>> Even if there is no such a method available, we can still store the
>>> buffered elements or pre-aggregate results to state when checkpoint is
>>> triggered. The state access will be much less compared with window operator
>>> as only the elements not sent out when checkpoint occur have to be written
>>> to state. Suppose the checkpoint interval is 3 minutes and the trigger
>>> interval is 10 seconds, then only about less than "10/180" elements will be
>>> written to state.
>>>
>>>
>>> Thanks,
>>> Dian
>>>
>>>
>>> > 在 2019年6月5日,上午11:43,Biao Liu <[hidden email]> 写道:
>>> >
>>> > Hi Vino,
>>> >
>>> > +1 for this feature. It's useful for data skew. And it could also reduce
>>> > shuffled datum.
>>> >
>>> > I have some concerns about the API part. From my side, this feature
>>> should
>>> > be more like an improvement. I'm afraid the proposal is an overkill
>>> about
>>> > the API part. Many other systems support pre-aggregation as an
>>> optimization
>>> > of global aggregation. The optimization might be used automatically or
>>> > manually but with a simple API. The proposal introduces a series of
>>> > flexible local aggregation APIs. They could be independent with global
>>> > aggregation. It doesn't look like an improvement but introduces a lot of
>>> > features. I'm not sure if there is a bigger picture later. As for now
>>> the
>>> > API part looks a little heavy for me.
>>> >
>>> >
>>> > vino yang <[hidden email]> 于2019年6月5日周三 上午10:38写道:
>>> >
>>> >> Hi Litree,
>>> >>
>>> >> From an implementation level, the localKeyBy API returns a general
>>> >> KeyedStream, you can call all the APIs which KeyedStream provides, we
>>> did
>>> >> not restrict its usage, although we can do this (for example returns a
>>> new
>>> >> stream object named LocalKeyedStream).
>>> >>
>>> >> However, to achieve the goal of local aggregation, it only makes sense
>>> to
>>> >> call the window API.
>>> >>
>>> >> Best,
>>> >> Vino
>>> >>
>>> >> litree <[hidden email]> 于2019年6月4日周二 下午10:41写道:
>>> >>
>>> >>> Hi Vino,
>>> >>>
>>> >>>
>>> >>> I have read your design,something I want to know is the usage of these
>>> >> new
>>> >>> APIs.It looks like when I use localByKey,i must then use a window
>>> >> operator
>>> >>> to return a datastream,and then use keyby and another window operator
>>> to
>>> >>> get the final result?
>>> >>>
>>> >>>
>>> >>> thanks,
>>> >>> Litree
>>> >>>
>>> >>>
>>> >>> On 06/04/2019 17:22, vino yang wrote:
>>> >>> Hi Dian,
>>> >>>
>>> >>> Thanks for your reply.
>>> >>>
>>> >>> I know what you mean. However, if you think deeply, you will find your
>>> >>> implementation need to provide an operator which looks like a window
>>> >>> operator. You need to use state and receive aggregation function and
>>> >>> specify the trigger time. It looks like a lightweight window operator.
>>> >>> Right?
>>> >>>
>>> >>> We try to reuse Flink provided functions and reduce complexity. IMO,
>>> It
>>> >> is
>>> >>> more user-friendly because users are familiar with the window API.
>>> >>>
>>> >>> Best,
>>> >>> Vino
>>> >>>
>>> >>>
>>> >>> Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道:
>>> >>>
>>> >>>> Hi Vino,
>>> >>>>
>>> >>>> Thanks a lot for starting this discussion. +1 to this feature as I
>>> >> think
>>> >>>> it will be very useful.
>>> >>>>
>>> >>>> Regarding to using window to buffer the input elements, personally I
>>> >>> don't
>>> >>>> think it's a good solution for the following reasons:
>>> >>>> 1) As we know that WindowOperator will store the accumulated results
>>> in
>>> >>>> states, this is not necessary for Local Aggregate operator.
>>> >>>> 2) For WindowOperator, each input element will be accumulated to
>>> >> states.
>>> >>>> This is also not necessary for Local Aggregate operator and storing
>>> the
>>> >>>> input elements in memory is enough.
>>> >>>>
>>> >>>> Thanks,
>>> >>>> Dian
>>> >>>>
>>> >>>>> 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道:
>>> >>>>>
>>> >>>>> Hi Ken,
>>> >>>>>
>>> >>>>> Thanks for your reply.
>>> >>>>>
>>> >>>>> As I said before, we try to reuse Flink's state concept (fault
>>> >>> tolerance
>>> >>>>> and guarantee "Exactly-Once" semantics). So we did not consider
>>> >> cache.
>>> >>>>>
>>> >>>>> In addition, if we use Flink's state, the OOM related issue is not a
>>> >>> key
>>> >>>>> problem we need to consider.
>>> >>>>>
>>> >>>>> Best,
>>> >>>>> Vino
>>> >>>>>
>>> >>>>> Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道:
>>> >>>>>
>>> >>>>>> Hi all,
>>> >>>>>>
>>> >>>>>> Cascading implemented this “map-side reduce” functionality with an
>>> >> LLR
>>> >>>>>> cache.
>>> >>>>>>
>>> >>>>>> That worked well, as then the skewed keys would always be in the
>>> >>> cache.
>>> >>>>>>
>>> >>>>>> The API let you decide the size of the cache, in terms of number of
>>> >>>>>> entries.
>>> >>>>>>
>>> >>>>>> Having a memory limit would have been better for many of our use
>>> >>> cases,
>>> >>>>>> though FWIR there’s no good way to estimate in-memory size for
>>> >>> objects.
>>> >>>>>>
>>> >>>>>> — Ken
>>> >>>>>>
>>> >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]>
>>> >> wrote:
>>> >>>>>>>
>>> >>>>>>> Hi Piotr,
>>> >>>>>>>
>>> >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just
>>> >> added
>>> >>> an
>>> >>>>>>> inner flag to identify the local mode) which is Flink has provided
>>> >>>>>> before.
>>> >>>>>>> Users can call all the APIs(especially *window* APIs) which
>>> >>> KeyedStream
>>> >>>>>>> provided.
>>> >>>>>>>
>>> >>>>>>> So if users want to use local aggregation, they should call the
>>> >>> window
>>> >>>>>> API
>>> >>>>>>> to build a local window that means users should (or say "can")
>>> >>> specify
>>> >>>>>> the
>>> >>>>>>> window length and other information based on their needs.
>>> >>>>>>>
>>> >>>>>>> I think you described another idea different from us. We did not
>>> >> try
>>> >>> to
>>> >>>>>>> react after triggering some predefined threshold. We tend to give
>>> >>> users
>>> >>>>>> the
>>> >>>>>>> discretion to make decisions.
>>> >>>>>>>
>>> >>>>>>> Our design idea tends to reuse Flink provided concept and
>>> functions
>>> >>>> like
>>> >>>>>>> state and window (IMO, we do not need to worry about OOM and the
>>> >>> issues
>>> >>>>>> you
>>> >>>>>>> mentioned).
>>> >>>>>>>
>>> >>>>>>> Best,
>>> >>>>>>> Vino
>>> >>>>>>>
>>> >>>>>>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:
>>> >>>>>>>
>>> >>>>>>>> Hi,
>>> >>>>>>>>
>>> >>>>>>>> +1 for the idea from my side. I’ve even attempted to add similar
>>> >>>> feature
>>> >>>>>>>> quite some time ago, but didn’t get enough traction [1].
>>> >>>>>>>>
>>> >>>>>>>> I’ve read through your document and I couldn’t find it mentioning
>>> >>>>>>>> anywhere, when the pre aggregated result should be emitted down
>>> >> the
>>> >>>>>> stream?
>>> >>>>>>>> I think that’s one of the most crucial decision, since wrong
>>> >>> decision
>>> >>>>>> here
>>> >>>>>>>> can lead to decrease of performance or to an explosion of
>>> >>> memory/state
>>> >>>>>>>> consumption (both with bounded and unbounded data streams). For
>>> >>>>>> streaming
>>> >>>>>>>> it can also lead to an increased latency.
>>> >>>>>>>>
>>> >>>>>>>> Since this is also a decision that’s impossible to make
>>> >>> automatically
>>> >>>>>>>> perfectly reliably, first and foremost I would expect this to be
>>> >>>>>>>> configurable via the API. With maybe some predefined triggers,
>>> >> like
>>> >>> on
>>> >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to
>>> >>>> decrease
>>> >>>>>>>> state size?), on element count, maybe memory usage (much easier
>>> to
>>> >>>>>> estimate
>>> >>>>>>>> with a known/predefined types, like in SQL)… and with some option
>>> >> to
>>> >>>>>>>> implement custom trigger.
>>> >>>>>>>>
>>> >>>>>>>> Also what would work the best would be to have a some form of
>>> >> memory
>>> >>>>>>>> consumption priority. For example if we are running out of memory
>>> >>> for
>>> >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or
>>> >> crashing
>>> >>>> the
>>> >>>>>> job
>>> >>>>>>>> with OOM it would be probably better to prune/dump the pre/local
>>> >>>>>>>> aggregation state. But that’s another story.
>>> >>>>>>>>
>>> >>>>>>>> [1] https://github.com/apache/flink/pull/4626 <
>>> >>>>>>>> https://github.com/apache/flink/pull/4626>
>>> >>>>>>>>
>>> >>>>>>>> Piotrek
>>> >>>>>>>>
>>> >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
>>> >>>>>>>>>
>>> >>>>>>>>> Excited and  Big +1 for this feature.
>>> >>>>>>>>>
>>> >>>>>>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
>>> >>>>>>>>>
>>> >>>>>>>>>> Nice feature.
>>> >>>>>>>>>> Looking forward to having it in Flink.
>>> >>>>>>>>>>
>>> >>>>>>>>>> Regards,
>>> >>>>>>>>>> Xiaogang
>>> >>>>>>>>>>
>>> >>>>>>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
>>> >>>>>>>>>>
>>> >>>>>>>>>>> Hi all,
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward SF
>>> >> 2019
>>> >>>> and
>>> >>>>>>>>>> QCon
>>> >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in
>>> >> our
>>> >>>>>> inner
>>> >>>>>>>>>>> Flink fork. This feature can effectively alleviate data skew.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Currently, keyed streams are widely used to perform
>>> aggregating
>>> >>>>>>>>>> operations
>>> >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having the
>>> >>> same
>>> >>>>>>>> key.
>>> >>>>>>>>>>> When executed at runtime, the elements with the same key will
>>> >> be
>>> >>>> sent
>>> >>>>>>>> to
>>> >>>>>>>>>>> and aggregated by the same task.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> The performance of these aggregating operations is very
>>> >> sensitive
>>> >>>> to
>>> >>>>>>>> the
>>> >>>>>>>>>>> distribution of keys. In the cases where the distribution of
>>> >> keys
>>> >>>>>>>>>> follows a
>>> >>>>>>>>>>> powerful law, the performance will be significantly
>>> downgraded.
>>> >>>> More
>>> >>>>>>>>>>> unluckily, increasing the degree of parallelism does not help
>>> >>> when
>>> >>>> a
>>> >>>>>>>> task
>>> >>>>>>>>>>> is overloaded by a single key.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the
>>> >>>>>> performance
>>> >>>>>>>>>>> degraded by data skew. We can decompose the aggregating
>>> >>> operations
>>> >>>>>> into
>>> >>>>>>>>>> two
>>> >>>>>>>>>>> phases. In the first phase, we aggregate the elements of the
>>> >> same
>>> >>>> key
>>> >>>>>>>> at
>>> >>>>>>>>>>> the sender side to obtain partial results. Then at the second
>>> >>>> phase,
>>> >>>>>>>>>> these
>>> >>>>>>>>>>> partial results are sent to receivers according to their keys
>>> >> and
>>> >>>> are
>>> >>>>>>>>>>> combined to obtain the final result. Since the number of
>>> >> partial
>>> >>>>>>>> results
>>> >>>>>>>>>>> received by each receiver is limited by the number of senders,
>>> >>> the
>>> >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing
>>> >>> the
>>> >>>>>>>> amount
>>> >>>>>>>>>>> of transferred data the performance can be further improved.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> The design documentation is here:
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>
>>> >>>>>>
>>> >>>>
>>> >>>
>>> >>
>>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Any comment and feedback are welcome and appreciated.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Best,
>>> >>>>>>>>>>> Vino
>>> >>>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>
>>> >>>>>> --------------------------
>>> >>>>>> Ken Krugler
>>> >>>>>> +1 530-210-6378
>>> >>>>>> http://www.scaleunlimited.com
>>> >>>>>> Custom big data solutions & training
>>> >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>> >>>>>>
>>> >>>>>>
>>> >>>>
>>> >>>>
>>> >>>
>>> >>
>>>
>>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] Support Local Aggregation in Flink

Forward Xu
hi

+1,nice work

best
forwardxu

boshu Zheng <[hidden email]> 于2019年6月6日周四 下午4:30写道:

> Hi,
>
>
>   +1 from my side. Looking forward to this must-have feature :)
>
>
> Best,
> boshu
>
> At 2019-06-05 16:33:13, "vino yang" <[hidden email]> wrote:
> >Hi Aljoscha,
> >
> >What do you think about this feature and design document?
> >
> >Best,
> >Vino
> >
> >vino yang <[hidden email]> 于2019年6月5日周三 下午4:18写道:
> >
> >> Hi Dian,
> >>
> >> I still think your implementation is similar to the window operator, you
> >> mentioned the scalable trigger mechanism, the window API also can
> customize
> >> trigger.
> >>
> >> Moreover, IMO, the design should guarantee a deterministic semantics, I
> >> think based on memory availability is a non-deterministic design.
> >>
> >> In addition, if the implementation depends on the timing of checkpoint,
> I
> >> do not think it is reasonable, we should avoid affecting the
> checkpoint's
> >> progress.
> >>
> >> Best,
> >> Vino
> >>
> >> Dian Fu <[hidden email]> 于2019年6月5日周三 下午1:55写道:
> >>
> >>> Hi Vino,
> >>>
> >>> Thanks a lot for your reply.
> >>>
> >>> > 1) When, Why and How to judge the memory is exhausted?
> >>>
> >>> My point here is that the local aggregate operator can buffer the
> inputs
> >>> in memory and send out the results AT ANY TIME. i.e. element count or
> the
> >>> time interval reached a pre-configured value, the memory usage of
> buffered
> >>> elements reached a configured valued (suppose we can estimate the
> object
> >>> size efficiently), or even when checkpoint is triggered.
> >>>
> >>> >
> >>> > 2) If the local aggregate operator rarely needs to operate the state,
> >>> what
> >>> > do you think about fault tolerance?
> >>>
> >>> AbstractStreamOperator provides a method `prepareSnapshotPreBarrier`
> >>> which can be used here to send out the results to the downstream when
> >>> checkpoint is triggered. Then fault tolerance can work well.
> >>>
> >>> Even if there is no such a method available, we can still store the
> >>> buffered elements or pre-aggregate results to state when checkpoint is
> >>> triggered. The state access will be much less compared with window
> operator
> >>> as only the elements not sent out when checkpoint occur have to be
> written
> >>> to state. Suppose the checkpoint interval is 3 minutes and the trigger
> >>> interval is 10 seconds, then only about less than "10/180" elements
> will be
> >>> written to state.
> >>>
> >>>
> >>> Thanks,
> >>> Dian
> >>>
> >>>
> >>> > 在 2019年6月5日,上午11:43,Biao Liu <[hidden email]> 写道:
> >>> >
> >>> > Hi Vino,
> >>> >
> >>> > +1 for this feature. It's useful for data skew. And it could also
> reduce
> >>> > shuffled datum.
> >>> >
> >>> > I have some concerns about the API part. From my side, this feature
> >>> should
> >>> > be more like an improvement. I'm afraid the proposal is an overkill
> >>> about
> >>> > the API part. Many other systems support pre-aggregation as an
> >>> optimization
> >>> > of global aggregation. The optimization might be used automatically
> or
> >>> > manually but with a simple API. The proposal introduces a series of
> >>> > flexible local aggregation APIs. They could be independent with
> global
> >>> > aggregation. It doesn't look like an improvement but introduces a
> lot of
> >>> > features. I'm not sure if there is a bigger picture later. As for now
> >>> the
> >>> > API part looks a little heavy for me.
> >>> >
> >>> >
> >>> > vino yang <[hidden email]> 于2019年6月5日周三 上午10:38写道:
> >>> >
> >>> >> Hi Litree,
> >>> >>
> >>> >> From an implementation level, the localKeyBy API returns a general
> >>> >> KeyedStream, you can call all the APIs which KeyedStream provides,
> we
> >>> did
> >>> >> not restrict its usage, although we can do this (for example
> returns a
> >>> new
> >>> >> stream object named LocalKeyedStream).
> >>> >>
> >>> >> However, to achieve the goal of local aggregation, it only makes
> sense
> >>> to
> >>> >> call the window API.
> >>> >>
> >>> >> Best,
> >>> >> Vino
> >>> >>
> >>> >> litree <[hidden email]> 于2019年6月4日周二 下午10:41写道:
> >>> >>
> >>> >>> Hi Vino,
> >>> >>>
> >>> >>>
> >>> >>> I have read your design,something I want to know is the usage of
> these
> >>> >> new
> >>> >>> APIs.It looks like when I use localByKey,i must then use a window
> >>> >> operator
> >>> >>> to return a datastream,and then use keyby and another window
> operator
> >>> to
> >>> >>> get the final result?
> >>> >>>
> >>> >>>
> >>> >>> thanks,
> >>> >>> Litree
> >>> >>>
> >>> >>>
> >>> >>> On 06/04/2019 17:22, vino yang wrote:
> >>> >>> Hi Dian,
> >>> >>>
> >>> >>> Thanks for your reply.
> >>> >>>
> >>> >>> I know what you mean. However, if you think deeply, you will find
> your
> >>> >>> implementation need to provide an operator which looks like a
> window
> >>> >>> operator. You need to use state and receive aggregation function
> and
> >>> >>> specify the trigger time. It looks like a lightweight window
> operator.
> >>> >>> Right?
> >>> >>>
> >>> >>> We try to reuse Flink provided functions and reduce complexity.
> IMO,
> >>> It
> >>> >> is
> >>> >>> more user-friendly because users are familiar with the window API.
> >>> >>>
> >>> >>> Best,
> >>> >>> Vino
> >>> >>>
> >>> >>>
> >>> >>> Dian Fu <[hidden email]> 于2019年6月4日周二 下午4:19写道:
> >>> >>>
> >>> >>>> Hi Vino,
> >>> >>>>
> >>> >>>> Thanks a lot for starting this discussion. +1 to this feature as I
> >>> >> think
> >>> >>>> it will be very useful.
> >>> >>>>
> >>> >>>> Regarding to using window to buffer the input elements,
> personally I
> >>> >>> don't
> >>> >>>> think it's a good solution for the following reasons:
> >>> >>>> 1) As we know that WindowOperator will store the accumulated
> results
> >>> in
> >>> >>>> states, this is not necessary for Local Aggregate operator.
> >>> >>>> 2) For WindowOperator, each input element will be accumulated to
> >>> >> states.
> >>> >>>> This is also not necessary for Local Aggregate operator and
> storing
> >>> the
> >>> >>>> input elements in memory is enough.
> >>> >>>>
> >>> >>>> Thanks,
> >>> >>>> Dian
> >>> >>>>
> >>> >>>>> 在 2019年6月4日,上午10:03,vino yang <[hidden email]> 写道:
> >>> >>>>>
> >>> >>>>> Hi Ken,
> >>> >>>>>
> >>> >>>>> Thanks for your reply.
> >>> >>>>>
> >>> >>>>> As I said before, we try to reuse Flink's state concept (fault
> >>> >>> tolerance
> >>> >>>>> and guarantee "Exactly-Once" semantics). So we did not consider
> >>> >> cache.
> >>> >>>>>
> >>> >>>>> In addition, if we use Flink's state, the OOM related issue is
> not a
> >>> >>> key
> >>> >>>>> problem we need to consider.
> >>> >>>>>
> >>> >>>>> Best,
> >>> >>>>> Vino
> >>> >>>>>
> >>> >>>>> Ken Krugler <[hidden email]> 于2019年6月4日周二 上午1:37写道:
> >>> >>>>>
> >>> >>>>>> Hi all,
> >>> >>>>>>
> >>> >>>>>> Cascading implemented this “map-side reduce” functionality with
> an
> >>> >> LLR
> >>> >>>>>> cache.
> >>> >>>>>>
> >>> >>>>>> That worked well, as then the skewed keys would always be in the
> >>> >>> cache.
> >>> >>>>>>
> >>> >>>>>> The API let you decide the size of the cache, in terms of
> number of
> >>> >>>>>> entries.
> >>> >>>>>>
> >>> >>>>>> Having a memory limit would have been better for many of our use
> >>> >>> cases,
> >>> >>>>>> though FWIR there’s no good way to estimate in-memory size for
> >>> >>> objects.
> >>> >>>>>>
> >>> >>>>>> — Ken
> >>> >>>>>>
> >>> >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <[hidden email]>
> >>> >> wrote:
> >>> >>>>>>>
> >>> >>>>>>> Hi Piotr,
> >>> >>>>>>>
> >>> >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just
> >>> >> added
> >>> >>> an
> >>> >>>>>>> inner flag to identify the local mode) which is Flink has
> provided
> >>> >>>>>> before.
> >>> >>>>>>> Users can call all the APIs(especially *window* APIs) which
> >>> >>> KeyedStream
> >>> >>>>>>> provided.
> >>> >>>>>>>
> >>> >>>>>>> So if users want to use local aggregation, they should call the
> >>> >>> window
> >>> >>>>>> API
> >>> >>>>>>> to build a local window that means users should (or say "can")
> >>> >>> specify
> >>> >>>>>> the
> >>> >>>>>>> window length and other information based on their needs.
> >>> >>>>>>>
> >>> >>>>>>> I think you described another idea different from us. We did
> not
> >>> >> try
> >>> >>> to
> >>> >>>>>>> react after triggering some predefined threshold. We tend to
> give
> >>> >>> users
> >>> >>>>>> the
> >>> >>>>>>> discretion to make decisions.
> >>> >>>>>>>
> >>> >>>>>>> Our design idea tends to reuse Flink provided concept and
> >>> functions
> >>> >>>> like
> >>> >>>>>>> state and window (IMO, we do not need to worry about OOM and
> the
> >>> >>> issues
> >>> >>>>>> you
> >>> >>>>>>> mentioned).
> >>> >>>>>>>
> >>> >>>>>>> Best,
> >>> >>>>>>> Vino
> >>> >>>>>>>
> >>> >>>>>>> Piotr Nowojski <[hidden email]> 于2019年6月3日周一 下午4:30写道:
> >>> >>>>>>>
> >>> >>>>>>>> Hi,
> >>> >>>>>>>>
> >>> >>>>>>>> +1 for the idea from my side. I’ve even attempted to add
> similar
> >>> >>>> feature
> >>> >>>>>>>> quite some time ago, but didn’t get enough traction [1].
> >>> >>>>>>>>
> >>> >>>>>>>> I’ve read through your document and I couldn’t find it
> mentioning
> >>> >>>>>>>> anywhere, when the pre aggregated result should be emitted
> down
> >>> >> the
> >>> >>>>>> stream?
> >>> >>>>>>>> I think that’s one of the most crucial decision, since wrong
> >>> >>> decision
> >>> >>>>>> here
> >>> >>>>>>>> can lead to decrease of performance or to an explosion of
> >>> >>> memory/state
> >>> >>>>>>>> consumption (both with bounded and unbounded data streams).
> For
> >>> >>>>>> streaming
> >>> >>>>>>>> it can also lead to an increased latency.
> >>> >>>>>>>>
> >>> >>>>>>>> Since this is also a decision that’s impossible to make
> >>> >>> automatically
> >>> >>>>>>>> perfectly reliably, first and foremost I would expect this to
> be
> >>> >>>>>>>> configurable via the API. With maybe some predefined triggers,
> >>> >> like
> >>> >>> on
> >>> >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to
> >>> >>>> decrease
> >>> >>>>>>>> state size?), on element count, maybe memory usage (much
> easier
> >>> to
> >>> >>>>>> estimate
> >>> >>>>>>>> with a known/predefined types, like in SQL)… and with some
> option
> >>> >> to
> >>> >>>>>>>> implement custom trigger.
> >>> >>>>>>>>
> >>> >>>>>>>> Also what would work the best would be to have a some form of
> >>> >> memory
> >>> >>>>>>>> consumption priority. For example if we are running out of
> memory
> >>> >>> for
> >>> >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or
> >>> >> crashing
> >>> >>>> the
> >>> >>>>>> job
> >>> >>>>>>>> with OOM it would be probably better to prune/dump the
> pre/local
> >>> >>>>>>>> aggregation state. But that’s another story.
> >>> >>>>>>>>
> >>> >>>>>>>> [1] https://github.com/apache/flink/pull/4626 <
> >>> >>>>>>>> https://github.com/apache/flink/pull/4626>
> >>> >>>>>>>>
> >>> >>>>>>>> Piotrek
> >>> >>>>>>>>
> >>> >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <[hidden email]> wrote:
> >>> >>>>>>>>>
> >>> >>>>>>>>> Excited and  Big +1 for this feature.
> >>> >>>>>>>>>
> >>> >>>>>>>>> SHI Xiaogang <[hidden email]> 于2019年6月3日周一 下午3:37写道:
> >>> >>>>>>>>>
> >>> >>>>>>>>>> Nice feature.
> >>> >>>>>>>>>> Looking forward to having it in Flink.
> >>> >>>>>>>>>>
> >>> >>>>>>>>>> Regards,
> >>> >>>>>>>>>> Xiaogang
> >>> >>>>>>>>>>
> >>> >>>>>>>>>> vino yang <[hidden email]> 于2019年6月3日周一 下午3:31写道:
> >>> >>>>>>>>>>
> >>> >>>>>>>>>>> Hi all,
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward
> SF
> >>> >> 2019
> >>> >>>> and
> >>> >>>>>>>>>> QCon
> >>> >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation"
> in
> >>> >> our
> >>> >>>>>> inner
> >>> >>>>>>>>>>> Flink fork. This feature can effectively alleviate data
> skew.
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>> Currently, keyed streams are widely used to perform
> >>> aggregating
> >>> >>>>>>>>>> operations
> >>> >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having
> the
> >>> >>> same
> >>> >>>>>>>> key.
> >>> >>>>>>>>>>> When executed at runtime, the elements with the same key
> will
> >>> >> be
> >>> >>>> sent
> >>> >>>>>>>> to
> >>> >>>>>>>>>>> and aggregated by the same task.
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>> The performance of these aggregating operations is very
> >>> >> sensitive
> >>> >>>> to
> >>> >>>>>>>> the
> >>> >>>>>>>>>>> distribution of keys. In the cases where the distribution
> of
> >>> >> keys
> >>> >>>>>>>>>> follows a
> >>> >>>>>>>>>>> powerful law, the performance will be significantly
> >>> downgraded.
> >>> >>>> More
> >>> >>>>>>>>>>> unluckily, increasing the degree of parallelism does not
> help
> >>> >>> when
> >>> >>>> a
> >>> >>>>>>>> task
> >>> >>>>>>>>>>> is overloaded by a single key.
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the
> >>> >>>>>> performance
> >>> >>>>>>>>>>> degraded by data skew. We can decompose the aggregating
> >>> >>> operations
> >>> >>>>>> into
> >>> >>>>>>>>>> two
> >>> >>>>>>>>>>> phases. In the first phase, we aggregate the elements of
> the
> >>> >> same
> >>> >>>> key
> >>> >>>>>>>> at
> >>> >>>>>>>>>>> the sender side to obtain partial results. Then at the
> second
> >>> >>>> phase,
> >>> >>>>>>>>>> these
> >>> >>>>>>>>>>> partial results are sent to receivers according to their
> keys
> >>> >> and
> >>> >>>> are
> >>> >>>>>>>>>>> combined to obtain the final result. Since the number of
> >>> >> partial
> >>> >>>>>>>> results
> >>> >>>>>>>>>>> received by each receiver is limited by the number of
> senders,
> >>> >>> the
> >>> >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by
> reducing
> >>> >>> the
> >>> >>>>>>>> amount
> >>> >>>>>>>>>>> of transferred data the performance can be further
> improved.
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>> The design documentation is here:
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>
> >>> >>>>>>>>
> >>> >>>>>>
> >>> >>>>
> >>> >>>
> >>> >>
> >>>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>> Any comment and feedback are welcome and appreciated.
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>> Best,
> >>> >>>>>>>>>>> Vino
> >>> >>>>>>>>>>>
> >>> >>>>>>>>>>
> >>> >>>>>>>>
> >>> >>>>>>>>
> >>> >>>>>>
> >>> >>>>>> --------------------------
> >>> >>>>>> Ken Krugler
> >>> >>>>>> +1 530-210-6378
> >>> >>>>>> http://www.scaleunlimited.com
> >>> >>>>>> Custom big data solutions & training
> >>> >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
> >>> >>>>>>
> >>> >>>>>>
> >>> >>>>
> >>> >>>>
> >>> >>>
> >>> >>
> >>>
> >>>
>
12