[DISCUSS] Detection Flink Backpressure

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Detection Flink Backpressure

裴立平
Recently I want to optimize the way to find the positions where the
backpressures occured .

I read some blogs about flink-backpressure and have a rough idea of it .

The method which Flink adopted is thread-stack-sample ,  it's heavy and
no-lasting .

The positions where backpressures occured are very important to the
developers .

They should be treated as monitor-metrics .

Any other choice that we can take to detection the flink backpressures ?
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Detection Flink Backpressure

Yun Gao
Hello liping,

       Thank you for proposing to optimize the backpressure detection! From our previous experience, we think the InputBufferPoolUsageGauge and OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a list of tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage of task A and InputBufferPoolUsage of task B is 100%, but the OutputBufferPoolUsage of task B is less than 100%, then it should be the task B that causes the backpressure.

      However, currently we think that the InputBufferPoolUsage and OutputBufferPoolUsage requires some modification to be more accurate:
 
         1. When there are multiple inputs or outputs, the InputBufferPoolUsage and OutputBufferPoolUsage  should show the maximum usage instead of the average usage [1].
         2. Currently the sender side will report backlog right before fulfilling the output Buffer. Together with the pre-allocate logic in the receiver side, the InputBufferPoolUsage may be 100% even if the data have not been received yet [2].

     We may need to address these problems before adopting the InputBufferPoolUsage  and OutputBufferPoolUsage as the backpressure indicator.

     Besides, another similar thought is that we may also add new InputBufferUsage and OutputBufferUsage metrics to show (number of queued buffers / number of all buffers) instead.  


    Best,
   Yun Gao


    [1] https://issues.apache.org/jira/browse/FLINK-10981
    [2] https://issues.apache.org/jira/browse/FLINK-11082


------------------------------------------------------------------
From:裴立平 <[hidden email]>
Send Time:2019 Jan. 3 (Thu.) 13:39
To:dev <[hidden email]>
Subject:[DISCUSS] Detection Flink Backpressure

Recently I want to optimize the way to find the positions where the
backpressures occured .

I read some blogs about flink-backpressure and have a rough idea of it .

The method which Flink adopted is thread-stack-sample ,  it's heavy and
no-lasting .

The positions where backpressures occured are very important to the
developers .

They should be treated as monitor-metrics .

Any other choice that we can take to detection the flink backpressures ?

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Detection Flink Backpressure

裴立平
I have some ideas about detecting the backpressure (the blocking
operators)  by checkpoint barrier .

I have some flink-jobs with checkpoint , but their checkpoints will take
a long time to be completed .

I need to find out the blocking operators  , the same as the
backpressure detection .

In a checkpoint object , I can get a timestamp which means the
start-time , then I compute a metric in

org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .

The metric  is  a delta time between checkpoint.timestamp to the time
when StreamTask.executeCheckpointing invoke

and I named it as checkpoint-delay-time .

It looks like the end-to-end-time metric in checkpoint  but not include
async-handles  ,

For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 )
---> C (parallelism : 1)

Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from
A(there are 2 instances )

Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from
B(there are 3 instances )

Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from
C(there is 1 instance)

Then I can get the other 3 delta time from checkpoint-delay-values

result-0-->A  = Checkpoint-delay-value-A  -  0

result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A

result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B

someone ( result-X-->Y)  which is longer than 5s (maybe other
threshold)  should be the black sheep .





在 2019/1/3 下午2:43, Yun Gao :

> Hello liping,
>
>         Thank you for proposing to optimize the backpressure detection! From our previous experience, we think the InputBufferPoolUsageGauge and OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a list of tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage of task A and InputBufferPoolUsage of task B is 100%, but the OutputBufferPoolUsage of task B is less than 100%, then it should be the task B that causes the backpressure.
>
>        However, currently we think that the InputBufferPoolUsage and OutputBufferPoolUsage requires some modification to be more accurate:
>  
>           1. When there are multiple inputs or outputs, the InputBufferPoolUsage and OutputBufferPoolUsage  should show the maximum usage instead of the average usage [1].
>           2. Currently the sender side will report backlog right before fulfilling the output Buffer. Together with the pre-allocate logic in the receiver side, the InputBufferPoolUsage may be 100% even if the data have not been received yet [2].
>
>       We may need to address these problems before adopting the InputBufferPoolUsage  and OutputBufferPoolUsage as the backpressure indicator.
>
>       Besides, another similar thought is that we may also add new InputBufferUsage and OutputBufferUsage metrics to show (number of queued buffers / number of all buffers) instead.
>
>
>      Best,
>     Yun Gao
>
>
>      [1] https://issues.apache.org/jira/browse/FLINK-10981
>      [2] https://issues.apache.org/jira/browse/FLINK-11082
>
>
> ------------------------------------------------------------------
> From:裴立平 <[hidden email]>
> Send Time:2019 Jan. 3 (Thu.) 13:39
> To:dev <[hidden email]>
> Subject:[DISCUSS] Detection Flink Backpressure
>
> Recently I want to optimize the way to find the positions where the
> backpressures occured .
>
> I read some blogs about flink-backpressure and have a rough idea of it .
>
> The method which Flink adopted is thread-stack-sample ,  it's heavy and
> no-lasting .
>
> The positions where backpressures occured are very important to the
> developers .
>
> They should be treated as monitor-metrics .
>
> Any other choice that we can take to detection the flink backpressures ?
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Detection Flink Backpressure

Piotr Nowojski-2
Hi all,

peiliping: I think your idea could be problematic for couple of reasons. Probably minor concern is that checkpoint time could be affected not only because of the back pressure, but also because how long does it take to actually perform the checkpoint. Bigger issues are that this bottleneck detection would be limited to only during checkpointing (what if one has checkpoints only once every 1 hour? Or none at all?) AND performance/bottlenecks may change significantly during checkpointing (for example writing state for the first operator to DFS can affect indirectly down stream operators).

The idea of detecting back pressure/bottlenecks using output/input buffers is much more natural. Because in the end, almost by definition, if the output buffers are full, that means that the given task is back pressured.

Both input and output queues length are already exposed via metrics, so developers have an access to raw data to manually calculate/detect bottlenecks. It would be actually nice to automatically aggregate those metrics and provide ready to use metrics: boolean flags whether task/stage/job are back pressured or not.

