How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

Elkhan Dadashov
Dear Flink developers,

Having  difficulty of getting  a Flink job started.

The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
containers.

The default HDFS replication is 3.

*The Yarn queue is empty, and 800 containers  are allocated
almost immediately  by Yarn  RM.*

It takes very long time until all 800 nodes (node managers) will download
Uberjar from HDFS to local machines.

*Q1:*

a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
size = HDFS replication size)

b) Or Do Flink TM's can replicate from each other  ? or  already started
TM's replicate  to  yet-started  nodes?

Most probably answer is (a), but  want to confirm.

*Q2:*

What  is the recommended way of handling  400MB+ Uberjar with 800+
containers ?

Any specific params to tune?

Thanks.

Because downloading the UberJar takes really   long time, after around 15
minutes since the job kicked, facing this exception:

org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request
to start container.
This token is expired. current time is 1567116179193 found 1567116001610
Note: System times on machines may be out of sync. Check system time
and time zones.
        at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
        at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
        at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
        at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Reply | Threaded
Open this post in threaded view
|

Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

SHI Xiaogang
Hi Datashov,

We faced similar problems in our production clusters.

Now both lauching and stopping of containers are performed in the main
thread of YarnResourceManager. As containers are launched and stopped one
after another, it usually takes long time to boostrap large jobs. Things
get worse when some node managers get lost. Yarn will retry many times to
communicate with them, leading to heartbeat timeout of TaskManagers.

Following are some efforts we made to help Flink deal with large jobs.

1. We provision some common jars in all cluster nodes and ask our users not
to include these jars in their uberjar. When containers bootstrap, these
jars are added to the classpath via JVM options. That way, we can
efficiently reduce the size of uberjars.

2. We deploys some asynchronous threads to launch and stop containers in
YarnResourceManager. The bootstrap time can be efficiently  reduced when
launching a large amount of containers. We'd like to contribute it to the
community very soon.

3. We deploys a timeout timer for each launching container. If a task
manager does not register in time after its container has been launched, a
new container will be allocated and launched. That will lead to certain
waste of resources, but can reduce the effects caused by slow or
problematic nodes.

Now the community is considering the refactoring of ResourceManager. I
think it will be the time for improving its efficiency.

Regards,
Xiaogang

Elkhan Dadashov <[hidden email]> 于2019年8月30日周五 上午7:10写道:

> Dear Flink developers,
>
> Having  difficulty of getting  a Flink job started.
>
> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
> containers.
>
> The default HDFS replication is 3.
>
> *The Yarn queue is empty, and 800 containers  are allocated
> almost immediately  by Yarn  RM.*
>
> It takes very long time until all 800 nodes (node managers) will download
> Uberjar from HDFS to local machines.
>
> *Q1:*
>
> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
> size = HDFS replication size)
>
> b) Or Do Flink TM's can replicate from each other  ? or  already started
> TM's replicate  to  yet-started  nodes?
>
> Most probably answer is (a), but  want to confirm.
>
> *Q2:*
>
> What  is the recommended way of handling  400MB+ Uberjar with 800+
> containers ?
>
> Any specific params to tune?
>
> Thanks.
>
> Because downloading the UberJar takes really   long time, after around 15
> minutes since the job kicked, facing this exception:
>
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container.
> This token is expired. current time is 1567116179193 found 1567116001610
> Note: System times on machines may be out of sync. Check system time and time zones.
> at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
> at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
> at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

Till Rohrmann
For point 2. there exists already a JIRA issue [1] and a PR. I hope that we
can merge it during this release cycle.

[1] https://issues.apache.org/jira/browse/FLINK-13184

Cheers,
Till

On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang <[hidden email]> wrote:

> Hi Datashov,
>
> We faced similar problems in our production clusters.
>
> Now both lauching and stopping of containers are performed in the main
> thread of YarnResourceManager. As containers are launched and stopped one
> after another, it usually takes long time to boostrap large jobs. Things
> get worse when some node managers get lost. Yarn will retry many times to
> communicate with them, leading to heartbeat timeout of TaskManagers.
>
> Following are some efforts we made to help Flink deal with large jobs.
>
> 1. We provision some common jars in all cluster nodes and ask our users
> not to include these jars in their uberjar. When containers bootstrap,
> these jars are added to the classpath via JVM options. That way, we can
> efficiently reduce the size of uberjars.
>
> 2. We deploys some asynchronous threads to launch and stop containers in
> YarnResourceManager. The bootstrap time can be efficiently  reduced when
> launching a large amount of containers. We'd like to contribute it to the
> community very soon.
>
> 3. We deploys a timeout timer for each launching container. If a task
> manager does not register in time after its container has been launched, a
> new container will be allocated and launched. That will lead to certain
> waste of resources, but can reduce the effects caused by slow or
> problematic nodes.
>
> Now the community is considering the refactoring of ResourceManager. I
> think it will be the time for improving its efficiency.
>
> Regards,
> Xiaogang
>
> Elkhan Dadashov <[hidden email]> 于2019年8月30日周五 上午7:10写道:
>
>> Dear Flink developers,
>>
>> Having  difficulty of getting  a Flink job started.
>>
>> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
>> containers.
>>
>> The default HDFS replication is 3.
>>
>> *The Yarn queue is empty, and 800 containers  are allocated
>> almost immediately  by Yarn  RM.*
>>
>> It takes very long time until all 800 nodes (node managers) will download
>> Uberjar from HDFS to local machines.
>>
>> *Q1:*
>>
>> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
>> size = HDFS replication size)
>>
>> b) Or Do Flink TM's can replicate from each other  ? or  already started
>> TM's replicate  to  yet-started  nodes?
>>
>> Most probably answer is (a), but  want to confirm.
>>
>> *Q2:*
>>
>> What  is the recommended way of handling  400MB+ Uberjar with 800+
>> containers ?
>>
>> Any specific params to tune?
>>
>> Thanks.
>>
>> Because downloading the UberJar takes really   long time, after around 15
>> minutes since the job kicked, facing this exception:
>>
>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container.
>> This token is expired. current time is 1567116179193 found 1567116001610
>> Note: System times on machines may be out of sync. Check system time and time zones.
>> at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source)
>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>> at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>> at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

