[PROPOSAL] Introduce Elastic Bloom Filter For Flink

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

[PROPOSAL] Introduce Elastic Bloom Filter For Flink

sihua zhou
Hi Devs!
I proposal to introduce "Elastic Bloom Filter" for Flink, the reason I make up this proposal is that, it helped us a lot on production, it let's improve the performance with reducing consumption of resources. Here is a brief description fo the motivation of why it's so powful, more detail information can be found https://issues.apache.org/jira/browse/FLINK-8601 , and the design doc can be found https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing


------------------------------------
Motivation


There are some scenarios drive us to introduce this ElasticBloomFilter, one is Stream Join, another is Data Deduplication, and some special user cases...This has given us a great experience, for example,  we implemented the Runtime Filter Join base on it, and it gives us a great performance improvement. With this feature, It diffs us from the "normal stream join", allows us to improve performance while reducing resource consumption by about half!!!
I will list the two most typical user cases that optimized by the ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data Dedeplication" in brief.


Scenario 1: Runtime Filter Join


In general, stream join is one of the most performance cost task. For every record from both side, we need to query the state from the other side, this will lead to poor performance when the state size if huge. So, in production, we always need to spend a lot slots to handle stream join. But, indeed, we can improve this in somehow, there a phenomenon of stream join can be found in production. That's the “joined ratio” of the stream join is often very low, for example.
stream join in promotion analysis: Job need to join the promotion log with the action(click, view, buy) log with the promotion_id to analysis the effect of the promotion.
stream join in AD(advertising) attribution: Job need to join the AD click log with the item payment log on the click_id to find which click of which AD that brings the payment to do attribution.
stream join in click log analysis of doc: Job need to join viewed log(doc viewed by users) with the click log (doc clicked by users) to analysis the reason of the click and the property of the users.
….so on
All these cases have one common property, that is the joined ratio is very low. Here is a example to describe it, we have 10000 records from the left stream, and 10000 records from the right stream, and we execute  select * from leftStream l join rightStream r on l.id = r.id , we only got 100 record from the result, that is the case for low joined ratio, this is an example for inner join, but it can also applied to left & right join.


there are more example I can come up with low joined ratio…but the point I want to raise up is that the low joined ratio of stream join in production is a very common phenomenon(maybe even the almost common phenomenon in some companies, at least in our company that is the case).


How to improve this?


We can see from the above case, 10000 record join 10000 record and we only got 100 result, that means, we query the state 20000 times (10000 for the left stream and 10000 for the right stream) but only 100 of them are meaningful!!! If we could reduce the useless query times, then we can definitely improve the performance of stream join.
the way we used to improve this is to introduce the Runtime Filter Join, the mainly ideal is that, we build a filter for the state on each side (left stream & right stream). When we need to query the state on that side we first check the corresponding filter whether the key is possible in the state, if the filter say "not, it impossible in the State", then we stop querying the state, if it say "hmm, it maybe in state", then we need to query the state. As you can see, the best choose of the filter is Bloom Filter, it has all the feature that we want: extremely good performance, non-existence of false negative.


Scenario 2:  Data Deduplication


