[Discuss] (FLINK-8297) A solution for FLINK-8297 Timebased RocksDBListState

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

[Discuss] (FLINK-8297) A solution for FLINK-8297 Timebased RocksDBListState

faxianzhao
This post was updated on .
Refer from PR#5185, I think we can use Timebased RocksDBListState to resolve it.
Timebased RocksDBListState store list entries dispersed in rocksdb like RocksDBMapState.
Key pair:
For the timebased flink inner class like StreamRecord(enable event/ingestion time), the rocksdb key is
#KeyGroup#Key#Namespace#StreamRecord.getTimestamp(). Otherwise, the key is current procssing time.
Value pair:
The rocksdb value is the entries which have the same timestamp(event/ingestion/processing time), like the original RocksDBListState.

The ListState.get() implement like org.apache.flink.contrib.streaming.state.RocksDBMapState#iterator.
Generally, it won't load all entries one time.

The rocksdb store structure.
-----------Key-------------------           --------------------Value---------
#KeyGroup#Key#Namespace            #KeyGroup#Key#Namespace#ts3 (max lexicographically key)
#KeyGroup#Key#Namespace#ts1     value1,value2,value7
#KeyGroup#Key#Namespace#ts2     value4,value6
#KeyGroup#Key#Namespace#ts3     value3,value5


Advantage:
1. Due to the rocksdb store key with lexicographically order, so the entries is monotonous by time. It's friendly to event time records processing.
2. We can store the max timestamp key in the rocksdb default key(#KeyGroup#Key#Namespace), then we can reverse iterate the stored list.
3. For the CountEvictor and TimeEvictor, we can stop the iteration early instead of read all of them into memory.
4. This ListState is monotonous by time, we can provide some more methods for event time records processing.
5. I think it resolve the ttl issue naturally.

Disadvantage:
1. It will add 8 bytes cost to store extended timestamp in key part, and I'm not good at rocksdb, I don't know the performance affect.
2. For the event time StreamRecord, it will reorder the entries by event time. This behavior is not align with other ListState implement.
3. For other records, the key is useless useless overhead.
4. If all of the entries have the same timestamp, the store structure is almost same as the original RocksDBListState.
5. We can't easily implement remove, size method for ListState yet.

Implement:
We can abstract a new class which is the parent of Time based
RocksDBListState and RocksDBMapState, but we should modify
InternalLargeListState.
I draft some code for this in PR#7675
Reply | Threaded
Open this post in threaded view
|

Re: [Discuss][FLINK-8297]A solution for FLINK-8297 Timebased RocksDBListState

Andrey Zagrebin-3
Hi Faxian,

Thanks for thinking on this new approach. Here are my thoughts:

- In case of event time, although, this approach changes semantics of
original list state, it could be a good fit for certain use cases. The main
advantage is that it is deterministic in event time. The list should end up
always in the same order.

- In case of processing time, the time skew might be a problem. If task
executor's clock jumps back for some reason or it fails and another TE with
shifted clock takes over, this can potentially reorder list elements. If we
rather think about the list state as a bag, reordering might be ok but
there is also a risk that different elements might end up having the same
processing time and rewrite each other.

- In general, exploding a storage size is a trade-off to achieve more
scalability for list state and should be ok if we do not degrade existing
approach.

Let's see other opinions.

Best,
Andrey

On Fri, Apr 12, 2019 at 10:34 AM Faxian Zhao <[hidden email]> wrote:

> Refer from PR#5185, I think we can use Timebased RocksDBListState to
> resolve it.
> Timebased RocksDBListState store list entries dispersed in rocksdb like
> RocksDBMapState.
> Key pair:
> For the timebased flink inner class like StreamRecord(enable
> event/ingestion time), the rocksdb key is
> #KeyGroup#Key#Namespace#StreamRecord.getTimestamp().
> Otherwise, the key is current procssing time.
> Value pair:
> The rocksdb value is the entries which have the same
> timestamp(event/ingestion/processing time), like the original
> RocksDBListState.
>
> The ListState.get() implement like
> org.apache.flink.contrib.streaming.state.RocksDBMapState#iterator.
> Generally, it won't load all entries one time.
>
> The rocksdb store structure.
> -----------Key------------------- --------------------Value---------
> #KeyGroup#Key#Namespace #KeyGroup#Key#Namespace#ts3 (max lexicographically
> key)
> #KeyGroup#Key#Namespace#ts1value1,value2,value7
> #KeyGroup#Key#Namespace#ts2value4,value6
> #KeyGroup#Key#Namespace#ts3value3,value5
>
>
> Advantage:
> 1. Due to the rocksdb store key with lexicographically order, so the
> entries is monotonous by time. It's friendly to event time records
> processing.
> 2. We can store the max timestamp key in the rocksdb default
> key(#KeyGroup#Key#Namespace), then we can reverse iterate the stored list.
> 3. For the CountEvictor and TimeEvictor, we can stop the iteration early
> instead of read all of them into memory.
> 4. This ListState is monotonous by time, we can provide some more methods
> for event time records processing.
> 5. I think it resolve the ttl issue naturally.
>
> Disadvantage:
> 1. It will add 8 bytes cost to store extended timestamp in key part, and
> I'm not good at rocksdb, I don't know the performance affect.
> 2. For the event time StreamRecord, it will reorder the entries by event
> time. This behavior is not align with other ListState implement.
> 3. For other records, the key is useless useless overhead.
> 4. If all of the entries have the same timestamp, the store structure is
> almost same as the original RocksDBListState.
> 5. We can't easily implement remove, size method for ListState yet.
>
> Implement:
> We can abstract a new class which is the parent of Time based
> RocksDBListState and RocksDBMapState, but we should modify
> InternalLargeListState.
> I draft some code for this in PR#7675
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discuss][FLINK-8297]A solution for FLINK-8297 Timebased RocksDBListState