Jeff Zhang
I can think of 2 approaches:

1. Allow flink to specify the replication of the submitted uber jar.
2. Allow flink to specify config flink.yarn.lib which is all the flink
related jars that are hosted on hdfs. This way users don't need to build
and submit a fat uber jar every time. And those flink jars hosted on hdfs
can also be specify replication separately.



Till Rohrmann <[hidden email]> 于2019年8月30日周五 下午3:33写道:

> For point 2. there exists already a JIRA issue [1] and a PR. I hope that
> we can merge it during this release cycle.
>
> [1] https://issues.apache.org/jira/browse/FLINK-13184
>
> Cheers,
> Till
>
> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang <[hidden email]>
> wrote:
>
>> Hi Datashov,
>>
>> We faced similar problems in our production clusters.
>>
>> Now both lauching and stopping of containers are performed in the main
>> thread of YarnResourceManager. As containers are launched and stopped one
>> after another, it usually takes long time to boostrap large jobs. Things
>> get worse when some node managers get lost. Yarn will retry many times to
>> communicate with them, leading to heartbeat timeout of TaskManagers.
>>
>> Following are some efforts we made to help Flink deal with large jobs.
>>
>> 1. We provision some common jars in all cluster nodes and ask our users
>> not to include these jars in their uberjar. When containers bootstrap,
>> these jars are added to the classpath via JVM options. That way, we can
>> efficiently reduce the size of uberjars.
>>
>> 2. We deploys some asynchronous threads to launch and stop containers in
>> YarnResourceManager. The bootstrap time can be efficiently  reduced when
>> launching a large amount of containers. We'd like to contribute it to the
>> community very soon.
>>
>> 3. We deploys a timeout timer for each launching container. If a task
>> manager does not register in time after its container has been launched, a
>> new container will be allocated and launched. That will lead to certain
>> waste of resources, but can reduce the effects caused by slow or
>> problematic nodes.
>>
>> Now the community is considering the refactoring of ResourceManager. I
>> think it will be the time for improving its efficiency.
>>
>> Regards,
>> Xiaogang
>>
>> Elkhan Dadashov <[hidden email]> 于2019年8月30日周五 上午7:10写道:
>>
>>> Dear Flink developers,
>>>
>>> Having  difficulty of getting  a Flink job started.
>>>
>>> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
>>> containers.
>>>
>>> The default HDFS replication is 3.
>>>
>>> *The Yarn queue is empty, and 800 containers  are allocated
>>> almost immediately  by Yarn  RM.*
>>>
>>> It takes very long time until all 800 nodes (node managers) will
>>> download Uberjar from HDFS to local machines.
>>>
>>> *Q1:*
>>>
>>> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
>>> size = HDFS replication size)
>>>
>>> b) Or Do Flink TM's can replicate from each other  ? or  already
>>> started  TM's replicate  to  yet-started  nodes?
>>>
>>> Most probably answer is (a), but  want to confirm.
>>>
>>> *Q2:*
>>>
>>> What  is the recommended way of handling  400MB+ Uberjar with 800+
>>> containers ?
>>>
>>> Any specific params to tune?
>>>
>>> Thanks.
>>>
>>> Because downloading the UberJar takes really   long time, after around
>>> 15 minutes since the job kicked, facing this exception:
>>>
>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container.
>>> This token is expired. current time is 1567116179193 found 1567116001610
>>> Note: System times on machines may be out of sync. Check system time and time zones.
>>> at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source)
>>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>>> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>>> at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>>> at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>>
>>>

--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

Jörn Franke
In reply to this post by Elkhan Dadashov
Increase replication factor and/or use HDFS cache https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html
Try to reduce the size of the Jar, eg the Flink libraries do not need to be included.

> Am 30.08.2019 um 01:09 schrieb Elkhan Dadashov <[hidden email]>:
>
> Dear Flink developers,
>
> Having  difficulty of getting  a Flink job started.
>
> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+ containers.  
>
> The default HDFS replication is 3.
>
> The Yarn queue is empty, and 800 containers  are allocated  almost immediately  by Yarn  RM.
>
> It takes very long time until all 800 nodes (node managers) will download Uberjar from HDFS to local machines.
>
> Q1:
>
> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch size = HDFS replication size)
>
> b) Or Do Flink TM's can replicate from each other  ? or  already started  TM's replicate  to  yet-started  nodes?
>
> Most probably answer is (a), but  want to confirm.
>
> Q2:
>
> What  is the recommended way of handling  400MB+ Uberjar with 800+ containers ?
>
> Any specific params to tune?
>
> Thanks.
>
> Because downloading the UberJar takes really   long time, after around 15 minutes since the job kicked, facing this exception:
>
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container.
> This token is expired. current time is 1567116179193 found 1567116001610
> Note: System times on machines may be out of sync. Check system time and time zones.
> at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
> at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
> at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
Reply | Threaded
Open this post in threaded view
|

Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

Zhu Zhu
One optimization that we take is letting yarn to reuse the flink-dist jar
which was localized when running previous jobs.

