Cassandra statebackend

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Cassandra statebackend

chenqin
​Hi there,

Is there any design docs or on going efforts there?

Thanks,
Chen ​
Reply | Threaded
Open this post in threaded view
|

Re: Cassandra statebackend

Tzu-Li Tai
Hi Chen,

AFAIK, there currently isn’t any FLIP / JIRA / work currently for a
Cassandra state backend. I think it’ll definitely by interesting to have
one in Flink.

Regards,
Gordon


On July 25, 2016 at 10:24:32 AM, Chen Qin ([hidden email]) wrote:

​Hi there,

Is there any design docs or on going efforts there?

Thanks,
Chen ​
Reply | Threaded
Open this post in threaded view
|

Re: Cassandra statebackend

Aljoscha Krettek-2
Hi,
I thought there was a Jira for that but I looked and couldn't find it. If
you'd like you can create one and we can discuss the design. Do you have
any ideas yet?

The tricky things I see in this are:
 - Knowing which data is the current data. This will require some kind of
timestamps or increasing IDs.
 - Knowing when you can retire data from Cassandra.

Some of these might require some changes to how Flink handles checkpoints
and it somewhat goes into the direction of incremental checkpoints. That
last part is especially important once you deal with savepoints, which can
stay around indefinitely.

Cheers,
Aljoscha

On Mon, 25 Jul 2016 at 08:31 Tai Gordon <[hidden email]> wrote:

> Hi Chen,
>
> AFAIK, there currently isn’t any FLIP / JIRA / work currently for a
> Cassandra state backend. I think it’ll definitely by interesting to have
> one in Flink.
>
> Regards,
> Gordon
>
>
> On July 25, 2016 at 10:24:32 AM, Chen Qin ([hidden email]) wrote:
>
> ​Hi there,
>
> Is there any design docs or on going efforts there?
>
> Thanks,
> Chen ​
>
Reply | Threaded
Open this post in threaded view
|

Re: Cassandra statebackend

chenqin
Hi Aljoscha,

Cool! I created a JIRA for this.
https://issues.apache.org/jira/browse/FLINK-4266
Some comments inline.

Chen

On Mon, Jul 25, 2016 at 2:41 AM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> I thought there was a Jira for that but I looked and couldn't find it. If
> you'd like you can create one and we can discuss the design. Do you have
> any ideas yet?
>
> The tricky things I see in this are:
>  - Knowing which data is the current data. This will require some kind of
> timestamps or increasing IDs.
>

​We are thinking of leveraging client assigned timestamp from
checkpoint_timestamp.


>  - Knowing when you can retire data from Cassandra
>
​That's interesting part, each state checkpoint snapshot might reference
t's previous snapshot​. Delete/Consolidate rows previous snapshot with
eventual consistency can be tricky.
 ​

> Some of these might require some changes to how Flink handles checkpoints
> and it somewhat goes into the direction of incremental checkpoints. That
> last part is especially important once you deal with savepoints, which can
> stay around indefinitely.
>
> Cheers,
> Aljoscha
>
> On Mon, 25 Jul 2016 at 08:31 Tai Gordon <[hidden email]> wrote:
>
> > Hi Chen,
> >
> > AFAIK, there currently isn’t any FLIP / JIRA / work currently for a
> > Cassandra state backend. I think it’ll definitely by interesting to have
> > one in Flink.
> >
> > Regards,
> > Gordon
> >
> >
> > On July 25, 2016 at 10:24:32 AM, Chen Qin ([hidden email]) wrote:
> >
> > ​Hi there,
> >
> > Is there any design docs or on going efforts there?
> >
> > Thanks,
> > Chen ​
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Cassandra statebackend

Aljoscha Krettek-2
Hi,
thanks for opening the Jira issue. I'll continue the discussion here
instead of in the Jira, I hope that's OK.