Yun Tang
Hi Faxian

We also try to fix the single large list state problem in RocksDB and had a private solution by adding atomic increased number in the RocksDB's key bytes. We would keep the number in the checkpoint so that the order would not be broken after restoring from checkpoint.

I think FLINK-8297 mainly focus on resolving large list storage in RocksDB, and timestamp is just one solution. Actually I did not get your point why we should use record's timestamp.

After we implement the elemnt-wise rocksDB list state in our environment, we found this behaves much worse than original list state as expected and not recommend users to directly use this feature if they're not sure the list state really large.

Best
Yun Tang

________________________________
From: Andrey Zagrebin <[hidden email]>
Sent: Monday, April 15, 2019 17:09
To: dev
Subject: Re: [Discuss][FLINK-8297]A solution for FLINK-8297 Timebased RocksDBListState

Hi Faxian,

Thanks for thinking on this new approach. Here are my thoughts:

- In case of event time, although, this approach changes semantics of
original list state, it could be a good fit for certain use cases. The main
advantage is that it is deterministic in event time. The list should end up
always in the same order.

- In case of processing time, the time skew might be a problem. If task
executor's clock jumps back for some reason or it fails and another TE with
shifted clock takes over, this can potentially reorder list elements. If we
rather think about the list state as a bag, reordering might be ok but
there is also a risk that different elements might end up having the same
processing time and rewrite each other.

- In general, exploding a storage size is a trade-off to achieve more
scalability for list state and should be ok if we do not degrade existing
approach.

Let's see other opinions.

Best,
Andrey

On Fri, Apr 12, 2019 at 10:34 AM Faxian Zhao <[hidden email]> wrote:

> Refer from PR#5185, I think we can use Timebased RocksDBListState to
> resolve it.
> Timebased RocksDBListState store list entries dispersed in rocksdb like
> RocksDBMapState.
> Key pair:
> For the timebased flink inner class like StreamRecord(enable
> event/ingestion time), the rocksdb key is
> #KeyGroup#Key#Namespace#StreamRecord.getTimestamp().
> Otherwise, the key is current procssing time.
> Value pair:
> The rocksdb value is the entries which have the same
> timestamp(event/ingestion/processing time), like the original
> RocksDBListState.
>
> The ListState.get() implement like
> org.apache.flink.contrib.streaming.state.RocksDBMapState#iterator.
> Generally, it won't load all entries one time.
>
> The rocksdb store structure.
> -----------Key------------------- --------------------Value---------
> #KeyGroup#Key#Namespace #KeyGroup#Key#Namespace#ts3 (max lexicographically
> key)
> #KeyGroup#Key#Namespace#ts1value1,value2,value7
> #KeyGroup#Key#Namespace#ts2value4,value6
> #KeyGroup#Key#Namespace#ts3value3,value5
>
>
> Advantage:
> 1. Due to the rocksdb store key with lexicographically order, so the
> entries is monotonous by time. It's friendly to event time records
> processing.
> 2. We can store the max timestamp key in the rocksdb default
> key(#KeyGroup#Key#Namespace), then we can reverse iterate the stored list.
> 3. For the CountEvictor and TimeEvictor, we can stop the iteration early
> instead of read all of them into memory.
> 4. This ListState is monotonous by time, we can provide some more methods
> for event time records processing.
> 5. I think it resolve the ttl issue naturally.
>
> Disadvantage:
> 1. It will add 8 bytes cost to store extended timestamp in key part, and
> I'm not good at rocksdb, I don't know the performance affect.
> 2. For the event time StreamRecord, it will reorder the entries by event
> time. This behavior is not align with other ListState implement.
> 3. For other records, the key is useless useless overhead.
> 4. If all of the entries have the same timestamp, the store structure is
> almost same as the original RocksDBListState.
> 5. We can't easily implement remove, size method for ListState yet.
>
> Implement:
> We can abstract a new class which is the parent of Time based
> RocksDBListState and RocksDBMapState, but we should modify
> InternalLargeListState.
> I draft some code for this in PR#7675
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discuss][FLINK-8297]A solution for FLINK-8297 Timebased RocksDBListState