Thanks,
Zhu Zhu

Jörn Franke <[hidden email]> 于2019年8月30日周五 下午4:02写道:

> Increase replication factor and/or use HDFS cache
> https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html
> Try to reduce the size of the Jar, eg the Flink libraries do not need to
> be included.
>
> Am 30.08.2019 um 01:09 schrieb Elkhan Dadashov <[hidden email]
> >:
>
> Dear Flink developers,
>
> Having  difficulty of getting  a Flink job started.
>
> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
> containers.
>
> The default HDFS replication is 3.
>
> *The Yarn queue is empty, and 800 containers  are allocated
> almost immediately  by Yarn  RM.*
>
> It takes very long time until all 800 nodes (node managers) will download
> Uberjar from HDFS to local machines.
>
> *Q1:*
>
> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
> size = HDFS replication size)
>
> b) Or Do Flink TM's can replicate from each other  ? or  already started
> TM's replicate  to  yet-started  nodes?
>
> Most probably answer is (a), but  want to confirm.
>
> *Q2:*
>
> What  is the recommended way of handling  400MB+ Uberjar with 800+
> containers ?
>
> Any specific params to tune?
>
> Thanks.
>
> Because downloading the UberJar takes really   long time, after around 15
> minutes since the job kicked, facing this exception:
>
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container.
> This token is expired. current time is 1567116179193 found 1567116001610
> Note: System times on machines may be out of sync. Check system time and time zones.
> at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
> at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
> at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

Elkhan Dadashov
In reply to this post by Jeff Zhang
Thanks  everyone for valuable input and sharing  your experience for
tackling the issue.

Regarding suggestions :
- We provision some common jars in all cluster nodes  *-->*  but this
requires dependence on Infra Team schedule for handling common jars/updating
- Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half size),
did not improve much. Only 100 containers could started in time. but then
receiving :

org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request
to start container.
This token is expired. current time is 1566422713305 found 1566422560552
Note: System times on machines may be out of sync. Check system time
and time zones.


- It would be nice to see FLINK-13184
<https://issues.apache.org/jira/browse/FLINK-13184> , but expected version
that will get in is 1.10
- Increase replication factor --> It would be nice to have Flink conf for
setting replication factor for only Fink job jars, but not the output. It
is also challenging to set a replication for yet non-existing directory,
the new files will have default replication factor. Will explore HDFS cache
option.

Maybe another option can be:
- Letting yet-to-be-started Task Managers (or NodeManagers) download the
jars from already started TaskManagers  in P2P fashion, not to have a
blocker on HDFS replication.

Spark job without any tuning exact same size jar with 800 executors, can
start without any issue at the same cluster in less than a minute.

*Further questions:*

*@ SHI Xiaogang <[hidden email] <[hidden email]>> :*

I see that all 800 requests are sent concurrently :

2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
container with resources <memory:16384, vCores:1>. Number pending requests
793.
2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
nativeMemoryInMB=0, networkMemoryInMB=0} for job
e908cb4700d5127a0b67be035e4494f7 with allocation id
AllocationID{cb016f7ce1eac1342001ccdb1427ba07}.

2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
container with resources <memory:16384, vCores:1>. Number pending requests
794.
2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
nativeMemoryInMB=0, networkMemoryInMB=0} for job
e908cb4700d5127a0b67be035e4494f7 with allocation id
AllocationID{71bbb917374ade66df4c058c41b81f4e}.
...

Can you please elaborate the part  "As containers are launched and stopped
one after another" ? Any pointer to class/method in Flink?

*@ Zhu Zhu <[hidden email] <[hidden email]>> *:

Regarding "One optimization that we take is letting yarn to reuse the
flink-dist jar which was localized when running previous jobs."

We are intending to use Flink Real-time pipeline for Replay from Hive/HDFS
(from offline source), to have 1 single pipeline for both batch and
real-time. So for batch Flink job, the containers will be released once the
job is done.
I guess your job is real-time flink, so  you can share the  jars from
already long-running jobs.

Thanks.


On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang <[hidden email]> wrote:

> I can think of 2 approaches:
>
> 1. Allow flink to specify the replication of the submitted uber jar.
> 2. Allow flink to specify config flink.yarn.lib which is all the flink
> related jars that are hosted on hdfs. This way users don't need to build
> and submit a fat uber jar every time. And those flink jars hosted on hdfs
> can also be specify replication separately.
>
>
>
> Till Rohrmann <[hidden email]> 于2019年8月30日周五 下午3:33写道:
>
>> For point 2. there exists already a JIRA issue [1] and a PR. I hope that
>> we can merge it during this release cycle.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-13184
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang <[hidden email]>
>> wrote:
>>
>>> Hi Datashov,
>>>
>>> We faced similar problems in our production clusters.
>>>
>>> Now both lauching and stopping of containers are performed in the main
>>> thread of YarnResourceManager. As containers are launched and stopped one
>>> after another, it usually takes long time to boostrap large jobs. Things
>>> get worse when some node managers get lost. Yarn will retry many times to
>>> communicate with them, leading to heartbeat timeout of TaskManagers.
>>>
>>> Following are some efforts we made to help Flink deal with large jobs.
>>>
>>> 1. We provision some common jars in all cluster nodes and ask our users
>>> not to include these jars in their uberjar. When containers bootstrap,
>>> these jars are added to the classpath via JVM options. That way, we can
>>> efficiently reduce the size of uberjars.
>>>
>>> 2. We deploys some asynchronous threads to launch and stop containers in
>>> YarnResourceManager. The bootstrap time can be efficiently  reduced when
>>> launching a large amount of containers. We'd like to contribute it to the
>>> community very soon.
>>>
>>> 3. We deploys a timeout timer for each launching container. If a task
>>> manager does not register in time after its container has been launched, a
>>> new container will be allocated and launched. That will lead to certain
>>> waste of resources, but can reduce the effects caused by slow or
>>> problematic nodes.
>>>
>>> Now the community is considering the refactoring of ResourceManager. I
>>> think it will be the time for improving its efficiency.
>>>
>>> Regards,
>>> Xiaogang
>>>
>>> Elkhan Dadashov <[hidden email]> 于2019年8月30日周五 上午7:10写道:
>>>
>>>> Dear Flink developers,
>>>>
>>>> Having  difficulty of getting  a Flink job started.
>>>>
>>>> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
>>>> containers.
>>>>
>>>> The default HDFS replication is 3.
>>>>
>>>> *The Yarn queue is empty, and 800 containers  are allocated
>>>> almost immediately  by Yarn  RM.*
>>>>
>>>> It takes very long time until all 800 nodes (node managers) will
>>>> download Uberjar from HDFS to local machines.
>>>>
>>>> *Q1:*
>>>>
>>>> a)  Do all those 800 nodes download of batch of  3  at a time  ? (
>>>> batch size = HDFS replication size)
>>>>
>>>> b) Or Do Flink TM's can replicate from each other  ? or  already
>>>> started  TM's replicate  to  yet-started  nodes?
>>>>
>>>> Most probably answer is (a), but  want to confirm.
>>>>
>>>> *Q2:*
>>>>
>>>> What  is the recommended way of handling  400MB+ Uberjar with 800+
>>>> containers ?
>>>>
>>>> Any specific params to tune?
>>>>
>>>> Thanks.
>>>>
>>>> Because downloading the UberJar takes really   long time, after around
>>>> 15 minutes since the job kicked, facing this exception:
>>>>
>>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container.
>>>> This token is expired. current time is 1567116179193 found 1567116001610
>>>> Note: System times on machines may be out of sync. Check system time and time zones.
>>>> at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source)
>>>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>>>> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>>>> at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>>>> at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>>
>>>>
>>>>
>
> --
> Best Regards
>
> Jeff Zhang
>
Reply | Threaded
Open this post in threaded view
|

Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

SHI Xiaogang
Hi Dadashov,

