checkpointing a OneInputStreamOperator/AbstractStreamOperator

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

checkpointing a OneInputStreamOperator/AbstractStreamOperator

dan bress
Hi Flink Devs,
   I have an operator that implements both OneInputStreamOperator
and AbstractStreamOperator, and I would like to preserve its state.

   I started by having it implement CheckpointedAsynchronously like my
other stateful functions, but this didn't work(in the flink UI under the
checkpoint tab, this function doesn't show anything, where the others do.
Is this because its a lower level operator, and not a function?).  Also
functionally, the state did not seem to be preserved in my dataflow.

   Where can I go from here?  Should I implement

StreamTaskState snapshotOperatorState(long checkpointId, long timestamp)
throws Exception;
and
void restoreState(StreamTaskState state) throws Exception;

defined in StreamOperator

   I tried doing this, but the semantics of these methods were a little
confusing, and when I implemented it I started getting null pointer
exceptions in restoreState.  Also the amount of stuff I had to do seemed
like it was more than when implementing CheckpointedAsynchronously

   Is there an example of how to implement a low level operator with
checkpointing?

Any help would be appreciated,

Thanks!

Dan
Reply | Threaded
Open this post in threaded view
|

Re: checkpointing a OneInputStreamOperator/AbstractStreamOperator

Aljoscha Krettek-2
Hi,
yes you guessed correctly: CheckpointedAsynchronously only works with
functions and not with the lower-level StreamOperator. You would have to
implement snapshotOperatorState() and restoreState(). These interfaces are
quite low-level, though, and not stable. For example, in Flink 1.2 we're
refactoring that to make it way simpler.

On a side note, I would suggest not to use these methods for state
checkpointing because they store non-rescalable state, i.e. if you use this
you won't be able to change the parallelism of your job in the future. A
more future proof solution would be to use keyed state, i.e.
getRuntimeContext().getState().

Cheers,
Aljoscha

On Mon, 12 Sep 2016 at 20:30 dan bress <[hidden email]> wrote:

> Hi Flink Devs,
>    I have an operator that implements both OneInputStreamOperator
> and AbstractStreamOperator, and I would like to preserve its state.
>
>    I started by having it implement CheckpointedAsynchronously like my
> other stateful functions, but this didn't work(in the flink UI under the
> checkpoint tab, this function doesn't show anything, where the others do.
> Is this because its a lower level operator, and not a function?).  Also
> functionally, the state did not seem to be preserved in my dataflow.
>
>    Where can I go from here?  Should I implement
>
> StreamTaskState snapshotOperatorState(long checkpointId, long timestamp)
> throws Exception;
> and
> void restoreState(StreamTaskState state) throws Exception;
>
> defined in StreamOperator
>
>    I tried doing this, but the semantics of these methods were a little
> confusing, and when I implemented it I started getting null pointer
> exceptions in restoreState.  Also the amount of stuff I had to do seemed
> like it was more than when implementing CheckpointedAsynchronously
>
>    Is there an example of how to implement a low level operator with
> checkpointing?
>
> Any help would be appreciated,
>
> Thanks!
>
> Dan
>
Reply | Threaded
Open this post in threaded view
|

Re: checkpointing a OneInputStreamOperator/AbstractStreamOperator

dan bress
Aljoscha,
   Thanks.  I originally looked at the keyed state streaming.  The problem
I have with this approach is that it requires that I manage which keys this
operator has seen and set them prior to getting or retrieving state,
correct?  My operator is used on a keyed stream.  This means I have to
persist this key state.  How do I do that?

Dan

