[DISCUSSION] Consistent shutdown of streaming jobs

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSSION] Consistent shutdown of streaming jobs

Gyula Fóra-2
Hey guys,

With recent discussions around being able to shutdown and restart streaming
jobs from specific checkpoints, there is another issue that I think needs
tackling.

As far as I understand when a streaming job finishes the tasks are not
notified for the last checkpoints and also jobs don't take a final
checkpoint before shutting down.

In my opinion this might lead to situations when the user cannot tell
whether the job finished properly (with consistent states/ outputs) etc. To
give you a concrete example, let's say I am using the RollingSink to
produce exactly once output files. If the job finishes I think there will
be some files that remain in the pending state and are never completed. The
user then sees some complete files, and some pending files for the
completed job. The question is then, how do I tell whether the pending
files were actually completed properly no that the job is finished.

Another example would be that I want to manually shut down my job at 12:00
and make sure that I produce every output up to that point. Is there any
way to achieve this currently?

I think we need to do 2 things to make this work:
1. Job shutdowns (finish/manual) should trigger a final checkpoint
2. These final checkpoints should actually be 2 phase checkpoints:
checkpoint -> ack -> notify -> ack , then when the checkpointcoordinator
gets all the notification acks it can tell the user that the system shut
down cleanely.

Unfortunately it can happen that for some reason the coordinator does not
receive all the acks for a complete job, in that case it can warn the user
that the checkpoint might be inconsistent.

Let me know what you think!

Cheers,
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION] Consistent shutdown of streaming jobs

Stephan Ewen
I think you are touching on something important here.

There is a discussion/PullRequest about graceful shutdown of streaming jobs
(like stop
the sources and let the remainder of the streams run out).

With the work in progress to draw external checkpoint, it should be easy do
checkpoint-and-close.
We may not even need the last ack in the "checkpoint -> ack -> notify ->
ack" sequence, when the
operators simply wait for the "notifyComplete" function to finish. Then,
the operators finish naturally
only successfully when the "notifyComplete()" method succeeds, otherwise
they go to the state "failed".
That is good, because we need no extra mechanism (extra message type).

What we do need anyways is a way to detect when the checkpoint did not
globally succeed, that the
functions where it succeeded do not wait forever for the "notifySuccessful"
message.

We have two things here now:

1) Graceful shutdown should trigger an "internal" checkpoint (which is
immediately discarded), in order to commit
    pending data for cases where data is staged between checkpoints.

2) An option to shut down with external checkpoint would also be important,
to stop and resume from exactly there.


Stephan


On Wed, Nov 11, 2015 at 3:19 PM, Gyula Fóra <[hidden email]> wrote:

> Hey guys,
>
> With recent discussions around being able to shutdown and restart streaming
> jobs from specific checkpoints, there is another issue that I think needs
> tackling.
>
> As far as I understand when a streaming job finishes the tasks are not
> notified for the last checkpoints and also jobs don't take a final
> checkpoint before shutting down.
>
> In my opinion this might lead to situations when the user cannot tell
> whether the job finished properly (with consistent states/ outputs) etc. To
> give you a concrete example, let's say I am using the RollingSink to
> produce exactly once output files. If the job finishes I think there will
> be some files that remain in the pending state and are never completed. The
> user then sees some complete files, and some pending files for the
> completed job. The question is then, how do I tell whether the pending
> files were actually completed properly no that the job is finished.
>
> Another example would be that I want to manually shut down my job at 12:00
> and make sure that I produce every output up to that point. Is there any
> way to achieve this currently?
>
> I think we need to do 2 things to make this work:
> 1. Job shutdowns (finish/manual) should trigger a final checkpoint
> 2. These final checkpoints should actually be 2 phase checkpoints:
> checkpoint -> ack -> notify -> ack , then when the checkpointcoordinator
> gets all the notification acks it can tell the user that the system shut
> down cleanely.
>
> Unfortunately it can happen that for some reason the coordinator does not
> receive all the acks for a complete job, in that case it can warn the user
> that the checkpoint might be inconsistent.
>
> Let me know what you think!
>
> Cheers,
> Gyula
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION] Consistent shutdown of streaming jobs