You may have a look at method YarnResourceManager#onContainersAllocated
which will launch containers (via NMClient#startContainer) after containers
are allocated.
The launching is performed in the main thread of YarnResourceManager and
the launching is synchronous/blocking. Consequently, the containers will be
launched one by one.

Regards,
Xiaogang

Elkhan Dadashov <[hidden email]> 于2019年8月31日周六 上午2:37写道:

> Thanks  everyone for valuable input and sharing  your experience for
> tackling the issue.
>
> Regarding suggestions :
> - We provision some common jars in all cluster nodes  *-->*  but this
> requires dependence on Infra Team schedule for handling common jars/updating
> - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half
> size),  did not improve much. Only 100 containers could started in time.
> but then receiving :
>
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container.
> This token is expired. current time is 1566422713305 found 1566422560552
> Note: System times on machines may be out of sync. Check system time and time zones.
>
>
> - It would be nice to see FLINK-13184
> <https://issues.apache.org/jira/browse/FLINK-13184> , but expected
> version that will get in is 1.10
> - Increase replication factor --> It would be nice to have Flink conf for
> setting replication factor for only Fink job jars, but not the output. It
> is also challenging to set a replication for yet non-existing directory,
> the new files will have default replication factor. Will explore HDFS cache
> option.
>
> Maybe another option can be:
> - Letting yet-to-be-started Task Managers (or NodeManagers) download the
> jars from already started TaskManagers  in P2P fashion, not to have a
> blocker on HDFS replication.
>
> Spark job without any tuning exact same size jar with 800 executors, can
> start without any issue at the same cluster in less than a minute.
>
> *Further questions:*
>
> *@ SHI Xiaogang <[hidden email] <[hidden email]>> :*
>
> I see that all 800 requests are sent concurrently :
>
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
> container with resources <memory:16384, vCores:1>. Number pending requests
> 793.
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> e908cb4700d5127a0b67be035e4494f7 with allocation id
> AllocationID{cb016f7ce1eac1342001ccdb1427ba07}.
>
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
> container with resources <memory:16384, vCores:1>. Number pending requests
> 794.
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> e908cb4700d5127a0b67be035e4494f7 with allocation id
> AllocationID{71bbb917374ade66df4c058c41b81f4e}.
> ...
>
> Can you please elaborate the part  "As containers are launched and stopped
> one after another" ? Any pointer to class/method in Flink?
>
> *@ Zhu Zhu <[hidden email] <[hidden email]>> *:
>
> Regarding "One optimization that we take is letting yarn to reuse the
> flink-dist jar which was localized when running previous jobs."
>
> We are intending to use Flink Real-time pipeline for Replay from Hive/HDFS
> (from offline source), to have 1 single pipeline for both batch and
> real-time. So for batch Flink job, the containers will be released once the
> job is done.
> I guess your job is real-time flink, so  you can share the  jars from
> already long-running jobs.
>
> Thanks.
>
>
> On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang <[hidden email]> wrote:
>
>> I can think of 2 approaches:
>>
>> 1. Allow flink to specify the replication of the submitted uber jar.
>> 2. Allow flink to specify config flink.yarn.lib which is all the flink
>> related jars that are hosted on hdfs. This way users don't need to build
>> and submit a fat uber jar every time. And those flink jars hosted on hdfs
>> can also be specify replication separately.
>>
>>
>>
>> Till Rohrmann <[hidden email]> 于2019年8月30日周五 下午3:33写道:
>>
>>> For point 2. there exists already a JIRA issue [1] and a PR. I hope that
>>> we can merge it during this release cycle.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-13184
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang <[hidden email]>
>>> wrote:
>>>
>>>> Hi Datashov,
>>>>
>>>> We faced similar problems in our production clusters.
>>>>
>>>> Now both lauching and stopping of containers are performed in the main
>>>> thread of YarnResourceManager. As containers are launched and stopped one
>>>> after another, it usually takes long time to boostrap large jobs. Things
>>>> get worse when some node managers get lost. Yarn will retry many times to
>>>> communicate with them, leading to heartbeat timeout of TaskManagers.
>>>>
>>>> Following are some efforts we made to help Flink deal with large jobs.
>>>>
>>>> 1. We provision some common jars in all cluster nodes and ask our users
>>>> not to include these jars in their uberjar. When containers bootstrap,
>>>> these jars are added to the classpath via JVM options. That way, we can
>>>> efficiently reduce the size of uberjars.
>>>>
>>>> 2. We deploys some asynchronous threads to launch and stop containers
>>>> in YarnResourceManager. The bootstrap time can be efficiently  reduced when
>>>> launching a large amount of containers. We'd like to contribute it to the
>>>> community very soon.
>>>>
>>>> 3. We deploys a timeout timer for each launching container. If a task
>>>> manager does not register in time after its container has been launched, a
>>>> new container will be allocated and launched. That will lead to certain
>>>> waste of resources, but can reduce the effects caused by slow or
>>>> problematic nodes.
>>>>
>>>> Now the community is considering the refactoring of ResourceManager. I
>>>> think it will be the time for improving its efficiency.
>>>>
>>>> Regards,
>>>> Xiaogang
>>>>
>>>> Elkhan Dadashov <[hidden email]> 于2019年8月30日周五 上午7:10写道:
>>>>
>>>>> Dear Flink developers,
>>>>>
>>>>> Having  difficulty of getting  a Flink job started.
>>>>>
>>>>> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
>>>>> containers.
>>>>>
>>>>> The default HDFS replication is 3.
>>>>>
>>>>> *The Yarn queue is empty, and 800 containers  are allocated
>>>>> almost immediately  by Yarn  RM.*
>>>>>
>>>>> It takes very long time until all 800 nodes (node managers) will
>>>>> download Uberjar from HDFS to local machines.
>>>>>
>>>>> *Q1:*
>>>>>
>>>>> a)  Do all those 800 nodes download of batch of  3  at a time  ? (
>>>>> batch size = HDFS replication size)
>>>>>
>>>>> b) Or Do Flink TM's can replicate from each other  ? or  already
>>>>> started  TM's replicate  to  yet-started  nodes?
>>>>>
>>>>> Most probably answer is (a), but  want to confirm.
>>>>>
>>>>> *Q2:*
>>>>>
>>>>> What  is the recommended way of handling  400MB+ Uberjar with 800+
>>>>> containers ?
>>>>>
>>>>> Any specific params to tune?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> Because downloading the UberJar takes really   long time, after around
>>>>> 15 minutes since the job kicked, facing this exception:
>>>>>
>>>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container.
>>>>> This token is expired. current time is 1567116179193 found 1567116001610
>>>>> Note: System times on machines may be out of sync. Check system time and time zones.
>>>>> at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source)
>>>>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>>>>> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>>>>> at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>>>>> at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>
>>>>>
>>>>>
>>>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

Zhu Zhu
Hi Elkhan,

>>Regarding "One optimization that we take is letting yarn to reuse the
flink-dist jar which was localized when running previous jobs."
>>We are intending to use Flink Real-time pipeline for Replay from
Hive/HDFS (from offline source), to have 1 single pipeline for both batch
and real-time. So for batch Flink job, the ?>>containers will be released
once the job is done.
>>I guess your job is real-time flink, so  you can share the  jars from
already long-running jobs.

This optimization is conducted by making flink dist jar a public
distributed cache of YARN.
In this way, the localized dist jar can be shared by different YARN
applications and it will not be removed when the YARN application which
localized it terminates.
This requires some changes in Flink though.
We will open a ISSUE to contribute this optimization to the community.

Thanks,
Zhu Zhu

SHI Xiaogang <[hidden email]> 于2019年8月31日周六 下午12:57写道:

> Hi Dadashov,
>
> You may have a look at method YarnResourceManager#onContainersAllocated
> which will launch containers (via NMClient#startContainer) after containers
> are allocated.
> The launching is performed in the main thread of YarnResourceManager and
> the launching is synchronous/blocking. Consequently, the containers will be
> launched one by one.
>
> Regards,
> Xiaogang
>
> Elkhan Dadashov <[hidden email]> 于2019年8月31日周六 上午2:37写道:
>
>> Thanks  everyone for valuable input and sharing  your experience for
>> tackling the issue.
>>
>> Regarding suggestions :
>> - We provision some common jars in all cluster nodes  *-->*  but this
>> requires dependence on Infra Team schedule for handling common jars/updating
>> - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half
>> size),  did not improve much. Only 100 containers could started in time.
>> but then receiving :
>>
>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container.
>> This token is expired. current time is 1566422713305 found 1566422560552
>> Note: System times on machines may be out of sync. Check system time and time zones.
>>
>>
>> - It would be nice to see FLINK-13184
>> <https://issues.apache.org/jira/browse/FLINK-13184> , but expected
>> version that will get in is 1.10
>> - Increase replication factor --> It would be nice to have Flink conf for
>> setting replication factor for only Fink job jars, but not the output. It
>> is also challenging to set a replication for yet non-existing directory,
>> the new files will have default replication factor. Will explore HDFS cache
>> option.
>>
>> Maybe another option can be:
>> - Letting yet-to-be-started Task Managers (or NodeManagers) download the
>> jars from already started TaskManagers  in P2P fashion, not to have a
>> blocker on HDFS replication.
>>
>> Spark job without any tuning exact same size jar with 800 executors, can
>> start without any issue at the same cluster in less than a minute.
>>
>> *Further questions:*
>>
>> *@ SHI Xiaogang <[hidden email] <[hidden email]>> :*
>>
>> I see that all 800 requests are sent concurrently :
>>
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
>> container with resources <memory:16384, vCores:1>. Number pending requests
>> 793.
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
>> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
>> nativeMemoryInMB=0, networkMemoryInMB=0} for job
>> e908cb4700d5127a0b67be035e4494f7 with allocation id
>> AllocationID{cb016f7ce1eac1342001ccdb1427ba07}.
>>
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
>> container with resources <memory:16384, vCores:1>. Number pending requests
>> 794.
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
>> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
>> nativeMemoryInMB=0, networkMemoryInMB=0} for job
>> e908cb4700d5127a0b67be035e4494f7 with allocation id
>> AllocationID{71bbb917374ade66df4c058c41b81f4e}.
>> ...
>>
>> Can you please elaborate the part  "As containers are launched and
>> stopped one after another" ? Any pointer to class/method in Flink?
>>
>> *@ Zhu Zhu <[hidden email] <[hidden email]>> *:
>>
>> Regarding "One optimization that we take is letting yarn to reuse the
>> flink-dist jar which was localized when running previous jobs."
>>
>> We are intending to use Flink Real-time pipeline for Replay from
>> Hive/HDFS (from offline source), to have 1 single pipeline for both batch
>> and real-time. So for batch Flink job, the containers will be released once
>> the job is done.
>> I guess your job is real-time flink, so  you can share the  jars from
>> already long-running jobs.
>>
>> Thanks.
>>
>>
>> On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang <[hidden email]> wrote:
>>
>>> I can think of 2 approaches:
>>>
>>> 1. Allow flink to specify the replication of the submitted uber jar.
>>> 2. Allow flink to specify config flink.yarn.lib which is all the flink
>>> related jars that are hosted on hdfs. This way users don't need to build
>>> and submit a fat uber jar every time. And those flink jars hosted on hdfs
>>> can also be specify replication separately.
>>>
>>>
>>>
>>> Till Rohrmann <[hidden email]> 于2019年8月30日周五 下午3:33写道:
>>>
>>>> For point 2. there exists already a JIRA issue [1] and a PR. I hope
>>>> that we can merge it during this release cycle.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-13184
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang <[hidden email]>
>>>> wrote:
>>>>
>>>>> Hi Datashov,
>>>>>
>>>>> We faced similar problems in our production clusters.
>>>>>
>>>>> Now both lauching and stopping of containers are performed in the main
>>>>> thread of YarnResourceManager. As containers are launched and stopped one
>>>>> after another, it usually takes long time to boostrap large jobs. Things
>>>>> get worse when some node managers get lost. Yarn will retry many times to
>>>>> communicate with them, leading to heartbeat timeout of TaskManagers.
>>>>>
>>>>> Following are some efforts we made to help Flink deal with large jobs.
>>>>>
>>>>> 1. We provision some common jars in all cluster nodes and ask our
>>>>> users not to include these jars in their uberjar. When containers
>>>>> bootstrap, these jars are added to the classpath via JVM options. That way,
>>>>> we can efficiently reduce the size of uberjars.
>>>>>
>>>>> 2. We deploys some asynchronous threads to launch and stop containers
>>>>> in YarnResourceManager. The bootstrap time can be efficiently  reduced when
>>>>> launching a large amount of containers. We'd like to contribute it to the
>>>>> community very soon.
>>>>>
>>>>> 3. We deploys a timeout timer for each launching container. If a task
>>>>> manager does not register in time after its container has been launched, a
>>>>> new container will be allocated and launched. That will lead to certain
>>>>> waste of resources, but can reduce the effects caused by slow or
>>>>> problematic nodes.
>>>>>
>>>>> Now the community is considering the refactoring of ResourceManager. I
>>>>> think it will be the time for improving its efficiency.
>>>>>
>>>>> Regards,
>>>>> Xiaogang
>>>>>
>>>>> Elkhan Dadashov <[hidden email]> 于2019年8月30日周五 上午7:10写道:
>>>>>
>>>>>> Dear Flink developers,
>>>>>>
>>>>>> Having  difficulty of getting  a Flink job started.
>>>>>>
>>>>>> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
>>>>>> containers.
>>>>>>
>>>>>> The default HDFS replication is 3.
>>>>>>
>>>>>> *The Yarn queue is empty, and 800 containers  are allocated
>>>>>> almost immediately  by Yarn  RM.*
>>>>>>
>>>>>> It takes very long time until all 800 nodes (node managers) will
>>>>>> download Uberjar from HDFS to local machines.
>>>>>>
>>>>>> *Q1:*
>>>>>>
>>>>>> a)  Do all those 800 nodes download of batch of  3  at a time  ? (
>>>>>> batch size = HDFS replication size)
>>>>>>
>>>>>> b) Or Do Flink TM's can replicate from each other  ? or  already
>>>>>> started  TM's replicate  to  yet-started  nodes?
>>>>>>
>>>>>> Most probably answer is (a), but  want to confirm.
>>>>>>
>>>>>> *Q2:*
>>>>>>
>>>>>> What  is the recommended way of handling  400MB+ Uberjar with 800+
>>>>>> containers ?
>>>>>>
>>>>>> Any specific params to tune?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> Because downloading the UberJar takes really   long time, after
>>>>>> around 15 minutes since the job kicked, facing this exception:
>>>>>>
>>>>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container.
>>>>>> This token is expired. current time is 1567116179193 found 1567116001610
>>>>>> Note: System times on machines may be out of sync. Check system time and time zones.
>>>>>> at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source)
>>>>>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>>>>>> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>>>>>> at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>>>>>> at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>>>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