That last paragraph of yours is the most interesting. We will have to adapt
the way that checkpoints are stored to accommodate state backends that
store state in some external system, such as Cassandra. Right now, each
Checkpoint/Savepoint is stored in isolation and the system does not know
about any relation between them. We have to introduce such a relation,
basically putting the checkpoints into a graph structure that shows the
lineage of the checkpoints. Then, when we are cleaning up old checkpoints
we check the ranges of (logical) timestamps of the checkpoints that we can
remove and instruct the StateBackend to remove the relevant ranges.

This leads to another interesting thing. We might need to have a
StateBackend component running in the JobManager that we can invoke to
delete ranges of checkpoints. Right now, a StateBackend only lives on the
TaskManager, in the operators. Cleanup of time ranges, however, should
probably happen in some centralized location.

Cheers,
Aljoscha

On Mon, 25 Jul 2016 at 22:38 Chen Qin <[hidden email]> wrote:

> Hi Aljoscha,
>
> Cool! I created a JIRA for this.
> https://issues.apache.org/jira/browse/FLINK-4266
> Some comments inline.
>
> Chen
>
> On Mon, Jul 25, 2016 at 2:41 AM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Hi,
> > I thought there was a Jira for that but I looked and couldn't find it. If
> > you'd like you can create one and we can discuss the design. Do you have
> > any ideas yet?
> >
> > The tricky things I see in this are:
> >  - Knowing which data is the current data. This will require some kind of
> > timestamps or increasing IDs.
> >
>
> ​We are thinking of leveraging client assigned timestamp from
> checkpoint_timestamp.
> ​
>
> >  - Knowing when you can retire data from Cassandra
> >
> ​That's interesting part, each state checkpoint snapshot might reference
> t's previous snapshot​. Delete/Consolidate rows previous snapshot with
> eventual consistency can be tricky.
>  ​
>
> > Some of these might require some changes to how Flink handles checkpoints
> > and it somewhat goes into the direction of incremental checkpoints. That
> > last part is especially important once you deal with savepoints, which
> can
> > stay around indefinitely.
> >
> > Cheers,
> > Aljoscha
> >
> > On Mon, 25 Jul 2016 at 08:31 Tai Gordon <[hidden email]> wrote:
> >
> > > Hi Chen,
> > >
> > > AFAIK, there currently isn’t any FLIP / JIRA / work currently for a
> > > Cassandra state backend. I think it’ll definitely by interesting to
> have
> > > one in Flink.
> > >
> > > Regards,
> > > Gordon
> > >
> > >
> > > On July 25, 2016 at 10:24:32 AM, Chen Qin ([hidden email]) wrote:
> > >
> > > ​Hi there,
> > >
> > > Is there any design docs or on going efforts there?
> > >
> > > Thanks,
> > > Chen ​
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Cassandra statebackend

chenqin
Aljoscha

Sorry about late reply.

David and I drafted a design doc with some diagrams. We may not work on it
immediately, but we thought it would be valuable to share our thoughts and
hear feedbacks.

https://docs.google.com/document/d/1diHQyOPZVxgmnmYfiTa6glLf-FlFjSHcL8J3YR2xLdk/edit#heading=h.12fh7saw98iz

>about sate lineage:

One approach might add pointer to keep data lineage between updated key in
first checkpoint and t's restored checkpoint_id correspondent. It assume
restore from a save point will not cause job manager re instrument already
used checkpoint id.

>clean up old states,

Since job manager already knew save points and latests successful
checkpoint. When a save point is created, it could be good time for job
manager to instrument clean up message and ask each states to move
effective key/values up to current save point and delete anything before.
That's doesn't need to be synchronized since both before and after
compaction will not change states value but location where that value
stored. Delete a save point / checkpoint can also trigger compaction.

Thanks,
Chen