On Mon, Sep 12, 2016 at 11:33 PM Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> yes you guessed correctly: CheckpointedAsynchronously only works with
> functions and not with the lower-level StreamOperator. You would have to
> implement snapshotOperatorState() and restoreState(). These interfaces are
> quite low-level, though, and not stable. For example, in Flink 1.2 we're
> refactoring that to make it way simpler.
>
> On a side note, I would suggest not to use these methods for state
> checkpointing because they store non-rescalable state, i.e. if you use this
> you won't be able to change the parallelism of your job in the future. A
> more future proof solution would be to use keyed state, i.e.
> getRuntimeContext().getState().
>
> Cheers,
> Aljoscha
>
> On Mon, 12 Sep 2016 at 20:30 dan bress <[hidden email]> wrote:
>
> > Hi Flink Devs,
> >    I have an operator that implements both OneInputStreamOperator
> > and AbstractStreamOperator, and I would like to preserve its state.
> >
> >    I started by having it implement CheckpointedAsynchronously like my
> > other stateful functions, but this didn't work(in the flink UI under the
> > checkpoint tab, this function doesn't show anything, where the others do.
> > Is this because its a lower level operator, and not a function?).  Also
> > functionally, the state did not seem to be preserved in my dataflow.
> >
> >    Where can I go from here?  Should I implement
> >
> > StreamTaskState snapshotOperatorState(long checkpointId, long timestamp)
> > throws Exception;
> > and
> > void restoreState(StreamTaskState state) throws Exception;
> >
> > defined in StreamOperator
> >
> >    I tried doing this, but the semantics of these methods were a little
> > confusing, and when I implemented it I started getting null pointer
> > exceptions in restoreState.  Also the amount of stuff I had to do seemed
> > like it was more than when implementing CheckpointedAsynchronously
> >
> >    Is there an example of how to implement a low level operator with
> > checkpointing?
> >
> > Any help would be appreciated,
> >
> > Thanks!
> >
> > Dan
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: checkpointing a OneInputStreamOperator/AbstractStreamOperator

Aljoscha Krettek-2
Hi,
yes, this observation is correct. It's a current limitation in Flink that I
was trying to address with this issue
https://issues.apache.org/jira/browse/FLINK-3582 and the PR I created for
it. By now, that PR is quite outdated but we should maybe put in the effort
to finish this now.

+Stefan I'm looping in Stefan since he did some recent changes in how
RocksDB keeps its state. Providing such a feature should still be possible,
correct?

Cheers,
Aljoscha

On Tue, 13 Sep 2016 at 16:53 dan bress <[hidden email]> wrote:

> Aljoscha,
>    Thanks.  I originally looked at the keyed state streaming.  The problem
> I have with this approach is that it requires that I manage which keys this
> operator has seen and set them prior to getting or retrieving state,
> correct?  My operator is used on a keyed stream.  This means I have to
> persist this key state.  How do I do that?
>
> Dan
>
> On Mon, Sep 12, 2016 at 11:33 PM Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Hi,
> > yes you guessed correctly: CheckpointedAsynchronously only works with
> > functions and not with the lower-level StreamOperator. You would have to
> > implement snapshotOperatorState() and restoreState(). These interfaces
> are
> > quite low-level, though, and not stable. For example, in Flink 1.2 we're
> > refactoring that to make it way simpler.
> >
> > On a side note, I would suggest not to use these methods for state
> > checkpointing because they store non-rescalable state, i.e. if you use
> this
> > you won't be able to change the parallelism of your job in the future. A
> > more future proof solution would be to use keyed state, i.e.
> > getRuntimeContext().getState().
> >
> > Cheers,
> > Aljoscha
> >
> > On Mon, 12 Sep 2016 at 20:30 dan bress <[hidden email]> wrote:
> >
> > > Hi Flink Devs,
> > >    I have an operator that implements both OneInputStreamOperator
> > > and AbstractStreamOperator, and I would like to preserve its state.
> > >
> > >    I started by having it implement CheckpointedAsynchronously like my
> > > other stateful functions, but this didn't work(in the flink UI under
> the
> > > checkpoint tab, this function doesn't show anything, where the others
> do.
> > > Is this because its a lower level operator, and not a function?).  Also
> > > functionally, the state did not seem to be preserved in my dataflow.
> > >
> > >    Where can I go from here?  Should I implement
> > >
> > > StreamTaskState snapshotOperatorState(long checkpointId, long
> timestamp)
> > > throws Exception;
> > > and
> > > void restoreState(StreamTaskState state) throws Exception;
> > >
> > > defined in StreamOperator
> > >
> > >    I tried doing this, but the semantics of these methods were a little
> > > confusing, and when I implemented it I started getting null pointer
> > > exceptions in restoreState.  Also the amount of stuff I had to do
> seemed
> > > like it was more than when implementing CheckpointedAsynchronously
> > >
> > >    Is there an example of how to implement a low level operator with
> > > checkpointing?
> > >
> > > Any help would be appreciated,
> > >
> > > Thanks!
> > >
> > > Dan
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: checkpointing a OneInputStreamOperator/AbstractStreamOperator

