Flink cluster crashing going from 1.4.0 -> 1.5.3

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink cluster crashing going from 1.4.0 -> 1.5.3

Jozef Vilcek
Hello,

I am trying to get my Beam application (run on newer version of Flink
(1.5.3) but having trouble with that. When I submit application, everything
works fine but after a few mins (as soon as 2 minutes after job start)
cluster just goes bad. Logs are full of timeouts for heartbeats, JobManager
lost leadership, TaskExecutor timed out etc.

At that time, also WebUI is not usable. Looking into job manager, I did
notice that all of "flink-akka.actor.default-dispatcher" threads are busy
or blocked. Most blocks are on metrics:

=======================================
java.lang.Thread.State: BLOCKED (on object monitor)
        at
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:84)
        - waiting to lock <0x000000053df75510> (a
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)
        at
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher.lambda$queryMetrics$5(MetricFetcher.java:205)
        at
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher$$Lambda$201/995076607.accept(Unknown
Source)
        at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        ...
=======================================

I tried to increase memory, as MetricStore seems to hold quite a lot stuff,
but it is not helping. On 1.4.0 job manager was running with 4GB heap, now,
this behaviour also occur with 10G.

Any suggestions?

Best,
Jozef

P.S.: Executed Beam app has problem in setup with 100 parallelism, 100 task
slots, 2100 running task, streaming mode. Smaller job runs without problem
Reply | Threaded
Open this post in threaded view
|

Re: Flink cluster crashing going from 1.4.0 -> 1.5.3

Aljoscha Krettek-2
Hi,

So with Flink 1.5.3 but a smaller parallelism the job works fine?

Best,
Aljoscha

> On 23. Aug 2018, at 15:25, Jozef Vilcek <[hidden email]> wrote:
>
> Hello,
>
> I am trying to get my Beam application (run on newer version of Flink
> (1.5.3) but having trouble with that. When I submit application, everything
> works fine but after a few mins (as soon as 2 minutes after job start)
> cluster just goes bad. Logs are full of timeouts for heartbeats, JobManager
> lost leadership, TaskExecutor timed out etc.
>
> At that time, also WebUI is not usable. Looking into job manager, I did
> notice that all of "flink-akka.actor.default-dispatcher" threads are busy
> or blocked. Most blocks are on metrics:
>
> =======================================
> java.lang.Thread.State: BLOCKED (on object monitor)
>        at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:84)
>        - waiting to lock <0x000000053df75510> (a
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)
>        at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher.lambda$queryMetrics$5(MetricFetcher.java:205)
>        at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher$$Lambda$201/995076607.accept(Unknown
> Source)
>        at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>        ...
> =======================================
>
> I tried to increase memory, as MetricStore seems to hold quite a lot stuff,
> but it is not helping. On 1.4.0 job manager was running with 4GB heap, now,
> this behaviour also occur with 10G.
>
> Any suggestions?
>
> Best,
> Jozef
>
> P.S.: Executed Beam app has problem in setup with 100 parallelism, 100 task
> slots, 2100 running task, streaming mode. Smaller job runs without problem

Reply | Threaded
Open this post in threaded view
|

Re: Flink cluster crashing going from 1.4.0 -> 1.5.3

Jozef Vilcek
Yes, on smaller data and therefore smaller resources and parallelism
exactly same job runs fine

On Thu, Aug 23, 2018, 16:11 Aljoscha Krettek <[hidden email]> wrote:

> Hi,
>
> So with Flink 1.5.3 but a smaller parallelism the job works fine?
>
> Best,
> Aljoscha
>
> > On 23. Aug 2018, at 15:25, Jozef Vilcek <[hidden email]> wrote:
> >
> > Hello,
> >
> > I am trying to get my Beam application (run on newer version of Flink
> > (1.5.3) but having trouble with that. When I submit application,
> everything
> > works fine but after a few mins (as soon as 2 minutes after job start)
> > cluster just goes bad. Logs are full of timeouts for heartbeats,
> JobManager
> > lost leadership, TaskExecutor timed out etc.
> >
> > At that time, also WebUI is not usable. Looking into job manager, I did
> > notice that all of "flink-akka.actor.default-dispatcher" threads are busy
> > or blocked. Most blocks are on metrics:
> >
> > =======================================
> > java.lang.Thread.State: BLOCKED (on object monitor)
> >        at
> >
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:84)
> >        - waiting to lock <0x000000053df75510> (a
> > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)
> >        at
> >
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher.lambda$queryMetrics$5(MetricFetcher.java:205)
> >        at
> >
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher$$Lambda$201/995076607.accept(Unknown
> > Source)
> >        at
> >
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> >        ...
> > =======================================
> >
> > I tried to increase memory, as MetricStore seems to hold quite a lot
> stuff,
> > but it is not helping. On 1.4.0 job manager was running with 4GB heap,
> now,
> > this behaviour also occur with 10G.
> >
> > Any suggestions?
> >
> > Best,
> > Jozef
> >
> > P.S.: Executed Beam app has problem in setup with 100 parallelism, 100
> task
> > slots, 2100 running task, streaming mode. Smaller job runs without
> problem
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink cluster crashing going from 1.4.0 -> 1.5.3