Replacing current back pressure detection mechanism that probes the threads and checks which of them are waiting for buffers is another issues. Functionally it is equivalent to monitoring whether the output queues are full. This might be more hacky, but will give the same results, thus it wasn’t high on my priority list to change/refactor. It would be nice to clean this up a little bit and unify, but using metrics can also mean some additional work, since there are some known metrics related performance issues.

Piotrek

> On 3 Jan 2019, at 10:35, peiliping <[hidden email]> wrote:
>
> I have some ideas about detecting the backpressure (the blocking operators)  by checkpoint barrier .
>
> I have some flink-jobs with checkpoint , but their checkpoints will take a long time to be completed .
>
> I need to find out the blocking operators  , the same as the backpressure detection .
>
> In a checkpoint object , I can get a timestamp which means the start-time , then I compute a metric in
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .
>
> The metric  is  a delta time between checkpoint.timestamp to the time when StreamTask.executeCheckpointing invoke
>
> and I named it as checkpoint-delay-time .
>
> It looks like the end-to-end-time metric in checkpoint  but not include async-handles  ,
>
> For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 ) ---> C (parallelism : 1)
>
> Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from A(there are 2 instances )
>
> Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from B(there are 3 instances )
>
> Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from C(there is 1 instance)
>
> Then I can get the other 3 delta time from checkpoint-delay-values
>
> result-0-->A  = Checkpoint-delay-value-A  -  0
>
> result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A
>
> result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B
>
> someone ( result-X-->Y)  which is longer than 5s (maybe other threshold)  should be the black sheep .
>
>
>
>
>
> 在 2019/1/3 下午2:43, Yun Gao :
>> Hello liping,
>>
>>        Thank you for proposing to optimize the backpressure detection! From our previous experience, we think the InputBufferPoolUsageGauge and OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a list of tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage of task A and InputBufferPoolUsage of task B is 100%, but the OutputBufferPoolUsage of task B is less than 100%, then it should be the task B that causes the backpressure.
>>
>>       However, currently we think that the InputBufferPoolUsage and OutputBufferPoolUsage requires some modification to be more accurate:
>>            1. When there are multiple inputs or outputs, the InputBufferPoolUsage and OutputBufferPoolUsage  should show the maximum usage instead of the average usage [1].
>>          2. Currently the sender side will report backlog right before fulfilling the output Buffer. Together with the pre-allocate logic in the receiver side, the InputBufferPoolUsage may be 100% even if the data have not been received yet [2].
>>
>>      We may need to address these problems before adopting the InputBufferPoolUsage  and OutputBufferPoolUsage as the backpressure indicator.
>>
>>      Besides, another similar thought is that we may also add new InputBufferUsage and OutputBufferUsage metrics to show (number of queued buffers / number of all buffers) instead.
>>
>>
>>     Best,
>>    Yun Gao
>>
>>
>>     [1] https://issues.apache.org/jira/browse/FLINK-10981
>>     [2] https://issues.apache.org/jira/browse/FLINK-11082
>>
>>
>> ------------------------------------------------------------------
>> From:裴立平 <[hidden email]>
>> Send Time:2019 Jan. 3 (Thu.) 13:39
>> To:dev <[hidden email]>
>> Subject:[DISCUSS] Detection Flink Backpressure
>>
>> Recently I want to optimize the way to find the positions where the
>> backpressures occured .
>>
>> I read some blogs about flink-backpressure and have a rough idea of it .
>>
>> The method which Flink adopted is thread-stack-sample ,  it's heavy and
>> no-lasting .
>>
>> The positions where backpressures occured are very important to the
>> developers .
>>
>> They should be treated as monitor-metrics .
>>
>> Any other choice that we can take to detection the flink backpressures ?
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Detection Flink Backpressure

Jamie Grier-3
One unfortunate problem with the current back-pressure detection mechanism
is that it doesn't work well with all of our sources.  The problem is that
some sources (Kinesis for sure) emit elements from threads Flink knows
nothing about and therefore those stack traces aren't sampled.  The result
is that you never see back-pressure detected in the first chain of a Flink
job containing that source.

On Thu, Jan 3, 2019 at 2:12 AM Piotr Nowojski <[hidden email]> wrote:

> Hi all,
>
> peiliping: I think your idea could be problematic for couple of reasons.
> Probably minor concern is that checkpoint time could be affected not only
> because of the back pressure, but also because how long does it take to
> actually perform the checkpoint. Bigger issues are that this bottleneck
> detection would be limited to only during checkpointing (what if one has
> checkpoints only once every 1 hour? Or none at all?) AND
> performance/bottlenecks may change significantly during checkpointing (for
> example writing state for the first operator to DFS can affect indirectly
> down stream operators).
>
> The idea of detecting back pressure/bottlenecks using output/input buffers
> is much more natural. Because in the end, almost by definition, if the
> output buffers are full, that means that the given task is back pressured.
>
> Both input and output queues length are already exposed via metrics, so
> developers have an access to raw data to manually calculate/detect
> bottlenecks. It would be actually nice to automatically aggregate those
> metrics and provide ready to use metrics: boolean flags whether
> task/stage/job are back pressured or not.
>
> Replacing current back pressure detection mechanism that probes the
> threads and checks which of them are waiting for buffers is another issues.
> Functionally it is equivalent to monitoring whether the output queues are
> full. This might be more hacky, but will give the same results, thus it
> wasn’t high on my priority list to change/refactor. It would be nice to
> clean this up a little bit and unify, but using metrics can also mean some
> additional work, since there are some known metrics related performance
> issues.
>
> Piotrek
>
> > On 3 Jan 2019, at 10:35, peiliping <[hidden email]> wrote:
> >
> > I have some ideas about detecting the backpressure (the blocking
> operators)  by checkpoint barrier .
> >
> > I have some flink-jobs with checkpoint , but their checkpoints will take
> a long time to be completed .
> >
> > I need to find out the blocking operators  , the same as the
> backpressure detection .
> >
> > In a checkpoint object , I can get a timestamp which means the
> start-time , then I compute a metric in
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .
> >
> > The metric  is  a delta time between checkpoint.timestamp to the time
> when StreamTask.executeCheckpointing invoke
> >
> > and I named it as checkpoint-delay-time .
> >
> > It looks like the end-to-end-time metric in checkpoint  but not include
> async-handles  ,
> >
> > For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 )
> ---> C (parallelism : 1)
> >
> > Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from
> A(there are 2 instances )
> >
> > Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from
> B(there are 3 instances )
> >
> > Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from
> C(there is 1 instance)
> >
> > Then I can get the other 3 delta time from checkpoint-delay-values
> >
> > result-0-->A  = Checkpoint-delay-value-A  -  0
> >
> > result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A
> >
> > result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B
> >
> > someone ( result-X-->Y)  which is longer than 5s (maybe other
> threshold)  should be the black sheep .
> >
> >
> >
> >
> >
> > 在 2019/1/3 下午2:43, Yun Gao :
> >> Hello liping,
> >>
> >>        Thank you for proposing to optimize the backpressure detection!
> From our previous experience, we think the InputBufferPoolUsageGauge and
> OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a
> list of tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage
> of task A and InputBufferPoolUsage of task B is 100%, but the
> OutputBufferPoolUsage of task B is less than 100%, then it should be the
> task B that causes the backpressure.
> >>
> >>       However, currently we think that the InputBufferPoolUsage and
> OutputBufferPoolUsage requires some modification to be more accurate:
> >>            1. When there are multiple inputs or outputs, the
> InputBufferPoolUsage and OutputBufferPoolUsage  should show the maximum
> usage instead of the average usage [1].
> >>          2. Currently the sender side will report backlog right before
> fulfilling the output Buffer. Together with the pre-allocate logic in the
> receiver side, the InputBufferPoolUsage may be 100% even if the data have
> not been received yet [2].
> >>
> >>      We may need to address these problems before adopting the
> InputBufferPoolUsage  and OutputBufferPoolUsage as the backpressure
> indicator.
> >>
> >>      Besides, another similar thought is that we may also add new
> InputBufferUsage and OutputBufferUsage metrics to show (number of queued
> buffers / number of all buffers) instead.
> >>
> >>
> >>     Best,
> >>    Yun Gao
> >>
> >>
> >>     [1] https://issues.apache.org/jira/browse/FLINK-10981
> >>     [2] https://issues.apache.org/jira/browse/FLINK-11082
> >>
> >>
> >> ------------------------------------------------------------------
> >> From:裴立平 <[hidden email]>
> >> Send Time:2019 Jan. 3 (Thu.) 13:39
> >> To:dev <[hidden email]>
> >> Subject:[DISCUSS] Detection Flink Backpressure
> >>
> >> Recently I want to optimize the way to find the positions where the
> >> backpressures occured .
> >>
> >> I read some blogs about flink-backpressure and have a rough idea of it .
> >>
> >> The method which Flink adopted is thread-stack-sample ,  it's heavy and
> >> no-lasting .
> >>
> >> The positions where backpressures occured are very important to the
> >> developers .
> >>
> >> They should be treated as monitor-metrics .
> >>
> >> Any other choice that we can take to detection the flink backpressures ?
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Detection Flink Backpressure

Jamie Grier-3
..  Maybe we should add a way to register those threads such that they are
also sampled.  Thoughts?

On Thu, Jan 3, 2019 at 10:25 AM Jamie Grier <[hidden email]> wrote:

> One unfortunate problem with the current back-pressure detection mechanism
> is that it doesn't work well with all of our sources.  The problem is that
> some sources (Kinesis for sure) emit elements from threads Flink knows
> nothing about and therefore those stack traces aren't sampled.  The result
> is that you never see back-pressure detected in the first chain of a Flink
> job containing that source.
>
> On Thu, Jan 3, 2019 at 2:12 AM Piotr Nowojski <[hidden email]>
> wrote:
>
>> Hi all,
>>
>> peiliping: I think your idea could be problematic for couple of reasons.
>> Probably minor concern is that checkpoint time could be affected not only
>> because of the back pressure, but also because how long does it take to
>> actually perform the checkpoint. Bigger issues are that this bottleneck
>> detection would be limited to only during checkpointing (what if one has
>> checkpoints only once every 1 hour? Or none at all?) AND
>> performance/bottlenecks may change significantly during checkpointing (for
>> example writing state for the first operator to DFS can affect indirectly
>> down stream operators).
>>
>> The idea of detecting back pressure/bottlenecks using output/input
>> buffers is much more natural. Because in the end, almost by definition, if
>> the output buffers are full, that means that the given task is back
>> pressured.
>>
>> Both input and output queues length are already exposed via metrics, so
>> developers have an access to raw data to manually calculate/detect
>> bottlenecks. It would be actually nice to automatically aggregate those
>> metrics and provide ready to use metrics: boolean flags whether
>> task/stage/job are back pressured or not.
>>
>> Replacing current back pressure detection mechanism that probes the
>> threads and checks which of them are waiting for buffers is another issues.
>> Functionally it is equivalent to monitoring whether the output queues are
>> full. This might be more hacky, but will give the same results, thus it
>> wasn’t high on my priority list to change/refactor. It would be nice to
>> clean this up a little bit and unify, but using metrics can also mean some
>> additional work, since there are some known metrics related performance
>> issues.
>>
>> Piotrek
>>
>> > On 3 Jan 2019, at 10:35, peiliping <[hidden email]> wrote:
>> >
>> > I have some ideas about detecting the backpressure (the blocking
>> operators)  by checkpoint barrier .
>> >
>> > I have some flink-jobs with checkpoint , but their checkpoints will
>> take a long time to be completed .
>> >
>> > I need to find out the blocking operators  , the same as the
>> backpressure detection .
>> >
>> > In a checkpoint object , I can get a timestamp which means the
>> start-time , then I compute a metric in
>> >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .
>> >
>> > The metric  is  a delta time between checkpoint.timestamp to the time
>> when StreamTask.executeCheckpointing invoke
>> >
>> > and I named it as checkpoint-delay-time .
>> >
>> > It looks like the end-to-end-time metric in checkpoint  but not include
>> async-handles  ,
>> >
>> > For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 )
>> ---> C (parallelism : 1)
>> >
>> > Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from
>> A(there are 2 instances )
>> >
>> > Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from
>> B(there are 3 instances )
>> >
>> > Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from
>> C(there is 1 instance)
>> >
>> > Then I can get the other 3 delta time from checkpoint-delay-values
>> >
>> > result-0-->A  = Checkpoint-delay-value-A  -  0
>> >
>> > result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A
>> >
>> > result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B
>> >
>> > someone ( result-X-->Y)  which is longer than 5s (maybe other
>> threshold)  should be the black sheep .
>> >
>> >
>> >
>> >
>> >
>> > 在 2019/1/3 下午2:43, Yun Gao :
>> >> Hello liping,
>> >>
>> >>        Thank you for proposing to optimize the backpressure detection!
>> From our previous experience, we think the InputBufferPoolUsageGauge and
>> OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a
>> list of tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage
>> of task A and InputBufferPoolUsage of task B is 100%, but the
>> OutputBufferPoolUsage of task B is less than 100%, then it should be the
>> task B that causes the backpressure.
>> >>
>> >>       However, currently we think that the InputBufferPoolUsage and
>> OutputBufferPoolUsage requires some modification to be more accurate:
>> >>            1. When there are multiple inputs or outputs, the
>> InputBufferPoolUsage and OutputBufferPoolUsage  should show the maximum
>> usage instead of the average usage [1].
>> >>          2. Currently the sender side will report backlog right before
>> fulfilling the output Buffer. Together with the pre-allocate logic in the
>> receiver side, the InputBufferPoolUsage may be 100% even if the data have
>> not been received yet [2].
>> >>
>> >>      We may need to address these problems before adopting the
>> InputBufferPoolUsage  and OutputBufferPoolUsage as the backpressure
>> indicator.
>> >>
>> >>      Besides, another similar thought is that we may also add new
>> InputBufferUsage and OutputBufferUsage metrics to show (number of queued
>> buffers / number of all buffers) instead.
>> >>
>> >>
>> >>     Best,
>> >>    Yun Gao
>> >>
>> >>
>> >>     [1] https://issues.apache.org/jira/browse/FLINK-10981
>> >>     [2] https://issues.apache.org/jira/browse/FLINK-11082
>> >>
>> >>
>> >> ------------------------------------------------------------------
>> >> From:裴立平 <[hidden email]>
>> >> Send Time:2019 Jan. 3 (Thu.) 13:39
>> >> To:dev <[hidden email]>
>> >> Subject:[DISCUSS] Detection Flink Backpressure
>> >>
>> >> Recently I want to optimize the way to find the positions where the
>> >> backpressures occured .
>> >>
>> >> I read some blogs about flink-backpressure and have a rough idea of it
>> .
>> >>
>> >> The method which Flink adopted is thread-stack-sample ,  it's heavy and
>> >> no-lasting .
>> >>
>> >> The positions where backpressures occured are very important to the
>> >> developers .
>> >>
>> >> They should be treated as monitor-metrics .
>> >>
>> >> Any other choice that we can take to detection the flink backpressures
>> ?
>> >>
>> >
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Detection Flink Backpressure

