Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0

DEEP NARAYAN Singh
Hi Guys,

I’m struggling while initiating the task manager with flink 1.11.0 in AWS
EMR but with older versions it is not. Let me put the full context here.

*When using Flink 1.9.1 and EMR 5.29.0*

To create a long running session, we used the below command.

*sudo flink-yarn-session -n <Number of TM> -s <Number of slot> -jm <memory>
-tm <memory> -d*

and followed by below command to run the final job.

*flink run -m yarn-cluster -yid <flink sessionId> -yn <Number of TM> -ys
<Number of slot> -yjm <memory> -ytm <memory> -c <ClassName> <Jar Path>*

and if “n” is 6 then it is used to create 6 task managers to start the job,
so whatever “n” is configured the result was that number of TM the job is
being started.

But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and
EMR 6.1.0*) we are unable to achieve the desired values for TM.

Please find the session Ids of new configuration,

*sudo flink-yarn-session -Djobmanager.memory.process.size=<Memory in GB>
-Dtaskmanager.memory.process.size=<Memory in GB> -n <no of TM> -s <No of
slot/core> -d*

And the final Job command

*flink run -m yarn-cluster -yid <Flink sessionId> -c <ClassName> <Jar Path>*

I have tried a lot of combinations, but nothing worked out so far. I
request your help in this regard as the plan to have this configuration in
*PRODUCTION* soon.

Thanks in advance.


Regards,

-Deep
Reply | Threaded
Open this post in threaded view
|

Re: Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0

Till Rohrmann
Hi Deep,

Flink has dropped support for specifying the number of TMs via -n since the
introduction of Flip-6. Since then, Flink will automatically start TMs
depending on the required resources. Hence, there is no need to specify the
-n parameter anymore. Instead, you should specify the parallelism with
which you would like to run your job via the -p option.

Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to
limit the upper limit of slots a cluster is allowed to allocate [1].

[1] https://issues.apache.org/jira/browse/FLINK-16605

Cheers,
Till

On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh <[hidden email]>
wrote:

> Hi Guys,
>
> I’m struggling while initiating the task manager with flink 1.11.0 in AWS
> EMR but with older versions it is not. Let me put the full context here.
>
> *When using Flink 1.9.1 and EMR 5.29.0*
>
> To create a long running session, we used the below command.
>
> *sudo flink-yarn-session -n <Number of TM> -s <Number of slot> -jm <memory>
> -tm <memory> -d*
>
> and followed by below command to run the final job.
>
> *flink run -m yarn-cluster -yid <flink sessionId> -yn <Number of TM> -ys
> <Number of slot> -yjm <memory> -ytm <memory> -c <ClassName> <Jar Path>*
>
> and if “n” is 6 then it is used to create 6 task managers to start the job,
> so whatever “n” is configured the result was that number of TM the job is
> being started.
>
> But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and
> EMR 6.1.0*) we are unable to achieve the desired values for TM.
>
> Please find the session Ids of new configuration,
>
> *sudo flink-yarn-session -Djobmanager.memory.process.size=<Memory in GB>
> -Dtaskmanager.memory.process.size=<Memory in GB> -n <no of TM> -s <No of
> slot/core> -d*
>
> And the final Job command
>
> *flink run -m yarn-cluster -yid <Flink sessionId> -c <ClassName> <Jar
> Path>*
>
> I have tried a lot of combinations, but nothing worked out so far. I
> request your help in this regard as the plan to have this configuration in
> *PRODUCTION* soon.
>
> Thanks in advance.
>
>
> Regards,
>
> -Deep
>
Reply | Threaded
Open this post in threaded view
|

Re: Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0

DEEP NARAYAN Singh
Thanks Till, for the detailed explanation.I  tried and it is working fine.

Once again thanks for your quick response.

Regards,
-Deep

On Mon, 4 Jan, 2021, 2:20 PM Till Rohrmann, <[hidden email]> wrote:

> Hi Deep,
>
> Flink has dropped support for specifying the number of TMs via -n since the
> introduction of Flip-6. Since then, Flink will automatically start TMs
> depending on the required resources. Hence, there is no need to specify the
> -n parameter anymore. Instead, you should specify the parallelism with
> which you would like to run your job via the -p option.
>
> Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to
> limit the upper limit of slots a cluster is allowed to allocate [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-16605
>
> Cheers,
> Till
>
> On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh <[hidden email]>
> wrote:
>
> > Hi Guys,
> >
> > I’m struggling while initiating the task manager with flink 1.11.0 in AWS
> > EMR but with older versions it is not. Let me put the full context here.
> >
> > *When using Flink 1.9.1 and EMR 5.29.0*
> >
> > To create a long running session, we used the below command.
> >
> > *sudo flink-yarn-session -n <Number of TM> -s <Number of slot> -jm
> <memory>
> > -tm <memory> -d*
> >
> > and followed by below command to run the final job.
> >
> > *flink run -m yarn-cluster -yid <flink sessionId> -yn <Number of TM> -ys
> > <Number of slot> -yjm <memory> -ytm <memory> -c <ClassName> <Jar Path>*
> >
> > and if “n” is 6 then it is used to create 6 task managers to start the
> job,
> > so whatever “n” is configured the result was that number of TM the job is
> > being started.
> >
> > But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and
> > EMR 6.1.0*) we are unable to achieve the desired values for TM.
> >
> > Please find the session Ids of new configuration,
> >
> > *sudo flink-yarn-session -Djobmanager.memory.process.size=<Memory in GB>
> > -Dtaskmanager.memory.process.size=<Memory in GB> -n <no of TM> -s <No of
> > slot/core> -d*
> >
> > And the final Job command
> >
> > *flink run -m yarn-cluster -yid <Flink sessionId> -c <ClassName> <Jar
> > Path>*
> >
> > I have tried a lot of combinations, but nothing worked out so far. I
> > request your help in this regard as the plan to have this configuration
> in
> > *PRODUCTION* soon.
> >
> > Thanks in advance.
> >
> >
> > Regards,
> >
> > -Deep
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0

DEEP NARAYAN Singh
Hi Till,

Thanks for your support on task manager, in continuation to above email
when we increased the TM and JM memory to run the job with increased
parallelism but the job is getting failed in *one day* with below exception.

Logs : *org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error:
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
complete the operation. Number of retries has been exhausted.*

New session configuration is below and parallelism we provided -*p 28* to
job to get more TM to process the heavy load.

*sudo flink-yarn-session -Djobmanager.memory.process.size=16000m
-Dtaskmanager.memory.process.size=40000m -s 14 -d*

*Note:- *
we have Retry Logic mentioned below :
environment.setRestartStrategy(RestartStrategies.failureRateRestart(5, //
max failures per interval
Time.of(30, TimeUnit.MINUTES), // time interval for measuring failure rate
Time.of(60, TimeUnit.SECONDS) // delay ));

*Below is the exception which I am getting:*

org.apache.flink.runtime.JobException: Recovery is suppressed by
FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=1800000,backoffTimeMS=60000,maxFailuresPerInterval=5)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
        at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1710)
        at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1287)
        at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1255)
        at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:954)
        at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
        at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
        at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
        at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
        at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
        at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.tryFailingAllocatedSlot(SlotPoolImpl.java:733)
        at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.failAllocation(SlotPoolImpl.java:713)
        at org.apache.flink.runtime.jobmaster.JobMaster.internalFailAllocation(JobMaster.java:562)
        at org.apache.flink.runtime.jobmaster.JobMaster.notifyAllocationFailure(JobMaster.java:700)
        at sun.reflect.GeneratedMethodAccessor99.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:517)
        at akka.actor.Actor.aroundReceive$(Actor.scala:515)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Container released on a *lost* node
        at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
        ... 22 more
2021-01-10 20:07:48,974 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Stopping checkpoint coordinator for job
a7cffc31c4aeb01356c5132c908be314.
2021-01-10 20:07:48,974 INFO
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore
 - Shutting down
2021-01-10 20:07:48,978 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Job
a7cffc31c4aeb01356c5132c908be314 reached globally terminal state
FAILED.
2021-01-10 20:07:49,006 INFO
org.apache.flink.runtime.jobmaster.JobMaster                  -
Stopping the JobMaster for job Gas Job Runner
V2(a7cffc31c4aeb01356c5132c908be314).
2021-01-10 20:07:49,006 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      -
Suspending SlotPool.
2021-01-10 20:07:49,007 INFO
org.apache.flink.runtime.jobmaster.JobMaster                  - Close
ResourceManager connection 39a33f865ba12bac16dd21b834527750:
JobManager is shutting down..
2021-01-10 20:07:49,007 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      -
Stopping SlotPool.
2021-01-10 20:07:49,007 INFO
org.apache.flink.yarn.YarnResourceManager                     -
Disconnect job manager
[hidden email]://[hidden email]:39039/user/rpc/jobmanager_2
for job a7cffc31c4aeb01356c5132c908be314 from the resource manager.


  Could you please help us here why it is failing now when we increased the
“parallelism”?


Thanks & Regards,

-Deep




On Mon, Jan 4, 2021 at 8:12 PM DEEP NARAYAN Singh <[hidden email]>
wrote:

> Thanks Till, for the detailed explanation.I  tried and it is working fine.
>
> Once again thanks for your quick response.
>
> Regards,
> -Deep
>
> On Mon, 4 Jan, 2021, 2:20 PM Till Rohrmann, <[hidden email]> wrote:
>
>> Hi Deep,
>>
>> Flink has dropped support for specifying the number of TMs via -n since
>> the
>> introduction of Flip-6. Since then, Flink will automatically start TMs
>> depending on the required resources. Hence, there is no need to specify
>> the
>> -n parameter anymore. Instead, you should specify the parallelism with
>> which you would like to run your job via the -p option.
>>
>> Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to
>> limit the upper limit of slots a cluster is allowed to allocate [1].
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-16605
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh <[hidden email]>
>> wrote:
>>
>> > Hi Guys,
>> >
>> > I’m struggling while initiating the task manager with flink 1.11.0 in
>> AWS
>> > EMR but with older versions it is not. Let me put the full context here.
>> >
>> > *When using Flink 1.9.1 and EMR 5.29.0*
>> >
>> > To create a long running session, we used the below command.
>> >
>> > *sudo flink-yarn-session -n <Number of TM> -s <Number of slot> -jm
>> <memory>
>> > -tm <memory> -d*
>> >
>> > and followed by below command to run the final job.
>> >
>> > *flink run -m yarn-cluster -yid <flink sessionId> -yn <Number of TM> -ys
>> > <Number of slot> -yjm <memory> -ytm <memory> -c <ClassName> <Jar Path>*
>> >
>> > and if “n” is 6 then it is used to create 6 task managers to start the
>> job,
>> > so whatever “n” is configured the result was that number of TM the job
>> is
>> > being started.
>> >
>> > But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and
>> > EMR 6.1.0*) we are unable to achieve the desired values for TM.
>> >
>> > Please find the session Ids of new configuration,
>> >
>> > *sudo flink-yarn-session -Djobmanager.memory.process.size=<Memory in GB>
>> > -Dtaskmanager.memory.process.size=<Memory in GB> -n <no of TM> -s <No of
>> > slot/core> -d*
>> >
>> > And the final Job command
>> >
>> > *flink run -m yarn-cluster -yid <Flink sessionId> -c <ClassName> <Jar
>> > Path>*
>> >
>> > I have tried a lot of combinations, but nothing worked out so far. I
>> > request your help in this regard as the plan to have this configuration
>> in
>> > *PRODUCTION* soon.
>> >
>> > Thanks in advance.
>> >
>> >
>> > Regards,
>> >
>> > -Deep
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0

Till Rohrmann
Hi Deep,

Could you take a look at the logs of the TM which was lost? They might
help us to debug the root cause. My guess would be that we are exceeding
some memory threshold and that the kernel kills the process.

Cheers,
Till

On Mon, Jan 11, 2021 at 1:46 PM DEEP NARAYAN Singh <[hidden email]>
wrote:

> Hi Till,
>
> Thanks for your support on task manager, in continuation to above email
> when we increased the TM and JM memory to run the job with increased
> parallelism but the job is getting failed in *one day* with below
> exception.
>
> Logs : *org.apache.flink.client.program.ProgramInvocationException: The
> main method caused an error:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Number of retries has been exhausted.*
>
> New session configuration is below and parallelism we provided -*p 28* to
> job to get more TM to process the heavy load.
>
> *sudo flink-yarn-session -Djobmanager.memory.process.size=16000m
> -Dtaskmanager.memory.process.size=40000m -s 14 -d*
>
> *Note:- *
> we have Retry Logic mentioned below :
> environment.setRestartStrategy(RestartStrategies.failureRateRestart(5, //
> max failures per interval
> Time.of(30, TimeUnit.MINUTES), // time interval for measuring failure rate
> Time.of(60, TimeUnit.SECONDS) // delay ));
>
> *Below is the exception which I am getting:*
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by
>
> FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=1800000,backoffTimeMS=60000,maxFailuresPerInterval=5)
>         at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>         at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
>         at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
>         at
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1710)
>         at
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1287)
>         at
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1255)
>         at
> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:954)
>         at
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
>         at
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
>         at
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
>         at
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
>         at
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
>         at
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.tryFailingAllocatedSlot(SlotPoolImpl.java:733)
>         at
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.failAllocation(SlotPoolImpl.java:713)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.internalFailAllocation(JobMaster.java:562)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.notifyAllocationFailure(JobMaster.java:700)
>         at sun.reflect.GeneratedMethodAccessor99.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>         at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>         at akka.japi.pf
> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>         at akka.actor.Actor.aroundReceive(Actor.scala:517)
>         at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Container released on a *lost* node
>         at
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>         ... 22 more
> 2021-01-10 20:07:48,974 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Stopping checkpoint coordinator for job
> a7cffc31c4aeb01356c5132c908be314.
> 2021-01-10 20:07:48,974 INFO
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore
>  - Shutting down
> 2021-01-10 20:07:48,978 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Job
> a7cffc31c4aeb01356c5132c908be314 reached globally terminal state
> FAILED.
> 2021-01-10 20:07:49,006 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                  -
> Stopping the JobMaster for job Gas Job Runner
> V2(a7cffc31c4aeb01356c5132c908be314).
> 2021-01-10 20:07:49,006 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      -
> Suspending SlotPool.
> 2021-01-10 20:07:49,007 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                  - Close
> ResourceManager connection 39a33f865ba12bac16dd21b834527750:
> JobManager is shutting down..
> 2021-01-10 20:07:49,007 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      -
> Stopping SlotPool.
> 2021-01-10 20:07:49,007 INFO
> org.apache.flink.yarn.YarnResourceManager                     -
> Disconnect job manager
> 00000000000000000000000000000000
> @akka.tcp://[hidden email]:39039/user/rpc/jobmanager_2
> for job a7cffc31c4aeb01356c5132c908be314 from the resource manager.
>
>
>   Could you please help us here why it is failing now when we increased the
> “parallelism”?
>
>
> Thanks & Regards,
>
> -Deep
>
>
>
>
> On Mon, Jan 4, 2021 at 8:12 PM DEEP NARAYAN Singh <[hidden email]>
> wrote:
>
> > Thanks Till, for the detailed explanation.I  tried and it is working
> fine.
> >
> > Once again thanks for your quick response.
> >
> > Regards,
> > -Deep
> >
> > On Mon, 4 Jan, 2021, 2:20 PM Till Rohrmann, <[hidden email]>
> wrote:
> >
> >> Hi Deep,
> >>
> >> Flink has dropped support for specifying the number of TMs via -n since
> >> the
> >> introduction of Flip-6. Since then, Flink will automatically start TMs
> >> depending on the required resources. Hence, there is no need to specify
> >> the
> >> -n parameter anymore. Instead, you should specify the parallelism with
> >> which you would like to run your job via the -p option.
> >>
> >> Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max
> to
> >> limit the upper limit of slots a cluster is allowed to allocate [1].
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-16605
> >>
> >> Cheers,
> >> Till
> >>
> >> On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh <[hidden email]
> >
> >> wrote:
> >>
> >> > Hi Guys,
> >> >
> >> > I’m struggling while initiating the task manager with flink 1.11.0 in
> >> AWS
> >> > EMR but with older versions it is not. Let me put the full context
> here.
> >> >
> >> > *When using Flink 1.9.1 and EMR 5.29.0*
> >> >
> >> > To create a long running session, we used the below command.
> >> >
> >> > *sudo flink-yarn-session -n <Number of TM> -s <Number of slot> -jm
> >> <memory>
> >> > -tm <memory> -d*
> >> >
> >> > and followed by below command to run the final job.
> >> >
> >> > *flink run -m yarn-cluster -yid <flink sessionId> -yn <Number of TM>
> -ys
> >> > <Number of slot> -yjm <memory> -ytm <memory> -c <ClassName> <Jar
> Path>*
> >> >
> >> > and if “n” is 6 then it is used to create 6 task managers to start the
> >> job,
> >> > so whatever “n” is configured the result was that number of TM the job
> >> is
> >> > being started.
> >> >
> >> > But Now when we scaled up with the configuration (*i.e. Flink 1.11.0
> and
> >> > EMR 6.1.0*) we are unable to achieve the desired values for TM.
> >> >
> >> > Please find the session Ids of new configuration,
> >> >
> >> > *sudo flink-yarn-session -Djobmanager.memory.process.size=<Memory in
> GB>
> >> > -Dtaskmanager.memory.process.size=<Memory in GB> -n <no of TM> -s <No
> of
> >> > slot/core> -d*
> >> >
> >> > And the final Job command
> >> >
> >> > *flink run -m yarn-cluster -yid <Flink sessionId> -c <ClassName> <Jar
> >> > Path>*
> >> >
> >> > I have tried a lot of combinations, but nothing worked out so far. I
> >> > request your help in this regard as the plan to have this
> configuration
> >> in
> >> > *PRODUCTION* soon.
> >> >
> >> > Thanks in advance.
> >> >
> >> >
> >> > Regards,
> >> >
> >> > -Deep
> >> >
> >>
> >
>