Flink TM Heartbeat Timeout

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink TM Heartbeat Timeout

Lu Niu
Hi, Flink User

Several of our applications get heartbeat timeout occasionally. there is no
GC, no OOM:
```
- realtime conversion event filter (49/120)
(16e3d3e7608eed0a30e0508eb52065fd) switched from RUNNING to FAILED on
container_e05_1599158866703_129001_01_000111 @
xenon-pii-prod-001-20191210-data-slave-prod-0a01bbdd.ec2.pin220.com
(dataPort=39013). java.util.concurrent.TimeoutException: Heartbeat of
TaskManager with id container_e05_1599158866703_129001_01_000111 timed out.
at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193)
at
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
akka.actor.ActorCell.invoke(ActorCell.scala:561) at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
akka.dispatch.Mailbox.run(Mailbox.scala:225) at
akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```
Shall we try increase `heartbeat.timeout` ? Any side effects? E.g, would
that lead to slower detection when TM container is killed by YARN?

we use flink 1.11

Best
Lu
Pinterest, Inc.
Reply | Threaded
Open this post in threaded view
|

Re: Flink TM Heartbeat Timeout

JING ZHANG
Hi Lu,
Xintong has a professional analysis about TM heartbeat timeout in a
historical mail[1], please check if it could help.

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html

Best regards,
JING ZHANG

Lu Niu <[hidden email]> 于2021年6月11日周五 上午1:33写道:

> Hi, Flink User
>
> Several of our applications get heartbeat timeout occasionally. there is no
> GC, no OOM:
> ```
> - realtime conversion event filter (49/120)
> (16e3d3e7608eed0a30e0508eb52065fd) switched from RUNNING to FAILED on
> container_e05_1599158866703_129001_01_000111 @
> xenon-pii-prod-001-20191210-data-slave-prod-0a01bbdd.ec2.pin220.com
> (dataPort=39013). java.util.concurrent.TimeoutException: Heartbeat of
> TaskManager with id container_e05_1599158866703_129001_01_000111 timed out.
> at
>
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193)
> at
>
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
> akka.actor.ActorCell.invoke(ActorCell.scala:561) at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
> akka.dispatch.Mailbox.run(Mailbox.scala:225) at
> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ```
> Shall we try increase `heartbeat.timeout` ? Any side effects? E.g, would
> that lead to slower detection when TM container is killed by YARN?
>
> we use flink 1.11
>
> Best
> Lu
> Pinterest, Inc.
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink TM Heartbeat Timeout

Till Rohrmann
Hi Lu,

longer heartbeat timeouts will have the effect that a loss of component
(e.g. a TaskManager) will take longer to be detected. This will affect the
recovery speed of your application in case of such a situation. On the
upside, longer heartbeat timeouts allow working on less reliable
infrastructure w/o detecting falsely that a component has died.

What I would suggest is to figure out why the heartbeats are not sent in
time. You could take a look at the logs or metrics of your container
whether it was under higher load so that it could not send the heartbeats
in time. Also, network issues can cause heartbeats to be delayed.

Cheers,
Till

On Fri, Jun 11, 2021 at 3:41 AM JING ZHANG <[hidden email]> wrote:

> Hi Lu,
> Xintong has a professional analysis about TM heartbeat timeout in a
> historical mail[1], please check if it could help.
>
> [1]
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html
>
> Best regards,
> JING ZHANG
>
> Lu Niu <[hidden email]> 于2021年6月11日周五 上午1:33写道:
>
> > Hi, Flink User
> >
> > Several of our applications get heartbeat timeout occasionally. there is
> no
> > GC, no OOM:
> > ```
> > - realtime conversion event filter (49/120)
> > (16e3d3e7608eed0a30e0508eb52065fd) switched from RUNNING to FAILED on
> > container_e05_1599158866703_129001_01_000111 @
> > xenon-pii-prod-001-20191210-data-slave-prod-0a01bbdd.ec2.pin220.com
> > (dataPort=39013). java.util.concurrent.TimeoutException: Heartbeat of
> > TaskManager with id container_e05_1599158866703_129001_01_000111 timed
> out.
> > at
> >
> >
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193)
> > at
> >
> >
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> > at
> >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> > at
> >
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> > at
> >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
> > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
> > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
> > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> > akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
> > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
> > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
> > akka.actor.ActorCell.invoke(ActorCell.scala:561) at
> > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
> > akka.dispatch.Mailbox.run(Mailbox.scala:225) at
> > akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
> > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> >
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> >
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > ```
> > Shall we try increase `heartbeat.timeout` ? Any side effects? E.g, would
> > that lead to slower detection when TM container is killed by YARN?
> >
> > we use flink 1.11
> >
> > Best
> > Lu
> > Pinterest, Inc.
> >
>