Ken Krugler
In reply to this post by Jamie Grier-3
There’s the related issue of Async I/O not showing up in back pressure reporting, also due to the same issue of threads not being sampled.

— Ken

> On Jan 3, 2019, at 10:25 AM, Jamie Grier <[hidden email]> wrote:
>
> One unfortunate problem with the current back-pressure detection mechanism
> is that it doesn't work well with all of our sources.  The problem is that
> some sources (Kinesis for sure) emit elements from threads Flink knows
> nothing about and therefore those stack traces aren't sampled.  The result
> is that you never see back-pressure detected in the first chain of a Flink
> job containing that source.
>
> On Thu, Jan 3, 2019 at 2:12 AM Piotr Nowojski <[hidden email]> wrote:
>
>> Hi all,
>>
>> peiliping: I think your idea could be problematic for couple of reasons.
>> Probably minor concern is that checkpoint time could be affected not only
>> because of the back pressure, but also because how long does it take to
>> actually perform the checkpoint. Bigger issues are that this bottleneck
>> detection would be limited to only during checkpointing (what if one has
>> checkpoints only once every 1 hour? Or none at all?) AND
>> performance/bottlenecks may change significantly during checkpointing (for
>> example writing state for the first operator to DFS can affect indirectly
>> down stream operators).
>>
>> The idea of detecting back pressure/bottlenecks using output/input buffers
>> is much more natural. Because in the end, almost by definition, if the
>> output buffers are full, that means that the given task is back pressured.
>>
>> Both input and output queues length are already exposed via metrics, so
>> developers have an access to raw data to manually calculate/detect
>> bottlenecks. It would be actually nice to automatically aggregate those
>> metrics and provide ready to use metrics: boolean flags whether
>> task/stage/job are back pressured or not.
>>
>> Replacing current back pressure detection mechanism that probes the
>> threads and checks which of them are waiting for buffers is another issues.
>> Functionally it is equivalent to monitoring whether the output queues are
>> full. This might be more hacky, but will give the same results, thus it
>> wasn’t high on my priority list to change/refactor. It would be nice to
>> clean this up a little bit and unify, but using metrics can also mean some
>> additional work, since there are some known metrics related performance
>> issues.
>>
>> Piotrek
>>
>>> On 3 Jan 2019, at 10:35, peiliping <[hidden email]> wrote:
>>>
>>> I have some ideas about detecting the backpressure (the blocking
>> operators)  by checkpoint barrier .
>>>
>>> I have some flink-jobs with checkpoint , but their checkpoints will take
>> a long time to be completed .
>>>
>>> I need to find out the blocking operators  , the same as the
>> backpressure detection .
>>>
>>> In a checkpoint object , I can get a timestamp which means the
>> start-time , then I compute a metric in
>>>
>>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .
>>>
>>> The metric  is  a delta time between checkpoint.timestamp to the time
>> when StreamTask.executeCheckpointing invoke
>>>
>>> and I named it as checkpoint-delay-time .
>>>
>>> It looks like the end-to-end-time metric in checkpoint  but not include
>> async-handles  ,
>>>
>>> For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 )
>> ---> C (parallelism : 1)
>>>
>>> Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from
>> A(there are 2 instances )
>>>
>>> Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from
>> B(there are 3 instances )
>>>
>>> Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from
>> C(there is 1 instance)
>>>
>>> Then I can get the other 3 delta time from checkpoint-delay-values
>>>
>>> result-0-->A  = Checkpoint-delay-value-A  -  0
>>>
>>> result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A
>>>
>>> result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B
>>>
>>> someone ( result-X-->Y)  which is longer than 5s (maybe other
>> threshold)  should be the black sheep .
>>>
>>>
>>>
>>>
>>>
>>> 在 2019/1/3 下午2:43, Yun Gao :
>>>> Hello liping,
>>>>
>>>>       Thank you for proposing to optimize the backpressure detection!
>> From our previous experience, we think the InputBufferPoolUsageGauge and
>> OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a
>> list of tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage
>> of task A and InputBufferPoolUsage of task B is 100%, but the
>> OutputBufferPoolUsage of task B is less than 100%, then it should be the
>> task B that causes the backpressure.
>>>>
>>>>      However, currently we think that the InputBufferPoolUsage and
>> OutputBufferPoolUsage requires some modification to be more accurate:
>>>>           1. When there are multiple inputs or outputs, the
>> InputBufferPoolUsage and OutputBufferPoolUsage  should show the maximum
>> usage instead of the average usage [1].
>>>>         2. Currently the sender side will report backlog right before
>> fulfilling the output Buffer. Together with the pre-allocate logic in the
>> receiver side, the InputBufferPoolUsage may be 100% even if the data have
>> not been received yet [2].
>>>>
>>>>     We may need to address these problems before adopting the
>> InputBufferPoolUsage  and OutputBufferPoolUsage as the backpressure
>> indicator.
>>>>
>>>>     Besides, another similar thought is that we may also add new
>> InputBufferUsage and OutputBufferUsage metrics to show (number of queued
>> buffers / number of all buffers) instead.
>>>>
>>>>
>>>>    Best,
>>>>   Yun Gao
>>>>
>>>>
>>>>    [1] https://issues.apache.org/jira/browse/FLINK-10981
>>>>    [2] https://issues.apache.org/jira/browse/FLINK-11082
>>>>
>>>>
>>>> ------------------------------------------------------------------
>>>> From:裴立平 <[hidden email]>
>>>> Send Time:2019 Jan. 3 (Thu.) 13:39
>>>> To:dev <[hidden email]>
>>>> Subject:[DISCUSS] Detection Flink Backpressure
>>>>
>>>> Recently I want to optimize the way to find the positions where the
>>>> backpressures occured .
>>>>
>>>> I read some blogs about flink-backpressure and have a rough idea of it .
>>>>
>>>> The method which Flink adopted is thread-stack-sample ,  it's heavy and
>>>> no-lasting .
>>>>
>>>> The positions where backpressures occured are very important to the
>>>> developers .
>>>>
>>>> They should be treated as monitor-metrics .
>>>>
>>>> Any other choice that we can take to detection the flink backpressures ?
>>>>
>>>
>>
>>

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Detection Flink Backpressure