Piotr Nowojski
Hi,

How many task slots do you have in the cluster and per machine, and what parallelism are you using?

Piotrek

> On 23 Aug 2018, at 16:21, Jozef Vilcek <[hidden email]> wrote:
>
> Yes, on smaller data and therefore smaller resources and parallelism
> exactly same job runs fine
>
> On Thu, Aug 23, 2018, 16:11 Aljoscha Krettek <[hidden email]> wrote:
>
>> Hi,
>>
>> So with Flink 1.5.3 but a smaller parallelism the job works fine?
>>
>> Best,
>> Aljoscha
>>
>>> On 23. Aug 2018, at 15:25, Jozef Vilcek <[hidden email]> wrote:
>>>
>>> Hello,
>>>
>>> I am trying to get my Beam application (run on newer version of Flink
>>> (1.5.3) but having trouble with that. When I submit application,
>> everything
>>> works fine but after a few mins (as soon as 2 minutes after job start)
>>> cluster just goes bad. Logs are full of timeouts for heartbeats,
>> JobManager
>>> lost leadership, TaskExecutor timed out etc.
>>>
>>> At that time, also WebUI is not usable. Looking into job manager, I did
>>> notice that all of "flink-akka.actor.default-dispatcher" threads are busy
>>> or blocked. Most blocks are on metrics:
>>>
>>> =======================================
>>> java.lang.Thread.State: BLOCKED (on object monitor)
>>>       at
>>>
>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:84)
>>>       - waiting to lock <0x000000053df75510> (a
>>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)
>>>       at
>>>
>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher.lambda$queryMetrics$5(MetricFetcher.java:205)
>>>       at
>>>
>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher$$Lambda$201/995076607.accept(Unknown
>>> Source)
>>>       at
>>>
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>>       ...
>>> =======================================
>>>
>>> I tried to increase memory, as MetricStore seems to hold quite a lot
>> stuff,
>>> but it is not helping. On 1.4.0 job manager was running with 4GB heap,
>> now,
>>> this behaviour also occur with 10G.
>>>
>>> Any suggestions?
>>>
>>> Best,
>>> Jozef
>>>
>>> P.S.: Executed Beam app has problem in setup with 100 parallelism, 100
>> task
>>> slots, 2100 running task, streaming mode. Smaller job runs without
>> problem
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Flink cluster crashing going from 1.4.0 -> 1.5.3

Jozef Vilcek
parallelism is 100.  I tried clusters with 1 and 2 slots per TM yielding
100 or 50 TMs in cluster.

I did notice that URL  <a href="http://jobmanager:port/jobs/job_id/metrics">http://jobmanager:port/jobs/job_id/metrics  in 1.5.x
returns huge list of "latency.source_id. ...." IDs. Heap dump shows that
hash map takes 1.6GB for me. I am guessing that is the one dispatcher
threads keep updating. Not sure what are those. In 1.4.0 that URL returns
something else, very short list.

On Thu, Aug 23, 2018 at 6:44 PM Piotr Nowojski <[hidden email]>
wrote:

> Hi,
>
> How many task slots do you have in the cluster and per machine, and what
> parallelism are you using?
>
> Piotrek
>
> > On 23 Aug 2018, at 16:21, Jozef Vilcek <[hidden email]> wrote:
> >
> > Yes, on smaller data and therefore smaller resources and parallelism
> > exactly same job runs fine
> >
> > On Thu, Aug 23, 2018, 16:11 Aljoscha Krettek <[hidden email]>
> wrote:
> >
> >> Hi,
> >>
> >> So with Flink 1.5.3 but a smaller parallelism the job works fine?
> >>
> >> Best,
> >> Aljoscha
> >>
> >>> On 23. Aug 2018, at 15:25, Jozef Vilcek <[hidden email]> wrote:
> >>>
> >>> Hello,
> >>>
> >>> I am trying to get my Beam application (run on newer version of Flink
> >>> (1.5.3) but having trouble with that. When I submit application,
> >> everything
> >>> works fine but after a few mins (as soon as 2 minutes after job start)
> >>> cluster just goes bad. Logs are full of timeouts for heartbeats,
> >> JobManager
> >>> lost leadership, TaskExecutor timed out etc.
> >>>
> >>> At that time, also WebUI is not usable. Looking into job manager, I did
> >>> notice that all of "flink-akka.actor.default-dispatcher" threads are
> busy
> >>> or blocked. Most blocks are on metrics:
> >>>
> >>> =======================================
> >>> java.lang.Thread.State: BLOCKED (on object monitor)
> >>>       at
> >>>
> >>
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:84)
> >>>       - waiting to lock <0x000000053df75510> (a
> >>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)
> >>>       at
> >>>
> >>
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher.lambda$queryMetrics$5(MetricFetcher.java:205)
> >>>       at
> >>>
> >>
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher$$Lambda$201/995076607.accept(Unknown
> >>> Source)
> >>>       at
> >>>
> >>
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> >>>       ...
> >>> =======================================
> >>>
> >>> I tried to increase memory, as MetricStore seems to hold quite a lot
> >> stuff,
> >>> but it is not helping. On 1.4.0 job manager was running with 4GB heap,
> >> now,
> >>> this behaviour also occur with 10G.
> >>>
> >>> Any suggestions?
> >>>
> >>> Best,
> >>> Jozef
> >>>
> >>> P.S.: Executed Beam app has problem in setup with 100 parallelism, 100
> >> task
> >>> slots, 2100 running task, streaming mode. Smaller job runs without
> >> problem
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink cluster crashing going from 1.4.0 -> 1.5.3

