Hi Guys,
I’m struggling while initiating the task manager with flink 1.11.0 in AWS EMR but with older versions it is not. Let me put the full context here. *When using Flink 1.9.1 and EMR 5.29.0* To create a long running session, we used the below command. *sudo flink-yarn-session -n <Number of TM> -s <Number of slot> -jm <memory> -tm <memory> -d* and followed by below command to run the final job. *flink run -m yarn-cluster -yid <flink sessionId> -yn <Number of TM> -ys <Number of slot> -yjm <memory> -ytm <memory> -c <ClassName> <Jar Path>* and if “n” is 6 then it is used to create 6 task managers to start the job, so whatever “n” is configured the result was that number of TM the job is being started. But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and EMR 6.1.0*) we are unable to achieve the desired values for TM. Please find the session Ids of new configuration, *sudo flink-yarn-session -Djobmanager.memory.process.size=<Memory in GB> -Dtaskmanager.memory.process.size=<Memory in GB> -n <no of TM> -s <No of slot/core> -d* And the final Job command *flink run -m yarn-cluster -yid <Flink sessionId> -c <ClassName> <Jar Path>* I have tried a lot of combinations, but nothing worked out so far. I request your help in this regard as the plan to have this configuration in *PRODUCTION* soon. Thanks in advance. Regards, -Deep |
Hi Deep,
Flink has dropped support for specifying the number of TMs via -n since the introduction of Flip-6. Since then, Flink will automatically start TMs depending on the required resources. Hence, there is no need to specify the -n parameter anymore. Instead, you should specify the parallelism with which you would like to run your job via the -p option. Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to limit the upper limit of slots a cluster is allowed to allocate [1]. [1] https://issues.apache.org/jira/browse/FLINK-16605 Cheers, Till On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh <[hidden email]> wrote: > Hi Guys, > > I’m struggling while initiating the task manager with flink 1.11.0 in AWS > EMR but with older versions it is not. Let me put the full context here. > > *When using Flink 1.9.1 and EMR 5.29.0* > > To create a long running session, we used the below command. > > *sudo flink-yarn-session -n <Number of TM> -s <Number of slot> -jm <memory> > -tm <memory> -d* > > and followed by below command to run the final job. > > *flink run -m yarn-cluster -yid <flink sessionId> -yn <Number of TM> -ys > <Number of slot> -yjm <memory> -ytm <memory> -c <ClassName> <Jar Path>* > > and if “n” is 6 then it is used to create 6 task managers to start the job, > so whatever “n” is configured the result was that number of TM the job is > being started. > > But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and > EMR 6.1.0*) we are unable to achieve the desired values for TM. > > Please find the session Ids of new configuration, > > *sudo flink-yarn-session -Djobmanager.memory.process.size=<Memory in GB> > -Dtaskmanager.memory.process.size=<Memory in GB> -n <no of TM> -s <No of > slot/core> -d* > > And the final Job command > > *flink run -m yarn-cluster -yid <Flink sessionId> -c <ClassName> <Jar > Path>* > > I have tried a lot of combinations, but nothing worked out so far. I > request your help in this regard as the plan to have this configuration in > *PRODUCTION* soon. > > Thanks in advance. > > > Regards, > > -Deep > |
Thanks Till, for the detailed explanation.I tried and it is working fine.
Once again thanks for your quick response. Regards, -Deep On Mon, 4 Jan, 2021, 2:20 PM Till Rohrmann, <[hidden email]> wrote: > Hi Deep, > > Flink has dropped support for specifying the number of TMs via -n since the > introduction of Flip-6. Since then, Flink will automatically start TMs > depending on the required resources. Hence, there is no need to specify the > -n parameter anymore. Instead, you should specify the parallelism with > which you would like to run your job via the -p option. > > Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to > limit the upper limit of slots a cluster is allowed to allocate [1]. > > [1] https://issues.apache.org/jira/browse/FLINK-16605 > > Cheers, > Till > > On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh <[hidden email]> > wrote: > > > Hi Guys, > > > > I’m struggling while initiating the task manager with flink 1.11.0 in AWS > > EMR but with older versions it is not. Let me put the full context here. > > > > *When using Flink 1.9.1 and EMR 5.29.0* > > > > To create a long running session, we used the below command. > > > > *sudo flink-yarn-session -n <Number of TM> -s <Number of slot> -jm > <memory> > > -tm <memory> -d* > > > > and followed by below command to run the final job. > > > > *flink run -m yarn-cluster -yid <flink sessionId> -yn <Number of TM> -ys > > <Number of slot> -yjm <memory> -ytm <memory> -c <ClassName> <Jar Path>* > > > > and if “n” is 6 then it is used to create 6 task managers to start the > job, > > so whatever “n” is configured the result was that number of TM the job is > > being started. > > > > But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and > > EMR 6.1.0*) we are unable to achieve the desired values for TM. > > > > Please find the session Ids of new configuration, > > > > *sudo flink-yarn-session -Djobmanager.memory.process.size=<Memory in GB> > > -Dtaskmanager.memory.process.size=<Memory in GB> -n <no of TM> -s <No of > > slot/core> -d* > > > > And the final Job command > > > > *flink run -m yarn-cluster -yid <Flink sessionId> -c <ClassName> <Jar > > Path>* > > > > I have tried a lot of combinations, but nothing worked out so far. I > > request your help in this regard as the plan to have this configuration > in > > *PRODUCTION* soon. > > > > Thanks in advance. > > > > > > Regards, > > > > -Deep > > > |
Hi Till,
Thanks for your support on task manager, in continuation to above email when we increased the TM and JM memory to run the job with increased parallelism but the job is getting failed in *one day* with below exception. Logs : *org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.* New session configuration is below and parallelism we provided -*p 28* to job to get more TM to process the heavy load. *sudo flink-yarn-session -Djobmanager.memory.process.size=16000m -Dtaskmanager.memory.process.size=40000m -s 14 -d* *Note:- * we have Retry Logic mentioned below : environment.setRestartStrategy(RestartStrategies.failureRateRestart(5, // max failures per interval Time.of(30, TimeUnit.MINUTES), // time interval for measuring failure rate Time.of(60, TimeUnit.SECONDS) // delay )); *Below is the exception which I am getting:* org.apache.flink.runtime.JobException: Recovery is suppressed by FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=1800000,backoffTimeMS=60000,maxFailuresPerInterval=5) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49) at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1710) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1287) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1255) at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:954) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537) at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.tryFailingAllocatedSlot(SlotPoolImpl.java:733) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.failAllocation(SlotPoolImpl.java:713) at org.apache.flink.runtime.jobmaster.JobMaster.internalFailAllocation(JobMaster.java:562) at org.apache.flink.runtime.jobmaster.JobMaster.notifyAllocationFailure(JobMaster.java:700) at sun.reflect.GeneratedMethodAccessor99.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: Container released on a *lost* node at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ... 22 more 2021-01-10 20:07:48,974 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job a7cffc31c4aeb01356c5132c908be314. 2021-01-10 20:07:48,974 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down 2021-01-10 20:07:48,978 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job a7cffc31c4aeb01356c5132c908be314 reached globally terminal state FAILED. 2021-01-10 20:07:49,006 INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job Gas Job Runner V2(a7cffc31c4aeb01356c5132c908be314). 2021-01-10 20:07:49,006 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Suspending SlotPool. 2021-01-10 20:07:49,007 INFO org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection 39a33f865ba12bac16dd21b834527750: JobManager is shutting down.. 2021-01-10 20:07:49,007 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Stopping SlotPool. 2021-01-10 20:07:49,007 INFO org.apache.flink.yarn.YarnResourceManager - Disconnect job manager [hidden email]://[hidden email]:39039/user/rpc/jobmanager_2 for job a7cffc31c4aeb01356c5132c908be314 from the resource manager. Could you please help us here why it is failing now when we increased the “parallelism”? Thanks & Regards, -Deep On Mon, Jan 4, 2021 at 8:12 PM DEEP NARAYAN Singh <[hidden email]> wrote: > Thanks Till, for the detailed explanation.I tried and it is working fine. > > Once again thanks for your quick response. > > Regards, > -Deep > > On Mon, 4 Jan, 2021, 2:20 PM Till Rohrmann, <[hidden email]> wrote: > >> Hi Deep, >> >> Flink has dropped support for specifying the number of TMs via -n since >> the >> introduction of Flip-6. Since then, Flink will automatically start TMs >> depending on the required resources. Hence, there is no need to specify >> the >> -n parameter anymore. Instead, you should specify the parallelism with >> which you would like to run your job via the -p option. >> >> Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to >> limit the upper limit of slots a cluster is allowed to allocate [1]. >> >> [1] https://issues.apache.org/jira/browse/FLINK-16605 >> >> Cheers, >> Till >> >> On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh <[hidden email]> >> wrote: >> >> > Hi Guys, >> > >> > I’m struggling while initiating the task manager with flink 1.11.0 in >> AWS >> > EMR but with older versions it is not. Let me put the full context here. >> > >> > *When using Flink 1.9.1 and EMR 5.29.0* >> > >> > To create a long running session, we used the below command. >> > >> > *sudo flink-yarn-session -n <Number of TM> -s <Number of slot> -jm >> <memory> >> > -tm <memory> -d* >> > >> > and followed by below command to run the final job. >> > >> > *flink run -m yarn-cluster -yid <flink sessionId> -yn <Number of TM> -ys >> > <Number of slot> -yjm <memory> -ytm <memory> -c <ClassName> <Jar Path>* >> > >> > and if “n” is 6 then it is used to create 6 task managers to start the >> job, >> > so whatever “n” is configured the result was that number of TM the job >> is >> > being started. >> > >> > But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and >> > EMR 6.1.0*) we are unable to achieve the desired values for TM. >> > >> > Please find the session Ids of new configuration, >> > >> > *sudo flink-yarn-session -Djobmanager.memory.process.size=<Memory in GB> >> > -Dtaskmanager.memory.process.size=<Memory in GB> -n <no of TM> -s <No of >> > slot/core> -d* >> > >> > And the final Job command >> > >> > *flink run -m yarn-cluster -yid <Flink sessionId> -c <ClassName> <Jar >> > Path>* >> > >> > I have tried a lot of combinations, but nothing worked out so far. I >> > request your help in this regard as the plan to have this configuration >> in >> > *PRODUCTION* soon. >> > >> > Thanks in advance. >> > >> > >> > Regards, >> > >> > -Deep >> > >> > |
Hi Deep,
Could you take a look at the logs of the TM which was lost? They might help us to debug the root cause. My guess would be that we are exceeding some memory threshold and that the kernel kills the process. Cheers, Till On Mon, Jan 11, 2021 at 1:46 PM DEEP NARAYAN Singh <[hidden email]> wrote: > Hi Till, > > Thanks for your support on task manager, in continuation to above email > when we increased the TM and JM memory to run the job with increased > parallelism but the job is getting failed in *one day* with below > exception. > > Logs : *org.apache.flink.client.program.ProgramInvocationException: The > main method caused an error: > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not > complete the operation. Number of retries has been exhausted.* > > New session configuration is below and parallelism we provided -*p 28* to > job to get more TM to process the heavy load. > > *sudo flink-yarn-session -Djobmanager.memory.process.size=16000m > -Dtaskmanager.memory.process.size=40000m -s 14 -d* > > *Note:- * > we have Retry Logic mentioned below : > environment.setRestartStrategy(RestartStrategies.failureRateRestart(5, // > max failures per interval > Time.of(30, TimeUnit.MINUTES), // time interval for measuring failure rate > Time.of(60, TimeUnit.SECONDS) // delay )); > > *Below is the exception which I am getting:* > > org.apache.flink.runtime.JobException: Recovery is suppressed by > > FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=1800000,backoffTimeMS=60000,maxFailuresPerInterval=5) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) > at > org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1710) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1287) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1255) > at > org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:954) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537) > at > org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.tryFailingAllocatedSlot(SlotPoolImpl.java:733) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.failAllocation(SlotPoolImpl.java:713) > at > org.apache.flink.runtime.jobmaster.JobMaster.internalFailAllocation(JobMaster.java:562) > at > org.apache.flink.runtime.jobmaster.JobMaster.notifyAllocationFailure(JobMaster.java:700) > at sun.reflect.GeneratedMethodAccessor99.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > at akka.japi.pf > .UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at akka.actor.Actor.aroundReceive(Actor.scala:517) > at akka.actor.Actor.aroundReceive$(Actor.scala:515) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: Container released on a *lost* node > at > org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) > ... 22 more > 2021-01-10 20:07:48,974 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - > Stopping checkpoint coordinator for job > a7cffc31c4aeb01356c5132c908be314. > 2021-01-10 20:07:48,974 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore > - Shutting down > 2021-01-10 20:07:48,978 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job > a7cffc31c4aeb01356c5132c908be314 reached globally terminal state > FAILED. > 2021-01-10 20:07:49,006 INFO > org.apache.flink.runtime.jobmaster.JobMaster - > Stopping the JobMaster for job Gas Job Runner > V2(a7cffc31c4aeb01356c5132c908be314). > 2021-01-10 20:07:49,006 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - > Suspending SlotPool. > 2021-01-10 20:07:49,007 INFO > org.apache.flink.runtime.jobmaster.JobMaster - Close > ResourceManager connection 39a33f865ba12bac16dd21b834527750: > JobManager is shutting down.. > 2021-01-10 20:07:49,007 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - > Stopping SlotPool. > 2021-01-10 20:07:49,007 INFO > org.apache.flink.yarn.YarnResourceManager - > Disconnect job manager > 00000000000000000000000000000000 > @akka.tcp://[hidden email]:39039/user/rpc/jobmanager_2 > for job a7cffc31c4aeb01356c5132c908be314 from the resource manager. > > > Could you please help us here why it is failing now when we increased the > “parallelism”? > > > Thanks & Regards, > > -Deep > > > > > On Mon, Jan 4, 2021 at 8:12 PM DEEP NARAYAN Singh <[hidden email]> > wrote: > > > Thanks Till, for the detailed explanation.I tried and it is working > fine. > > > > Once again thanks for your quick response. > > > > Regards, > > -Deep > > > > On Mon, 4 Jan, 2021, 2:20 PM Till Rohrmann, <[hidden email]> > wrote: > > > >> Hi Deep, > >> > >> Flink has dropped support for specifying the number of TMs via -n since > >> the > >> introduction of Flip-6. Since then, Flink will automatically start TMs > >> depending on the required resources. Hence, there is no need to specify > >> the > >> -n parameter anymore. Instead, you should specify the parallelism with > >> which you would like to run your job via the -p option. > >> > >> Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max > to > >> limit the upper limit of slots a cluster is allowed to allocate [1]. > >> > >> [1] https://issues.apache.org/jira/browse/FLINK-16605 > >> > >> Cheers, > >> Till > >> > >> On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh <[hidden email] > > > >> wrote: > >> > >> > Hi Guys, > >> > > >> > I’m struggling while initiating the task manager with flink 1.11.0 in > >> AWS > >> > EMR but with older versions it is not. Let me put the full context > here. > >> > > >> > *When using Flink 1.9.1 and EMR 5.29.0* > >> > > >> > To create a long running session, we used the below command. > >> > > >> > *sudo flink-yarn-session -n <Number of TM> -s <Number of slot> -jm > >> <memory> > >> > -tm <memory> -d* > >> > > >> > and followed by below command to run the final job. > >> > > >> > *flink run -m yarn-cluster -yid <flink sessionId> -yn <Number of TM> > -ys > >> > <Number of slot> -yjm <memory> -ytm <memory> -c <ClassName> <Jar > Path>* > >> > > >> > and if “n” is 6 then it is used to create 6 task managers to start the > >> job, > >> > so whatever “n” is configured the result was that number of TM the job > >> is > >> > being started. > >> > > >> > But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 > and > >> > EMR 6.1.0*) we are unable to achieve the desired values for TM. > >> > > >> > Please find the session Ids of new configuration, > >> > > >> > *sudo flink-yarn-session -Djobmanager.memory.process.size=<Memory in > GB> > >> > -Dtaskmanager.memory.process.size=<Memory in GB> -n <no of TM> -s <No > of > >> > slot/core> -d* > >> > > >> > And the final Job command > >> > > >> > *flink run -m yarn-cluster -yid <Flink sessionId> -c <ClassName> <Jar > >> > Path>* > >> > > >> > I have tried a lot of combinations, but nothing worked out so far. I > >> > request your help in this regard as the plan to have this > configuration > >> in > >> > *PRODUCTION* soon. > >> > > >> > Thanks in advance. > >> > > >> > > >> > Regards, > >> > > >> > -Deep > >> > > >> > > > |
Free forum by Nabble | Edit this page |