Gyula Fóra
Yes, I agree with you.

Once we have the graceful shutdown we can make this happen fairly simply
with the mechanism you described :)

Gyula

Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. nov. 11., Sze,
15:43):

> I think you are touching on something important here.
>
> There is a discussion/PullRequest about graceful shutdown of streaming jobs
> (like stop
> the sources and let the remainder of the streams run out).
>
> With the work in progress to draw external checkpoint, it should be easy do
> checkpoint-and-close.
> We may not even need the last ack in the "checkpoint -> ack -> notify ->
> ack" sequence, when the
> operators simply wait for the "notifyComplete" function to finish. Then,
> the operators finish naturally
> only successfully when the "notifyComplete()" method succeeds, otherwise
> they go to the state "failed".
> That is good, because we need no extra mechanism (extra message type).
>
> What we do need anyways is a way to detect when the checkpoint did not
> globally succeed, that the
> functions where it succeeded do not wait forever for the "notifySuccessful"
> message.
>
> We have two things here now:
>
> 1) Graceful shutdown should trigger an "internal" checkpoint (which is
> immediately discarded), in order to commit
>     pending data for cases where data is staged between checkpoints.
>
> 2) An option to shut down with external checkpoint would also be important,
> to stop and resume from exactly there.
>
>
> Stephan
>
>
> On Wed, Nov 11, 2015 at 3:19 PM, Gyula Fóra <[hidden email]> wrote:
>
> > Hey guys,
> >
> > With recent discussions around being able to shutdown and restart
> streaming
> > jobs from specific checkpoints, there is another issue that I think needs
> > tackling.
> >
> > As far as I understand when a streaming job finishes the tasks are not
> > notified for the last checkpoints and also jobs don't take a final
> > checkpoint before shutting down.
> >
> > In my opinion this might lead to situations when the user cannot tell
> > whether the job finished properly (with consistent states/ outputs) etc.
> To
> > give you a concrete example, let's say I am using the RollingSink to
> > produce exactly once output files. If the job finishes I think there will
> > be some files that remain in the pending state and are never completed.
> The
> > user then sees some complete files, and some pending files for the
> > completed job. The question is then, how do I tell whether the pending
> > files were actually completed properly no that the job is finished.
> >
> > Another example would be that I want to manually shut down my job at
> 12:00
> > and make sure that I produce every output up to that point. Is there any
> > way to achieve this currently?
> >
> > I think we need to do 2 things to make this work:
> > 1. Job shutdowns (finish/manual) should trigger a final checkpoint
> > 2. These final checkpoints should actually be 2 phase checkpoints:
> > checkpoint -> ack -> notify -> ack , then when the checkpointcoordinator
> > gets all the notification acks it can tell the user that the system shut
> > down cleanely.
> >
> > Unfortunately it can happen that for some reason the coordinator does not
> > receive all the acks for a complete job, in that case it can warn the
> user
> > that the checkpoint might be inconsistent.
> >
> > Let me know what you think!
> >
> > Cheers,
> > Gyula
> >
>
mxm
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION] Consistent shutdown of streaming jobs

mxm
+1 for the proposed changes. But why not always create a snapshot on
shutdown? Does that break any assumptions in the checkpointing
interval? I see that if the user has checkpointing disabled, we can
just create a fake snapshot.

On Thu, Nov 12, 2015 at 9:56 AM, Gyula Fóra <[hidden email]> wrote:

> Yes, I agree with you.
>
> Once we have the graceful shutdown we can make this happen fairly simply
> with the mechanism you described :)
>
> Gyula
>
> Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. nov. 11., Sze,
> 15:43):
>
>> I think you are touching on something important here.
>>
>> There is a discussion/PullRequest about graceful shutdown of streaming jobs
>> (like stop
>> the sources and let the remainder of the streams run out).
>>
>> With the work in progress to draw external checkpoint, it should be easy do
>> checkpoint-and-close.
>> We may not even need the last ack in the "checkpoint -> ack -> notify ->
>> ack" sequence, when the
>> operators simply wait for the "notifyComplete" function to finish. Then,
>> the operators finish naturally
>> only successfully when the "notifyComplete()" method succeeds, otherwise
>> they go to the state "failed".
>> That is good, because we need no extra mechanism (extra message type).
>>
>> What we do need anyways is a way to detect when the checkpoint did not
>> globally succeed, that the
>> functions where it succeeded do not wait forever for the "notifySuccessful"
>> message.
>>
>> We have two things here now:
>>
>> 1) Graceful shutdown should trigger an "internal" checkpoint (which is
>> immediately discarded), in order to commit
>>     pending data for cases where data is staged between checkpoints.
>>
>> 2) An option to shut down with external checkpoint would also be important,
>> to stop and resume from exactly there.
>>
>>
>> Stephan
>>
>>
>> On Wed, Nov 11, 2015 at 3:19 PM, Gyula Fóra <[hidden email]> wrote:
>>
>> > Hey guys,
>> >
>> > With recent discussions around being able to shutdown and restart
>> streaming
>> > jobs from specific checkpoints, there is another issue that I think needs
>> > tackling.
>> >
>> > As far as I understand when a streaming job finishes the tasks are not
>> > notified for the last checkpoints and also jobs don't take a final
>> > checkpoint before shutting down.
>> >
>> > In my opinion this might lead to situations when the user cannot tell
>> > whether the job finished properly (with consistent states/ outputs) etc.
>> To
>> > give you a concrete example, let's say I am using the RollingSink to
>> > produce exactly once output files. If the job finishes I think there will
>> > be some files that remain in the pending state and are never completed.
>> The
>> > user then sees some complete files, and some pending files for the
>> > completed job. The question is then, how do I tell whether the pending
>> > files were actually completed properly no that the job is finished.
>> >
>> > Another example would be that I want to manually shut down my job at
>> 12:00
>> > and make sure that I produce every output up to that point. Is there any
>> > way to achieve this currently?
>> >
>> > I think we need to do 2 things to make this work:
>> > 1. Job shutdowns (finish/manual) should trigger a final checkpoint
>> > 2. These final checkpoints should actually be 2 phase checkpoints:
>> > checkpoint -> ack -> notify -> ack , then when the checkpointcoordinator
>> > gets all the notification acks it can tell the user that the system shut
>> > down cleanely.
>> >
>> > Unfortunately it can happen that for some reason the coordinator does not
>> > receive all the acks for a complete job, in that case it can warn the
>> user
>> > that the checkpoint might be inconsistent.
>> >
>> > Let me know what you think!
>> >
>> > Cheers,
>> > Gyula
>> >
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION] Consistent shutdown of streaming jobs