On Thu, Jul 28, 2016 at 6:59 AM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> thanks for opening the Jira issue. I'll continue the discussion here
> instead of in the Jira, I hope that's OK.
>
> That last paragraph of yours is the most interesting. We will have to adapt
> the way that checkpoints are stored to accommodate state backends that
> store state in some external system, such as Cassandra. Right now, each
> Checkpoint/Savepoint is stored in isolation and the system does not know
> about any relation between them. We have to introduce such a relation,
> basically putting the checkpoints into a graph structure that shows the
> lineage of the checkpoints. Then, when we are cleaning up old checkpoints
> we check the ranges of (logical) timestamps of the checkpoints that we can
> remove and instruct the StateBackend to remove the relevant ranges.
>
> This leads to another interesting thing. We might need to have a
> StateBackend component running in the JobManager that we can invoke to
> delete ranges of checkpoints. Right now, a StateBackend only lives on the
> TaskManager, in the operators. Cleanup of time ranges, however, should
> probably happen in some centralized location.
>
> Cheers,
> Aljoscha
>
> On Mon, 25 Jul 2016 at 22:38 Chen Qin <[hidden email]> wrote:
>
> > Hi Aljoscha,
> >
> > Cool! I created a JIRA for this.
> > https://issues.apache.org/jira/browse/FLINK-4266
> > Some comments inline.
> >
> > Chen
> >
> > On Mon, Jul 25, 2016 at 2:41 AM, Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > Hi,
> > > I thought there was a Jira for that but I looked and couldn't find it.
> If
> > > you'd like you can create one and we can discuss the design. Do you
> have
> > > any ideas yet?
> > >
> > > The tricky things I see in this are:
> > >  - Knowing which data is the current data. This will require some kind
> of
> > > timestamps or increasing IDs.
> > >
> >
> > ​We are thinking of leveraging client assigned timestamp from
> > checkpoint_timestamp.
> > ​
> >
> > >  - Knowing when you can retire data from Cassandra
> > >
> > ​That's interesting part, each state checkpoint snapshot might reference
> > t's previous snapshot​. Delete/Consolidate rows previous snapshot with
> > eventual consistency can be tricky.
> >  ​
> >
> > > Some of these might require some changes to how Flink handles
> checkpoints
> > > and it somewhat goes into the direction of incremental checkpoints.
> That
> > > last part is especially important once you deal with savepoints, which
> > can
> > > stay around indefinitely.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Mon, 25 Jul 2016 at 08:31 Tai Gordon <[hidden email]> wrote:
> > >
> > > > Hi Chen,
> > > >
> > > > AFAIK, there currently isn’t any FLIP / JIRA / work currently for a
> > > > Cassandra state backend. I think it’ll definitely by interesting to
> > have
> > > > one in Flink.
> > > >
> > > > Regards,
> > > > Gordon
> > > >
> > > >
> > > > On July 25, 2016 at 10:24:32 AM, Chen Qin ([hidden email])
> wrote:
> > > >
> > > > ​Hi there,
> > > >
> > > > Is there any design docs or on going efforts there?
> > > >
> > > > Thanks,
> > > > Chen ​
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Cassandra statebackend

Aljoscha Krettek-2
Hi,
thanks for sharing the design doc, these are valuable ideas.

We might have to revisit the specifics once the re-sharding/key-group
changes are in Flink and once you actually want to start working on this.

Cheers,
Aljoscha

On Sat, 6 Aug 2016 at 07:32 Chen Qin <[hidden email]> wrote:

> Aljoscha
>
> Sorry about late reply.
>
> David and I drafted a design doc with some diagrams. We may not work on it
> immediately, but we thought it would be valuable to share our thoughts and
> hear feedbacks.
>
>
> https://docs.google.com/document/d/1diHQyOPZVxgmnmYfiTa6glLf-FlFjSHcL8J3YR2xLdk/edit#heading=h.12fh7saw98iz
>
> >about sate lineage:
>
> One approach might add pointer to keep data lineage between updated key in
> first checkpoint and t's restored checkpoint_id correspondent. It assume
> restore from a save point will not cause job manager re instrument already
> used checkpoint id.
>
> >clean up old states,
>
> Since job manager already knew save points and latests successful
> checkpoint. When a save point is created, it could be good time for job
> manager to instrument clean up message and ask each states to move
> effective key/values up to current save point and delete anything before.
> That's doesn't need to be synchronized since both before and after
> compaction will not change states value but location where that value
> stored. Delete a save point / checkpoint can also trigger compaction.
>
> Thanks,
> Chen
>
> On Thu, Jul 28, 2016 at 6:59 AM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Hi,
> > thanks for opening the Jira issue. I'll continue the discussion here
> > instead of in the Jira, I hope that's OK.
> >
> > That last paragraph of yours is the most interesting. We will have to
> adapt
> > the way that checkpoints are stored to accommodate state backends that
> > store state in some external system, such as Cassandra. Right now, each
> > Checkpoint/Savepoint is stored in isolation and the system does not know
> > about any relation between them. We have to introduce such a relation,
> > basically putting the checkpoints into a graph structure that shows the
> > lineage of the checkpoints. Then, when we are cleaning up old checkpoints
> > we check the ranges of (logical) timestamps of the checkpoints that we
> can
> > remove and instruct the StateBackend to remove the relevant ranges.
> >
> > This leads to another interesting thing. We might need to have a
> > StateBackend component running in the JobManager that we can invoke to
> > delete ranges of checkpoints. Right now, a StateBackend only lives on the
> > TaskManager, in the operators. Cleanup of time ranges, however, should
> > probably happen in some centralized location.
> >
> > Cheers,
> > Aljoscha
> >
> > On Mon, 25 Jul 2016 at 22:38 Chen Qin <[hidden email]> wrote:
> >
> > > Hi Aljoscha,
> > >
> > > Cool! I created a JIRA for this.
> > > https://issues.apache.org/jira/browse/FLINK-4266
> > > Some comments inline.
> > >
> > > Chen
> > >
> > > On Mon, Jul 25, 2016 at 2:41 AM, Aljoscha Krettek <[hidden email]
> >
> > > wrote:
> > >
> > > > Hi,
> > > > I thought there was a Jira for that but I looked and couldn't find
> it.
> > If
> > > > you'd like you can create one and we can discuss the design. Do you
> > have
> > > > any ideas yet?
> > > >
> > > > The tricky things I see in this are:
> > > >  - Knowing which data is the current data. This will require some
> kind
> > of
> > > > timestamps or increasing IDs.
> > > >
> > >
> > > ​We are thinking of leveraging client assigned timestamp from
> > > checkpoint_timestamp.
> > > ​
> > >
> > > >  - Knowing when you can retire data from Cassandra
> > > >
> > > ​That's interesting part, each state checkpoint snapshot might
> reference
> > > t's previous snapshot​. Delete/Consolidate rows previous snapshot with
> > > eventual consistency can be tricky.
> > >  ​
> > >
> > > > Some of these might require some changes to how Flink handles
> > checkpoints
> > > > and it somewhat goes into the direction of incremental checkpoints.
> > That
> > > > last part is especially important once you deal with savepoints,
> which
> > > can
> > > > stay around indefinitely.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Mon, 25 Jul 2016 at 08:31 Tai Gordon <[hidden email]> wrote:
> > > >
> > > > > Hi Chen,
> > > > >
> > > > > AFAIK, there currently isn’t any FLIP / JIRA / work currently for a
> > > > > Cassandra state backend. I think it’ll definitely by interesting to
> > > have
> > > > > one in Flink.
> > > > >
> > > > > Regards,
> > > > > Gordon
> > > > >
> > > > >
> > > > > On July 25, 2016 at 10:24:32 AM, Chen Qin ([hidden email])
> > wrote:
> > > > >
> > > > > ​Hi there,
> > > > >
> > > > > Is there any design docs or on going efforts there?
> > > > >
> > > > > Thanks,
> > > > > Chen ​
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Cassandra statebackend

Gyula Fóra-2
Hi,

I have done something similar in the past for storing state in sharded
MySql databases. We used this for a while for state size scaling reasons
but have switched to RocksDB later and therefore this statebackend has been
removed from Flink to cut some maintenance costs.

