States split over to external storage

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

States split over to external storage

Chen Qin
Hi there,

I would like to discuss split over local states to external storage. The
use case is NOT another external state backend like HDFS, rather just to
expand beyond what local disk/ memory can hold when large key space exceeds
what task managers could handle. Realizing FLINK-4266 might be hard to
tacking all-in-one, I would live give a shot to split-over first.

An intuitive approach would be treat HeapStatebackend as LRU cache and
split over to external key/value storage when threshold triggered. To make
this happen, we need minor refactor to runtime and adding set/get logic.
One nice thing of keeping HDFS to store snapshots would be avoid versioning
conflicts. Once checkpoint restore happens, partial write data will be
overwritten with previously checkpointed value.

Comments?

--
-Chen Qin
Reply | Threaded
Open this post in threaded view
|

答复: States split over to external storage

liuxinchun
Dear Chen Qin:
I am liuxinchun, and email is [hidden email] ( the email address in the "Copy To" is wrong). I have leave a message in FLINK-4266 using name SyinChwun Leo. We meet the similar problem in the applications. I hope we can develop this feature together. The following is my opinion:

(1) The organization form of current sliding window(SlidingProcessingTimeWindow and SlidingEventTimeWindow) have a drawback: When using ListState, a element may be kept in multiple windows (size / slide). It's time consuming and waste storage when checkpointing.
  Opinion: I think this is a optimal point. Elements can be organized according to the key and split(maybe also can called as pane). When triggering cleanup, only the oldest split(pane) can be cleanup.
(2) Incremental backup strategy. In original idea, we plan to only backup the new coming element, and that means a whole window may span several checkpoints, and we have develop this idea in our private SPS. But in Flink, the window may not keep raw data(for example, ReducingState and FoldingState). The idea of Chen Qin maybe a candidate strategy. We can keep in touch and exchange our respective strategy.
-----邮件原件-----
发件人: Chen Qin [mailto:[hidden email]]
发送时间: 2017年1月17日 13:30
收件人: [hidden email]
抄送: [hidden email]; Aljoscha Krettek; shijinkui
主题: States split over to external storage

Hi there,

I would like to discuss split over local states to external storage. The use case is NOT another external state backend like HDFS, rather just to expand beyond what local disk/ memory can hold when large key space exceeds what task managers could handle. Realizing FLINK-4266 might be hard to tacking all-in-one, I would live give a shot to split-over first.

An intuitive approach would be treat HeapStatebackend as LRU cache and split over to external key/value storage when threshold triggered. To make this happen, we need minor refactor to runtime and adding set/get logic.
One nice thing of keeping HDFS to store snapshots would be avoid versioning conflicts. Once checkpoint restore happens, partial write data will be overwritten with previously checkpointed value.

Comments?

--
-Chen Qin
Reply | Threaded
Open this post in threaded view
|

Re: 答复: States split over to external storage

Chen Qin
Hi liuxinchun,

Thanks for expedite feedback!

I think if dev community find it makes sense to invest on this feature,
allowing user config eviction strategy(2) makes sense to me. Given the
nature how flink job states increase various a lot, there might be a
interface allow state backend decide which state can be evicted or
restored.

Regarding to (1), I see there are optimizations can give performance boost
immediately. I would suggest raise a jira and discuss with whole dev
community. There might be cases it will conflict with upcoming refactors.
Notice Flink devs are super busy releasing 1.2 so expecting late response :)

Thanks,
Chen