Matthias J. Sax-2
I was thinking about this issue too and wanted to include it in my
current PR (I just about to rebase it to the current master...
https://github.com/apache/flink/pull/750).

Or should be open a new JIRA for it and address it after Stop signal is
available?


-Matthias

On 11/12/2015 11:47 AM, Maximilian Michels wrote:

> +1 for the proposed changes. But why not always create a snapshot on
> shutdown? Does that break any assumptions in the checkpointing
> interval? I see that if the user has checkpointing disabled, we can
> just create a fake snapshot.
>
> On Thu, Nov 12, 2015 at 9:56 AM, Gyula Fóra <[hidden email]> wrote:
>> Yes, I agree with you.
>>
>> Once we have the graceful shutdown we can make this happen fairly simply
>> with the mechanism you described :)
>>
>> Gyula
>>
>> Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. nov. 11., Sze,
>> 15:43):
>>
>>> I think you are touching on something important here.
>>>
>>> There is a discussion/PullRequest about graceful shutdown of streaming jobs
>>> (like stop
>>> the sources and let the remainder of the streams run out).
>>>
>>> With the work in progress to draw external checkpoint, it should be easy do
>>> checkpoint-and-close.
>>> We may not even need the last ack in the "checkpoint -> ack -> notify ->
>>> ack" sequence, when the
>>> operators simply wait for the "notifyComplete" function to finish. Then,
>>> the operators finish naturally
>>> only successfully when the "notifyComplete()" method succeeds, otherwise
>>> they go to the state "failed".
>>> That is good, because we need no extra mechanism (extra message type).
>>>
>>> What we do need anyways is a way to detect when the checkpoint did not
>>> globally succeed, that the
>>> functions where it succeeded do not wait forever for the "notifySuccessful"
>>> message.
>>>
>>> We have two things here now:
>>>
>>> 1) Graceful shutdown should trigger an "internal" checkpoint (which is
>>> immediately discarded), in order to commit
>>>     pending data for cases where data is staged between checkpoints.
>>>
>>> 2) An option to shut down with external checkpoint would also be important,
>>> to stop and resume from exactly there.
>>>
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Nov 11, 2015 at 3:19 PM, Gyula Fóra <[hidden email]> wrote:
>>>
>>>> Hey guys,
>>>>
>>>> With recent discussions around being able to shutdown and restart
>>> streaming
>>>> jobs from specific checkpoints, there is another issue that I think needs
>>>> tackling.
>>>>
>>>> As far as I understand when a streaming job finishes the tasks are not
>>>> notified for the last checkpoints and also jobs don't take a final
>>>> checkpoint before shutting down.
>>>>
>>>> In my opinion this might lead to situations when the user cannot tell
>>>> whether the job finished properly (with consistent states/ outputs) etc.
>>> To
>>>> give you a concrete example, let's say I am using the RollingSink to
>>>> produce exactly once output files. If the job finishes I think there will
>>>> be some files that remain in the pending state and are never completed.
>>> The
>>>> user then sees some complete files, and some pending files for the
>>>> completed job. The question is then, how do I tell whether the pending
>>>> files were actually completed properly no that the job is finished.
>>>>
>>>> Another example would be that I want to manually shut down my job at
>>> 12:00
>>>> and make sure that I produce every output up to that point. Is there any
>>>> way to achieve this currently?
>>>>
>>>> I think we need to do 2 things to make this work:
>>>> 1. Job shutdowns (finish/manual) should trigger a final checkpoint
>>>> 2. These final checkpoints should actually be 2 phase checkpoints:
>>>> checkpoint -> ack -> notify -> ack , then when the checkpointcoordinator
>>>> gets all the notification acks it can tell the user that the system shut
>>>> down cleanely.
>>>>
>>>> Unfortunately it can happen that for some reason the coordinator does not
>>>> receive all the acks for a complete job, in that case it can warn the
>>> user
>>>> that the checkpoint might be inconsistent.
>>>>
>>>> Let me know what you think!
>>>>
>>>> Cheers,
>>>> Gyula
>>>>
>>>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION] Consistent shutdown of streaming jobs

Stephan Ewen
Let do a separate JIRA, not overload this already tough pull request...

On Fri, Nov 13, 2015 at 10:44 AM, Matthias J. Sax <[hidden email]> wrote:

> I was thinking about this issue too and wanted to include it in my
> current PR (I just about to rebase it to the current master...
> https://github.com/apache/flink/pull/750).
>
> Or should be open a new JIRA for it and address it after Stop signal is
> available?
>
>
> -Matthias
>
> On 11/12/2015 11:47 AM, Maximilian Michels wrote:
> > +1 for the proposed changes. But why not always create a snapshot on
> > shutdown? Does that break any assumptions in the checkpointing
> > interval? I see that if the user has checkpointing disabled, we can
> > just create a fake snapshot.
> >
> > On Thu, Nov 12, 2015 at 9:56 AM, Gyula Fóra <[hidden email]>
> wrote:
> >> Yes, I agree with you.
> >>
> >> Once we have the graceful shutdown we can make this happen fairly simply
> >> with the mechanism you described :)
> >>
> >> Gyula
> >>
> >> Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. nov. 11., Sze,
> >> 15:43):
> >>
> >>> I think you are touching on something important here.
> >>>
> >>> There is a discussion/PullRequest about graceful shutdown of streaming
> jobs
> >>> (like stop
> >>> the sources and let the remainder of the streams run out).
> >>>
> >>> With the work in progress to draw external checkpoint, it should be
> easy do
> >>> checkpoint-and-close.
> >>> We may not even need the last ack in the "checkpoint -> ack -> notify
> ->
> >>> ack" sequence, when the
> >>> operators simply wait for the "notifyComplete" function to finish.
> Then,
> >>> the operators finish naturally
> >>> only successfully when the "notifyComplete()" method succeeds,
> otherwise
> >>> they go to the state "failed".
> >>> That is good, because we need no extra mechanism (extra message type).
> >>>
> >>> What we do need anyways is a way to detect when the checkpoint did not
> >>> globally succeed, that the
> >>> functions where it succeeded do not wait forever for the
> "notifySuccessful"
> >>> message.
> >>>
> >>> We have two things here now:
> >>>
> >>> 1) Graceful shutdown should trigger an "internal" checkpoint (which is
> >>> immediately discarded), in order to commit
> >>>     pending data for cases where data is staged between checkpoints.
> >>>
> >>> 2) An option to shut down with external checkpoint would also be
> important,
> >>> to stop and resume from exactly there.
> >>>
> >>>
> >>> Stephan
> >>>
> >>>
> >>> On Wed, Nov 11, 2015 at 3:19 PM, Gyula Fóra <[hidden email]> wrote:
> >>>
> >>>> Hey guys,
> >>>>
> >>>> With recent discussions around being able to shutdown and restart
> >>> streaming
> >>>> jobs from specific checkpoints, there is another issue that I think
> needs
> >>>> tackling.
> >>>>
> >>>> As far as I understand when a streaming job finishes the tasks are not
> >>>> notified for the last checkpoints and also jobs don't take a final
> >>>> checkpoint before shutting down.
> >>>>
> >>>> In my opinion this might lead to situations when the user cannot tell
> >>>> whether the job finished properly (with consistent states/ outputs)
> etc.
> >>> To
> >>>> give you a concrete example, let's say I am using the RollingSink to
> >>>> produce exactly once output files. If the job finishes I think there
> will
> >>>> be some files that remain in the pending state and are never
> completed.
> >>> The
> >>>> user then sees some complete files, and some pending files for the
> >>>> completed job. The question is then, how do I tell whether the pending
> >>>> files were actually completed properly no that the job is finished.
> >>>>
> >>>> Another example would be that I want to manually shut down my job at
> >>> 12:00
> >>>> and make sure that I produce every output up to that point. Is there
> any
> >>>> way to achieve this currently?
> >>>>
> >>>> I think we need to do 2 things to make this work:
> >>>> 1. Job shutdowns (finish/manual) should trigger a final checkpoint
> >>>> 2. These final checkpoints should actually be 2 phase checkpoints:
> >>>> checkpoint -> ack -> notify -> ack , then when the
> checkpointcoordinator
> >>>> gets all the notification acks it can tell the user that the system
> shut
> >>>> down cleanely.
> >>>>
> >>>> Unfortunately it can happen that for some reason the coordinator does
> not
> >>>> receive all the acks for a complete job, in that case it can warn the
> >>> user
> >>>> that the checkpoint might be inconsistent.
> >>>>
> >>>> Let me know what you think!
> >>>>
> >>>> Cheers,
> >>>> Gyula
> >>>>
> >>>
>
>