Chesnay Schepler-3
In 1.5 the latency metric was changed to be reported on the job-level,
that's why you see it under /jobs/.../metrics now, but not in 1.4.
In 1.4 you would see something similar under
/jobs/.../vertices/.../metrics, for each vertex.

Additionally it is now a proper histogram, which significantly increases
the number of accesses to the ConcurrentHashMaps that store metrics fort
he UI. It could be that this code is just too slow for the amount of
metrics.

On 23.08.2018 19:06, Jozef Vilcek wrote:

> parallelism is 100.  I tried clusters with 1 and 2 slots per TM yielding
> 100 or 50 TMs in cluster.
>
> I did notice that URL  <a href="http://jobmanager:port/jobs/job_id/metrics">http://jobmanager:port/jobs/job_id/metrics  in 1.5.x
> returns huge list of "latency.source_id. ...." IDs. Heap dump shows that
> hash map takes 1.6GB for me. I am guessing that is the one dispatcher
> threads keep updating. Not sure what are those. In 1.4.0 that URL returns
> something else, very short list.
>
> On Thu, Aug 23, 2018 at 6:44 PM Piotr Nowojski <[hidden email]>
> wrote:
>
>> Hi,
>>
>> How many task slots do you have in the cluster and per machine, and what
>> parallelism are you using?
>>
>> Piotrek
>>
>>> On 23 Aug 2018, at 16:21, Jozef Vilcek <[hidden email]> wrote:
>>>
>>> Yes, on smaller data and therefore smaller resources and parallelism
>>> exactly same job runs fine
>>>
>>> On Thu, Aug 23, 2018, 16:11 Aljoscha Krettek <[hidden email]>
>> wrote:
>>>> Hi,
>>>>
>>>> So with Flink 1.5.3 but a smaller parallelism the job works fine?
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>> On 23. Aug 2018, at 15:25, Jozef Vilcek <[hidden email]> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I am trying to get my Beam application (run on newer version of Flink
>>>>> (1.5.3) but having trouble with that. When I submit application,
>>>> everything
>>>>> works fine but after a few mins (as soon as 2 minutes after job start)
>>>>> cluster just goes bad. Logs are full of timeouts for heartbeats,
>>>> JobManager
>>>>> lost leadership, TaskExecutor timed out etc.
>>>>>
>>>>> At that time, also WebUI is not usable. Looking into job manager, I did
>>>>> notice that all of "flink-akka.actor.default-dispatcher" threads are
>> busy
>>>>> or blocked. Most blocks are on metrics:
>>>>>
>>>>> =======================================
>>>>> java.lang.Thread.State: BLOCKED (on object monitor)
>>>>>        at
>>>>>
>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:84)
>>>>>        - waiting to lock <0x000000053df75510> (a
>>>>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)
>>>>>        at
>>>>>
>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher.lambda$queryMetrics$5(MetricFetcher.java:205)
>>>>>        at
>>>>>
>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher$$Lambda$201/995076607.accept(Unknown
>>>>> Source)
>>>>>        at
>>>>>
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>>>>        ...
>>>>> =======================================
>>>>>
>>>>> I tried to increase memory, as MetricStore seems to hold quite a lot
>>>> stuff,
>>>>> but it is not helping. On 1.4.0 job manager was running with 4GB heap,
>>>> now,
>>>>> this behaviour also occur with 10G.
>>>>>
>>>>> Any suggestions?
>>>>>
>>>>> Best,
>>>>> Jozef
>>>>>
>>>>> P.S.: Executed Beam app has problem in setup with 100 parallelism, 100
>>>> task
>>>>> slots, 2100 running task, streaming mode. Smaller job runs without
>>>> problem
>>>>
>>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Flink cluster crashing going from 1.4.0 -> 1.5.3