>
> (1) The organization form of current sliding window(SlidingProcessingTimeWindow
> and SlidingEventTimeWindow) have a drawback: When using ListState, a
> element may be kept in multiple windows (size / slide). It's time consuming
> and waste storage when checkpointing.
>   Opinion: I think this is a optimal point. Elements can be organized
> according to the key and split(maybe also can called as pane). When
> triggering cleanup, only the oldest split(pane) can be cleanup.
> (2) Incremental backup strategy. In original idea, we plan to only backup
> the new coming element, and that means a whole window may span several
> checkpoints, and we have develop this idea in our private SPS. But in
> Flink, the window may not keep raw data(for example, ReducingState and
> FoldingState). The idea of Chen Qin maybe a candidate strategy. We can keep
> in touch and exchange our respective strategy.
> -----邮件原件-----
> 发件人: Chen Qin [mailto:[hidden email]]
> 发送时间: 2017年1月17日 13:30
> 收件人: [hidden email]
> 抄送: [hidden email]; Aljoscha Krettek; shijinkui
> 主题: States split over to external storage
>
> Hi there,
>
> I would like to discuss split over local states to external storage. The
> use case is NOT another external state backend like HDFS, rather just to
> expand beyond what local disk/ memory can hold when large key space exceeds
> what task managers could handle. Realizing FLINK-4266 might be hard to
> tacking all-in-one, I would live give a shot to split-over first.
>
> An intuitive approach would be treat HeapStatebackend as LRU cache and
> split over to external key/value storage when threshold triggered. To make
> this happen, we need minor refactor to runtime and adding set/get logic.
> One nice thing of keeping HDFS to store snapshots would be avoid
> versioning conflicts. Once checkpoint restore happens, partial write data
> will be overwritten with previously checkpointed value.
>
> Comments?
>
> --
> -Chen Qin
>



--
-Chen Qin
Reply | Threaded
Open this post in threaded view
|

Re: States split over to external storage

Stephan Ewen
In reply to this post by Chen Qin
Hi!

This is an interesting suggestion.
Just to make sure I understand it correctly: Do you design this for cases
where the state per machine is larger than that machines memory/disk? And
in that case, you cannot solve the problem by scaling out (having more
machines)?

Stephan


On Tue, Jan 17, 2017 at 6:29 AM, Chen Qin <[hidden email]> wrote:

> Hi there,
>
> I would like to discuss split over local states to external storage. The
> use case is NOT another external state backend like HDFS, rather just to
> expand beyond what local disk/ memory can hold when large key space exceeds
> what task managers could handle. Realizing FLINK-4266 might be hard to
> tacking all-in-one, I would live give a shot to split-over first.
>
> An intuitive approach would be treat HeapStatebackend as LRU cache and
> split over to external key/value storage when threshold triggered. To make
> this happen, we need minor refactor to runtime and adding set/get logic.
> One nice thing of keeping HDFS to store snapshots would be avoid versioning
> conflicts. Once checkpoint restore happens, partial write data will be
> overwritten with previously checkpointed value.
>
> Comments?
>
> --
> -Chen Qin
>
Reply | Threaded
Open this post in threaded view
|

Re: States split over to external storage