You can find the initial PR here that contains the description:
https://github.com/apache/flink/pull/1305

Maybe it helps a little, I don't know :)

Cheers,
Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2016. aug. 8., H,
11:41):

> Hi,
> thanks for sharing the design doc, these are valuable ideas.
>
> We might have to revisit the specifics once the re-sharding/key-group
> changes are in Flink and once you actually want to start working on this.
>
> Cheers,
> Aljoscha
>
> On Sat, 6 Aug 2016 at 07:32 Chen Qin <[hidden email]> wrote:
>
> > Aljoscha
> >
> > Sorry about late reply.
> >
> > David and I drafted a design doc with some diagrams. We may not work on
> it
> > immediately, but we thought it would be valuable to share our thoughts
> and
> > hear feedbacks.
> >
> >
> >
> https://docs.google.com/document/d/1diHQyOPZVxgmnmYfiTa6glLf-FlFjSHcL8J3YR2xLdk/edit#heading=h.12fh7saw98iz
> >
> > >about sate lineage:
> >
> > One approach might add pointer to keep data lineage between updated key
> in
> > first checkpoint and t's restored checkpoint_id correspondent. It assume
> > restore from a save point will not cause job manager re instrument
> already
> > used checkpoint id.
> >
> > >clean up old states,
> >
> > Since job manager already knew save points and latests successful
> > checkpoint. When a save point is created, it could be good time for job
> > manager to instrument clean up message and ask each states to move
> > effective key/values up to current save point and delete anything before.
> > That's doesn't need to be synchronized since both before and after
> > compaction will not change states value but location where that value
> > stored. Delete a save point / checkpoint can also trigger compaction.
> >
> > Thanks,
> > Chen
> >
> > On Thu, Jul 28, 2016 at 6:59 AM, Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > Hi,
> > > thanks for opening the Jira issue. I'll continue the discussion here
> > > instead of in the Jira, I hope that's OK.
> > >
> > > That last paragraph of yours is the most interesting. We will have to
> > adapt
> > > the way that checkpoints are stored to accommodate state backends that
> > > store state in some external system, such as Cassandra. Right now, each
> > > Checkpoint/Savepoint is stored in isolation and the system does not
> know
> > > about any relation between them. We have to introduce such a relation,
> > > basically putting the checkpoints into a graph structure that shows the
> > > lineage of the checkpoints. Then, when we are cleaning up old
> checkpoints
> > > we check the ranges of (logical) timestamps of the checkpoints that we
> > can
> > > remove and instruct the StateBackend to remove the relevant ranges.
> > >
> > > This leads to another interesting thing. We might need to have a
> > > StateBackend component running in the JobManager that we can invoke to
> > > delete ranges of checkpoints. Right now, a StateBackend only lives on
> the
> > > TaskManager, in the operators. Cleanup of time ranges, however, should
> > > probably happen in some centralized location.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Mon, 25 Jul 2016 at 22:38 Chen Qin <[hidden email]> wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > Cool! I created a JIRA for this.
> > > > https://issues.apache.org/jira/browse/FLINK-4266
> > > > Some comments inline.
> > > >
> > > > Chen
> > > >
> > > > On Mon, Jul 25, 2016 at 2:41 AM, Aljoscha Krettek <
> [hidden email]
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > I thought there was a Jira for that but I looked and couldn't find
> > it.
> > > If
> > > > > you'd like you can create one and we can discuss the design. Do you
> > > have
> > > > > any ideas yet?
> > > > >
> > > > > The tricky things I see in this are:
> > > > >  - Knowing which data is the current data. This will require some
> > kind
> > > of
> > > > > timestamps or increasing IDs.
> > > > >
> > > >
> > > > ​We are thinking of leveraging client assigned timestamp from
> > > > checkpoint_timestamp.
> > > > ​
> > > >
> > > > >  - Knowing when you can retire data from Cassandra
> > > > >
> > > > ​That's interesting part, each state checkpoint snapshot might
> > reference
> > > > t's previous snapshot​. Delete/Consolidate rows previous snapshot
> with
> > > > eventual consistency can be tricky.
> > > >  ​
> > > >
> > > > > Some of these might require some changes to how Flink handles
> > > checkpoints
> > > > > and it somewhat goes into the direction of incremental checkpoints.
> > > That
> > > > > last part is especially important once you deal with savepoints,
> > which
> > > > can
> > > > > stay around indefinitely.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Mon, 25 Jul 2016 at 08:31 Tai Gordon <[hidden email]>
> wrote:
> > > > >
> > > > > > Hi Chen,
> > > > > >
> > > > > > AFAIK, there currently isn’t any FLIP / JIRA / work currently
> for a
> > > > > > Cassandra state backend. I think it’ll definitely by interesting
> to
> > > > have
> > > > > > one in Flink.
> > > > > >
> > > > > > Regards,
> > > > > > Gordon
> > > > > >
> > > > > >
> > > > > > On July 25, 2016 at 10:24:32 AM, Chen Qin ([hidden email])
> > > wrote:
> > > > > >
> > > > > > ​Hi there,
> > > > > >
> > > > > > Is there any design docs or on going efforts there?
> > > > > >
> > > > > > Thanks,
> > > > > > Chen ​
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Cassandra statebackend

