Checkpoint metrics.

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpoint metrics.

Jamie Grier-3
Hey all,

I need to make sense of this behavior.  Any help would be appreciated.

Here’s an example of a set of Flink checkpoint metrics I don’t understand.  This is the first operator in a job and as you can see the end-to-end time for the checkpoint is long, but it’s not explained by either sync, async, or alignment times.  I’m not sure what to make of this.  It makes me think I don’t understand the meaning of the metrics themselves.  In my interpretation the end-to-end time should always be, roughly, the sum of the other components — certainly in the case of a source task such as this.

Any thoughts or clarifications anyone can provide on this?  We have many jobs with slow checkpoints that suffer from this sort of thing with metrics that look similar.

-Jamie

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint metrics.

Seth Wiesman-4
Great timing, I just debugged this on Monday. E2e time is checkpoint coordinator to checkpoint coordinator, so it includes RPC to the source and RPC from the operator back for the JM.

Seth

> On Sep 11, 2019, at 6:17 PM, Jamie Grier <[hidden email]> wrote:
>
> Hey all,
>
> I need to make sense of this behavior.  Any help would be appreciated.
>
> Here’s an example of a set of Flink checkpoint metrics I don’t understand.  This is the first operator in a job and as you can see the end-to-end time for the checkpoint is long, but it’s not explained by either sync, async, or alignment times.  I’m not sure what to make of this.  It makes me think I don’t understand the meaning of the metrics themselves.  In my interpretation the end-to-end time should always be, roughly, the sum of the other components — certainly in the case of a source task such as this.
>
> Any thoughts or clarifications anyone can provide on this?  We have many jobs with slow checkpoints that suffer from this sort of thing with metrics that look similar.
>
> -Jamie
>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint metrics.

Stephan Ewen
Hi Jamie!

Did you mean to attach a screenshot? If yes, you need to share that through
a different channel, the mailing list does not support attachments,
unfortunately.

Seth is right how the time is measured.
One important bit to add to the interpretation:
  - For non-source tasks, the time include the "travel of the barriers",
which can take long under back pressure
  - For source tasks, it includes the "time to acquire the checkpoint
lock", which can be long if the source is blocked in trying to emit data
(again, backpressure).

As part of FLIP-27 we will eliminate the checkpoint lock (have a mailbox
instead) which should lead to faster lock acquisition.

The "unaligned checkpoints" discussion is looking at ways to make
checkpoints much less susceptible to back pressure.

https://lists.apache.org/thread.html/fd5b6cceb4bffb635e26e7ec0787a8db454ddd64aadb40a0d08a90a8@%3Cdev.flink.apache.org%3E

Hope that helps understanding what is going on.

Best,
Stephan


On Thu, Sep 12, 2019 at 1:25 AM Seth Wiesman <[hidden email]> wrote:

> Great timing, I just debugged this on Monday. E2e time is checkpoint
> coordinator to checkpoint coordinator, so it includes RPC to the source and
> RPC from the operator back for the JM.
>
> Seth
>
> > On Sep 11, 2019, at 6:17 PM, Jamie Grier <[hidden email]>
> wrote:
> >
> > Hey all,
> >
> > I need to make sense of this behavior.  Any help would be appreciated.
> >
> > Here’s an example of a set of Flink checkpoint metrics I don’t
> understand.  This is the first operator in a job and as you can see the
> end-to-end time for the checkpoint is long, but it’s not explained by
> either sync, async, or alignment times.  I’m not sure what to make of
> this.  It makes me think I don’t understand the meaning of the metrics
> themselves.  In my interpretation the end-to-end time should always be,
> roughly, the sum of the other components — certainly in the case of a
> source task such as this.
> >
> > Any thoughts or clarifications anyone can provide on this?  We have many
> jobs with slow checkpoints that suffer from this sort of thing with metrics
> that look similar.
> >
> > -Jamie
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint metrics.

Jamie Grier-3
Thanks Seth and Stephan,

Yup, I had intended to upload a image.  Here it is:
https://pasteboard.co/Ixg0YP2.png

This one is very simple and I suppose can be explained by heavy
backpressure.  The more complex version of this problem I run into
frequently is where a single (or a couple of) sub-task(s) in a highly
parallel job takes up to an order of magnitude longer than others with
regard to end-to-end time and usually ends up causing checkpoint timeouts.
This often occurs in the sink task but that's not the only case I've seen.

The only metric that shows a high value will still be end-to-end time.
Sync, async, and alignment times are all negligible.  This, to me, is very
hard to understand, especially when this task will take 10 minutes+ to
complete and everything else takes seconds.

Rather than speak hypothetically on this I'll post some data to this thread
as this situation occurs again.  Maybe we can make sense of it together.

Thanks a lot for the help.

-Jamie


.

On Thu, Sep 12, 2019 at 10:57 AM Stephan Ewen <[hidden email]> wrote:

> Hi Jamie!
>
> Did you mean to attach a screenshot? If yes, you need to share that through
> a different channel, the mailing list does not support attachments,
> unfortunately.
>
> Seth is right how the time is measured.
> One important bit to add to the interpretation:
>   - For non-source tasks, the time include the "travel of the barriers",
> which can take long under back pressure
>   - For source tasks, it includes the "time to acquire the checkpoint
> lock", which can be long if the source is blocked in trying to emit data
> (again, backpressure).
>
> As part of FLIP-27 we will eliminate the checkpoint lock (have a mailbox
> instead) which should lead to faster lock acquisition.
>
> The "unaligned checkpoints" discussion is looking at ways to make
> checkpoints much less susceptible to back pressure.
>
>
> https://lists.apache.org/thread.html/fd5b6cceb4bffb635e26e7ec0787a8db454ddd64aadb40a0d08a90a8@%3Cdev.flink.apache.org%3E
>
> Hope that helps understanding what is going on.
>
> Best,
> Stephan
>
>
> On Thu, Sep 12, 2019 at 1:25 AM Seth Wiesman <[hidden email]> wrote:
>
> > Great timing, I just debugged this on Monday. E2e time is checkpoint
> > coordinator to checkpoint coordinator, so it includes RPC to the source
> and
> > RPC from the operator back for the JM.
> >
> > Seth
> >
> > > On Sep 11, 2019, at 6:17 PM, Jamie Grier <[hidden email]>
> > wrote:
> > >
> > > Hey all,
> > >
> > > I need to make sense of this behavior.  Any help would be appreciated.
> > >
> > > Here’s an example of a set of Flink checkpoint metrics I don’t
> > understand.  This is the first operator in a job and as you can see the
> > end-to-end time for the checkpoint is long, but it’s not explained by
> > either sync, async, or alignment times.  I’m not sure what to make of
> > this.  It makes me think I don’t understand the meaning of the metrics
> > themselves.  In my interpretation the end-to-end time should always be,
> > roughly, the sum of the other components — certainly in the case of a
> > source task such as this.
> > >
> > > Any thoughts or clarifications anyone can provide on this?  We have
> many
> > jobs with slow checkpoints that suffer from this sort of thing with
> metrics
> > that look similar.
> > >
> > > -Jamie
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint metrics.

Jamie Grier-3
Alright, here's another case where this is very pronounced.  Here's a link
to a couple of screenshots showing the overall stats for a slow task as
well as a zoom in on the slowest of them:  https://pasteboard.co/IxhGWXz.png

This is the sink stage of a pipeline with 3 upstream tasks.  All the
upstream subtasks complete their checkpoints end-to-end in under 10
seconds.  Most of the sink subtasks also complete end-to-end in under a few
seconds.  There are a few that take a minute or so (which is also
indicative of a problem) but then there is one that takes 29 minutes.

The sink here is the StreamingFileSink.

It does seem that each subtask that has a high end-to-end time also has a
substantially higher alignment time but the end-to-end time is much larger
than just alignment.

I suppose the correct interpretation of this is that the end-to-end time
alone indicates heavy backpressure / slow progress making on the slow
subtasks and since they are moving so slowly that also explains how there
could be a relatively high alignment time as well.  The skew in the barrier
arrival times is essentially amplified since the subtasks are making their
way through the data so darn slowly.

It's still very hard to understand how this sink could be taking so long to
make progress.  I mean unless I misunderstand something the total amount of
data that has to be worked through to receive a barrier can't be more than
what is buffered in Flink's network stack in the worst case, right?  How
could it take 29 minutes to consume this data in the sink?

Anyway, I'd appreciate and feedback or insights.

Thanks :)

-Jamie


On Fri, Sep 13, 2019 at 12:11 PM Jamie Grier <[hidden email]> wrote:

> Thanks Seth and Stephan,
>
> Yup, I had intended to upload a image.  Here it is:
> https://pasteboard.co/Ixg0YP2.png
>
> This one is very simple and I suppose can be explained by heavy
> backpressure.  The more complex version of this problem I run into
> frequently is where a single (or a couple of) sub-task(s) in a highly
> parallel job takes up to an order of magnitude longer than others with
> regard to end-to-end time and usually ends up causing checkpoint timeouts.
> This often occurs in the sink task but that's not the only case I've seen.
>
> The only metric that shows a high value will still be end-to-end time.
> Sync, async, and alignment times are all negligible.  This, to me, is very
> hard to understand, especially when this task will take 10 minutes+ to
> complete and everything else takes seconds.
>
> Rather than speak hypothetically on this I'll post some data to this
> thread as this situation occurs again.  Maybe we can make sense of it
> together.
>
> Thanks a lot for the help.
>
> -Jamie
>
>
> .
>
> On Thu, Sep 12, 2019 at 10:57 AM Stephan Ewen <[hidden email]> wrote:
>
>> Hi Jamie!
>>
>> Did you mean to attach a screenshot? If yes, you need to share that
>> through
>> a different channel, the mailing list does not support attachments,
>> unfortunately.
>>
>> Seth is right how the time is measured.
>> One important bit to add to the interpretation:
>>   - For non-source tasks, the time include the "travel of the barriers",
>> which can take long under back pressure
>>   - For source tasks, it includes the "time to acquire the checkpoint
>> lock", which can be long if the source is blocked in trying to emit data
>> (again, backpressure).
>>
>> As part of FLIP-27 we will eliminate the checkpoint lock (have a mailbox
>> instead) which should lead to faster lock acquisition.
>>
>> The "unaligned checkpoints" discussion is looking at ways to make
>> checkpoints much less susceptible to back pressure.
>>
>>
>> https://lists.apache.org/thread.html/fd5b6cceb4bffb635e26e7ec0787a8db454ddd64aadb40a0d08a90a8@%3Cdev.flink.apache.org%3E
>>
>> Hope that helps understanding what is going on.
>>
>> Best,
>> Stephan
>>
>>
>> On Thu, Sep 12, 2019 at 1:25 AM Seth Wiesman <[hidden email]> wrote:
>>
>> > Great timing, I just debugged this on Monday. E2e time is checkpoint
>> > coordinator to checkpoint coordinator, so it includes RPC to the source
>> and
>> > RPC from the operator back for the JM.
>> >
>> > Seth
>> >
>> > > On Sep 11, 2019, at 6:17 PM, Jamie Grier <[hidden email]>
>> > wrote:
>> > >
>> > > Hey all,
>> > >
>> > > I need to make sense of this behavior.  Any help would be appreciated.
>> > >
>> > > Here’s an example of a set of Flink checkpoint metrics I don’t
>> > understand.  This is the first operator in a job and as you can see the
>> > end-to-end time for the checkpoint is long, but it’s not explained by
>> > either sync, async, or alignment times.  I’m not sure what to make of
>> > this.  It makes me think I don’t understand the meaning of the metrics
>> > themselves.  In my interpretation the end-to-end time should always be,
>> > roughly, the sum of the other components — certainly in the case of a
>> > source task such as this.
>> > >
>> > > Any thoughts or clarifications anyone can provide on this?  We have
>> many
>> > jobs with slow checkpoints that suffer from this sort of thing with
>> metrics
>> > that look similar.
>> > >
>> > > -Jamie
>> > >
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint metrics.

Jamie Grier-3
Here's the second screenshot I forgot to include:
https://pasteboard.co/IxhNIhc.png

On Fri, Sep 13, 2019 at 4:34 PM Jamie Grier <[hidden email]> wrote:

> Alright, here's another case where this is very pronounced.  Here's a link
> to a couple of screenshots showing the overall stats for a slow task as
> well as a zoom in on the slowest of them:
> https://pasteboard.co/IxhGWXz.png
>
> This is the sink stage of a pipeline with 3 upstream tasks.  All the
> upstream subtasks complete their checkpoints end-to-end in under 10
> seconds.  Most of the sink subtasks also complete end-to-end in under a few
> seconds.  There are a few that take a minute or so (which is also
> indicative of a problem) but then there is one that takes 29 minutes.
>
> The sink here is the StreamingFileSink.
>
> It does seem that each subtask that has a high end-to-end time also has a
> substantially higher alignment time but the end-to-end time is much larger
> than just alignment.
>
> I suppose the correct interpretation of this is that the end-to-end time
> alone indicates heavy backpressure / slow progress making on the slow
> subtasks and since they are moving so slowly that also explains how there
> could be a relatively high alignment time as well.  The skew in the barrier
> arrival times is essentially amplified since the subtasks are making their
> way through the data so darn slowly.
>
> It's still very hard to understand how this sink could be taking so long
> to make progress.  I mean unless I misunderstand something the total amount
> of data that has to be worked through to receive a barrier can't be more
> than what is buffered in Flink's network stack in the worst case, right?
> How could it take 29 minutes to consume this data in the sink?
>
> Anyway, I'd appreciate and feedback or insights.
>
> Thanks :)
>
> -Jamie
>
>
> On Fri, Sep 13, 2019 at 12:11 PM Jamie Grier <[hidden email]> wrote:
>
>> Thanks Seth and Stephan,
>>
>> Yup, I had intended to upload a image.  Here it is:
>> https://pasteboard.co/Ixg0YP2.png
>>
>> This one is very simple and I suppose can be explained by heavy
>> backpressure.  The more complex version of this problem I run into
>> frequently is where a single (or a couple of) sub-task(s) in a highly
>> parallel job takes up to an order of magnitude longer than others with
>> regard to end-to-end time and usually ends up causing checkpoint timeouts.
>> This often occurs in the sink task but that's not the only case I've seen.
>>
>> The only metric that shows a high value will still be end-to-end time.
>> Sync, async, and alignment times are all negligible.  This, to me, is very
>> hard to understand, especially when this task will take 10 minutes+ to
>> complete and everything else takes seconds.
>>
>> Rather than speak hypothetically on this I'll post some data to this
>> thread as this situation occurs again.  Maybe we can make sense of it
>> together.
>>
>> Thanks a lot for the help.
>>
>> -Jamie
>>
>>
>> .
>>
>> On Thu, Sep 12, 2019 at 10:57 AM Stephan Ewen <[hidden email]> wrote:
>>
>>> Hi Jamie!
>>>
>>> Did you mean to attach a screenshot? If yes, you need to share that
>>> through
>>> a different channel, the mailing list does not support attachments,
>>> unfortunately.
>>>
>>> Seth is right how the time is measured.
>>> One important bit to add to the interpretation:
>>>   - For non-source tasks, the time include the "travel of the barriers",
>>> which can take long under back pressure
>>>   - For source tasks, it includes the "time to acquire the checkpoint
>>> lock", which can be long if the source is blocked in trying to emit data
>>> (again, backpressure).
>>>
>>> As part of FLIP-27 we will eliminate the checkpoint lock (have a mailbox
>>> instead) which should lead to faster lock acquisition.
>>>
>>> The "unaligned checkpoints" discussion is looking at ways to make
>>> checkpoints much less susceptible to back pressure.
>>>
>>>
>>> https://lists.apache.org/thread.html/fd5b6cceb4bffb635e26e7ec0787a8db454ddd64aadb40a0d08a90a8@%3Cdev.flink.apache.org%3E
>>>
>>> Hope that helps understanding what is going on.
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Thu, Sep 12, 2019 at 1:25 AM Seth Wiesman <[hidden email]>
>>> wrote:
>>>
>>> > Great timing, I just debugged this on Monday. E2e time is checkpoint
>>> > coordinator to checkpoint coordinator, so it includes RPC to the
>>> source and
>>> > RPC from the operator back for the JM.
>>> >
>>> > Seth
>>> >
>>> > > On Sep 11, 2019, at 6:17 PM, Jamie Grier <[hidden email]>
>>> > wrote:
>>> > >
>>> > > Hey all,
>>> > >
>>> > > I need to make sense of this behavior.  Any help would be
>>> appreciated.
>>> > >
>>> > > Here’s an example of a set of Flink checkpoint metrics I don’t
>>> > understand.  This is the first operator in a job and as you can see the
>>> > end-to-end time for the checkpoint is long, but it’s not explained by
>>> > either sync, async, or alignment times.  I’m not sure what to make of
>>> > this.  It makes me think I don’t understand the meaning of the metrics
>>> > themselves.  In my interpretation the end-to-end time should always be,
>>> > roughly, the sum of the other components — certainly in the case of a
>>> > source task such as this.
>>> > >
>>> > > Any thoughts or clarifications anyone can provide on this?  We have
>>> many
>>> > jobs with slow checkpoints that suffer from this sort of thing with
>>> metrics
>>> > that look similar.
>>> > >
>>> > > -Jamie
>>> > >
>>> >
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint metrics.