Piotr Nowojski-2
Hi,

In that case I think instead of fixing the current back pressure monitoring mechanism, it would be better to replace it with a new one based on output queues length. But I haven’t thought it through, especially with respect to performance implications, however my gut feeling is that it should be solvable in one way or another.

Piotrek

> On 3 Jan 2019, at 20:05, Ken Krugler <[hidden email]> wrote:
>
> There’s the related issue of Async I/O not showing up in back pressure reporting, also due to the same issue of threads not being sampled.
>
> — Ken
>
>> On Jan 3, 2019, at 10:25 AM, Jamie Grier <[hidden email]> wrote:
>>
>> One unfortunate problem with the current back-pressure detection mechanism
>> is that it doesn't work well with all of our sources.  The problem is that
>> some sources (Kinesis for sure) emit elements from threads Flink knows
>> nothing about and therefore those stack traces aren't sampled.  The result
>> is that you never see back-pressure detected in the first chain of a Flink
>> job containing that source.
>>
>> On Thu, Jan 3, 2019 at 2:12 AM Piotr Nowojski <[hidden email]> wrote:
>>
>>> Hi all,
>>>
>>> peiliping: I think your idea could be problematic for couple of reasons.
>>> Probably minor concern is that checkpoint time could be affected not only
>>> because of the back pressure, but also because how long does it take to
>>> actually perform the checkpoint. Bigger issues are that this bottleneck
>>> detection would be limited to only during checkpointing (what if one has
>>> checkpoints only once every 1 hour? Or none at all?) AND
>>> performance/bottlenecks may change significantly during checkpointing (for
>>> example writing state for the first operator to DFS can affect indirectly
>>> down stream operators).
>>>
>>> The idea of detecting back pressure/bottlenecks using output/input buffers
>>> is much more natural. Because in the end, almost by definition, if the
>>> output buffers are full, that means that the given task is back pressured.
>>>
>>> Both input and output queues length are already exposed via metrics, so
>>> developers have an access to raw data to manually calculate/detect
>>> bottlenecks. It would be actually nice to automatically aggregate those
>>> metrics and provide ready to use metrics: boolean flags whether
>>> task/stage/job are back pressured or not.
>>>
>>> Replacing current back pressure detection mechanism that probes the
>>> threads and checks which of them are waiting for buffers is another issues.
>>> Functionally it is equivalent to monitoring whether the output queues are
>>> full. This might be more hacky, but will give the same results, thus it
>>> wasn’t high on my priority list to change/refactor. It would be nice to
>>> clean this up a little bit and unify, but using metrics can also mean some
>>> additional work, since there are some known metrics related performance
>>> issues.
>>>
>>> Piotrek
>>>
>>>> On 3 Jan 2019, at 10:35, peiliping <[hidden email]> wrote:
>>>>
>>>> I have some ideas about detecting the backpressure (the blocking
>>> operators)  by checkpoint barrier .
>>>>
>>>> I have some flink-jobs with checkpoint , but their checkpoints will take
>>> a long time to be completed .
>>>>
>>>> I need to find out the blocking operators  , the same as the
>>> backpressure detection .
>>>>
>>>> In a checkpoint object , I can get a timestamp which means the
>>> start-time , then I compute a metric in
>>>>
>>>>
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .
>>>>
>>>> The metric  is  a delta time between checkpoint.timestamp to the time
>>> when StreamTask.executeCheckpointing invoke
>>>>
>>>> and I named it as checkpoint-delay-time .
>>>>
>>>> It looks like the end-to-end-time metric in checkpoint  but not include
>>> async-handles  ,
>>>>
>>>> For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 )
>>> ---> C (parallelism : 1)
>>>>
>>>> Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from
>>> A(there are 2 instances )
>>>>
>>>> Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from
>>> B(there are 3 instances )
>>>>
>>>> Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from
>>> C(there is 1 instance)
>>>>
>>>> Then I can get the other 3 delta time from checkpoint-delay-values
>>>>
>>>> result-0-->A  = Checkpoint-delay-value-A  -  0
>>>>
>>>> result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A
>>>>
>>>> result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B
>>>>
>>>> someone ( result-X-->Y)  which is longer than 5s (maybe other
>>> threshold)  should be the black sheep .
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 在 2019/1/3 下午2:43, Yun Gao :
>>>>> Hello liping,
>>>>>
>>>>>      Thank you for proposing to optimize the backpressure detection!
>>> From our previous experience, we think the InputBufferPoolUsageGauge and
>>> OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a
>>> list of tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage
>>> of task A and InputBufferPoolUsage of task B is 100%, but the
>>> OutputBufferPoolUsage of task B is less than 100%, then it should be the
>>> task B that causes the backpressure.
>>>>>
>>>>>     However, currently we think that the InputBufferPoolUsage and
>>> OutputBufferPoolUsage requires some modification to be more accurate:
>>>>>          1. When there are multiple inputs or outputs, the
>>> InputBufferPoolUsage and OutputBufferPoolUsage  should show the maximum
>>> usage instead of the average usage [1].
>>>>>        2. Currently the sender side will report backlog right before
>>> fulfilling the output Buffer. Together with the pre-allocate logic in the
>>> receiver side, the InputBufferPoolUsage may be 100% even if the data have
>>> not been received yet [2].
>>>>>
>>>>>    We may need to address these problems before adopting the
>>> InputBufferPoolUsage  and OutputBufferPoolUsage as the backpressure
>>> indicator.
>>>>>
>>>>>    Besides, another similar thought is that we may also add new
>>> InputBufferUsage and OutputBufferUsage metrics to show (number of queued
>>> buffers / number of all buffers) instead.
>>>>>
>>>>>
>>>>>   Best,
>>>>>  Yun Gao
>>>>>
>>>>>
>>>>>   [1] https://issues.apache.org/jira/browse/FLINK-10981
>>>>>   [2] https://issues.apache.org/jira/browse/FLINK-11082
>>>>>
>>>>>
>>>>> ------------------------------------------------------------------
>>>>> From:裴立平 <[hidden email]>
>>>>> Send Time:2019 Jan. 3 (Thu.) 13:39
>>>>> To:dev <[hidden email]>
>>>>> Subject:[DISCUSS] Detection Flink Backpressure
>>>>>
>>>>> Recently I want to optimize the way to find the positions where the
>>>>> backpressures occured .
>>>>>
>>>>> I read some blogs about flink-backpressure and have a rough idea of it .
>>>>>
>>>>> The method which Flink adopted is thread-stack-sample ,  it's heavy and
>>>>> no-lasting .
>>>>>
>>>>> The positions where backpressures occured are very important to the
>>>>> developers .
>>>>>
>>>>> They should be treated as monitor-metrics .
>>>>>
>>>>> Any other choice that we can take to detection the flink backpressures ?
>>>>>
>>>>
>>>
>>>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Detection Flink Backpressure