faxianzhao
In reply to this post by Andrey Zagrebin-3
Hi Andrey,
I think the mailing archive reformat my mail and confuse you.
If the elements have the same processing time, the behavior of the them will
same as the original RocksDBListState. So, it will involve the OOM issue. I
think we can add an inner time shift to resolve it (only put limit count
elements for the same key), in another way we can use hash function to
dispersed the key, but it will reorder the elements.



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: [Discuss][FLINK-8297]A solution for FLINK-8297 Timebased RocksDBListState

faxianzhao
In reply to this post by Yun Tang
Hi Yun
I think whether atomic increased number or timestamp, the key point is
disperse the elements in the different keys.
My point is how to design a useful key.
For the atomic increased number, it will array the elements one by one but I
think the key is useless. Because the max key is not the elements count,
when we implement the remove method.
Currently, for the CountEvictor, TimeEvictor and TTL scenario, we should
iteration all of the elements to find what we want. But if we use timestamp
key, we could terminal the iteration early to save performance or start from
the available timestamp to iteration the rest elements.




--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: [Discuss][FLINK-8297]A solution for FLINK-8297 Timebased RocksDBListState

Andrey Zagrebin-3
Hi Faxian,

True, we can resolve timestamp conflicts putting values into the same row,
good point.
Then re-ordering in case of internal clock jump changes behaviour comparing
with the list state we have now.
In this case, it can be similar to dispersing elements by hash and we can
call it a bag, not list.

Best,
Andrey

On Tue, Apr 16, 2019 at 5:29 AM faxianzhao <[hidden email]> wrote:

> Hi Yun
> I think whether atomic increased number or timestamp, the key point is
> disperse the elements in the different keys.
> My point is how to design a useful key.
> For the atomic increased number, it will array the elements one by one but
> I
> think the key is useless. Because the max key is not the elements count,
> when we implement the remove method.
> Currently, for the CountEvictor, TimeEvictor and TTL scenario, we should
> iteration all of the elements to find what we want. But if we use timestamp
> key, we could terminal the iteration early to save performance or start
> from
> the available timestamp to iteration the rest elements.
>
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discuss][FLINK-8297]A solution for FLINK-8297 Timebased RocksDBListState

SHI Xiaogang
Hi all,

I can provide more details about the private solution mentioned by Yun.

We noticed that the re-ordering of elements due to internal clock jump will
break the semantics of LIST.  So we decide not to use timestamp in the keys.
Instead, we format the key in a list state as STATE_NAME#SEQUENCE_NUMBER,
where SEQUENCE_NUMBER is generated by the state backend.
The state backend keeps the value of the next sequence number . To avoid
unnecessary I/O access, the next sequence number is maintained in memory.

When taking checkpoints, the state backend will save the next sequence
number to checkpoints and restore from them.
When the parallelism is changed, the state backend may be assigned more
than one values at restoring.  In such cases, the state backend will
restore with the largest one. That way, we can ensure that the elements
newly added to the list are always in the list tail, hence getting rid of
reordering.

Furthermore, in my opinion, we should support different types of states for
collections:
* List state: Current implementation of list state is very good for small
lists.
* Deque state, or dispersed list state: It also supports ordered access but
stores elements in multiple RocksDB entries. It may be suitable for large
lists as it avoids large memory foot-prints. But for small lists, its
performance may be poorer than ListState due to extra I/O operations. (BTW,
i think my current implementation of dispersed list states is not very
good. We can rework on it to find a better one.)
* Set state, or bag state: It can be implemented on top of map state. It
does not support ordered access, but can achieve relatively good
performance for large collections and allow access by value.

Regards,
Xiaogang

Andrey Zagrebin <[hidden email]> 于2019年4月16日周二 下午4:51写道:

> Hi Faxian,
>
> True, we can resolve timestamp conflicts putting values into the same row,
> good point.
> Then re-ordering in case of internal clock jump changes behaviour comparing
> with the list state we have now.
> In this case, it can be similar to dispersing elements by hash and we can
> call it a bag, not list.
>
> Best,
> Andrey
>
> On Tue, Apr 16, 2019 at 5:29 AM faxianzhao <[hidden email]> wrote:
>
> > Hi Yun
> > I think whether atomic increased number or timestamp, the key point is
> > disperse the elements in the different keys.
> > My point is how to design a useful key.
> > For the atomic increased number, it will array the elements one by one
> but
> > I
> > think the key is useless. Because the max key is not the elements count,
> > when we implement the remove method.
> > Currently, for the CountEvictor, TimeEvictor and TTL scenario, we should
> > iteration all of the elements to find what we want. But if we use
> timestamp
> > key, we could terminal the iteration early to save performance or start
> > from
> > the available timestamp to iteration the rest elements.
> >
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> >
>