Konstantin Knauf-3
Hi Jamie,

I think, your interpretation is correct. It takes a long time until the
first barrier reaches the "slow" subtask and in case of the screenshot
another 3m 22s until the last barrier reaches the subtask. Regarding the
total amount of data: depending on the your checkpoint configuration
(especially the min time between checkpoints), there might also still be
alignment buffers, which need to be processes while the next checkpoint has
already started.

Cheers,

Konstantin



On Sat, Sep 14, 2019 at 1:35 AM Jamie Grier <[hidden email]> wrote:

> Here's the second screenshot I forgot to include:
> https://pasteboard.co/IxhNIhc.png
>
> On Fri, Sep 13, 2019 at 4:34 PM Jamie Grier <[hidden email]> wrote:
>
> > Alright, here's another case where this is very pronounced.  Here's a
> link
> > to a couple of screenshots showing the overall stats for a slow task as
> > well as a zoom in on the slowest of them:
> > https://pasteboard.co/IxhGWXz.png
> >
> > This is the sink stage of a pipeline with 3 upstream tasks.  All the
> > upstream subtasks complete their checkpoints end-to-end in under 10
> > seconds.  Most of the sink subtasks also complete end-to-end in under a
> few
> > seconds.  There are a few that take a minute or so (which is also
> > indicative of a problem) but then there is one that takes 29 minutes.
> >
> > The sink here is the StreamingFileSink.
> >
> > It does seem that each subtask that has a high end-to-end time also has a
> > substantially higher alignment time but the end-to-end time is much
> larger
> > than just alignment.
> >
> > I suppose the correct interpretation of this is that the end-to-end time
> > alone indicates heavy backpressure / slow progress making on the slow
> > subtasks and since they are moving so slowly that also explains how there
> > could be a relatively high alignment time as well.  The skew in the
> barrier
> > arrival times is essentially amplified since the subtasks are making
> their
> > way through the data so darn slowly.
> >
> > It's still very hard to understand how this sink could be taking so long
> > to make progress.  I mean unless I misunderstand something the total
> amount
> > of data that has to be worked through to receive a barrier can't be more
> > than what is buffered in Flink's network stack in the worst case, right?
> > How could it take 29 minutes to consume this data in the sink?
> >
> > Anyway, I'd appreciate and feedback or insights.
> >
> > Thanks :)
> >
> > -Jamie
> >
> >
> > On Fri, Sep 13, 2019 at 12:11 PM Jamie Grier <[hidden email]> wrote:
> >
> >> Thanks Seth and Stephan,
> >>
> >> Yup, I had intended to upload a image.  Here it is:
> >> https://pasteboard.co/Ixg0YP2.png
> >>
> >> This one is very simple and I suppose can be explained by heavy
> >> backpressure.  The more complex version of this problem I run into
> >> frequently is where a single (or a couple of) sub-task(s) in a highly
> >> parallel job takes up to an order of magnitude longer than others with
> >> regard to end-to-end time and usually ends up causing checkpoint
> timeouts.
> >> This often occurs in the sink task but that's not the only case I've
> seen.
> >>
> >> The only metric that shows a high value will still be end-to-end time.
> >> Sync, async, and alignment times are all negligible.  This, to me, is
> very
> >> hard to understand, especially when this task will take 10 minutes+ to
> >> complete and everything else takes seconds.
> >>
> >> Rather than speak hypothetically on this I'll post some data to this
> >> thread as this situation occurs again.  Maybe we can make sense of it
> >> together.
> >>
> >> Thanks a lot for the help.
> >>
> >> -Jamie
> >>
> >>
> >> .
> >>
> >> On Thu, Sep 12, 2019 at 10:57 AM Stephan Ewen <[hidden email]> wrote:
> >>
> >>> Hi Jamie!
> >>>
> >>> Did you mean to attach a screenshot? If yes, you need to share that
> >>> through
> >>> a different channel, the mailing list does not support attachments,
> >>> unfortunately.
> >>>
> >>> Seth is right how the time is measured.
> >>> One important bit to add to the interpretation:
> >>>   - For non-source tasks, the time include the "travel of the
> barriers",
> >>> which can take long under back pressure
> >>>   - For source tasks, it includes the "time to acquire the checkpoint
> >>> lock", which can be long if the source is blocked in trying to emit
> data
> >>> (again, backpressure).
> >>>
> >>> As part of FLIP-27 we will eliminate the checkpoint lock (have a
> mailbox
> >>> instead) which should lead to faster lock acquisition.
> >>>
> >>> The "unaligned checkpoints" discussion is looking at ways to make
> >>> checkpoints much less susceptible to back pressure.
> >>>
> >>>
> >>>
> https://lists.apache.org/thread.html/fd5b6cceb4bffb635e26e7ec0787a8db454ddd64aadb40a0d08a90a8@%3Cdev.flink.apache.org%3E
> >>>
> >>> Hope that helps understanding what is going on.
> >>>
> >>> Best,
> >>> Stephan
> >>>
> >>>
> >>> On Thu, Sep 12, 2019 at 1:25 AM Seth Wiesman <[hidden email]>
> >>> wrote:
> >>>
> >>> > Great timing, I just debugged this on Monday. E2e time is checkpoint
> >>> > coordinator to checkpoint coordinator, so it includes RPC to the
> >>> source and
> >>> > RPC from the operator back for the JM.
> >>> >
> >>> > Seth
> >>> >
> >>> > > On Sep 11, 2019, at 6:17 PM, Jamie Grier <[hidden email]>
> >>> > wrote:
> >>> > >
> >>> > > Hey all,
> >>> > >
> >>> > > I need to make sense of this behavior.  Any help would be
> >>> appreciated.
> >>> > >
> >>> > > Here’s an example of a set of Flink checkpoint metrics I don’t
> >>> > understand.  This is the first operator in a job and as you can see
> the
> >>> > end-to-end time for the checkpoint is long, but it’s not explained by
> >>> > either sync, async, or alignment times.  I’m not sure what to make of
> >>> > this.  It makes me think I don’t understand the meaning of the
> metrics
> >>> > themselves.  In my interpretation the end-to-end time should always
> be,
> >>> > roughly, the sum of the other components — certainly in the case of a
> >>> > source task such as this.
> >>> > >
> >>> > > Any thoughts or clarifications anyone can provide on this?  We have
> >>> many
> >>> > jobs with slow checkpoints that suffer from this sort of thing with
> >>> metrics
> >>> > that look similar.
> >>> > >
> >>> > > -Jamie
> >>> > >
> >>> >
> >>>
> >>
>


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData Ververica <https://www.ververica.com/>