Yang Wang
Hi Dadashov,


Regarding your questions.


> Q1 Do all those 800 nodes download of batch of  3  at a time

The 800+ containers will be allocated on different yarn nodes. By default,
the LocalResourceVisibility is APPLICATION, so they will be downloaded only
once and shared for all taskmanager containers of a same application in the
same node. And the batch is not 3. Even the replica of your jars is 3(hdfs
blocks located on 3 different datanodes), a datanode could serve multiple
downloads. The limit is bandwidth of the datanode. I guess the bandwidth of
your hdfs datanode is not very good.So increase the replica of fat jar will
help to reduce the downloading time. And a JIRA ticket has been created.[1]


> Q2 What is the recommended way of handling 400MB+ Uberjar with 800+
containers ?

From our online production experience, there are at least 3 optimization
ways.

   1. Increase the replica of jars in the yarn distributed cache.[1]
   2. Increase the container launch number or use NMClientAsync so that the
   allocated containers could be started asap. Even the startContainer in yarn
   nodemanager is asynchronous, launching container in
   FlinkYarnResourceManager is a blocking call. We have to start containers
   one by one.[2]
   3. Use yarn public cache to eliminate unnecessary jar downloading. Such
   as flink-dist.jar, it will not have to been uploaded ant then localized for
   each application.[3]


Unfortunately, the three features above are under developing. As a work
around, you could set dfs.replication=10 in the hdfs-site.xml of
HADOOP_CONF_DIR in the flink client machine.



[1].https://issues.apache.org/jira/browse/FLINK-12343

[2].https://issues.apache.org/jira/browse/FLINK-13184

[3].https://issues.apache.org/jira/browse/FLINK-13938



Best,

Yang

Zhu Zhu <[hidden email]> 于2019年9月2日周一 上午10:42写道:

> Hi Elkhan,
>
> >>Regarding "One optimization that we take is letting yarn to reuse the
> flink-dist jar which was localized when running previous jobs."
> >>We are intending to use Flink Real-time pipeline for Replay from
> Hive/HDFS (from offline source), to have 1 single pipeline for both batch
> and real-time. So for batch Flink job, the ?>>containers will be released
> once the job is done.
> >>I guess your job is real-time flink, so  you can share the  jars from
> already long-running jobs.
>
> This optimization is conducted by making flink dist jar a public
> distributed cache of YARN.
> In this way, the localized dist jar can be shared by different YARN
> applications and it will not be removed when the YARN application which
> localized it terminates.
> This requires some changes in Flink though.
> We will open a ISSUE to contribute this optimization to the community.
>
> Thanks,
> Zhu Zhu
>
> SHI Xiaogang <[hidden email]> 于2019年8月31日周六 下午12:57写道:
>
>> Hi Dadashov,
>>
>> You may have a look at method YarnResourceManager#onContainersAllocated
>> which will launch containers (via NMClient#startContainer) after containers
>> are allocated.
>> The launching is performed in the main thread of YarnResourceManager and
>> the launching is synchronous/blocking. Consequently, the containers will be
>> launched one by one.
>>
>> Regards,
>> Xiaogang
>>
>> Elkhan Dadashov <[hidden email]> 于2019年8月31日周六 上午2:37写道:
>>
>>> Thanks  everyone for valuable input and sharing  your experience for
>>> tackling the issue.
>>>
>>> Regarding suggestions :
>>> - We provision some common jars in all cluster nodes  *-->*  but this
>>> requires dependence on Infra Team schedule for handling common jars/updating
>>> - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half
>>> size),  did not improve much. Only 100 containers could started in time.
>>> but then receiving :
>>>
>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container.
>>> This token is expired. current time is 1566422713305 found 1566422560552
>>> Note: System times on machines may be out of sync. Check system time and time zones.
>>>
>>>
>>> - It would be nice to see FLINK-13184
>>> <https://issues.apache.org/jira/browse/FLINK-13184> , but expected
>>> version that will get in is 1.10
>>> - Increase replication factor --> It would be nice to have Flink conf
>>> for setting replication factor for only Fink job jars, but not the output.
>>> It is also challenging to set a replication for yet non-existing directory,
>>> the new files will have default replication factor. Will explore HDFS cache
>>> option.
>>>
>>> Maybe another option can be:
>>> - Letting yet-to-be-started Task Managers (or NodeManagers) download the
>>> jars from already started TaskManagers  in P2P fashion, not to have a
>>> blocker on HDFS replication.
>>>
>>> Spark job without any tuning exact same size jar with 800 executors, can
>>> start without any issue at the same cluster in less than a minute.
>>>
>>> *Further questions:*
>>>
>>> *@ SHI Xiaogang <[hidden email] <[hidden email]>> :*
>>>
>>> I see that all 800 requests are sent concurrently :
>>>
>>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
>>> container with resources <memory:16384, vCores:1>. Number pending requests
>>> 793.
>>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
>>> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
>>> nativeMemoryInMB=0, networkMemoryInMB=0} for job
>>> e908cb4700d5127a0b67be035e4494f7 with allocation id
>>> AllocationID{cb016f7ce1eac1342001ccdb1427ba07}.
>>>
>>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
>>> container with resources <memory:16384, vCores:1>. Number pending requests
>>> 794.
>>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
>>> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
>>> nativeMemoryInMB=0, networkMemoryInMB=0} for job
>>> e908cb4700d5127a0b67be035e4494f7 with allocation id
>>> AllocationID{71bbb917374ade66df4c058c41b81f4e}.
>>> ...
>>>
>>> Can you please elaborate the part  "As containers are launched and
>>> stopped one after another" ? Any pointer to class/method in Flink?
>>>
>>> *@ Zhu Zhu <[hidden email] <[hidden email]>> *:
>>>
>>> Regarding "One optimization that we take is letting yarn to reuse the
>>> flink-dist jar which was localized when running previous jobs."
>>>
>>> We are intending to use Flink Real-time pipeline for Replay from
>>> Hive/HDFS (from offline source), to have 1 single pipeline for both batch
>>> and real-time. So for batch Flink job, the containers will be released once
>>> the job is done.
>>> I guess your job is real-time flink, so  you can share the  jars from
>>> already long-running jobs.
>>>
>>> Thanks.
>>>
>>>
>>> On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang <[hidden email]> wrote:
>>>
>>>> I can think of 2 approaches:
>>>>
>>>> 1. Allow flink to specify the replication of the submitted uber jar.
>>>> 2. Allow flink to specify config flink.yarn.lib which is all the flink
>>>> related jars that are hosted on hdfs. This way users don't need to build
>>>> and submit a fat uber jar every time. And those flink jars hosted on hdfs
>>>> can also be specify replication separately.
>>>>
>>>>
>>>>
>>>> Till Rohrmann <[hidden email]> 于2019年8月30日周五 下午3:33写道:
>>>>
>>>>> For point 2. there exists already a JIRA issue [1] and a PR. I hope
>>>>> that we can merge it during this release cycle.
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-13184
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi Datashov,
>>>>>>
>>>>>> We faced similar problems in our production clusters.
>>>>>>
>>>>>> Now both lauching and stopping of containers are performed in the
>>>>>> main thread of YarnResourceManager. As containers are launched and stopped
>>>>>> one after another, it usually takes long time to boostrap large jobs.
>>>>>> Things get worse when some node managers get lost. Yarn will retry many
>>>>>> times to communicate with them, leading to heartbeat timeout of
>>>>>> TaskManagers.
>>>>>>
>>>>>> Following are some efforts we made to help Flink deal with large jobs.
>>>>>>
>>>>>> 1. We provision some common jars in all cluster nodes and ask our
>>>>>> users not to include these jars in their uberjar. When containers
>>>>>> bootstrap, these jars are added to the classpath via JVM options. That way,
>>>>>> we can efficiently reduce the size of uberjars.
>>>>>>
>>>>>> 2. We deploys some asynchronous threads to launch and stop containers
>>>>>> in YarnResourceManager. The bootstrap time can be efficiently  reduced when
>>>>>> launching a large amount of containers. We'd like to contribute it to the
>>>>>> community very soon.
>>>>>>
>>>>>> 3. We deploys a timeout timer for each launching container. If a task
>>>>>> manager does not register in time after its container has been launched, a
>>>>>> new container will be allocated and launched. That will lead to certain
>>>>>> waste of resources, but can reduce the effects caused by slow or
>>>>>> problematic nodes.
>>>>>>
>>>>>> Now the community is considering the refactoring of ResourceManager.
>>>>>> I think it will be the time for improving its efficiency.
>>>>>>
>>>>>> Regards,
>>>>>> Xiaogang
>>>>>>
>>>>>> Elkhan Dadashov <[hidden email]> 于2019年8月30日周五 上午7:10写道:
>>>>>>
>>>>>>> Dear Flink developers,
>>>>>>>
>>>>>>> Having  difficulty of getting  a Flink job started.
>>>>>>>
>>>>>>> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
>>>>>>> containers.
>>>>>>>
>>>>>>> The default HDFS replication is 3.
>>>>>>>
>>>>>>> *The Yarn queue is empty, and 800 containers  are allocated
>>>>>>> almost immediately  by Yarn  RM.*
>>>>>>>
>>>>>>> It takes very long time until all 800 nodes (node managers) will
>>>>>>> download Uberjar from HDFS to local machines.
>>>>>>>
>>>>>>> *Q1:*
>>>>>>>
>>>>>>> a)  Do all those 800 nodes download of batch of  3  at a time  ? (
>>>>>>> batch size = HDFS replication size)
>>>>>>>
>>>>>>> b) Or Do Flink TM's can replicate from each other  ? or  already
>>>>>>> started  TM's replicate  to  yet-started  nodes?
>>>>>>>
>>>>>>> Most probably answer is (a), but  want to confirm.
>>>>>>>
>>>>>>> *Q2:*
>>>>>>>
>>>>>>> What  is the recommended way of handling  400MB+ Uberjar with 800+
>>>>>>> containers ?
>>>>>>>
>>>>>>> Any specific params to tune?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> Because downloading the UberJar takes really   long time, after
>>>>>>> around 15 minutes since the job kicked, facing this exception:
>>>>>>>
>>>>>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container.
>>>>>>> This token is expired. current time is 1567116179193 found 1567116001610
>>>>>>> Note: System times on machines may be out of sync. Check system time and time zones.
>>>>>>> at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source)
>>>>>>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>>> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>>>>>>> at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>>>>>>> at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>>>>>>> at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>>>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>>>>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>