Zhijiang(wangzhijiang999)
Hi all,

The current thread strace sample can not cover all the cases. And the output queue length might not be very accurate sometimes, because the last BufferConsumer in each subpartition might not be fulfilled and still be writable although the outqueu length is already equal or more (considering event buffer) than the buffer pool size, but it is not backpressure in this case.

I think the direct and accurate way to monitor backpressure is stating how many times "availableMemorySegments.wait(2000)" is triggered during "LocalBufferPool#requestMemorySegment", and reflecting the ratio as backpressure if not affect theperformance.

Best,
Zhijiang


------------------------------------------------------------------
From:Piotr Nowojski <[hidden email]>
Send Time:2019年1月4日(星期五) 17:01
To:dev <[hidden email]>
Subject:Re: [DISCUSS] Detection Flink Backpressure

Hi,

In that case I think instead of fixing the current back pressure monitoring mechanism, it would be better to replace it with a new one based on output queues length. But I haven’t thought it through, especially with respect to performance implications, however my gut feeling is that it should be solvable in one way or another.

Piotrek

> On 3 Jan 2019, at 20:05, Ken Krugler <[hidden email]> wrote:
>
> There’s the related issue of Async I/O not showing up in back pressure reporting, also due to the same issue of threads not being sampled.
>
> — Ken
>
>> On Jan 3, 2019, at 10:25 AM, Jamie Grier <[hidden email]> wrote:
>>
>> One unfortunate problem with the current back-pressure detection mechanism
>> is that it doesn't work well with all of our sources.  The problem is that
>> some sources (Kinesis for sure) emit elements from threads Flink knows
>> nothing about and therefore those stack traces aren't sampled.  The result
>> is that you never see back-pressure detected in the first chain of a Flink
>> job containing that source.
>>
>> On Thu, Jan 3, 2019 at 2:12 AM Piotr Nowojski <[hidden email]> wrote:
>>
>>> Hi all,
>>>
>>> peiliping: I think your idea could be problematic for couple of reasons.
>>> Probably minor concern is that checkpoint time could be affected not only
>>> because of the back pressure, but also because how long does it take to
>>> actually perform the checkpoint. Bigger issues are that this bottleneck
>>> detection would be limited to only during checkpointing (what if one has
>>> checkpoints only once every 1 hour? Or none at all?) AND
>>> performance/bottlenecks may change significantly during checkpointing (for
>>> example writing state for the first operator to DFS can affect indirectly
>>> down stream operators).
>>>
>>> The idea of detecting back pressure/bottlenecks using output/input buffers
>>> is much more natural. Because in the end, almost by definition, if the
>>> output buffers are full, that means that the given task is back pressured.
>>>
>>> Both input and output queues length are already exposed via metrics, so
>>> developers have an access to raw data to manually calculate/detect
>>> bottlenecks. It would be actually nice to automatically aggregate those
>>> metrics and provide ready to use metrics: boolean flags whether
>>> task/stage/job are back pressured or not.
>>>
>>> Replacing current back pressure detection mechanism that probes the
>>> threads and checks which of them are waiting for buffers is another issues.
>>> Functionally it is equivalent to monitoring whether the output queues are
>>> full. This might be more hacky, but will give the same results, thus it
>>> wasn’t high on my priority list to change/refactor. It would be nice to
>>> clean this up a little bit and unify, but using metrics can also mean some
>>> additional work, since there are some known metrics related performance
>>> issues.
>>>
>>> Piotrek
>>>
>>>> On 3 Jan 2019, at 10:35, peiliping <[hidden email]> wrote:
>>>>
>>>> I have some ideas about detecting the backpressure (the blocking
>>> operators)  by checkpoint barrier .
>>>>
>>>> I have some flink-jobs with checkpoint , but their checkpoints will take
>>> a long time to be completed .
>>>>
>>>> I need to find out the blocking operators  , the same as the
>>> backpressure detection .
>>>>
>>>> In a checkpoint object , I can get a timestamp which means the
>>> start-time , then I compute a metric in
>>>>
>>>>
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .
>>>>
>>>> The metric  is  a delta time between checkpoint.timestamp to the time
>>> when StreamTask.executeCheckpointing invoke
>>>>
>>>> and I named it as checkpoint-delay-time .
>>>>
>>>> It looks like the end-to-end-time metric in checkpoint  but not include
>>> async-handles  ,
>>>>
>>>> For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 )
>>> ---> C (parallelism : 1)
>>>>
>>>> Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from
>>> A(there are 2 instances )
>>>>
>>>> Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from
>>> B(there are 3 instances )
>>>>
>>>> Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from
>>> C(there is 1 instance)
>>>>
>>>> Then I can get the other 3 delta time from checkpoint-delay-values
>>>>
>>>> result-0-->A  = Checkpoint-delay-value-A  -  0
>>>>
>>>> result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A
>>>>
>>>> result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B
>>>>
>>>> someone ( result-X-->Y)  which is longer than 5s (maybe other
>>> threshold)  should be the black sheep .
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 在 2019/1/3 下午2:43, Yun Gao :
>>>>> Hello liping,
>>>>>
>>>>>      Thank you for proposing to optimize the backpressure detection!
>>> From our previous experience, we think the InputBufferPoolUsageGauge and
>>> OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a
>>> list of tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage
>>> of task A and InputBufferPoolUsage of task B is 100%, but the
>>> OutputBufferPoolUsage of task B is less than 100%, then it should be the
>>> task B that causes the backpressure.
>>>>>
>>>>>     However, currently we think that the InputBufferPoolUsage and
>>> OutputBufferPoolUsage requires some modification to be more accurate:
>>>>>          1. When there are multiple inputs or outputs, the
>>> InputBufferPoolUsage and OutputBufferPoolUsage  should show the maximum
>>> usage instead of the average usage [1].
>>>>>        2. Currently the sender side will report backlog right before
>>> fulfilling the output Buffer. Together with the pre-allocate logic in the
>>> receiver side, the InputBufferPoolUsage may be 100% even if the data have
>>> not been received yet [2].
>>>>>
>>>>>    We may need to address these problems before adopting the
>>> InputBufferPoolUsage  and OutputBufferPoolUsage as the backpressure
>>> indicator.
>>>>>
>>>>>    Besides, another similar thought is that we may also add new
>>> InputBufferUsage and OutputBufferUsage metrics to show (number of queued
>>> buffers / number of all buffers) instead.
>>>>>
>>>>>
>>>>>   Best,
>>>>>  Yun Gao
>>>>>
>>>>>
>>>>>   [1] https://issues.apache.org/jira/browse/FLINK-10981
>>>>>   [2] https://issues.apache.org/jira/browse/FLINK-11082
>>>>>
>>>>>
>>>>> ------------------------------------------------------------------
>>>>> From:裴立平 <[hidden email]>
>>>>> Send Time:2019 Jan. 3 (Thu.) 13:39
>>>>> To:dev <[hidden email]>
>>>>> Subject:[DISCUSS] Detection Flink Backpressure
>>>>>
>>>>> Recently I want to optimize the way to find the positions where the
>>>>> backpressures occured .
>>>>>
>>>>> I read some blogs about flink-backpressure and have a rough idea of it .
>>>>>
>>>>> The method which Flink adopted is thread-stack-sample ,  it's heavy and
>>>>> no-lasting .
>>>>>
>>>>> The positions where backpressures occured are very important to the
>>>>> developers .
>>>>>
>>>>> They should be treated as monitor-metrics .
>>>>>
>>>>> Any other choice that we can take to detection the flink backpressures ?
>>>>>
>>>>
>>>
>>>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Detection Flink Backpressure