Jozef Vilcek
For my small job, I see ~24k those latency metrics @
'/jobs/.../metrics'. That job is much smaller in terms of production
parallelism.

Are there any options here. Can it be turned off, reduced histogram
metrics, reduced update frequency, ... ?
Also, keeping it flat seems to use quite some memory of JM

{"id":"latency.source_id.2f6436c1f4f0c70e401663acf945a822.source_subtask_index.2.operator_id.4060d9664a78e1d82671ac80921843cd.operator_subtask_index.1.latency_stddev"}


On Fri, Aug 24, 2018 at 10:08 AM Chesnay Schepler <[hidden email]>
wrote:

> In 1.5 the latency metric was changed to be reported on the job-level,
> that's why you see it under /jobs/.../metrics now, but not in 1.4.
> In 1.4 you would see something similar under
> /jobs/.../vertices/.../metrics, for each vertex.
>
> Additionally it is now a proper histogram, which significantly increases
> the number of accesses to the ConcurrentHashMaps that store metrics fort
> he UI. It could be that this code is just too slow for the amount of
> metrics.
>
> On 23.08.2018 19:06, Jozef Vilcek wrote:
> > parallelism is 100.  I tried clusters with 1 and 2 slots per TM yielding
> > 100 or 50 TMs in cluster.
> >
> > I did notice that URL  <a href="http://jobmanager:port/jobs/job_id/metrics">http://jobmanager:port/jobs/job_id/metrics  in
> 1.5.x
> > returns huge list of "latency.source_id. ...." IDs. Heap dump shows that
> > hash map takes 1.6GB for me. I am guessing that is the one dispatcher
> > threads keep updating. Not sure what are those. In 1.4.0 that URL returns
> > something else, very short list.
> >
> > On Thu, Aug 23, 2018 at 6:44 PM Piotr Nowojski <[hidden email]>
> > wrote:
> >
> >> Hi,
> >>
> >> How many task slots do you have in the cluster and per machine, and what
> >> parallelism are you using?
> >>
> >> Piotrek
> >>
> >>> On 23 Aug 2018, at 16:21, Jozef Vilcek <[hidden email]> wrote:
> >>>
> >>> Yes, on smaller data and therefore smaller resources and parallelism
> >>> exactly same job runs fine
> >>>
> >>> On Thu, Aug 23, 2018, 16:11 Aljoscha Krettek <[hidden email]>
> >> wrote:
> >>>> Hi,
> >>>>
> >>>> So with Flink 1.5.3 but a smaller parallelism the job works fine?
> >>>>
> >>>> Best,
> >>>> Aljoscha
> >>>>
> >>>>> On 23. Aug 2018, at 15:25, Jozef Vilcek <[hidden email]>
> wrote:
> >>>>>
> >>>>> Hello,
> >>>>>
> >>>>> I am trying to get my Beam application (run on newer version of Flink
> >>>>> (1.5.3) but having trouble with that. When I submit application,
> >>>> everything
> >>>>> works fine but after a few mins (as soon as 2 minutes after job
> start)
> >>>>> cluster just goes bad. Logs are full of timeouts for heartbeats,
> >>>> JobManager
> >>>>> lost leadership, TaskExecutor timed out etc.
> >>>>>
> >>>>> At that time, also WebUI is not usable. Looking into job manager, I
> did
> >>>>> notice that all of "flink-akka.actor.default-dispatcher" threads are
> >> busy
> >>>>> or blocked. Most blocks are on metrics:
> >>>>>
> >>>>> =======================================
> >>>>> java.lang.Thread.State: BLOCKED (on object monitor)
> >>>>>        at
> >>>>>
> >>
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:84)
> >>>>>        - waiting to lock <0x000000053df75510> (a
> >>>>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)
> >>>>>        at
> >>>>>
> >>
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher.lambda$queryMetrics$5(MetricFetcher.java:205)
> >>>>>        at
> >>>>>
> >>
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher$$Lambda$201/995076607.accept(Unknown
> >>>>> Source)
> >>>>>        at
> >>>>>
> >>
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> >>>>>        ...
> >>>>> =======================================
> >>>>>
> >>>>> I tried to increase memory, as MetricStore seems to hold quite a lot
> >>>> stuff,
> >>>>> but it is not helping. On 1.4.0 job manager was running with 4GB
> heap,
> >>>> now,
> >>>>> this behaviour also occur with 10G.
> >>>>>
> >>>>> Any suggestions?
> >>>>>
> >>>>> Best,
> >>>>> Jozef
> >>>>>
> >>>>> P.S.: Executed Beam app has problem in setup with 100 parallelism,
> 100
> >>>> task
> >>>>> slots, 2100 running task, streaming mode. Smaller job runs without
> >>>> problem
> >>>>
> >>>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink cluster crashing going from 1.4.0 -> 1.5.3