Fabian Hueske-2
If I got it correctly, part of the motivation is to move rarely used / cold
state to an external storage (please correct me if I'm wrong).

2017-01-20 11:35 GMT+01:00 Stephan Ewen <[hidden email]>:

> Hi!
>
> This is an interesting suggestion.
> Just to make sure I understand it correctly: Do you design this for cases
> where the state per machine is larger than that machines memory/disk? And
> in that case, you cannot solve the problem by scaling out (having more
> machines)?
>
> Stephan
>
>
> On Tue, Jan 17, 2017 at 6:29 AM, Chen Qin <[hidden email]> wrote:
>
> > Hi there,
> >
> > I would like to discuss split over local states to external storage. The
> > use case is NOT another external state backend like HDFS, rather just to
> > expand beyond what local disk/ memory can hold when large key space
> exceeds
> > what task managers could handle. Realizing FLINK-4266 might be hard to
> > tacking all-in-one, I would live give a shot to split-over first.
> >
> > An intuitive approach would be treat HeapStatebackend as LRU cache and
> > split over to external key/value storage when threshold triggered. To
> make
> > this happen, we need minor refactor to runtime and adding set/get logic.
> > One nice thing of keeping HDFS to store snapshots would be avoid
> versioning
> > conflicts. Once checkpoint restore happens, partial write data will be
> > overwritten with previously checkpointed value.
> >
> > Comments?
> >
> > --
> > -Chen Qin
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: States split over to external storage

liuxinchun
In reply to this post by Stephan Ewen
I think the current backup strategy checkpoints the while whindow everytime,when the window size is very large, it's time and storage consuming. An increamental policy should be consided.

Sent from HUAWEI AnyOffice
发件人:Stephan Ewen
收件人:[hidden email],
抄送:[hidden email],Aljoscha Krettek,时金魁,
时间:2017-01-20 18:35:46
主题:Re: States split over to external storage

Hi!

This is an interesting suggestion.
Just to make sure I understand it correctly: Do you design this for cases
where the state per machine is larger than that machines memory/disk? And
in that case, you cannot solve the problem by scaling out (having more
machines)?

Stephan


On Tue, Jan 17, 2017 at 6:29 AM, Chen Qin <[hidden email]> wrote:

> Hi there,
>
> I would like to discuss split over local states to external storage. The
> use case is NOT another external state backend like HDFS, rather just to
> expand beyond what local disk/ memory can hold when large key space exceeds
> what task managers could handle. Realizing FLINK-4266 might be hard to
> tacking all-in-one, I would live give a shot to split-over first.
>
> An intuitive approach would be treat HeapStatebackend as LRU cache and
> split over to external key/value storage when threshold triggered. To make
> this happen, we need minor refactor to runtime and adding set/get logic.
> One nice thing of keeping HDFS to store snapshots would be avoid versioning
> conflicts. Once checkpoint restore happens, partial write data will be
> overwritten with previously checkpointed value.
>
> Comments?
>
> --
> -Chen Qin
>
Reply | Threaded
Open this post in threaded view
|

Re: States split over to external storage

Stephan Ewen
There are works on different approaches of incremental policies underways
(more soon in some design proposals),
but the point raised here sounded different to me.

Maybe Chen Qin can describe in some more detail what he was having in
mind...

On Fri, Jan 20, 2017 at 12:15 PM, liuxinchun <[hidden email]> wrote:

> I think the current backup strategy checkpoints the while whindow
> everytime,when the window size is very large, it's time and storage
> consuming. An increamental policy should be consided.
>
> Sent from HUAWEI AnyOffice
> *发件人:*Stephan Ewen
> *收件人:*[hidden email],
> *抄送:*[hidden email],Aljoscha Krettek,时金魁,
> *时间:*2017-01-20 18:35:46
> *主题:*Re: States split over to external storage
>
> Hi!
>
> This is an interesting suggestion.
> Just to make sure I understand it correctly: Do you design this for cases
> where the state per machine is larger than that machines memory/disk? And
> in that case, you cannot solve the problem by scaling out (having more
> machines)?
>
> Stephan
>
>
> On Tue, Jan 17, 2017 at 6:29 AM, Chen Qin <[hidden email]> wrote:
>
> > Hi there,
> >
> > I would like to discuss split over local states to external storage. The
> > use case is NOT another external state backend like HDFS, rather just to
> > expand beyond what local disk/ memory can hold when large key space
> exceeds
> > what task managers could handle. Realizing FLINK-4266 might be hard to
> > tacking all-in-one, I would live give a shot to split-over first.
> >
> > An intuitive approach would be treat HeapStatebackend as LRU cache and
> > split over to external key/value storage when threshold triggered. To
> make
> > this happen, we need minor refactor to runtime and adding set/get logic.
> > One nice thing of keeping HDFS to store snapshots would be avoid
> versioning
> > conflicts. Once checkpoint restore happens, partial write data will be
> > overwritten with previously checkpointed value.
> >
> > Comments?
> >
> > --
> > -Chen Qin
> >
>
Reply | Threaded
Open this post in threaded view
|

答复: States split over to external storage

liuxinchun
In reply to this post by Stephan Ewen
What's more I make a little change in WindowOperator for ListState in

https://issues.apache.org/jira/browse/FLINK-5572


发件人:Stephan Ewen
收件人:dev
抄送:[hidden email],Aljoscha Krettek,时金魁
时间:2017-01-20 18:35:46
主题:Re: States split over to external storage

Hi!

This is an interesting suggestion.
Just to make sure I understand it correctly: Do you design this for cases
where the state per machine is larger than that machines memory/disk? And
in that case, you cannot solve the problem by scaling out (having more
machines)?

Stephan


On Tue, Jan 17, 2017 at 6:29 AM, Chen Qin <[hidden email]> wrote:

> Hi there,
>
> I would like to discuss split over local states to external storage. The
> use case is NOT another external state backend like HDFS, rather just to
> expand beyond what local disk/ memory can hold when large key space exceeds
> what task managers could handle. Realizing FLINK-4266 might be hard to
> tacking all-in-one, I would live give a shot to split-over first.
>
> An intuitive approach would be treat HeapStatebackend as LRU cache and
> split over to external key/value storage when threshold triggered. To make
> this happen, we need minor refactor to runtime and adding set/get logic.
> One nice thing of keeping HDFS to store snapshots would be avoid versioning
> conflicts. Once checkpoint restore happens, partial write data will be
> overwritten with previously checkpointed value.
>
> Comments?
>
> --
> -Chen Qin
>
Reply | Threaded
Open this post in threaded view
|

Re: States split over to external storage

Chen Qin
In reply to this post by Stephan Ewen
Hi Stephan ,Fabian, Liuxin,

Looks like it's already solved with dynamic scale keygroup along with
incremental policy.

Our use case is like a workflow model where high volume events (million
TPS) & long holding window (24 hours), very small percentage of events will
be forwarded to next operator. Wether spend lots of nodes mostly idle to
hold majority of "cold states" in key groups or split over to external
storage can be discussed. But I feel this is not typical use scenario so I
am fine with first approach(as long as incremental checkpointing in place)!

Thanks,
Chen


On Fri, Jan 20, 2017 at 3:25 AM, Stephan Ewen <[hidden email]> wrote:

> There are works on different approaches of incremental policies underways
> (more soon in some design proposals),
> but the point raised here sounded different to me.
>
> Maybe Chen Qin can describe in some more detail what he was having in
> mind...
>
> On Fri, Jan 20, 2017 at 12:15 PM, liuxinchun <[hidden email]>
> wrote:
>
> > I think the current backup strategy checkpoints the while whindow
> > everytime,when the window size is very large, it's time and storage
> > consuming. An increamental policy should be consided.
> >
> > Sent from HUAWEI AnyOffice
> > *发件人:*Stephan Ewen
> > *收件人:*[hidden email],
> > *抄送:*[hidden email],Aljoscha Krettek,时金魁,
> > *时间:*2017-01-20 18:35:46
> > *主题:*Re: States split over to external storage
> >
> > Hi!
> >
> > This is an interesting suggestion.
> > Just to make sure I understand it correctly: Do you design this for cases
> > where the state per machine is larger than that machines memory/disk? And
> > in that case, you cannot solve the problem by scaling out (having more
> > machines)?
> >
> > Stephan
> >
> >
> > On Tue, Jan 17, 2017 at 6:29 AM, Chen Qin <[hidden email]> wrote:
> >
> > > Hi there,
> > >
> > > I would like to discuss split over local states to external storage.
> The
> > > use case is NOT another external state backend like HDFS, rather just
> to
> > > expand beyond what local disk/ memory can hold when large key space
> > exceeds
> > > what task managers could handle. Realizing FLINK-4266 might be hard to
> > > tacking all-in-one, I would live give a shot to split-over first.
> > >
> > > An intuitive approach would be treat HeapStatebackend as LRU cache and
> > > split over to external key/value storage when threshold triggered. To
> > make
> > > this happen, we need minor refactor to runtime and adding set/get
> logic.
> > > One nice thing of keeping HDFS to store snapshots would be avoid
> > versioning
> > > conflicts. Once checkpoint restore happens, partial write data will be
> > > overwritten with previously checkpointed value.
> > >
> > > Comments?
> > >
> > > --
> > > -Chen Qin
> > >
> >
>



--
-Chen Qin