chenqin
Aljoscha,

Sure thing, will do after key/group feature in place when we got bandwith :)

Gyula,

That's where we started, many terms are copied over(logical timestamp,
compaction, lazy restore). we have to use Cassandra which offer less in
transaction and consistency to gain availability and cross data center
replication. It leverage RocksDB as pre-checkpoint cache and evict/lazy
restore from Cassandra.

Chen




On Mon, Aug 8, 2016 at 2:51 AM, Gyula Fóra <[hidden email]> wrote:

> Hi,
>
> I have done something similar in the past for storing state in sharded
> MySql databases. We used this for a while for state size scaling reasons
> but have switched to RocksDB later and therefore this statebackend has been
> removed from Flink to cut some maintenance costs.
>
> You can find the initial PR here that contains the description:
> https://github.com/apache/flink/pull/1305
>
> Maybe it helps a little, I don't know :)
>
> Cheers,
> Gyula
>
> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2016. aug. 8.,
> H,
> 11:41):
>
> > Hi,
> > thanks for sharing the design doc, these are valuable ideas.
> >
> > We might have to revisit the specifics once the re-sharding/key-group
> > changes are in Flink and once you actually want to start working on this.
> >
> > Cheers,
> > Aljoscha
> >
> > On Sat, 6 Aug 2016 at 07:32 Chen Qin <[hidden email]> wrote:
> >
> > > Aljoscha
> > >
> > > Sorry about late reply.
> > >
> > > David and I drafted a design doc with some diagrams. We may not work on
> > it
> > > immediately, but we thought it would be valuable to share our thoughts
> > and
> > > hear feedbacks.
> > >
> > >
> > >
> > https://docs.google.com/document/d/1diHQyOPZVxgmnmYfiTa6glLf-
> FlFjSHcL8J3YR2xLdk/edit#heading=h.12fh7saw98iz
> > >
> > > >about sate lineage:
> > >
> > > One approach might add pointer to keep data lineage between updated key
> > in
> > > first checkpoint and t's restored checkpoint_id correspondent. It
> assume
> > > restore from a save point will not cause job manager re instrument
> > already
> > > used checkpoint id.
> > >
> > > >clean up old states,
> > >
> > > Since job manager already knew save points and latests successful
> > > checkpoint. When a save point is created, it could be good time for job
> > > manager to instrument clean up message and ask each states to move
> > > effective key/values up to current save point and delete anything
> before.
> > > That's doesn't need to be synchronized since both before and after
> > > compaction will not change states value but location where that value
> > > stored. Delete a save point / checkpoint can also trigger compaction.
> > >
> > > Thanks,
> > > Chen
> > >
> > > On Thu, Jul 28, 2016 at 6:59 AM, Aljoscha Krettek <[hidden email]
> >
> > > wrote:
> > >
> > > > Hi,
> > > > thanks for opening the Jira issue. I'll continue the discussion here
> > > > instead of in the Jira, I hope that's OK.
> > > >
> > > > That last paragraph of yours is the most interesting. We will have to
> > > adapt
> > > > the way that checkpoints are stored to accommodate state backends
> that
> > > > store state in some external system, such as Cassandra. Right now,
> each
> > > > Checkpoint/Savepoint is stored in isolation and the system does not
> > know
> > > > about any relation between them. We have to introduce such a
> relation,
> > > > basically putting the checkpoints into a graph structure that shows
> the
> > > > lineage of the checkpoints. Then, when we are cleaning up old
> > checkpoints
> > > > we check the ranges of (logical) timestamps of the checkpoints that
> we
> > > can
> > > > remove and instruct the StateBackend to remove the relevant ranges.
> > > >
> > > > This leads to another interesting thing. We might need to have a
> > > > StateBackend component running in the JobManager that we can invoke
> to
> > > > delete ranges of checkpoints. Right now, a StateBackend only lives on
> > the
> > > > TaskManager, in the operators. Cleanup of time ranges, however,
> should
> > > > probably happen in some centralized location.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Mon, 25 Jul 2016 at 22:38 Chen Qin <[hidden email]> wrote:
> > > >
> > > > > Hi Aljoscha,
> > > > >
> > > > > Cool! I created a JIRA for this.
> > > > > https://issues.apache.org/jira/browse/FLINK-4266
> > > > > Some comments inline.
> > > > >
> > > > > Chen
> > > > >
> > > > > On Mon, Jul 25, 2016 at 2:41 AM, Aljoscha Krettek <
> > [hidden email]
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > > I thought there was a Jira for that but I looked and couldn't
> find
> > > it.
> > > > If
> > > > > > you'd like you can create one and we can discuss the design. Do
> you
> > > > have
> > > > > > any ideas yet?
> > > > > >
> > > > > > The tricky things I see in this are:
> > > > > >  - Knowing which data is the current data. This will require some
> > > kind
> > > > of
> > > > > > timestamps or increasing IDs.
> > > > > >
> > > > >
> > > > > ​We are thinking of leveraging client assigned timestamp from
> > > > > checkpoint_timestamp.
> > > > > ​
> > > > >
> > > > > >  - Knowing when you can retire data from Cassandra
> > > > > >
> > > > > ​That's interesting part, each state checkpoint snapshot might
> > > reference
> > > > > t's previous snapshot​. Delete/Consolidate rows previous snapshot
> > with
> > > > > eventual consistency can be tricky.
> > > > >  ​
> > > > >
> > > > > > Some of these might require some changes to how Flink handles
> > > > checkpoints
> > > > > > and it somewhat goes into the direction of incremental
> checkpoints.
> > > > That
> > > > > > last part is especially important once you deal with savepoints,
> > > which
> > > > > can
> > > > > > stay around indefinitely.
> > > > > >
> > > > > > Cheers,
> > > > > > Aljoscha
> > > > > >
> > > > > > On Mon, 25 Jul 2016 at 08:31 Tai Gordon <[hidden email]>
> > wrote:
> > > > > >
> > > > > > > Hi Chen,
> > > > > > >
> > > > > > > AFAIK, there currently isn’t any FLIP / JIRA / work currently
> > for a
> > > > > > > Cassandra state backend. I think it’ll definitely by
> interesting
> > to
> > > > > have
> > > > > > > one in Flink.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Gordon
> > > > > > >
> > > > > > >
> > > > > > > On July 25, 2016 at 10:24:32 AM, Chen Qin ([hidden email])
> > > > wrote:
> > > > > > >
> > > > > > > ​Hi there,
> > > > > > >
> > > > > > > Is there any design docs or on going efforts there?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Chen ​
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>