--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint metrics.

Jamie Grier-3
Thanks Konstantin,

Refining this a little bit..  All the checkpoints for all the subtasks
upstream of the sink complete in seconds.  Most of the subtasks of the sink
itself also complete in seconds other than these very few "slow" ones.  So,
somehow we are taking at worst 29 minutes to clear the data between the
slow sink subtask and the tasks just upstream.  It's very hard to see how
this can be the case.  I think I need to dig further into the operation of
the StreamingFileSink to see if I can make sense out of how this can
happen.  I don't see any signs of major data/partition skew in the metrics
so this is pretty strange.

Has anyone seen this sort of behavior with the StreamingFileSink before?

-Jamie




On Sat, Sep 14, 2019 at 2:30 AM Konstantin Knauf <[hidden email]>
wrote:

> Hi Jamie,
>
> I think, your interpretation is correct. It takes a long time until the
> first barrier reaches the "slow" subtask and in case of the screenshot
> another 3m 22s until the last barrier reaches the subtask. Regarding the
> total amount of data: depending on the your checkpoint configuration
> (especially the min time between checkpoints), there might also still be
> alignment buffers, which need to be processes while the next checkpoint has
> already started.
>
> Cheers,
>
> Konstantin
>
>
>
> On Sat, Sep 14, 2019 at 1:35 AM Jamie Grier <[hidden email]>
> wrote:
>
> > Here's the second screenshot I forgot to include:
> > https://pasteboard.co/IxhNIhc.png
> >
> > On Fri, Sep 13, 2019 at 4:34 PM Jamie Grier <[hidden email]> wrote:
> >
> > > Alright, here's another case where this is very pronounced.  Here's a
> > link
> > > to a couple of screenshots showing the overall stats for a slow task as
> > > well as a zoom in on the slowest of them:
> > > https://pasteboard.co/IxhGWXz.png
> > >
> > > This is the sink stage of a pipeline with 3 upstream tasks.  All the
> > > upstream subtasks complete their checkpoints end-to-end in under 10
> > > seconds.  Most of the sink subtasks also complete end-to-end in under a
> > few
> > > seconds.  There are a few that take a minute or so (which is also
> > > indicative of a problem) but then there is one that takes 29 minutes.
> > >
> > > The sink here is the StreamingFileSink.
> > >
> > > It does seem that each subtask that has a high end-to-end time also
> has a
> > > substantially higher alignment time but the end-to-end time is much
> > larger
> > > than just alignment.
> > >
> > > I suppose the correct interpretation of this is that the end-to-end
> time
> > > alone indicates heavy backpressure / slow progress making on the slow
> > > subtasks and since they are moving so slowly that also explains how
> there
> > > could be a relatively high alignment time as well.  The skew in the
> > barrier
> > > arrival times is essentially amplified since the subtasks are making
> > their
> > > way through the data so darn slowly.
> > >
> > > It's still very hard to understand how this sink could be taking so
> long
> > > to make progress.  I mean unless I misunderstand something the total
> > amount
> > > of data that has to be worked through to receive a barrier can't be
> more
> > > than what is buffered in Flink's network stack in the worst case,
> right?
> > > How could it take 29 minutes to consume this data in the sink?
> > >
> > > Anyway, I'd appreciate and feedback or insights.
> > >
> > > Thanks :)
> > >
> > > -Jamie
> > >
> > >
> > > On Fri, Sep 13, 2019 at 12:11 PM Jamie Grier <[hidden email]> wrote:
> > >
> > >> Thanks Seth and Stephan,
> > >>
> > >> Yup, I had intended to upload a image.  Here it is:
> > >> https://pasteboard.co/Ixg0YP2.png
> > >>
> > >> This one is very simple and I suppose can be explained by heavy
> > >> backpressure.  The more complex version of this problem I run into
> > >> frequently is where a single (or a couple of) sub-task(s) in a highly
> > >> parallel job takes up to an order of magnitude longer than others with
> > >> regard to end-to-end time and usually ends up causing checkpoint
> > timeouts.
> > >> This often occurs in the sink task but that's not the only case I've
> > seen.
> > >>
> > >> The only metric that shows a high value will still be end-to-end time.
> > >> Sync, async, and alignment times are all negligible.  This, to me, is
> > very
> > >> hard to understand, especially when this task will take 10 minutes+ to
> > >> complete and everything else takes seconds.
> > >>
> > >> Rather than speak hypothetically on this I'll post some data to this
> > >> thread as this situation occurs again.  Maybe we can make sense of it
> > >> together.
> > >>
> > >> Thanks a lot for the help.
> > >>
> > >> -Jamie
> > >>
> > >>
> > >> .
> > >>
> > >> On Thu, Sep 12, 2019 at 10:57 AM Stephan Ewen <[hidden email]>
> wrote:
> > >>
> > >>> Hi Jamie!
> > >>>
> > >>> Did you mean to attach a screenshot? If yes, you need to share that
> > >>> through
> > >>> a different channel, the mailing list does not support attachments,
> > >>> unfortunately.
> > >>>
> > >>> Seth is right how the time is measured.
> > >>> One important bit to add to the interpretation:
> > >>>   - For non-source tasks, the time include the "travel of the
> > barriers",
> > >>> which can take long under back pressure
> > >>>   - For source tasks, it includes the "time to acquire the checkpoint
> > >>> lock", which can be long if the source is blocked in trying to emit
> > data
> > >>> (again, backpressure).
> > >>>
> > >>> As part of FLIP-27 we will eliminate the checkpoint lock (have a
> > mailbox
> > >>> instead) which should lead to faster lock acquisition.
> > >>>
> > >>> The "unaligned checkpoints" discussion is looking at ways to make
> > >>> checkpoints much less susceptible to back pressure.
> > >>>
> > >>>
> > >>>
> >
> https://lists.apache.org/thread.html/fd5b6cceb4bffb635e26e7ec0787a8db454ddd64aadb40a0d08a90a8@%3Cdev.flink.apache.org%3E
> > >>>
> > >>> Hope that helps understanding what is going on.
> > >>>
> > >>> Best,
> > >>> Stephan
> > >>>
> > >>>
> > >>> On Thu, Sep 12, 2019 at 1:25 AM Seth Wiesman <[hidden email]>
> > >>> wrote:
> > >>>
> > >>> > Great timing, I just debugged this on Monday. E2e time is
> checkpoint
> > >>> > coordinator to checkpoint coordinator, so it includes RPC to the
> > >>> source and
> > >>> > RPC from the operator back for the JM.
> > >>> >
> > >>> > Seth
> > >>> >
> > >>> > > On Sep 11, 2019, at 6:17 PM, Jamie Grier <[hidden email]
> >
> > >>> > wrote:
> > >>> > >
> > >>> > > Hey all,
> > >>> > >
> > >>> > > I need to make sense of this behavior.  Any help would be
> > >>> appreciated.
> > >>> > >
> > >>> > > Here’s an example of a set of Flink checkpoint metrics I don’t
> > >>> > understand.  This is the first operator in a job and as you can see
> > the
> > >>> > end-to-end time for the checkpoint is long, but it’s not explained
> by
> > >>> > either sync, async, or alignment times.  I’m not sure what to make
> of
> > >>> > this.  It makes me think I don’t understand the meaning of the
> > metrics
> > >>> > themselves.  In my interpretation the end-to-end time should always
> > be,
> > >>> > roughly, the sum of the other components — certainly in the case
> of a
> > >>> > source task such as this.
> > >>> > >
> > >>> > > Any thoughts or clarifications anyone can provide on this?  We
> have
> > >>> many
> > >>> > jobs with slow checkpoints that suffer from this sort of thing with
> > >>> metrics
> > >>> > that look similar.
> > >>> > >
> > >>> > > -Jamie
> > >>> > >
> > >>> >
> > >>>
> > >>
> >
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Follow us @VervericaData Ververica <https://www.ververica.com/>
>
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint metrics.