Stefan Richter
Hi,

I think it is still perfectly possible to implement something along the
lines of the pull request. The major change in the current version is the
introduction of key groups which are like prefixes to a key and are also
functionally dependent on the key.

Best,
Stefan

On Wed, Sep 14, 2016 at 10:43 AM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> yes, this observation is correct. It's a current limitation in Flink that I
> was trying to address with this issue
> https://issues.apache.org/jira/browse/FLINK-3582 and the PR I created for
> it. By now, that PR is quite outdated but we should maybe put in the effort
> to finish this now.
>
> +Stefan I'm looping in Stefan since he did some recent changes in how
> RocksDB keeps its state. Providing such a feature should still be possible,
> correct?
>
> Cheers,
> Aljoscha
>
> On Tue, 13 Sep 2016 at 16:53 dan bress <[hidden email]> wrote:
>
> > Aljoscha,
> >    Thanks.  I originally looked at the keyed state streaming.  The
> problem
> > I have with this approach is that it requires that I manage which keys
> this
> > operator has seen and set them prior to getting or retrieving state,
> > correct?  My operator is used on a keyed stream.  This means I have to
> > persist this key state.  How do I do that?
> >
> > Dan
> >
> > On Mon, Sep 12, 2016 at 11:33 PM Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > Hi,
> > > yes you guessed correctly: CheckpointedAsynchronously only works with
> > > functions and not with the lower-level StreamOperator. You would have
> to
> > > implement snapshotOperatorState() and restoreState(). These interfaces
> > are
> > > quite low-level, though, and not stable. For example, in Flink 1.2
> we're
> > > refactoring that to make it way simpler.
> > >
> > > On a side note, I would suggest not to use these methods for state
> > > checkpointing because they store non-rescalable state, i.e. if you use
> > this
> > > you won't be able to change the parallelism of your job in the future.
> A
> > > more future proof solution would be to use keyed state, i.e.
> > > getRuntimeContext().getState().
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Mon, 12 Sep 2016 at 20:30 dan bress <[hidden email]> wrote:
> > >
> > > > Hi Flink Devs,
> > > >    I have an operator that implements both OneInputStreamOperator
> > > > and AbstractStreamOperator, and I would like to preserve its state.
> > > >
> > > >    I started by having it implement CheckpointedAsynchronously like
> my
> > > > other stateful functions, but this didn't work(in the flink UI under
> > the
> > > > checkpoint tab, this function doesn't show anything, where the others
> > do.
> > > > Is this because its a lower level operator, and not a function?).
> Also
> > > > functionally, the state did not seem to be preserved in my dataflow.
> > > >
> > > >    Where can I go from here?  Should I implement
> > > >
> > > > StreamTaskState snapshotOperatorState(long checkpointId, long
> > timestamp)
> > > > throws Exception;
> > > > and
> > > > void restoreState(StreamTaskState state) throws Exception;
> > > >
> > > > defined in StreamOperator
> > > >
> > > >    I tried doing this, but the semantics of these methods were a
> little
> > > > confusing, and when I implemented it I started getting null pointer
> > > > exceptions in restoreState.  Also the amount of stuff I had to do
> > seemed
> > > > like it was more than when implementing CheckpointedAsynchronously
> > > >
> > > >    Is there an example of how to implement a low level operator with
> > > > checkpointing?
> > > >
> > > > Any help would be appreciated,
> > > >
> > > > Thanks!
> > > >
> > > > Dan
> > > >
> > >
> >
>



--
Data Artisans GmbH | Tempelhofer Ufer 17 | 10963 Berlin

[hidden email]
Phone +49 030 55599146
Mobile +49 0171 7424461

Registered at Amtsgericht Charlottenburg - HRB 158244 B
Managing Directors: Kostas Tzoumas, Stephan Ewen