Chesnay Schepler-3
I believe the only thing you can do is disable latency tracking, by
setting the `latencyTrackingInterval` in `env.getExecutionConfig()` to 0
or a negative value.

The update frequency is not configurable and currently set to 10 seconds.

Latency metrics are tracked as the cross-product of all subtasks of all
operators and all subtasks of all sources.
That is, if you have 2 sources, with 2 other operators and a parallelism
of 10 you can end up with 400 latency metrics.
10 (subtasks per source) * 10 (subtasks per operator) * 2 (# operators)
* 2 (#-sources)

On 24.08.2018 11:28, Jozef Vilcek wrote:

> For my small job, I see ~24k those latency metrics @
> '/jobs/.../metrics'. That job is much smaller in terms of production
> parallelism.
>
> Are there any options here. Can it be turned off, reduced histogram
> metrics, reduced update frequency, ... ?
> Also, keeping it flat seems to use quite some memory of JM
> {"id":"latency.source_id.2f6436c1f4f0c70e401663acf945a822.source_subtask_index.2.operator_id.4060d9664a78e1d82671ac80921843cd.operator_subtask_index.1.latency_stddev"}
>
> On Fri, Aug 24, 2018 at 10:08 AM Chesnay Schepler <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     In 1.5 the latency metric was changed to be reported on the job-level,
>     that's why you see it under /jobs/.../metrics now, but not in 1.4.
>     In 1.4 you would see something similar under
>     /jobs/.../vertices/.../metrics, for each vertex.
>
>     Additionally it is now a proper histogram, which significantly
>     increases
>     the number of accesses to the ConcurrentHashMaps that store
>     metrics fort
>     he UI. It could be that this code is just too slow for the amount of
>     metrics.
>
>     On 23.08.2018 19:06, Jozef Vilcek wrote:
>     > parallelism is 100.  I tried clusters with 1 and 2 slots per TM
>     yielding
>     > 100 or 50 TMs in cluster.
>     >
>     > I did notice that URL
>     <a href="http://jobmanager:port/jobs/job_id/metrics">http://jobmanager:port/jobs/job_id/metrics  in 1.5.x
>     > returns huge list of "latency.source_id. ...." IDs. Heap dump
>     shows that
>     > hash map takes 1.6GB for me. I am guessing that is the one
>     dispatcher
>     > threads keep updating. Not sure what are those. In 1.4.0 that
>     URL returns
>     > something else, very short list.
>     >
>     > On Thu, Aug 23, 2018 at 6:44 PM Piotr Nowojski
>     <[hidden email] <mailto:[hidden email]>>
>     > wrote:
>     >
>     >> Hi,
>     >>
>     >> How many task slots do you have in the cluster and per machine,
>     and what
>     >> parallelism are you using?
>     >>
>     >> Piotrek
>     >>
>     >>> On 23 Aug 2018, at 16:21, Jozef Vilcek <[hidden email]
>     <mailto:[hidden email]>> wrote:
>     >>>
>     >>> Yes, on smaller data and therefore smaller resources and
>     parallelism
>     >>> exactly same job runs fine
>     >>>
>     >>> On Thu, Aug 23, 2018, 16:11 Aljoscha Krettek
>     <[hidden email] <mailto:[hidden email]>>
>     >> wrote:
>     >>>> Hi,
>     >>>>
>     >>>> So with Flink 1.5.3 but a smaller parallelism the job works fine?
>     >>>>
>     >>>> Best,
>     >>>> Aljoscha
>     >>>>
>     >>>>> On 23. Aug 2018, at 15:25, Jozef Vilcek
>     <[hidden email] <mailto:[hidden email]>> wrote:
>     >>>>>
>     >>>>> Hello,
>     >>>>>
>     >>>>> I am trying to get my Beam application (run on newer version
>     of Flink
>     >>>>> (1.5.3) but having trouble with that. When I submit application,
>     >>>> everything
>     >>>>> works fine but after a few mins (as soon as 2 minutes after
>     job start)
>     >>>>> cluster just goes bad. Logs are full of timeouts for heartbeats,
>     >>>> JobManager
>     >>>>> lost leadership, TaskExecutor timed out etc.
>     >>>>>
>     >>>>> At that time, also WebUI is not usable. Looking into job
>     manager, I did
>     >>>>> notice that all of "flink-akka.actor.default-dispatcher"
>     threads are
>     >> busy
>     >>>>> or blocked. Most blocks are on metrics:
>     >>>>>
>     >>>>> =======================================
>     >>>>> java.lang.Thread.State: BLOCKED (on object monitor)
>     >>>>>        at
>     >>>>>
>     >>
>     org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:84)
>     >>>>>        - waiting to lock <0x000000053df75510> (a
>     >>>>>
>     org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)
>     >>>>>        at
>     >>>>>
>     >>
>     org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher.lambda$queryMetrics$5(MetricFetcher.java:205)
>     >>>>>        at
>     >>>>>
>     >>
>     org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher$$Lambda$201/995076607.accept(Unknown
>     >>>>> Source)
>     >>>>>        at
>     >>>>>
>     >>
>     java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>     >>>>>        ...
>     >>>>> =======================================
>     >>>>>
>     >>>>> I tried to increase memory, as MetricStore seems to hold
>     quite a lot
>     >>>> stuff,
>     >>>>> but it is not helping. On 1.4.0 job manager was running with
>     4GB heap,
>     >>>> now,
>     >>>>> this behaviour also occur with 10G.
>     >>>>>
>     >>>>> Any suggestions?
>     >>>>>
>     >>>>> Best,
>     >>>>> Jozef
>     >>>>>
>     >>>>> P.S.: Executed Beam app has problem in setup with 100
>     parallelism, 100
>     >>>> task
>     >>>>> slots, 2100 running task, streaming mode. Smaller job runs
>     without
>     >>>> problem
>     >>>>
>     >>>>
>     >>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink cluster crashing going from 1.4.0 -> 1.5.3

Jozef Vilcek
With `latencyTrackingInterval` set to `0` cluster runs fine.
So, is this something which make sense to be improved? JIRA I can track or
file one?

On Fri, Aug 24, 2018 at 11:50 AM Chesnay Schepler <[hidden email]>
wrote:

> I believe the only thing you can do is disable latency tracking, by
> setting the `latencyTrackingInterval` in `env.getExecutionConfig()` to 0 or
> a negative value.
>
> The update frequency is not configurable and currently set to 10 seconds.
>
> Latency metrics are tracked as the cross-product of all subtasks of all
> operators and all subtasks of all sources.
> That is, if you have 2 sources, with 2 other operators and a parallelism
> of 10 you can end up with 400 latency metrics.
> 10 (subtasks per source) * 10 (subtasks per operator) * 2 (# operators) *
> 2 (#-sources)
>
> On 24.08.2018 11:28, Jozef Vilcek wrote:
>
> For my small job, I see ~24k those latency metrics @
> '/jobs/.../metrics'. That job is much smaller in terms of production
> parallelism.
>
> Are there any options here. Can it be turned off, reduced histogram
> metrics, reduced update frequency, ... ?
> Also, keeping it flat seems to use quite some memory of JM
>
> {"id":"latency.source_id.2f6436c1f4f0c70e401663acf945a822.source_subtask_index.2.operator_id.4060d9664a78e1d82671ac80921843cd.operator_subtask_index.1.latency_stddev"}
>
>
> On Fri, Aug 24, 2018 at 10:08 AM Chesnay Schepler <[hidden email]>
> wrote:
>
>> In 1.5 the latency metric was changed to be reported on the job-level,
>> that's why you see it under /jobs/.../metrics now, but not in 1.4.
>> In 1.4 you would see something similar under
>> /jobs/.../vertices/.../metrics, for each vertex.
>>
>> Additionally it is now a proper histogram, which significantly increases
>> the number of accesses to the ConcurrentHashMaps that store metrics fort
>> he UI. It could be that this code is just too slow for the amount of
>> metrics.
>>
>> On 23.08.2018 19:06, Jozef Vilcek wrote:
>> > parallelism is 100.  I tried clusters with 1 and 2 slots per TM yielding
>> > 100 or 50 TMs in cluster.
>> >
>> > I did notice that URL  <a href="http://jobmanager:port/jobs/job_id/metrics">http://jobmanager:port/jobs/job_id/metrics  in
>> 1.5.x
>> > returns huge list of "latency.source_id. ...." IDs. Heap dump shows that
>> > hash map takes 1.6GB for me. I am guessing that is the one dispatcher
>> > threads keep updating. Not sure what are those. In 1.4.0 that URL
>> returns
>> > something else, very short list.
>> >
>> > On Thu, Aug 23, 2018 at 6:44 PM Piotr Nowojski <[hidden email]
>> >
>> > wrote:
>> >
>> >> Hi,
>> >>
>> >> How many task slots do you have in the cluster and per machine, and
>> what
>> >> parallelism are you using?
>> >>
>> >> Piotrek
>> >>
>> >>> On 23 Aug 2018, at 16:21, Jozef Vilcek <[hidden email]> wrote:
>> >>>
>> >>> Yes, on smaller data and therefore smaller resources and parallelism
>> >>> exactly same job runs fine
>> >>>
>> >>> On Thu, Aug 23, 2018, 16:11 Aljoscha Krettek <[hidden email]>
>> >> wrote:
>> >>>> Hi,
>> >>>>
>> >>>> So with Flink 1.5.3 but a smaller parallelism the job works fine?
>> >>>>
>> >>>> Best,
>> >>>> Aljoscha
>> >>>>
>> >>>>> On 23. Aug 2018, at 15:25, Jozef Vilcek <[hidden email]>
>> wrote:
>> >>>>>
>> >>>>> Hello,
>> >>>>>
>> >>>>> I am trying to get my Beam application (run on newer version of
>> Flink
>> >>>>> (1.5.3) but having trouble with that. When I submit application,
>> >>>> everything
>> >>>>> works fine but after a few mins (as soon as 2 minutes after job
>> start)
>> >>>>> cluster just goes bad. Logs are full of timeouts for heartbeats,
>> >>>> JobManager
>> >>>>> lost leadership, TaskExecutor timed out etc.
>> >>>>>
>> >>>>> At that time, also WebUI is not usable. Looking into job manager, I
>> did
>> >>>>> notice that all of "flink-akka.actor.default-dispatcher" threads are
>> >> busy
>> >>>>> or blocked. Most blocks are on metrics:
>> >>>>>
>> >>>>> =======================================
>> >>>>> java.lang.Thread.State: BLOCKED (on object monitor)
>> >>>>>        at
>> >>>>>
>> >>
>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:84)
>> >>>>>        - waiting to lock <0x000000053df75510> (a
>> >>>>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)
>> >>>>>        at
>> >>>>>
>> >>
>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher.lambda$queryMetrics$5(MetricFetcher.java:205)
>> >>>>>        at
>> >>>>>
>> >>
>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher$$Lambda$201/995076607.accept(Unknown
>> >>>>> Source)
>> >>>>>        at
>> >>>>>
>> >>
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>> >>>>>        ...
>> >>>>> =======================================
>> >>>>>
>> >>>>> I tried to increase memory, as MetricStore seems to hold quite a lot
>> >>>> stuff,
>> >>>>> but it is not helping. On 1.4.0 job manager was running with 4GB
>> heap,
>> >>>> now,
>> >>>>> this behaviour also occur with 10G.
>> >>>>>
>> >>>>> Any suggestions?
>> >>>>>
>> >>>>> Best,
>> >>>>> Jozef
>> >>>>>
>> >>>>> P.S.: Executed Beam app has problem in setup with 100 parallelism,
>> 100
>> >>>> task
>> >>>>> slots, 2100 running task, streaming mode. Smaller job runs without
>> >>>> problem
>> >>>>
>> >>>>
>> >>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink cluster crashing going from 1.4.0 -> 1.5.3

Chesnay Schepler-3
There are a few separate issues in here that we should
tackle/investigate in parallel.


        Improve storage of latency metrics

Given how absurdly the number of latency metrics scale with # of
operators / parallelism
it makes sense to introduce a special case here. I'm not quite sure yet
how easily this can
be done though, as this isn't just about storage but also about
transmission from the TM -> JM,
which is just as inefficient as the storage.


        Configurable granularity for latency metrics

The main reason for the scaling issue above is that we track latency
from each operator subtask
to each source subtask. If we only accounted for the source ID instead
we would significantly
reduce the number of metrics, while still providing some insight into
latency.
Here's a comparison for the number of individual data points in the
MetricStore,
for 2 sources, 6 subsequent operators, parallelism=100:

Current: 1.320.000
SourceID-only: 13.200


        Separate dispatcher thread-pool from REST API / metrics

We currently use the same thread-pool for inserting metrics / processing
REST requests
that is also used for the Dispatcher RPC, i.e., intra-cluster communication.
To better isolate the Dispatcher we should provide separate thread-pools
to both
components to prevent worst-case scenarios in the future.


        Find the bottleneck

I've run some preliminary benchmarks and the MetricStore itself appears
to be fast enough
to handle these loads, so the search continues...

On 24.08.2018 15:06, Jozef Vilcek wrote:

> With `latencyTrackingInterval` set to `0` cluster runs fine.
> So, is this something which make sense to be improved? JIRA I can track or
> file one?
>
> On Fri, Aug 24, 2018 at 11:50 AM Chesnay Schepler <[hidden email]>
> wrote:
>
>> I believe the only thing you can do is disable latency tracking, by
>> setting the `latencyTrackingInterval` in `env.getExecutionConfig()` to 0 or
>> a negative value.
>>
>> The update frequency is not configurable and currently set to 10 seconds.
>>
>> Latency metrics are tracked as the cross-product of all subtasks of all
>> operators and all subtasks of all sources.
>> That is, if you have 2 sources, with 2 other operators and a parallelism
>> of 10 you can end up with 400 latency metrics.
>> 10 (subtasks per source) * 10 (subtasks per operator) * 2 (# operators) *
>> 2 (#-sources)
>>
>> On 24.08.2018 11:28, Jozef Vilcek wrote:
>>
>> For my small job, I see ~24k those latency metrics @
>> '/jobs/.../metrics'. That job is much smaller in terms of production
>> parallelism.
>>
>> Are there any options here. Can it be turned off, reduced histogram
>> metrics, reduced update frequency, ... ?
>> Also, keeping it flat seems to use quite some memory of JM
>>
>> {"id":"latency.source_id.2f6436c1f4f0c70e401663acf945a822.source_subtask_index.2.operator_id.4060d9664a78e1d82671ac80921843cd.operator_subtask_index.1.latency_stddev"}
>>
>>
>> On Fri, Aug 24, 2018 at 10:08 AM Chesnay Schepler <[hidden email]>
>> wrote:
>>
>>> In 1.5 the latency metric was changed to be reported on the job-level,
>>> that's why you see it under /jobs/.../metrics now, but not in 1.4.
>>> In 1.4 you would see something similar under
>>> /jobs/.../vertices/.../metrics, for each vertex.
>>>
>>> Additionally it is now a proper histogram, which significantly increases
>>> the number of accesses to the ConcurrentHashMaps that store metrics fort
>>> he UI. It could be that this code is just too slow for the amount of
>>> metrics.
>>>
>>> On 23.08.2018 19:06, Jozef Vilcek wrote:
>>>> parallelism is 100.  I tried clusters with 1 and 2 slots per TM yielding
>>>> 100 or 50 TMs in cluster.
>>>>
>>>> I did notice that URL  <a href="http://jobmanager:port/jobs/job_id/metrics">http://jobmanager:port/jobs/job_id/metrics  in
>>> 1.5.x
>>>> returns huge list of "latency.source_id. ...." IDs. Heap dump shows that
>>>> hash map takes 1.6GB for me. I am guessing that is the one dispatcher
>>>> threads keep updating. Not sure what are those. In 1.4.0 that URL
>>> returns
>>>> something else, very short list.
>>>>
>>>> On Thu, Aug 23, 2018 at 6:44 PM Piotr Nowojski <[hidden email]
>>>>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> How many task slots do you have in the cluster and per machine, and
>>> what
>>>>> parallelism are you using?
>>>>>
>>>>> Piotrek
>>>>>
>>>>>> On 23 Aug 2018, at 16:21, Jozef Vilcek <[hidden email]> wrote:
>>>>>>
>>>>>> Yes, on smaller data and therefore smaller resources and parallelism
>>>>>> exactly same job runs fine
>>>>>>
>>>>>> On Thu, Aug 23, 2018, 16:11 Aljoscha Krettek <[hidden email]>
>>>>> wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> So with Flink 1.5.3 but a smaller parallelism the job works fine?
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>>> On 23. Aug 2018, at 15:25, Jozef Vilcek <[hidden email]>
>>> wrote:
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I am trying to get my Beam application (run on newer version of
>>> Flink
>>>>>>>> (1.5.3) but having trouble with that. When I submit application,
>>>>>>> everything
>>>>>>>> works fine but after a few mins (as soon as 2 minutes after job
>>> start)
>>>>>>>> cluster just goes bad. Logs are full of timeouts for heartbeats,
>>>>>>> JobManager
>>>>>>>> lost leadership, TaskExecutor timed out etc.
>>>>>>>>
>>>>>>>> At that time, also WebUI is not usable. Looking into job manager, I
>>> did
>>>>>>>> notice that all of "flink-akka.actor.default-dispatcher" threads are
>>>>> busy
>>>>>>>> or blocked. Most blocks are on metrics:
>>>>>>>>
>>>>>>>> =======================================
>>>>>>>> java.lang.Thread.State: BLOCKED (on object monitor)
>>>>>>>>         at
>>>>>>>>
>>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:84)
>>>>>>>>         - waiting to lock <0x000000053df75510> (a
>>>>>>>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)
>>>>>>>>         at
>>>>>>>>
>>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher.lambda$queryMetrics$5(MetricFetcher.java:205)
>>>>>>>>         at
>>>>>>>>
>>> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher$$Lambda$201/995076607.accept(Unknown
>>>>>>>> Source)
>>>>>>>>         at
>>>>>>>>
>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>>>>>>>         ...
>>>>>>>> =======================================
>>>>>>>>
>>>>>>>> I tried to increase memory, as MetricStore seems to hold quite a lot
>>>>>>> stuff,
>>>>>>>> but it is not helping. On 1.4.0 job manager was running with 4GB
>>> heap,
>>>>>>> now,
>>>>>>>> this behaviour also occur with 10G.
>>>>>>>>
>>>>>>>> Any suggestions?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jozef
>>>>>>>>
>>>>>>>> P.S.: Executed Beam app has problem in setup with 100 parallelism,
>>> 100
>>>>>>> task
>>>>>>>> slots, 2100 running task, streaming mode. Smaller job runs without
>>>>>>> problem
>>>>>>>
>>>>>>>
>>>