Piotr Nowojski-2
Hi,

> I think the direct and accurate way to monitor backpressure is stating how many times "availableMemorySegments.wait(2000)" is triggered during "LocalBufferPool#requestMemorySegment", and reflecting the ratio as backpressure if not affect theperformance.

This could be an interesting idea, however it might be non trivial or impossible to define a time window for calculating the ration/tuning moving average resolution, to reliably handle different workloads. I would lean towards something like this:

backPressureMonitor.backPressurued(this.id);
availableMemorySegments.wait(2000);

Where `backPressureMonitor` is some global/job level entity, that collects the "back pressured” events from one side. There can be some periodic thread (from metrics?) to query the backPressureMonitor and process the back pressured events. For example report that task was back pressured if there was even one “back pressured” event in the the reporting interval.

From performance perspective, this could be dirt cheap. `backPressureMonitor.backPressured(id)`’s call (backed by for example `AtomicIntArray#incrementAndGet(id)` would be probably negligible compared to synchronisation and waiting on `availableMemorySegments`. On the other hand, reporting processing could be done by one thread per job per task manager.

Piotrek

> On 7 Jan 2019, at 09:33, zhijiang <[hidden email]> wrote:
>
> Hi all,
>
> The current thread strace sample can not cover all the cases. And the output queue length might not be very accurate sometimes, because the last BufferConsumer in each subpartition might not be fulfilled and still be writable although the outqueu length is already equal or more (considering event buffer) than the buffer pool size, but it is not backpressure in this case.
>
> I think the direct and accurate way to monitor backpressure is stating how many times "availableMemorySegments.wait(2000)" is triggered during "LocalBufferPool#requestMemorySegment", and reflecting the ratio as backpressure if not affect theperformance.
>
> Best,
> Zhijiang
>
>
> ------------------------------------------------------------------
> From:Piotr Nowojski <[hidden email]>
> Send Time:2019年1月4日(星期五) 17:01
> To:dev <[hidden email]>
> Subject:Re: [DISCUSS] Detection Flink Backpressure
>
> Hi,
>
> In that case I think instead of fixing the current back pressure monitoring mechanism, it would be better to replace it with a new one based on output queues length. But I haven’t thought it through, especially with respect to performance implications, however my gut feeling is that it should be solvable in one way or another.
>
> Piotrek
>
>> On 3 Jan 2019, at 20:05, Ken Krugler <[hidden email]> wrote:
>>
>> There’s the related issue of Async I/O not showing up in back pressure reporting, also due to the same issue of threads not being sampled.
>>
>> — Ken
>>
>>> On Jan 3, 2019, at 10:25 AM, Jamie Grier <[hidden email]> wrote:
>>>
>>> One unfortunate problem with the current back-pressure detection mechanism
>>> is that it doesn't work well with all of our sources.  The problem is that
>>> some sources (Kinesis for sure) emit elements from threads Flink knows
>>> nothing about and therefore those stack traces aren't sampled.  The result
>>> is that you never see back-pressure detected in the first chain of a Flink
>>> job containing that source.
>>>
>>> On Thu, Jan 3, 2019 at 2:12 AM Piotr Nowojski <[hidden email]> wrote:
>>>
>>>> Hi all,
>>>>
>>>> peiliping: I think your idea could be problematic for couple of reasons.
>>>> Probably minor concern is that checkpoint time could be affected not only
>>>> because of the back pressure, but also because how long does it take to
>>>> actually perform the checkpoint. Bigger issues are that this bottleneck
>>>> detection would be limited to only during checkpointing (what if one has
>>>> checkpoints only once every 1 hour? Or none at all?) AND
>>>> performance/bottlenecks may change significantly during checkpointing (for
>>>> example writing state for the first operator to DFS can affect indirectly
>>>> down stream operators).
>>>>
>>>> The idea of detecting back pressure/bottlenecks using output/input buffers
>>>> is much more natural. Because in the end, almost by definition, if the
>>>> output buffers are full, that means that the given task is back pressured.
>>>>
>>>> Both input and output queues length are already exposed via metrics, so
>>>> developers have an access to raw data to manually calculate/detect
>>>> bottlenecks. It would be actually nice to automatically aggregate those
>>>> metrics and provide ready to use metrics: boolean flags whether
>>>> task/stage/job are back pressured or not.
>>>>
>>>> Replacing current back pressure detection mechanism that probes the
>>>> threads and checks which of them are waiting for buffers is another issues.
>>>> Functionally it is equivalent to monitoring whether the output queues are
>>>> full. This might be more hacky, but will give the same results, thus it
>>>> wasn’t high on my priority list to change/refactor. It would be nice to
>>>> clean this up a little bit and unify, but using metrics can also mean some
>>>> additional work, since there are some known metrics related performance
>>>> issues.
>>>>
>>>> Piotrek
>>>>
>>>>> On 3 Jan 2019, at 10:35, peiliping <[hidden email]> wrote:
>>>>>
>>>>> I have some ideas about detecting the backpressure (the blocking
>>>> operators)  by checkpoint barrier .
>>>>>
>>>>> I have some flink-jobs with checkpoint , but their checkpoints will take
>>>> a long time to be completed .
>>>>>
>>>>> I need to find out the blocking operators  , the same as the
>>>> backpressure detection .
>>>>>
>>>>> In a checkpoint object , I can get a timestamp which means the
>>>> start-time , then I compute a metric in
>>>>>
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .
>>>>>
>>>>> The metric  is  a delta time between checkpoint.timestamp to the time
>>>> when StreamTask.executeCheckpointing invoke
>>>>>
>>>>> and I named it as checkpoint-delay-time .
>>>>>
>>>>> It looks like the end-to-end-time metric in checkpoint  but not include
>>>> async-handles  ,
>>>>>
>>>>> For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 )
>>>> ---> C (parallelism : 1)
>>>>>
>>>>> Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from
>>>> A(there are 2 instances )
>>>>>
>>>>> Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from
>>>> B(there are 3 instances )
>>>>>
>>>>> Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from
>>>> C(there is 1 instance)
>>>>>
>>>>> Then I can get the other 3 delta time from checkpoint-delay-values
>>>>>
>>>>> result-0-->A  = Checkpoint-delay-value-A  -  0
>>>>>
>>>>> result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A
>>>>>
>>>>> result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B
>>>>>
>>>>> someone ( result-X-->Y)  which is longer than 5s (maybe other
>>>> threshold)  should be the black sheep .
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 在 2019/1/3 下午2:43, Yun Gao :
>>>>>> Hello liping,
>>>>>>
>>>>>>     Thank you for proposing to optimize the backpressure detection!
>>>> From our previous experience, we think the InputBufferPoolUsageGauge and
>>>> OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a
>>>> list of tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage
>>>> of task A and InputBufferPoolUsage of task B is 100%, but the
>>>> OutputBufferPoolUsage of task B is less than 100%, then it should be the
>>>> task B that causes the backpressure.
>>>>>>
>>>>>>    However, currently we think that the InputBufferPoolUsage and
>>>> OutputBufferPoolUsage requires some modification to be more accurate:
>>>>>>         1. When there are multiple inputs or outputs, the
>>>> InputBufferPoolUsage and OutputBufferPoolUsage  should show the maximum
>>>> usage instead of the average usage [1].
>>>>>>       2. Currently the sender side will report backlog right before
>>>> fulfilling the output Buffer. Together with the pre-allocate logic in the
>>>> receiver side, the InputBufferPoolUsage may be 100% even if the data have
>>>> not been received yet [2].
>>>>>>
>>>>>>   We may need to address these problems before adopting the
>>>> InputBufferPoolUsage  and OutputBufferPoolUsage as the backpressure
>>>> indicator.
>>>>>>
>>>>>>   Besides, another similar thought is that we may also add new
>>>> InputBufferUsage and OutputBufferUsage metrics to show (number of queued
>>>> buffers / number of all buffers) instead.
>>>>>>
>>>>>>
>>>>>>  Best,
>>>>>> Yun Gao
>>>>>>
>>>>>>
>>>>>>  [1] https://issues.apache.org/jira/browse/FLINK-10981
>>>>>>  [2] https://issues.apache.org/jira/browse/FLINK-11082
>>>>>>
>>>>>>
>>>>>> ------------------------------------------------------------------
>>>>>> From:裴立平 <[hidden email]>
>>>>>> Send Time:2019 Jan. 3 (Thu.) 13:39
>>>>>> To:dev <[hidden email]>
>>>>>> Subject:[DISCUSS] Detection Flink Backpressure
>>>>>>
>>>>>> Recently I want to optimize the way to find the positions where the
>>>>>> backpressures occured .
>>>>>>
>>>>>> I read some blogs about flink-backpressure and have a rough idea of it .
>>>>>>
>>>>>> The method which Flink adopted is thread-stack-sample ,  it's heavy and
>>>>>> no-lasting .
>>>>>>
>>>>>> The positions where backpressures occured are very important to the
>>>>>> developers .
>>>>>>
>>>>>> They should be treated as monitor-metrics .
>>>>>>
>>>>>> Any other choice that we can take to detection the flink backpressures ?
>>>>>>
>>>>>
>>>>
>>>>
>>
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>>
>