We have implemented two general functions based on the ElasticBloomFilter. They are count(distinct x) and select distinct x, y, z from table. Unlike the Runtime Filter Join the result of this two functions is approximate, not exactly. There are used in the scenario where we don't need a 100% accurate result, for example, to count the number of visiting users in each online store. In general, we don't need a 100% accurate result in this case(indeed we can't give a 100% accurate result, because there could be error when collecting user_id from different devices), if we could get a 98% accurate result with only 1/2 resource, that could be very nice.


I believe there would be more user cases in stream world that could be optimized by the Bloom Filter(as what it had done in the big data world)...


I will appreciate it very much, if someone could have a look of the JIRA or the google doc and give some comments!


Thanks, Sihua

Reply | Threaded
Open this post in threaded view
|

Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

Fabian Hueske-2
Thanks for the proposal Sihua!

Let me try to summarize the motivation / scope of this proposal.

You are proposing to add support for a special Bloom Filter state per
KeyGroup and reduce the number of key accesses by checking the Bloom Filter
first.

This is would be a rather generic feature that could be interesting for
various applications, including joins and deduplication as you described.

IMO, such a feature would be very interesting. However, my concerns with
Bloom Filter is that they are insert-only data structures, i.e., it is not
possible to remove keys once they were added. This might render the filter
useless over time.
In a different thread (see discussion in FLINK-8918 [1]), you mentioned
that the Bloom Filters would be growing.
If we keep them in memory, how can we prevent them from exceeding memory
boundaries over time?

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-8918

2018-05-23 9:56 GMT+02:00 sihua zhou <[hidden email]>:

> Hi Devs!
> I proposal to introduce "Elastic Bloom Filter" for Flink, the reason I
> make up this proposal is that, it helped us a lot on production, it let's
> improve the performance with reducing consumption of resources. Here is a
> brief description fo the motivation of why it's so powful, more detail
> information can be found https://issues.apache.org/jira/browse/FLINK-8601 ,
> and the design doc can be found https://docs.google.com/
> document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing
>
> ------------------------------------
> *Motivation*
>
> There are some scenarios drive us to introduce this ElasticBloomFilter,
> one is Stream Join, another is Data Deduplication, and some special user
> cases...This has given us a great experience, for example,  we implemented
> the Runtime Filter Join base on it, and it gives us a great performance
> improvement. With this feature, It diffs us from the "normal stream join",
> allows us to improve performance while reducing resource consumption by
> about half!!!
> I will list the two most typical user cases that optimized by the
> ElasticBloomFilter: one is "*Runtime Filter Join*" in detail, another is "*Data
> Dedeplication*" in brief.
>
> *Scenario 1: Runtime Filter Join*
>
> In general, stream join is one of the most performance cost task. For
> every record from both side, we need to query the state from the other
> side, this will lead to poor performance when the state size if huge. So,
> in production, we always need to spend a lot slots to handle stream join.
> But, indeed, we can improve this in somehow, there a phenomenon of stream
> join can be found in production. That's the “joined ratio” of the stream
> join is often very low, for example.
>
>    - stream join in promotion analysis: Job need to join the promotion
>    log with the action(click, view, buy) log with the promotion_id to analysis
>    the effect of the promotion.
>    - stream join in AD(advertising) attribution: Job need to join the AD
>    click log with the item payment log on the click_id to find which click of
>    which AD that brings the payment to do attribution.
>    - stream join in click log analysis of doc: Job need to join viewed
>    log(doc viewed by users) with the click log (doc clicked by users) to
>    analysis the reason of the click and the property of the users.
>    - ….so on
>
> All these cases have one common property, that is the joined ratio is very
> low. Here is a example to describe it, we have 10000 records from the left
> stream, and 10000 records from the right stream, and we execute  *select
> * from leftStream l join rightStream r on l.id <http://l.id> = r.id
> <http://r.id>* , we only got 100 record from the result, that is the case
> for low joined ratio, this is an example for inner join, but it can also
> applied to left & right join.
>
> there are more example I can come up with low joined ratio…but the point I
> want to raise up is that the low joined ratio of stream join in production
> is a very common phenomenon(maybe even the almost common phenomenon in some
> companies, at least in our company that is the case).
>
> *How to improve this?*
>
> We can see from the above case, 10000 record join 10000 record and we only
> got 100 result, that means, we query the state 20000 times (10000 for the
> left stream and 10000 for the right stream) but only 100 of them are
> meaningful!!! If we could reduce the useless query times, then we can
> definitely improve the performance of stream join.
> the way we used to improve this is to introduce the Runtime Filter Join,
> the mainly ideal is that, we build a filter for the state on each side
> (left stream & right stream). When we need to query the state on that side
> we first check the corresponding filter whether the key is possible in the
> state, if the filter say "not, it impossible in the State", then we stop
> querying the state, if it say "hmm, it maybe in state", then we need to
> query the state. As you can see, the best choose of the filter is Bloom
> Filter, it has all the feature that we want: extremely good performance,
> non-existence of false negative.
>
> *Scenario 2:  Data Deduplication*
>
> We have implemented two general functions based on the ElasticBloomFilter.
> They are count(distinct x) and select distinct x, y, z from table. Unlike
> the Runtime Filter Join the result of this two functions is approximate,
> not exactly. There are used in the scenario where we don't need a 100%
> accurate result, for example, to count the number of visiting users in each
> online store. In general, we don't need a 100% accurate result in this
> case(indeed we can't give a 100% accurate result, because there could be
> error when collecting user_id from different devices), if we could get a
> 98% accurate result with only 1/2 resource, that could be very nice.
>
> I believe there would be more user cases in stream world that could be
> optimized by the Bloom Filter(as what it had done in the big data world)...
>
> I will appreciate it very much, if someone could have a look of the JIRA
> or the google doc and give some comments!
>
> Thanks, Sihua
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

Stefan Richter
Hi,

In general, I like the proposal as well. We should try to integrate all forms of keyed state with the backend, to avoid the problems that we are currently facing with the timer service. We should discuss which exact implementation of bloom filters are the best fit.

@Fabian: There are also implementations of bloom filters that use counting and therefore support deletes, but obviously this comes at the cost of a potentially higher space consumption.

Best,
Stefan

> Am 23.05.2018 um 11:29 schrieb Fabian Hueske <[hidden email]>:
>
> Thanks for the proposal Sihua!
>
> Let me try to summarize the motivation / scope of this proposal.
>
> You are proposing to add support for a special Bloom Filter state per KeyGroup and reduce the number of key accesses by checking the Bloom Filter first.
>
> This is would be a rather generic feature that could be interesting for various applications, including joins and deduplication as you described.
>
> IMO, such a feature would be very interesting. However, my concerns with Bloom Filter is that they are insert-only data structures, i.e., it is not possible to remove keys once they were added. This might render the filter useless over time.
> In a different thread (see discussion in FLINK-8918 [1]), you mentioned that the Bloom Filters would be growing.
> If we keep them in memory, how can we prevent them from exceeding memory boundaries over time?
>
> Best,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-8918 <https://issues.apache.org/jira/browse/FLINK-8918>
>
> 2018-05-23 9:56 GMT+02:00 sihua zhou <[hidden email] <mailto:[hidden email]>>:
> Hi Devs!
> I proposal to introduce "Elastic Bloom Filter" for Flink, the reason I make up this proposal is that, it helped us a lot on production, it let's improve the performance with reducing consumption of resources. Here is a brief description fo the motivation of why it's so powful, more detail information can be found https://issues.apache.org/jira/browse/FLINK-8601 <https://issues.apache.org/jira/browse/FLINK-8601> , and the design doc can be found https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing <https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing>
>
> ------------------------------------
> Motivation
>
> There are some scenarios drive us to introduce this ElasticBloomFilter, one is Stream Join, another is Data Deduplication, and some special user cases...This has given us a great experience, for example,  we implemented the Runtime Filter Join base on it, and it gives us a great performance improvement. With this feature, It diffs us from the "normal stream join", allows us to improve performance while reducing resource consumption by about half!!!
> I will list the two most typical user cases that optimized by the ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data Dedeplication" in brief.
>
> Scenario 1: Runtime Filter Join
>
> In general, stream join is one of the most performance cost task. For every record from both side, we need to query the state from the other side, this will lead to poor performance when the state size if huge. So, in production, we always need to spend a lot slots to handle stream join. But, indeed, we can improve this in somehow, there a phenomenon of stream join can be found in production. That's the “joined ratio” of the stream join is often very low, for example.
> stream join in promotion analysis: Job need to join the promotion log with the action(click, view, buy) log with the promotion_id to analysis the effect of the promotion.
> stream join in AD(advertising) attribution: Job need to join the AD click log with the item payment log on the click_id to find which click of which AD that brings the payment to do attribution.
> stream join in click log analysis of doc: Job need to join viewed log(doc viewed by users) with the click log (doc clicked by users) to analysis the reason of the click and the property of the users.
> ….so on
> All these cases have one common property, that is the joined ratio is very low. Here is a example to describe it, we have 10000 records from the left stream, and 10000 records from the right stream, and we execute  select * from leftStream l join rightStream r on l.id <http://l.id/> = r.id <http://r.id/> , we only got 100 record from the result, that is the case for low joined ratio, this is an example for inner join, but it can also applied to left & right join.
>
> there are more example I can come up with low joined ratio…but the point I want to raise up is that the low joined ratio of stream join in production is a very common phenomenon(maybe even the almost common phenomenon in some companies, at least in our company that is the case).
>
> How to improve this?
>
> We can see from the above case, 10000 record join 10000 record and we only got 100 result, that means, we query the state 20000 times (10000 for the left stream and 10000 for the right stream) but only 100 of them are meaningful!!! If we could reduce the useless query times, then we can definitely improve the performance of stream join.
> the way we used to improve this is to introduce the Runtime Filter Join, the mainly ideal is that, we build a filter for the state on each side (left stream & right stream). When we need to query the state on that side we first check the corresponding filter whether the key is possible in the state, if the filter say "not, it impossible in the State", then we stop querying the state, if it say "hmm, it maybe in state", then we need to query the state. As you can see, the best choose of the filter is Bloom Filter, it has all the feature that we want: extremely good performance, non-existence of false negative.
>
> Scenario 2:  Data Deduplication
>
> We have implemented two general functions based on the ElasticBloomFilter. They are count(distinct x) and select distinct x, y, z from table. Unlike the Runtime Filter Join the result of this two functions is approximate, not exactly. There are used in the scenario where we don't need a 100% accurate result, for example, to count the number of visiting users in each online store. In general, we don't need a 100% accurate result in this case(indeed we can't give a 100% accurate result, because there could be error when collecting user_id from different devices), if we could get a 98% accurate result with only 1/2 resource, that could be very nice.
>
> I believe there would be more user cases in stream world that could be optimized by the Bloom Filter(as what it had done in the big data world)...
>
> I will appreciate it very much, if someone could have a look of the JIRA or the google doc and give some comments!
>
> Thanks, Sihua
>
>

Reply | Threaded
Open this post in threaded view
|

Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

sihua zhou


Thanks for your reply @Fabian and @Stefan


@Fabian: The bloom filter state I proposal would be "elastic" and "lazy allocation", what we have on each key group is a list of bloom filter node(which is shrinkable), every bloom filter node has its capacity, we allocate a new one only when the last one is filled, and also there is a relaxed TTL to recycle the memory resources to enable it to run properly, for example, we consider the situation on key group 0:
------------------------------------------------------------------------------------
key group0: bfNode0 -> bfNode1 -> bfNode2
------------------------------------------------------------------------------------
if the bfNode2 is filled, we allocate a new BF to store the data fails into key group 0, the situation on key group 0 becomes:
------------------------------------------------------------------------------------
key group0: bfNode0 -> bfNode1 -> bfNode2 -> bfNode3
------------------------------------------------------------------------------------
and once a bfNode filled, it will never be changed again, so for example, if bfNode0 is filled at time point: t1, and the TTL = 2 hour, then we could release bfNode0 at the time point "t1 + 2hour". After that the situation on key group 0 becomes:
------------------------------------------------------------------------------------
key group0: bfNode1 -> bfNode2 -> bfNode3
------------------------------------------------------------------------------------


What do you think of this?


@Stefan: Yes, I definitely agree with your point. And we should discuss the implementation deeply before jump into implementation.


Best,
Sihua






On 05/23/2018 17:33,Stefan Richter<[hidden email]> wrote:
Hi,

In general, I like the proposal as well. We should try to integrate all forms of keyed state with the backend, to avoid the problems that we are currently facing with the timer service. We should discuss which exact implementation of bloom filters are the best fit.

@Fabian: There are also implementations of bloom filters that use counting and therefore support deletes, but obviously this comes at the cost of a potentially higher space consumption.

Best,
Stefan

Am 23.05.2018 um 11:29 schrieb Fabian Hueske <[hidden email]>:

Thanks for the proposal Sihua!

Let me try to summarize the motivation / scope of this proposal.

You are proposing to add support for a special Bloom Filter state per KeyGroup and reduce the number of key accesses by checking the Bloom Filter first.

This is would be a rather generic feature that could be interesting for various applications, including joins and deduplication as you described.

IMO, such a feature would be very interesting. However, my concerns with Bloom Filter is that they are insert-only data structures, i.e., it is not possible to remove keys once they were added. This might render the filter useless over time.
In a different thread (see discussion in FLINK-8918 [1]), you mentioned that the Bloom Filters would be growing.
If we keep them in memory, how can we prevent them from exceeding memory boundaries over time?

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-8918 <https://issues.apache.org/jira/browse/FLINK-8918>

2018-05-23 9:56 GMT+02:00 sihua zhou <[hidden email] <mailto:[hidden email]>>:
Hi Devs!
I proposal to introduce "Elastic Bloom Filter" for Flink, the reason I make up this proposal is that, it helped us a lot on production, it let's improve the performance with reducing consumption of resources. Here is a brief description fo the motivation of why it's so powful, more detail information can be found https://issues.apache.org/jira/browse/FLINK-8601 <https://issues.apache.org/jira/browse/FLINK-8601> , and the design doc can be found https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing <https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing>

------------------------------------
Motivation

There are some scenarios drive us to introduce this ElasticBloomFilter, one is Stream Join, another is Data Deduplication, and some special user cases...This has given us a great experience, for example,  we implemented the Runtime Filter Join base on it, and it gives us a great performance improvement. With this feature, It diffs us from the "normal stream join", allows us to improve performance while reducing resource consumption by about half!!!
I will list the two most typical user cases that optimized by the ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data Dedeplication" in brief.

Scenario 1: Runtime Filter Join

In general, stream join is one of the most performance cost task. For every record from both side, we need to query the state from the other side, this will lead to poor performance when the state size if huge. So, in production, we always need to spend a lot slots to handle stream join. But, indeed, we can improve this in somehow, there a phenomenon of stream join can be found in production. That's the “joined ratio” of the stream join is often very low, for example.
stream join in promotion analysis: Job need to join the promotion log with the action(click, view, buy) log with the promotion_id to analysis the effect of the promotion.
stream join in AD(advertising) attribution: Job need to join the AD click log with the item payment log on the click_id to find which click of which AD that brings the payment to do attribution.
stream join in click log analysis of doc: Job need to join viewed log(doc viewed by users) with the click log (doc clicked by users) to analysis the reason of the click and the property of the users.
….so on
All these cases have one common property, that is the joined ratio is very low. Here is a example to describe it, we have 10000 records from the left stream, and 10000 records from the right stream, and we execute  select * from leftStream l join rightStream r on l.id <http://l.id/> = r.id <http://r.id/> , we only got 100 record from the result, that is the case for low joined ratio, this is an example for inner join, but it can also applied to left & right join.

there are more example I can come up with low joined ratio…but the point I want to raise up is that the low joined ratio of stream join in production is a very common phenomenon(maybe even the almost common phenomenon in some companies, at least in our company that is the case).

How to improve this?

We can see from the above case, 10000 record join 10000 record and we only got 100 result, that means, we query the state 20000 times (10000 for the left stream and 10000 for the right stream) but only 100 of them are meaningful!!! If we could reduce the useless query times, then we can definitely improve the performance of stream join.
the way we used to improve this is to introduce the Runtime Filter Join, the mainly ideal is that, we build a filter for the state on each side (left stream & right stream). When we need to query the state on that side we first check the corresponding filter whether the key is possible in the state, if the filter say "not, it impossible in the State", then we stop querying the state, if it say "hmm, it maybe in state", then we need to query the state. As you can see, the best choose of the filter is Bloom Filter, it has all the feature that we want: extremely good performance, non-existence of false negative.

Scenario 2:  Data Deduplication

We have implemented two general functions based on the ElasticBloomFilter. They are count(distinct x) and select distinct x, y, z from table. Unlike the Runtime Filter Join the result of this two functions is approximate, not exactly. There are used in the scenario where we don't need a 100% accurate result, for example, to count the number of visiting users in each online store. In general, we don't need a 100% accurate result in this case(indeed we can't give a 100% accurate result, because there could be error when collecting user_id from different devices), if we could get a 98% accurate result with only 1/2 resource, that could be very nice.

I believe there would be more user cases in stream world that could be optimized by the Bloom Filter(as what it had done in the big data world)...

I will appreciate it very much, if someone could have a look of the JIRA or the google doc and give some comments!

Thanks, Sihua



Reply | Threaded
Open this post in threaded view
|

Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

Elias Levy
In reply to this post by sihua zhou
I would suggest you consider an alternative data structures: a Cuckoo
Filter or a Golumb Compressed Sequence.

The GCS data structure was introduced in Cache-, Hash- and Space-Efficient
Bloom Filters
<http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf> by
F. Putze, P. Sanders, and J. Singler.  See section 4.



> We should discuss which exact implementation of bloom filters are the best
> fit.
> @Fabian: There are also implementations of bloom filters that use counting
> and therefore support
> deletes, but obviously this comes at the cost of a potentially higher
> space consumption.
>
> Am 23.05.2018 um 11:29 schrieb Fabian Hueske <[hidden email]>:
>> IMO, such a feature would be very interesting. However, my concerns with
>> Bloom Filter
>> is that they are insert-only data structures, i.e., it is not possible to
>> remove keys once
>> they were added. This might render the filter useless over time.
>> In a different thread (see discussion in FLINK-8918 [1]), you mentioned
>> that the Bloom
>> Filters would be growing.
>> If we keep them in memory, how can we prevent them from exceeding memory
>> boundaries over
>> time?
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

sihua zhou
Hi,
Thanks for your suggestions @Elias! I have a brief look at "Cuckoo Filter" and "Golumb Compressed Sequence", my first sensation is that maybe "Golumc Compressed Sequence" is not a good choose, because it seems to require non-constant lookup time, but Cuckoo Filter maybe a good choose, I should definitely have a deeper look at it.


Beside, to me, all of this filters seems to a "variant" of the bloom filter(which is the smallest unit to store data in the current desgin), the main challenge for introducing BF into flink is the data skewed(which is common phenomenon on production) problem, could you maybe also have a look at the solution that I posted on the google doc https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing for this problem, It would be nice if you could give us some advice on that.


Best, Sihua


On 05/24/2018 07:21,Elias Levy<[hidden email]> wrote:
I would suggest you consider an alternative data structures: a Cuckoo
Filter or a Golumb Compressed Sequence.

The GCS data structure was introduced in Cache-, Hash- and Space-Efficient
Bloom Filters
<http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf> by
F. Putze, P. Sanders, and J. Singler.  See section 4.



We should discuss which exact implementation of bloom filters are the best
fit.
@Fabian: There are also implementations of bloom filters that use counting
and therefore support
deletes, but obviously this comes at the cost of a potentially higher
space consumption.

Am 23.05.2018 um 11:29 schrieb Fabian Hueske <[hidden email]>:
IMO, such a feature would be very interesting. However, my concerns with
Bloom Filter
is that they are insert-only data structures, i.e., it is not possible to
remove keys once
they were added. This might render the filter useless over time.
In a different thread (see discussion in FLINK-8918 [1]), you mentioned
that the Bloom
Filters would be growing.
If we keep them in memory, how can we prevent them from exceeding memory
boundaries over
time?


Reply | Threaded
Open this post in threaded view
|

Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

sihua zhou
Hi,


I did a survey of the variants of Bloom Filter and the Cuckoo filter these days. Finally, I found 3 of them maybe adaptable for our purpose.


1. standard bloom filter (which we have implemented base on this and used it on production with a good experience)
2. cuckoo filter, also a very good filter which is a space-efficient data structures and support fast query(even faster then BF, but the insert maybe a little slower than BF), addtional it support delete() operation.
3. count bloom filter, a variant of BF, it supports delete()operation, but need to cost 4-5x memory than the standard bloom filter(so, I'm not sure whether it's adaptable in practice).


Anyway, these filters are just the smallest storage unit in this "Elastic Bloom Filter", we can define a general interface, and provide different implementation of "storage unit"  base on different filter if we want. Maybe I should change the PROPOSAL name to the "Introduce Elastic Filter For Flink", the ideal of approach that I outlined in the doc is very similar to the paper "Optimization and Applications of Dynamic Bloom Filters(http://ijarcs.info/index.php/Ijarcs/article/viewFile/826/814)"(compare to the paper, the approach I outlined could have a better query performance and also support the RELAXED TTL), maybe it can help to understand the desgin doc. Looking forward any feedback!


Best, Sihua
On 05/24/2018 10:36,sihua zhou<[hidden email]> wrote:
Hi,
Thanks for your suggestions @Elias! I have a brief look at "Cuckoo Filter" and "Golumb Compressed Sequence", my first sensation is that maybe "Golumc Compressed Sequence" is not a good choose, because it seems to require non-constant lookup time, but Cuckoo Filter maybe a good choose, I should definitely have a deeper look at it.


Beside, to me, all of this filters seems to a "variant" of the bloom filter(which is the smallest unit to store data in the current desgin), the main challenge for introducing BF into flink is the data skewed(which is common phenomenon on production) problem, could you maybe also have a look at the solution that I posted on the google doc https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing for this problem, It would be nice if you could give us some advice on that.


Best, Sihua


On 05/24/2018 07:21,Elias Levy<[hidden email]> wrote:
I would suggest you consider an alternative data structures: a Cuckoo
Filter or a Golumb Compressed Sequence.

The GCS data structure was introduced in Cache-, Hash- and Space-Efficient
Bloom Filters
<http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf> by
F. Putze, P. Sanders, and J. Singler.  See section 4.



We should discuss which exact implementation of bloom filters are the best
fit.
@Fabian: There are also implementations of bloom filters that use counting
and therefore support
deletes, but obviously this comes at the cost of a potentially higher
space consumption.

Am 23.05.2018 um 11:29 schrieb Fabian Hueske <[hidden email]>:
IMO, such a feature would be very interesting. However, my concerns with
Bloom Filter
is that they are insert-only data structures, i.e., it is not possible to
remove keys once
they were added. This might render the filter useless over time.
In a different thread (see discussion in FLINK-8918 [1]), you mentioned
that the Bloom
Filters would be growing.
If we keep them in memory, how can we prevent them from exceeding memory
boundaries over
time?


Reply | Threaded
Open this post in threaded view
|

Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

sihua zhou
Hi,


Sorry, but pinging for more feedbacks on this proposal...
Even the negative feedbacks is highly appreciated!


Best, Sihua






On 05/30/2018 13:19,sihua zhou<[hidden email]> wrote:
Hi,


I did a survey of the variants of Bloom Filter and the Cuckoo filter these days. Finally, I found 3 of them maybe adaptable for our purpose.


1. standard bloom filter (which we have implemented base on this and used it on production with a good experience)
2. cuckoo filter, also a very good filter which is a space-efficient data structures and support fast query(even faster then BF, but the insert maybe a little slower than BF), addtional it support delete() operation.
3. count bloom filter, a variant of BF, it supports delete()operation, but need to cost 4-5x memory than the standard bloom filter(so, I'm not sure whether it's adaptable in practice).


Anyway, these filters are just the smallest storage unit in this "Elastic Bloom Filter", we can define a general interface, and provide different implementation of "storage unit"  base on different filter if we want. Maybe I should change the PROPOSAL name to the "Introduce Elastic Filter For Flink", the ideal of approach that I outlined in the doc is very similar to the paper "Optimization and Applications of Dynamic Bloom Filters(http://ijarcs.info/index.php/Ijarcs/article/viewFile/826/814)"(compare to the paper, the approach I outlined could have a better query performance and also support the RELAXED TTL), maybe it can help to understand the desgin doc. Looking forward any feedback!


Best, Sihua
On 05/24/2018 10:36,sihua zhou<[hidden email]> wrote:
Hi,
Thanks for your suggestions @Elias! I have a brief look at "Cuckoo Filter" and "Golumb Compressed Sequence", my first sensation is that maybe "Golumc Compressed Sequence" is not a good choose, because it seems to require non-constant lookup time, but Cuckoo Filter maybe a good choose, I should definitely have a deeper look at it.


Beside, to me, all of this filters seems to a "variant" of the bloom filter(which is the smallest unit to store data in the current desgin), the main challenge for introducing BF into flink is the data skewed(which is common phenomenon on production) problem, could you maybe also have a look at the solution that I posted on the google doc https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing for this problem, It would be nice if you could give us some advice on that.


Best, Sihua


On 05/24/2018 07:21,Elias Levy<[hidden email]> wrote:
I would suggest you consider an alternative data structures: a Cuckoo
Filter or a Golumb Compressed Sequence.

The GCS data structure was introduced in Cache-, Hash- and Space-Efficient
Bloom Filters
<http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf> by
F. Putze, P. Sanders, and J. Singler.  See section 4.



We should discuss which exact implementation of bloom filters are the best
fit.
@Fabian: There are also implementations of bloom filters that use counting
and therefore support
deletes, but obviously this comes at the cost of a potentially higher
space consumption.

Am 23.05.2018 um 11:29 schrieb Fabian Hueske <[hidden email]>:
IMO, such a feature would be very interesting. However, my concerns with
Bloom Filter
is that they are insert-only data structures, i.e., it is not possible to
remove keys once
they were added. This might render the filter useless over time.
In a different thread (see discussion in FLINK-8918 [1]), you mentioned
that the Bloom
Filters would be growing.
If we keep them in memory, how can we prevent them from exceeding memory
boundaries over
time?


Reply | Threaded
Open this post in threaded view
|

Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

sihua zhou
Hi,


no more feedbacks these days...I guess it's because you guys are too busy and since I didn't receive any negative feedbacks and there're already some positive feedbacks. So I want to implement this *Elastic Bloom Filter* based on the current design doc(because I have time to do it currently), even though I think the design can be improved definitely, but maybe we could discuss the improvement better base on the code, and I believe most of the code could be cherry picked for the "final implementation". Does anyone object this?


Best, Sihua




On 06/6/2018 22:02,sihua zhou<[hidden email]> wrote:
Hi,


Sorry, but pinging for more feedbacks on this proposal...
Even the negative feedbacks is highly appreciated!


Best, Sihua






On 05/30/2018 13:19,sihua zhou<[hidden email]> wrote:
Hi,


I did a survey of the variants of Bloom Filter and the Cuckoo filter these days. Finally, I found 3 of them maybe adaptable for our purpose.


1. standard bloom filter (which we have implemented base on this and used it on production with a good experience)
2. cuckoo filter, also a very good filter which is a space-efficient data structures and support fast query(even faster then BF, but the insert maybe a little slower than BF), addtional it support delete() operation.
3. count bloom filter, a variant of BF, it supports delete()operation, but need to cost 4-5x memory than the standard bloom filter(so, I'm not sure whether it's adaptable in practice).


Anyway, these filters are just the smallest storage unit in this "Elastic Bloom Filter", we can define a general interface, and provide different implementation of "storage unit"  base on different filter if we want. Maybe I should change the PROPOSAL name to the "Introduce Elastic Filter For Flink", the ideal of approach that I outlined in the doc is very similar to the paper "Optimization and Applications of Dynamic Bloom Filters(http://ijarcs.info/index.php/Ijarcs/article/viewFile/826/814)"(compare to the paper, the approach I outlined could have a better query performance and also support the RELAXED TTL), maybe it can help to understand the desgin doc. Looking forward any feedback!


Best, Sihua
On 05/24/2018 10:36,sihua zhou<[hidden email]> wrote:
Hi,
Thanks for your suggestions @Elias! I have a brief look at "Cuckoo Filter" and "Golumb Compressed Sequence", my first sensation is that maybe "Golumc Compressed Sequence" is not a good choose, because it seems to require non-constant lookup time, but Cuckoo Filter maybe a good choose, I should definitely have a deeper look at it.


Beside, to me, all of this filters seems to a "variant" of the bloom filter(which is the smallest unit to store data in the current desgin), the main challenge for introducing BF into flink is the data skewed(which is common phenomenon on production) problem, could you maybe also have a look at the solution that I posted on the google doc https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing for this problem, It would be nice if you could give us some advice on that.


Best, Sihua


On 05/24/2018 07:21,Elias Levy<[hidden email]> wrote:
I would suggest you consider an alternative data structures: a Cuckoo
Filter or a Golumb Compressed Sequence.

The GCS data structure was introduced in Cache-, Hash- and Space-Efficient
Bloom Filters
<http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf> by
F. Putze, P. Sanders, and J. Singler.  See section 4.



We should discuss which exact implementation of bloom filters are the best
fit.
@Fabian: There are also implementations of bloom filters that use counting
and therefore support
deletes, but obviously this comes at the cost of a potentially higher
space consumption.

Am 23.05.2018 um 11:29 schrieb Fabian Hueske <[hidden email]>:
IMO, such a feature would be very interesting. However, my concerns with
Bloom Filter
is that they are insert-only data structures, i.e., it is not possible to
remove keys once
they were added. This might render the filter useless over time.
In a different thread (see discussion in FLINK-8918 [1]), you mentioned
that the Bloom
Filters would be growing.
If we keep them in memory, how can we prevent them from exceeding memory
boundaries over
time?


Reply | Threaded
Open this post in threaded view
|

Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

Fabian Hueske
Hi Sihua,

Sorry for not replying earlier.
I have one question left. If I understood the design of the linked
Bloomfilter nodes right, users would need to configure a TTL to be able to
remove a node.
When nodes are removed, we would need to insert every key into the current
node which would not be required if we don't remove nodes, right?

From the small summary of approximated filters, cuckoo filters seem to be
most appropriate as they also support deletes.
Are you aware of any downsides compared to bloom filters (besides
potentially slower inserts)?

Best, Fabian



2018-06-12 9:29 GMT+02:00 sihua zhou <[hidden email]>:

> Hi,
>
> no more feedbacks these days...I guess it's because you guys are too busy
> and since I didn't receive any negative feedbacks and there're already some
> positive feedbacks. So I want to implement this *Elastic Bloom Filter*
> based on the current design doc(because I have time to do it currently),
> even though I think the design can be improved definitely, but maybe we
> could discuss the improvement better base on the code, and I believe most
> of the code could be cherry picked for the "final implementation". Does
> anyone object this?
>
> Best, Sihua
>
>
> On 06/6/2018 22:02,sihua zhou<[hidden email]> <[hidden email]>
> wrote:
>
> Hi,
>
>
> Sorry, but pinging for more feedbacks on this proposal...
> Even the negative feedbacks is highly appreciated!
>
>
> Best, Sihua
>
>
>
>
>
>
> On 05/30/2018 13:19,sihua zhou<[hidden email]> wrote:
> Hi,
>
>
> I did a survey of the variants of Bloom Filter and the Cuckoo filter these
> days. Finally, I found 3 of them maybe adaptable for our purpose.
>
>
> 1. standard bloom filter (which we have implemented base on this and used
> it on production with a good experience)
> 2. cuckoo filter, also a very good filter which is a space-efficient data
> structures and support fast query(even faster then BF, but the insert maybe
> a little slower than BF), addtional it support delete() operation.
> 3. count bloom filter, a variant of BF, it supports delete()operation, but
> need to cost 4-5x memory than the standard bloom filter(so, I'm not sure
> whether it's adaptable in practice).
>
>
> Anyway, these filters are just the smallest storage unit in this "Elastic
> Bloom Filter", we can define a general interface, and provide different
> implementation of "storage unit"  base on different filter if we want.
> Maybe I should change the PROPOSAL name to the "Introduce Elastic Filter
> For Flink", the ideal of approach that I outlined in the doc is very
> similar to the paper "Optimization and Applications of Dynamic Bloom
> Filters(http://ijarcs.info/index.php/Ijarcs/article/viewFile/826/814)"(compare
> to the paper, the approach I outlined could have a better query performance
> and also support the RELAXED TTL), maybe it can help to understand the
> desgin doc. Looking forward any feedback!
>
>
> Best, Sihua
> On 05/24/2018 10:36,sihua zhou<[hidden email]> wrote:
> Hi,
> Thanks for your suggestions @Elias! I have a brief look at "Cuckoo Filter"
> and "Golumb Compressed Sequence", my first sensation is that maybe "Golumc
> Compressed Sequence" is not a good choose, because it seems to require
> non-constant lookup time, but Cuckoo Filter maybe a good choose, I should
> definitely have a deeper look at it.
>
>
> Beside, to me, all of this filters seems to a "variant" of the bloom
> filter(which is the smallest unit to store data in the current desgin), the
> main challenge for introducing BF into flink is the data skewed(which is
> common phenomenon on production) problem, could you maybe also have a look
> at the solution that I posted on the google doc https://docs.google.com/
> document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing
> for this problem, It would be nice if you could give us some advice on that.
>
>
> Best, Sihua
>
>
> On 05/24/2018 07:21,Elias Levy<[hidden email]> wrote:
> I would suggest you consider an alternative data structures: a Cuckoo
> Filter or a Golumb Compressed Sequence.
>
> The GCS data structure was introduced in Cache-, Hash- and Space-Efficient
> Bloom Filters
> <http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf> by
> F. Putze, P. Sanders, and J. Singler.  See section 4.
>
>
>
> We should discuss which exact implementation of bloom filters are the best
> fit.
> @Fabian: There are also implementations of bloom filters that use counting
> and therefore support
> deletes, but obviously this comes at the cost of a potentially higher
> space consumption.
>
> Am 23.05.2018 um 11:29 schrieb Fabian Hueske <[hidden email]>:
> IMO, such a feature would be very interesting. However, my concerns with
> Bloom Filter
> is that they are insert-only data structures, i.e., it is not possible to
> remove keys once
> they were added. This might render the filter useless over time.
> In a different thread (see discussion in FLINK-8918 [1]), you mentioned
> that the Bloom
> Filters would be growing.
> If we keep them in memory, how can we prevent them from exceeding memory
> boundaries over
> time?
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

sihua zhou
Hi Fabian,


Thanks a lot for your reply, you are right that users would need to configure a TTL for the Elastic Filter to recycle the memory resource.


For every Linked BloomFilter Nodes, only the head node is writable, the other nodes are all full, they are only immutable(only readable, we implement the relaxed ttl based on this feature). Even though we don't  need to remove the node, we still always need to insert the data into the current node(the head node), because the node is allocated lazily(to handle data skew), each node's a can only store "a part" of the data, once the current node is full, we allocate a new head node.


Concerning to the cuckoo filters, I also think it seem to be most appropriate in theroy. But there are some reasons that I prefer to implement this based on BF as the first interation.


- I didn't find a open source lib that provide the a "stable" cuckoo filter, maybe we need to implement it ourself, it's not a trivial work.


- The most attraction that cuckoo filter provided is that it support deletion, but since the cuckoo filter is a dense data structure, we can't store the time stamp with the record in cuckoo filter, we may need to depend on the "extra thing"(e.g. timer) to use it's deletion, the performance overhead may not cheap.


- No matter it's cuckoo filter or bloom fiter, they both seems as the "smallest storage unit" in the "Elastic Filter", after we provide a implementation base on Bloom Filter, it easily to extend to cuckoo filter.


How about to provide the Elastic Filter based on BF as the first iteration and provide the version that based on cuckoo filter as a second iteration? What do you think?


Best, Sihua
On 06/12/2018 15:43,Fabian Hueske<[hidden email]> wrote:
Hi Sihua,


Sorry for not replying earlier.

I have one question left. If I understood the design of the linked Bloomfilter nodes right, users would need to configure a TTL to be able to remove a node.

When nodes are removed, we would need to insert every key into the current node which would not be required if we don't remove nodes, right?


From the small summary of approximated filters, cuckoo filters seem to be most appropriate as they also support deletes.
Are you aware of any downsides compared to bloom filters (besides potentially slower inserts)?


Best, Fabian







2018-06-12 9:29 GMT+02:00 sihua zhou <[hidden email]>:

Hi,


no more feedbacks these days...I guess it's because you guys are too busy and since I didn't receive any negative feedbacks and there're already some positive feedbacks. So I want to implement this *Elastic Bloom Filter* based on the current design doc(because I have time to do it currently), even though I think the design can be improved definitely, but maybe we could discuss the improvement better base on the code, and I believe most of the code could be cherry picked for the "final implementation". Does anyone object this?


Best, Sihua




On 06/6/2018 22:02,sihua zhou<[hidden email]> wrote:
Hi,


Sorry, but pinging for more feedbacks on this proposal...
Even the negative feedbacks is highly appreciated!


Best, Sihua






On 05/30/2018 13:19,sihua zhou<[hidden email]> wrote:
Hi,


I did a survey of the variants of Bloom Filter and the Cuckoo filter these days. Finally, I found 3 of them maybe adaptable for our purpose.


1. standard bloom filter (which we have implemented base on this and used it on production with a good experience)
2. cuckoo filter, also a very good filter which is a space-efficient data structures and support fast query(even faster then BF, but the insert maybe a little slower than BF), addtional it support delete() operation.
3. count bloom filter, a variant of BF, it supports delete()operation, but need to cost 4-5x memory than the standard bloom filter(so, I'm not sure whether it's adaptable in practice).


Anyway, these filters are just the smallest storage unit in this "Elastic Bloom Filter", we can define a general interface, and provide different implementation of "storage unit"  base on different filter if we want. Maybe I should change the PROPOSAL name to the "Introduce Elastic Filter For Flink", the ideal of approach that I outlined in the doc is very similar to the paper "Optimization and Applications of Dynamic Bloom Filters(http://ijarcs.info/index.php/Ijarcs/article/viewFile/826/814)"(compare to the paper, the approach I outlined could have a better query performance and also support the RELAXED TTL), maybe it can help to understand the desgin doc. Looking forward any feedback!


Best, Sihua
On 05/24/2018 10:36,sihua zhou<[hidden email]> wrote:
Hi,
Thanks for your suggestions @Elias! I have a brief look at "Cuckoo Filter" and "Golumb Compressed Sequence", my first sensation is that maybe "Golumc Compressed Sequence" is not a good choose, because it seems to require non-constant lookup time, but Cuckoo Filter maybe a good choose, I should definitely have a deeper look at it.


Beside, to me, all of this filters seems to a "variant" of the bloom filter(which is the smallest unit to store data in the current desgin), the main challenge for introducing BF into flink is the data skewed(which is common phenomenon on production) problem, could you maybe also have a look at the solution that I posted on the google doc https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing for this problem, It would be nice if you could give us some advice on that.


Best, Sihua


On 05/24/2018 07:21,Elias Levy<[hidden email]> wrote:
I would suggest you consider an alternative data structures: a Cuckoo
Filter or a Golumb Compressed Sequence.

The GCS data structure was introduced in Cache-, Hash- and Space-Efficient
Bloom Filters
<http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf> by
F. Putze, P. Sanders, and J. Singler.  See section 4.



We should discuss which exact implementation of bloom filters are the best
fit.
@Fabian: There are also implementations of bloom filters that use counting
and therefore support
deletes, but obviously this comes at the cost of a potentially higher
space consumption.

Am 23.05.2018 um 11:29 schrieb Fabian Hueske <[hidden email]>:
IMO, such a feature would be very interesting. However, my concerns with
Bloom Filter
is that they are insert-only data structures, i.e., it is not possible to
remove keys once
they were added. This might render the filter useless over time.
In a different thread (see discussion in FLINK-8918 [1]), you mentioned
that the Bloom
Filters would be growing.
If we keep them in memory, how can we prevent them from exceeding memory
boundaries over
time?




Reply | Threaded
Open this post in threaded view
|

Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

sihua zhou
Hi,


Maybe I would like to add more information concerning to the Linked Filter Nodes on each key group. The reason that we need to maintance a Linked Filter Nodes is that we need to handle data skew, data skew is also the most challenging problem that we need to overcome. Because we don't know how many records will fall into each key group, so we couldn't allocate a Final Filter Node at the begin time, so we need to allocate the Filter Node lazily, each time we only allocate a Small Filter Node
for the incoming records, once it filled we freeze it and allocate a new node for the future incoming records, so we get a Linked Filter Node on each key group and only the head Node is writable, the rest are immutable.


Best, Sihua
On 06/12/2018 16:22,sihua zhou<[hidden email]> wrote:
Hi Fabian,


Thanks a lot for your reply, you are right that users would need to configure a TTL for the Elastic Filter to recycle the memory resource.


For every Linked BloomFilter Nodes, only the head node is writable, the other nodes are all full, they are only immutable(only readable, we implement the relaxed ttl based on this feature). Even though we don't  need to remove the node, we still always need to insert the data into the current node(the head node), because the node is allocated lazily(to handle data skew), each node's a can only store "a part" of the data, once the current node is full, we allocate a new head node.


Concerning to the cuckoo filters, I also think it seem to be most appropriate in theroy. But there are some reasons that I prefer to implement this based on BF as the first interation.


- I didn't find a open source lib that provide the a "stable" cuckoo filter, maybe we need to implement it ourself, it's not a trivial work.


- The most attraction that cuckoo filter provided is that it support deletion, but since the cuckoo filter is a dense data structure, we can't store the time stamp with the record in cuckoo filter, we may need to depend on the "extra thing"(e.g. timer) to use it's deletion, the performance overhead may not cheap.


- No matter it's cuckoo filter or bloom fiter, they both seems as the "smallest storage unit" in the "Elastic Filter", after we provide a implementation base on Bloom Filter, it easily to extend to cuckoo filter.


How about to provide the Elastic Filter based on BF as the first iteration and provide the version that based on cuckoo filter as a second iteration? What do you think?


Best, Sihua
On 06/12/2018 15:43,Fabian Hueske<[hidden email]> wrote:
Hi Sihua,


Sorry for not replying earlier.

I have one question left. If I understood the design of the linked Bloomfilter nodes right, users would need to configure a TTL to be able to remove a node.

When nodes are removed, we would need to insert every key into the current node which would not be required if we don't remove nodes, right?


From the small summary of approximated filters, cuckoo filters seem to be most appropriate as they also support deletes.
Are you aware of any downsides compared to bloom filters (besides potentially slower inserts)?


Best, Fabian







2018-06-12 9:29 GMT+02:00 sihua zhou <[hidden email]>:

Hi,


no more feedbacks these days...I guess it's because you guys are too busy and since I didn't receive any negative feedbacks and there're already some positive feedbacks. So I want to implement this *Elastic Bloom Filter* based on the current design doc(because I have time to do it currently), even though I think the design can be improved definitely, but maybe we could discuss the improvement better base on the code, and I believe most of the code could be cherry picked for the "final implementation". Does anyone object this?


Best, Sihua




On 06/6/2018 22:02,sihua zhou<[hidden email]> wrote:
Hi,


Sorry, but pinging for more feedbacks on this proposal...
Even the negative feedbacks is highly appreciated!


Best, Sihua






On 05/30/2018 13:19,sihua zhou<[hidden email]> wrote:
Hi,


I did a survey of the variants of Bloom Filter and the Cuckoo filter these days. Finally, I found 3 of them maybe adaptable for our purpose.


1. standard bloom filter (which we have implemented base on this and used it on production with a good experience)
2. cuckoo filter, also a very good filter which is a space-efficient data structures and support fast query(even faster then BF, but the insert maybe a little slower than BF), addtional it support delete() operation.
3. count bloom filter, a variant of BF, it supports delete()operation, but need to cost 4-5x memory than the standard bloom filter(so, I'm not sure whether it's adaptable in practice).


Anyway, these filters are just the smallest storage unit in this "Elastic Bloom Filter", we can define a general interface, and provide different implementation of "storage unit"  base on different filter if we want. Maybe I should change the PROPOSAL name to the "Introduce Elastic Filter For Flink", the ideal of approach that I outlined in the doc is very similar to the paper "Optimization and Applications of Dynamic Bloom Filters(http://ijarcs.info/index.php/Ijarcs/article/viewFile/826/814)"(compare to the paper, the approach I outlined could have a better query performance and also support the RELAXED TTL), maybe it can help to understand the desgin doc. Looking forward any feedback!


Best, Sihua
On 05/24/2018 10:36,sihua zhou<[hidden email]> wrote:
Hi,
Thanks for your suggestions @Elias! I have a brief look at "Cuckoo Filter" and "Golumb Compressed Sequence", my first sensation is that maybe "Golumc Compressed Sequence" is not a good choose, because it seems to require non-constant lookup time, but Cuckoo Filter maybe a good choose, I should definitely have a deeper look at it.


Beside, to me, all of this filters seems to a "variant" of the bloom filter(which is the smallest unit to store data in the current desgin), the main challenge for introducing BF into flink is the data skewed(which is common phenomenon on production) problem, could you maybe also have a look at the solution that I posted on the google doc https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing for this problem, It would be nice if you could give us some advice on that.


Best, Sihua


On 05/24/2018 07:21,Elias Levy<[hidden email]> wrote:
I would suggest you consider an alternative data structures: a Cuckoo
Filter or a Golumb Compressed Sequence.

The GCS data structure was introduced in Cache-, Hash- and Space-Efficient
Bloom Filters
<http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf> by
F. Putze, P. Sanders, and J. Singler.  See section 4.



We should discuss which exact implementation of bloom filters are the best
fit.
@Fabian: There are also implementations of bloom filters that use counting
and therefore support
deletes, but obviously this comes at the cost of a potentially higher
space consumption.

Am 23.05.2018 um 11:29 schrieb Fabian Hueske <[hidden email]>:
IMO, such a feature would be very interesting. However, my concerns with
Bloom Filter
is that they are insert-only data structures, i.e., it is not possible to
remove keys once
they were added. This might render the filter useless over time.
In a different thread (see discussion in FLINK-8918 [1]), you mentioned
that the Bloom
Filters would be growing.
If we keep them in memory, how can we prevent them from exceeding memory
boundaries over
time?




Reply | Threaded
Open this post in threaded view
|

Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

Stephan Ewen
Hi Sihua!

Sorry for joining this discussion late.

I can see the benefit of such a feature and also see the technical merit.
It is a nice piece of work and a good proposal.

I am wondering if there is a way to add such a technique as a "library
operator", or whether it needs a deep integration into the runtime and the
state backends.

The state backends have currently a few efforts going on, like state TTL,
making timers part of the state backends, asychronous timer snapshots,
scalable timers in RocksDB, avoiding small file fragmentation (and too much
read/write amplification) for RocksDB incremental snapshots, faster state
recovery efforts, etc.
This is a lot, and all these features are on the list for quite a while,
with various users pushing for them.

If we can add such BloomFilters as an independent operator, quasi like a
library or utility, then this is much easier to integrate, because it needs
no coordination with the other State Backend work. It is also easier to
review and merge, because it would be a new independent feature and not
immediately affect all existing state functionality. If this interacts
deeply with existing state backends, it touches some of the most critical
and most active parts of the system, which needs a lot of time from the
core developers of these parts, making it harder and take much longer.

What do you think about looking at whether the elastic bloom filters be
added like a library operator?

Best,
Stephan


On Tue, Jun 12, 2018 at 4:35 PM, sihua zhou <[hidden email]> wrote:

> Hi,
>
>
> Maybe I would like to add more information concerning to the Linked Filter
> Nodes on each key group. The reason that we need to maintance a Linked
> Filter Nodes is that we need to handle data skew, data skew is also the
> most challenging problem that we need to overcome. Because we don't know
> how many records will fall into each key group, so we couldn't allocate a
> Final Filter Node at the begin time, so we need to allocate the Filter Node
> lazily, each time we only allocate a Small Filter Node
> for the incoming records, once it filled we freeze it and allocate a new
> node for the future incoming records, so we get a Linked Filter Node on
> each key group and only the head Node is writable, the rest are immutable.
>
>
> Best, Sihua
> On 06/12/2018 16:22,sihua zhou<[hidden email]> wrote:
> Hi Fabian,
>
>
> Thanks a lot for your reply, you are right that users would need to
> configure a TTL for the Elastic Filter to recycle the memory resource.
>
>
> For every Linked BloomFilter Nodes, only the head node is writable, the
> other nodes are all full, they are only immutable(only readable, we
> implement the relaxed ttl based on this feature). Even though we don't
> need to remove the node, we still always need to insert the data into the
> current node(the head node), because the node is allocated lazily(to handle
> data skew), each node's a can only store "a part" of the data, once the
> current node is full, we allocate a new head node.
>
>
> Concerning to the cuckoo filters, I also think it seem to be most
> appropriate in theroy. But there are some reasons that I prefer to
> implement this based on BF as the first interation.
>
>
> - I didn't find a open source lib that provide the a "stable" cuckoo
> filter, maybe we need to implement it ourself, it's not a trivial work.
>
>
> - The most attraction that cuckoo filter provided is that it support
> deletion, but since the cuckoo filter is a dense data structure, we can't
> store the time stamp with the record in cuckoo filter, we may need to
> depend on the "extra thing"(e.g. timer) to use it's deletion, the
> performance overhead may not cheap.
>
>
> - No matter it's cuckoo filter or bloom fiter, they both seems as the
> "smallest storage unit" in the "Elastic Filter", after we provide a
> implementation base on Bloom Filter, it easily to extend to cuckoo filter.
>
>
> How about to provide the Elastic Filter based on BF as the first iteration
> and provide the version that based on cuckoo filter as a second iteration?
> What do you think?
>
>
> Best, Sihua
> On 06/12/2018 15:43,Fabian Hueske<[hidden email]> wrote:
> Hi Sihua,
>
>
> Sorry for not replying earlier.
>
> I have one question left. If I understood the design of the linked
> Bloomfilter nodes right, users would need to configure a TTL to be able to
> remove a node.
>
> When nodes are removed, we would need to insert every key into the current
> node which would not be required if we don't remove nodes, right?
>
>
> From the small summary of approximated filters, cuckoo filters seem to be
> most appropriate as they also support deletes.
> Are you aware of any downsides compared to bloom filters (besides
> potentially slower inserts)?
>
>
> Best, Fabian
>
>
>
>
>
>
>
> 2018-06-12 9:29 GMT+02:00 sihua zhou <[hidden email]>:
>
> Hi,
>
>
> no more feedbacks these days...I guess it's because you guys are too busy
> and since I didn't receive any negative feedbacks and there're already some
> positive feedbacks. So I want to implement this *Elastic Bloom Filter*
> based on the current design doc(because I have time to do it currently),
> even though I think the design can be improved definitely, but maybe we
> could discuss the improvement better base on the code, and I believe most
> of the code could be cherry picked for the "final implementation". Does
> anyone object this?
>
>
> Best, Sihua
>
>
>
>
> On 06/6/2018 22:02,sihua zhou<[hidden email]> wrote:
> Hi,
>
>
> Sorry, but pinging for more feedbacks on this proposal...
> Even the negative feedbacks is highly appreciated!
>
>
> Best, Sihua
>
>
>
>
>
>
> On 05/30/2018 13:19,sihua zhou<[hidden email]> wrote:
> Hi,
>
>
> I did a survey of the variants of Bloom Filter and the Cuckoo filter these
> days. Finally, I found 3 of them maybe adaptable for our purpose.
>
>
> 1. standard bloom filter (which we have implemented base on this and used
> it on production with a good experience)
> 2. cuckoo filter, also a very good filter which is a space-efficient data
> structures and support fast query(even faster then BF, but the insert maybe
> a little slower than BF), addtional it support delete() operation.
> 3. count bloom filter, a variant of BF, it supports delete()operation, but
> need to cost 4-5x memory than the standard bloom filter(so, I'm not sure
> whether it's adaptable in practice).
>
>
> Anyway, these filters are just the smallest storage unit in this "Elastic
> Bloom Filter", we can define a general interface, and provide different
> implementation of "storage unit"  base on different filter if we want.
> Maybe I should change the PROPOSAL name to the "Introduce Elastic Filter
> For Flink", the ideal of approach that I outlined in the doc is very
> similar to the paper "Optimization and Applications of Dynamic Bloom
> Filters(http://ijarcs.info/index.php/Ijarcs/article/viewFile/826/814)"(compare
> to the paper, the approach I outlined could have a better query performance
> and also support the RELAXED TTL), maybe it can help to understand the
> desgin doc. Looking forward any feedback!
>
>
> Best, Sihua
> On 05/24/2018 10:36,sihua zhou<[hidden email]> wrote:
> Hi,
> Thanks for your suggestions @Elias! I have a brief look at "Cuckoo Filter"
> and "Golumb Compressed Sequence", my first sensation is that maybe "Golumc
> Compressed Sequence" is not a good choose, because it seems to require
> non-constant lookup time, but Cuckoo Filter maybe a good choose, I should
> definitely have a deeper look at it.
>
>
> Beside, to me, all of this filters seems to a "variant" of the bloom
> filter(which is the smallest unit to store data in the current desgin), the
> main challenge for introducing BF into flink is the data skewed(which is
> common phenomenon on production) problem, could you maybe also have a look
> at the solution that I posted on the google doc https://docs.google.com/
> document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing
> for this problem, It would be nice if you could give us some advice on that.
>
>
> Best, Sihua
>
>
> On 05/24/2018 07:21,Elias Levy<[hidden email]> wrote:
> I would suggest you consider an alternative data structures: a Cuckoo
> Filter or a Golumb Compressed Sequence.
>
> The GCS data structure was introduced in Cache-, Hash- and Space-Efficient
> Bloom Filters
> <http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf> by
> F. Putze, P. Sanders, and J. Singler.  See section 4.
>
>
>
> We should discuss which exact implementation of bloom filters are the best
> fit.
> @Fabian: There are also implementations of bloom filters that use counting
> and therefore support
> deletes, but obviously this comes at the cost of a potentially higher
> space consumption.
>
> Am 23.05.2018 um 11:29 schrieb Fabian Hueske <[hidden email]>:
> IMO, such a feature would be very interesting. However, my concerns with
> Bloom Filter
> is that they are insert-only data structures, i.e., it is not possible to
> remove keys once
> they were added. This might render the filter useless over time.
> In a different thread (see discussion in FLINK-8918 [1]), you mentioned
> that the Bloom
> Filters would be growing.
> If we keep them in memory, how can we prevent them from exceeding memory
> boundaries over
> time?
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

sihua zhou


Hi Stephan,


Thank you very much for the reply and very happy for that!


I'm not sure whether I understood your idea correctly. Does it means 1) we should add a new operator with the feature of Elastic Bloom Filter? or 2) we could support it as the current (version <= 1.5) InternalTimerServer as an independent managed keyed state?


- If that means to introduce a new operator for supporting EBF, I think I not sure whether it's easy to integerate the some of the current features of SQL with EBF since their were built on other operaters. E.g. the "stream join" and the "distinct".


- If that means to maintain the EBF as the current TimeServer(which's not integrated into state backend), this is the implementation which we are using on production currently.


Please let me know if I totally misunderstood...


Best, Sihua
On 07/5/2018 04:34,Stephan Ewen<[hidden email]> wrote:
Hi Sihua!


Sorry for joining this discussion late.


I can see the benefit of such a feature and also see the technical merit. It is a nice piece of work and a good proposal.


I am wondering if there is a way to add such a technique as a "library operator", or whether it needs a deep integration into the runtime and the state backends.


The state backends have currently a few efforts going on, like state TTL, making timers part of the state backends, asychronous timer snapshots, scalable timers in RocksDB, avoiding small file fragmentation (and too much read/write amplification) for RocksDB incremental snapshots, faster state recovery efforts, etc.
This is a lot, and all these features are on the list for quite a while, with various users pushing for them.


If we can add such BloomFilters as an independent operator, quasi like a library or utility, then this is much easier to integrate, because it needs no coordination with the other State Backend work. It is also easier to review and merge, because it would be a new independent feature and not immediately affect all existing state functionality. If this interacts deeply with existing state backends, it touches some of the most critical and most active parts of the system, which needs a lot of time from the core developers of these parts, making it harder and take much longer.


What do you think about looking at whether the elastic bloom filters be added like a library operator?


Best,
Stephan




On Tue, Jun 12, 2018 at 4:35 PM, sihua zhou <[hidden email]> wrote:
Hi,


Maybe I would like to add more information concerning to the Linked Filter Nodes on each key group. The reason that we need to maintance a Linked Filter Nodes is that we need to handle data skew, data skew is also the most challenging problem that we need to overcome. Because we don't know how many records will fall into each key group, so we couldn't allocate a Final Filter Node at the begin time, so we need to allocate the Filter Node lazily, each time we only allocate a Small Filter Node
for the incoming records, once it filled we freeze it and allocate a new node for the future incoming records, so we get a Linked Filter Node on each key group and only the head Node is writable, the rest are immutable.


Best, Sihua

On 06/12/2018 16:22,sihua zhou<[hidden email]> wrote:
Hi Fabian,


Thanks a lot for your reply, you are right that users would need to configure a TTL for the Elastic Filter to recycle the memory resource.


For every Linked BloomFilter Nodes, only the head node is writable, the other nodes are all full, they are only immutable(only readable, we implement the relaxed ttl based on this feature). Even though we don't  need to remove the node, we still always need to insert the data into the current node(the head node), because the node is allocated lazily(to handle data skew), each node's a can only store "a part" of the data, once the current node is full, we allocate a new head node.


Concerning to the cuckoo filters, I also think it seem to be most appropriate in theroy. But there are some reasons that I prefer to implement this based on BF as the first interation.


- I didn't find a open source lib that provide the a "stable" cuckoo filter, maybe we need to implement it ourself, it's not a trivial work.


- The most attraction that cuckoo filter provided is that it support deletion, but since the cuckoo filter is a dense data structure, we can't store the time stamp with the record in cuckoo filter, we may need to depend on the "extra thing"(e.g. timer) to use it's deletion, the performance overhead may not cheap.


- No matter it's cuckoo filter or bloom fiter, they both seems as the "smallest storage unit" in the "Elastic Filter", after we provide a implementation base on Bloom Filter, it easily to extend to cuckoo filter.


How about to provide the Elastic Filter based on BF as the first iteration and provide the version that based on cuckoo filter as a second iteration? What do you think?


Best, Sihua
On 06/12/2018 15:43,Fabian Hueske<[hidden email]> wrote:
Hi Sihua,


Sorry for not replying earlier.

I have one question left. If I understood the design of the linked Bloomfilter nodes right, users would need to configure a TTL to be able to remove a node.

When nodes are removed, we would need to insert every key into the current node which would not be required if we don't remove nodes, right?


From the small summary of approximated filters, cuckoo filters seem to be most appropriate as they also support deletes.
Are you aware of any downsides compared to bloom filters (besides potentially slower inserts)?


Best, Fabian







2018-06-12 9:29 GMT+02:00 sihua zhou <[hidden email]>:

Hi,


no more feedbacks these days...I guess it's because you guys are too busy and since I didn't receive any negative feedbacks and there're already some positive feedbacks. So I want to implement this *Elastic Bloom Filter* based on the current design doc(because I have time to do it currently), even though I think the design can be improved definitely, but maybe we could discuss the improvement better base on the code, and I believe most of the code could be cherry picked for the "final implementation". Does anyone object this?


Best, Sihua




On 06/6/2018 22:02,sihua zhou<[hidden email]> wrote:
Hi,


Sorry, but pinging for more feedbacks on this proposal...
Even the negative feedbacks is highly appreciated!


Best, Sihua






On 05/30/2018 13:19,sihua zhou<[hidden email]> wrote:
Hi,


I did a survey of the variants of Bloom Filter and the Cuckoo filter these days. Finally, I found 3 of them maybe adaptable for our purpose.


1. standard bloom filter (which we have implemented base on this and used it on production with a good experience)
2. cuckoo filter, also a very good filter which is a space-efficient data structures and support fast query(even faster then BF, but the insert maybe a little slower than BF), addtional it support delete() operation.
3. count bloom filter, a variant of BF, it supports delete()operation, but need to cost 4-5x memory than the standard bloom filter(so, I'm not sure whether it's adaptable in practice).


Anyway, these filters are just the smallest storage unit in this "Elastic Bloom Filter", we can define a general interface, and provide different implementation of "storage unit"  base on different filter if we want. Maybe I should change the PROPOSAL name to the "Introduce Elastic Filter For Flink", the ideal of approach that I outlined in the doc is very similar to the paper "Optimization and Applications of Dynamic Bloom Filters(http://ijarcs.info/index.php/Ijarcs/article/viewFile/826/814)"(compare to the paper, the approach I outlined could have a better query performance and also support the RELAXED TTL), maybe it can help to understand the desgin doc. Looking forward any feedback!


Best, Sihua
On 05/24/2018 10:36,sihua zhou<[hidden email]> wrote:
Hi,
Thanks for your suggestions @Elias! I have a brief look at "Cuckoo Filter" and "Golumb Compressed Sequence", my first sensation is that maybe "Golumc Compressed Sequence" is not a good choose, because it seems to require non-constant lookup time, but Cuckoo Filter maybe a good choose, I should definitely have a deeper look at it.


Beside, to me, all of this filters seems to a "variant" of the bloom filter(which is the smallest unit to store data in the current desgin), the main challenge for introducing BF into flink is the data skewed(which is common phenomenon on production) problem, could you maybe also have a look at the solution that I posted on the google doc https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing for this problem, It would be nice if you could give us some advice on that.


Best, Sihua


On 05/24/2018 07:21,Elias Levy<[hidden email]> wrote:
I would suggest you consider an alternative data structures: a Cuckoo
Filter or a Golumb Compressed Sequence.

The GCS data structure was introduced in Cache-, Hash- and Space-Efficient
Bloom Filters
<http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf> by
F. Putze, P. Sanders, and J. Singler.  See section 4.



We should discuss which exact implementation of bloom filters are the best
fit.
@Fabian: There are also implementations of bloom filters that use counting
and therefore support
deletes, but obviously this comes at the cost of a potentially higher
space consumption.

Am 23.05.2018 um 11:29 schrieb Fabian Hueske <[hidden email]>:
IMO, such a feature would be very interesting. However, my concerns with
Bloom Filter
is that they are insert-only data structures, i.e., it is not possible to
remove keys once
they were added. This might render the filter useless over time.
In a different thread (see discussion in FLINK-8918 [1]), you mentioned
that the Bloom
Filters would be growing.
If we keep them in memory, how can we prevent them from exceeding memory
boundaries over
time?