Stephan Ewen
Hi Jamie!

(and adding Klou)

I think the Streaming FIle Sink has a limit on the number of concurrent
uploads. Could it be that too many uploads enqueue and at some point, the
checkpoint blocks for a long time until that queue is worked off?
Klou, do you have more insights here?

Best,
Stephan


On Sat, Sep 14, 2019 at 4:33 PM Jamie Grier <[hidden email]> wrote:

> Thanks Konstantin,
>
> Refining this a little bit..  All the checkpoints for all the subtasks
> upstream of the sink complete in seconds.  Most of the subtasks of the sink
> itself also complete in seconds other than these very few "slow" ones.  So,
> somehow we are taking at worst 29 minutes to clear the data between the
> slow sink subtask and the tasks just upstream.  It's very hard to see how
> this can be the case.  I think I need to dig further into the operation of
> the StreamingFileSink to see if I can make sense out of how this can
> happen.  I don't see any signs of major data/partition skew in the metrics
> so this is pretty strange.
>
> Has anyone seen this sort of behavior with the StreamingFileSink before?
>
> -Jamie
>
>
>
>
> On Sat, Sep 14, 2019 at 2:30 AM Konstantin Knauf <[hidden email]
> >
> wrote:
>
> > Hi Jamie,
> >
> > I think, your interpretation is correct. It takes a long time until the
> > first barrier reaches the "slow" subtask and in case of the screenshot
> > another 3m 22s until the last barrier reaches the subtask. Regarding the
> > total amount of data: depending on the your checkpoint configuration
> > (especially the min time between checkpoints), there might also still be
> > alignment buffers, which need to be processes while the next checkpoint
> has
> > already started.
> >
> > Cheers,
> >
> > Konstantin
> >
> >
> >
> > On Sat, Sep 14, 2019 at 1:35 AM Jamie Grier <[hidden email]>
> > wrote:
> >
> > > Here's the second screenshot I forgot to include:
> > > https://pasteboard.co/IxhNIhc.png
> > >
> > > On Fri, Sep 13, 2019 at 4:34 PM Jamie Grier <[hidden email]> wrote:
> > >
> > > > Alright, here's another case where this is very pronounced.  Here's a
> > > link
> > > > to a couple of screenshots showing the overall stats for a slow task
> as
> > > > well as a zoom in on the slowest of them:
> > > > https://pasteboard.co/IxhGWXz.png
> > > >
> > > > This is the sink stage of a pipeline with 3 upstream tasks.  All the
> > > > upstream subtasks complete their checkpoints end-to-end in under 10
> > > > seconds.  Most of the sink subtasks also complete end-to-end in
> under a
> > > few
> > > > seconds.  There are a few that take a minute or so (which is also
> > > > indicative of a problem) but then there is one that takes 29 minutes.
> > > >
> > > > The sink here is the StreamingFileSink.
> > > >
> > > > It does seem that each subtask that has a high end-to-end time also
> > has a
> > > > substantially higher alignment time but the end-to-end time is much
> > > larger
> > > > than just alignment.
> > > >
> > > > I suppose the correct interpretation of this is that the end-to-end
> > time
> > > > alone indicates heavy backpressure / slow progress making on the slow
> > > > subtasks and since they are moving so slowly that also explains how
> > there
> > > > could be a relatively high alignment time as well.  The skew in the
> > > barrier
> > > > arrival times is essentially amplified since the subtasks are making
> > > their
> > > > way through the data so darn slowly.
> > > >
> > > > It's still very hard to understand how this sink could be taking so
> > long
> > > > to make progress.  I mean unless I misunderstand something the total
> > > amount
> > > > of data that has to be worked through to receive a barrier can't be
> > more
> > > > than what is buffered in Flink's network stack in the worst case,
> > right?
> > > > How could it take 29 minutes to consume this data in the sink?
> > > >
> > > > Anyway, I'd appreciate and feedback or insights.
> > > >
> > > > Thanks :)
> > > >
> > > > -Jamie
> > > >
> > > >
> > > > On Fri, Sep 13, 2019 at 12:11 PM Jamie Grier <[hidden email]>
> wrote:
> > > >
> > > >> Thanks Seth and Stephan,
> > > >>
> > > >> Yup, I had intended to upload a image.  Here it is:
> > > >> https://pasteboard.co/Ixg0YP2.png
> > > >>
> > > >> This one is very simple and I suppose can be explained by heavy
> > > >> backpressure.  The more complex version of this problem I run into
> > > >> frequently is where a single (or a couple of) sub-task(s) in a
> highly
> > > >> parallel job takes up to an order of magnitude longer than others
> with
> > > >> regard to end-to-end time and usually ends up causing checkpoint
> > > timeouts.
> > > >> This often occurs in the sink task but that's not the only case I've
> > > seen.
> > > >>
> > > >> The only metric that shows a high value will still be end-to-end
> time.
> > > >> Sync, async, and alignment times are all negligible.  This, to me,
> is
> > > very
> > > >> hard to understand, especially when this task will take 10 minutes+
> to
> > > >> complete and everything else takes seconds.
> > > >>
> > > >> Rather than speak hypothetically on this I'll post some data to this
> > > >> thread as this situation occurs again.  Maybe we can make sense of
> it
> > > >> together.
> > > >>
> > > >> Thanks a lot for the help.
> > > >>
> > > >> -Jamie
> > > >>
> > > >>
> > > >> .
> > > >>
> > > >> On Thu, Sep 12, 2019 at 10:57 AM Stephan Ewen <[hidden email]>
> > wrote:
> > > >>
> > > >>> Hi Jamie!
> > > >>>
> > > >>> Did you mean to attach a screenshot? If yes, you need to share that
> > > >>> through
> > > >>> a different channel, the mailing list does not support attachments,
> > > >>> unfortunately.
> > > >>>
> > > >>> Seth is right how the time is measured.
> > > >>> One important bit to add to the interpretation:
> > > >>>   - For non-source tasks, the time include the "travel of the
> > > barriers",
> > > >>> which can take long under back pressure
> > > >>>   - For source tasks, it includes the "time to acquire the
> checkpoint
> > > >>> lock", which can be long if the source is blocked in trying to emit
> > > data
> > > >>> (again, backpressure).
> > > >>>
> > > >>> As part of FLIP-27 we will eliminate the checkpoint lock (have a
> > > mailbox
> > > >>> instead) which should lead to faster lock acquisition.
> > > >>>
> > > >>> The "unaligned checkpoints" discussion is looking at ways to make
> > > >>> checkpoints much less susceptible to back pressure.
> > > >>>
> > > >>>
> > > >>>
> > >
> >
> https://lists.apache.org/thread.html/fd5b6cceb4bffb635e26e7ec0787a8db454ddd64aadb40a0d08a90a8@%3Cdev.flink.apache.org%3E
> > > >>>
> > > >>> Hope that helps understanding what is going on.
> > > >>>
> > > >>> Best,
> > > >>> Stephan
> > > >>>
> > > >>>
> > > >>> On Thu, Sep 12, 2019 at 1:25 AM Seth Wiesman <[hidden email]>
> > > >>> wrote:
> > > >>>
> > > >>> > Great timing, I just debugged this on Monday. E2e time is
> > checkpoint
> > > >>> > coordinator to checkpoint coordinator, so it includes RPC to the
> > > >>> source and
> > > >>> > RPC from the operator back for the JM.
> > > >>> >
> > > >>> > Seth
> > > >>> >
> > > >>> > > On Sep 11, 2019, at 6:17 PM, Jamie Grier
> <[hidden email]
> > >
> > > >>> > wrote:
> > > >>> > >
> > > >>> > > Hey all,
> > > >>> > >
> > > >>> > > I need to make sense of this behavior.  Any help would be
> > > >>> appreciated.
> > > >>> > >
> > > >>> > > Here’s an example of a set of Flink checkpoint metrics I don’t
> > > >>> > understand.  This is the first operator in a job and as you can
> see
> > > the
> > > >>> > end-to-end time for the checkpoint is long, but it’s not
> explained
> > by
> > > >>> > either sync, async, or alignment times.  I’m not sure what to
> make
> > of
> > > >>> > this.  It makes me think I don’t understand the meaning of the
> > > metrics
> > > >>> > themselves.  In my interpretation the end-to-end time should
> always
> > > be,
> > > >>> > roughly, the sum of the other components — certainly in the case
> > of a
> > > >>> > source task such as this.
> > > >>> > >
> > > >>> > > Any thoughts or clarifications anyone can provide on this?  We
> > have
> > > >>> many
> > > >>> > jobs with slow checkpoints that suffer from this sort of thing
> with
> > > >>> metrics
> > > >>> > that look similar.
> > > >>> > >
> > > >>> > > -Jamie
> > > >>> > >
> > > >>> >
> > > >>>
> > > >>
> > >
> >
> >
> > --
> >
> > Konstantin Knauf | Solutions Architect
> >
> > +49 160 91394525
> >
> >
> > Follow us @VervericaData Ververica <https://www.ververica.com/>
> >
> >
> > --
> >
> > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> > (Tony) Cheng
> >
>