Hey,
I have a very specific use case. I have a history of records stored as Parquet in S3. I would like to read and process them with Flink. The issue is that the number of files is quite large ( >100k). If I provide the full list of files to HadoopInputFormat that I am using it will fail with AskTimeoutException, which Is weird since I am using YARN and setting the -yD akka.ask.timeout=600s, even thought according to the logs the setting is processed properly, the job execution still with AskTimeoutException after 10s, which seems weird to me. I have managed to go around this, by grouping files and reading them in a loop, so that finally I have the Seq[DataSet<Record>]. But if I try to union those datasets, then I will receive the AskTimeoutException again. So my question is, what can be the reason behind this exception being thrown and why is the setting ignored, even if this is pared properly. I will be glad for any help. Best Regards, Dom. |
Hi Dominik,
Would you check whether the JM GC status? One possible cause is that the large number of file metas inHadoopInputFormat is burdening the JM memory. `akka.ask.timeout` is the default RPC timeout, while some RPCs may override this timeout for their own purpose. e.g. the RPCs from web usually use `web.timeout`. Providing the detailed call stack of the AskTimeoutException may help to identify where this timeout happened. Thanks, Zhu Zhu Dominik Wosiński <[hidden email]> 于2019年11月11日周一 上午3:35写道: > Hey, > I have a very specific use case. I have a history of records stored as > Parquet in S3. I would like to read and process them with Flink. The issue > is that the number of files is quite large ( >100k). If I provide the full > list of files to HadoopInputFormat that I am using it will fail with > AskTimeoutException, which Is weird since I am using YARN and setting the > -yD akka.ask.timeout=600s, even thought according to the logs the setting > is processed properly, the job execution still with AskTimeoutException > after 10s, which seems weird to me. I have managed to go around this, by > grouping files and reading them in a loop, so that finally I have the > Seq[DataSet<Record>]. But if I try to union those datasets, then I will > receive the AskTimeoutException again. So my question is, what can be the > reason behind this exception being thrown and why is the setting ignored, > even if this is pared properly. > > I will be glad for any help. > > Best Regards, > Dom. > |
I suspect you suffer from Client submission failure which also throws
AskTimeoutException. The related configure option are `akka.client.timeout` which you can increase. However, there was some cases you can resolve the problem by upgrading Java to latest minimum version 8u212 Best, tison. Zhu Zhu <[hidden email]> 于2019年11月11日周一 下午6:03写道: > Hi Dominik, > > Would you check whether the JM GC status? > One possible cause is that the large number of file metas > inHadoopInputFormat is burdening the JM memory. > > `akka.ask.timeout` is the default RPC timeout, while some RPCs may override > this timeout for their own purpose. e.g. the RPCs from web usually use > `web.timeout`. > Providing the detailed call stack of the AskTimeoutException may help to > identify where this timeout happened. > > Thanks, > Zhu Zhu > > Dominik Wosiński <[hidden email]> 于2019年11月11日周一 上午3:35写道: > > > Hey, > > I have a very specific use case. I have a history of records stored as > > Parquet in S3. I would like to read and process them with Flink. The > issue > > is that the number of files is quite large ( >100k). If I provide the > full > > list of files to HadoopInputFormat that I am using it will fail with > > AskTimeoutException, which Is weird since I am using YARN and setting the > > -yD akka.ask.timeout=600s, even thought according to the logs the setting > > is processed properly, the job execution still with AskTimeoutException > > after 10s, which seems weird to me. I have managed to go around this, by > > grouping files and reading them in a loop, so that finally I have the > > Seq[DataSet<Record>]. But if I try to union those datasets, then I will > > receive the AskTimeoutException again. So my question is, what can be the > > reason behind this exception being thrown and why is the setting ignored, > > even if this is pared properly. > > > > I will be glad for any help. > > > > Best Regards, > > Dom. > > > |
Hey,
I have increased the `akka.client.timeout` but it has not helped at all. Here is the log with callstack for AskTimeoutException: 019-11-12 10:19:17,425 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission 81fbbc3f41ad5e08ac832d0e656478bc (Flink Java Job at Tue Nov 12 10:04:19 UTC 2019). 2019-11-12 10:19:17,425 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting job 81fbbc3f41ad5e08ac832d0e656478bc (Flink Java Job at Tue Nov 12 10:04:19 UTC 2019). 2019-11-12 10:19:17,444 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_0 . 2019-11-12 10:19:17,452 INFO org.apache.flink.runtime.jobmaster.JobMaster - Initializing job Flink Java Job at Tue Nov 12 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc). 2019-11-12 10:19:17,477 INFO org.apache.flink.runtime.jobmaster.JobMaster - Using restart strategy NoRestartStrategy for Flink Java Job at Tue Nov 12 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc). 2019-11-12 10:19:17,495 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job recovers via failover strategy: full graph restart 2019-11-12 10:19:17,513 INFO org.apache.flink.runtime.jobmaster.JobMaster - Running initialization on master for job Flink Java Job at Tue Nov 12 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc). 2019-11-12 10:19:17,777 INFO org.apache.flink.runtime.jobmaster.JobMaster - Successfully ran initialization on master in 264 ms. 2019-11-12 10:19:27,442 ERROR org.apache.flink.runtime.rest.handler.job.JobSubmitHandler - Unhandled exception. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-1507147991]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) at java.lang.Thread.run(Thread.java:748) Thanks in Advance, Best Regards, Dom wt., 12 lis 2019 o 03:26 tison <[hidden email]> napisał(a): > I suspect you suffer from Client submission failure which also throws > AskTimeoutException. > > The related configure option are `akka.client.timeout` which you can > increase. However, there > was some cases you can resolve the problem by upgrading Java to latest > minimum version 8u212 > > Best, > tison. > > > Zhu Zhu <[hidden email]> 于2019年11月11日周一 下午6:03写道: > >> Hi Dominik, >> >> Would you check whether the JM GC status? >> One possible cause is that the large number of file metas >> inHadoopInputFormat is burdening the JM memory. >> >> `akka.ask.timeout` is the default RPC timeout, while some RPCs may >> override >> this timeout for their own purpose. e.g. the RPCs from web usually use >> `web.timeout`. >> Providing the detailed call stack of the AskTimeoutException may help to >> identify where this timeout happened. >> >> Thanks, >> Zhu Zhu >> >> Dominik Wosiński <[hidden email]> 于2019年11月11日周一 上午3:35写道: >> >> > Hey, >> > I have a very specific use case. I have a history of records stored as >> > Parquet in S3. I would like to read and process them with Flink. The >> issue >> > is that the number of files is quite large ( >100k). If I provide the >> full >> > list of files to HadoopInputFormat that I am using it will fail with >> > AskTimeoutException, which Is weird since I am using YARN and setting >> the >> > -yD akka.ask.timeout=600s, even thought according to the logs the >> setting >> > is processed properly, the job execution still with AskTimeoutException >> > after 10s, which seems weird to me. I have managed to go around this, by >> > grouping files and reading them in a loop, so that finally I have the >> > Seq[DataSet<Record>]. But if I try to union those datasets, then I will >> > receive the AskTimeoutException again. So my question is, what can be >> the >> > reason behind this exception being thrown and why is the setting >> ignored, >> > even if this is pared properly. >> > >> > I will be glad for any help. >> > >> > Best Regards, >> > Dom. >> > >> > |
Hi Dominik:
I found a problem too that it maybe your root cause.[1] JobConf in HadoopInputSplit may very big, contains hundreds of configurations, if it is serialized by every split, that will significantly reduce performance. Consider thousands of splits, the akka thread of JobMaster will all on the serialization of conf. That may will lead to various akka timeouts too. So the reason of your job failure may be that the JobMaster is busy serializing the configuration. I have created a patch to solve it, you can take a look and try.[2] [1] https://issues.apache.org/jira/browse/FLINK-14722 [2] https://github.com/JingsongLi/flink/commit/90c021ab8e7a175c6644c345e63383d828c415d7 Best, Jingsong Lee On Tue, Nov 12, 2019 at 6:49 PM Dominik Wosiński <[hidden email]> wrote: > Hey, > I have increased the `akka.client.timeout` but it has not helped at all. > Here is the log with callstack for AskTimeoutException: > > 019-11-12 10:19:17,425 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - > Received JobGraph submission 81fbbc3f41ad5e08ac832d0e656478bc (Flink > Java Job at Tue Nov 12 10:04:19 UTC 2019). > 2019-11-12 10:19:17,425 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - > Submitting job 81fbbc3f41ad5e08ac832d0e656478bc (Flink Java Job at Tue > Nov 12 10:04:19 UTC 2019). > 2019-11-12 10:19:17,444 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcService - > Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster > at akka://flink/user/jobmanager_0 . > 2019-11-12 10:19:17,452 INFO > org.apache.flink.runtime.jobmaster.JobMaster - > Initializing job Flink Java Job at Tue Nov 12 10:04:19 UTC 2019 > (81fbbc3f41ad5e08ac832d0e656478bc). > 2019-11-12 10:19:17,477 INFO > org.apache.flink.runtime.jobmaster.JobMaster - Using > restart strategy NoRestartStrategy for Flink Java Job at Tue Nov 12 > 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc). > 2019-11-12 10:19:17,495 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > recovers via failover strategy: full graph restart > 2019-11-12 10:19:17,513 INFO > org.apache.flink.runtime.jobmaster.JobMaster - > Running initialization on master for job Flink Java Job at Tue Nov 12 > 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc). > 2019-11-12 10:19:17,777 INFO > org.apache.flink.runtime.jobmaster.JobMaster - > Successfully ran initialization on master in 264 ms. > 2019-11-12 10:19:27,442 ERROR > org.apache.flink.runtime.rest.handler.job.JobSubmitHandler - > Unhandled exception. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#-1507147991]] after [10000 ms]. > Sender[null] sent message of type > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > at java.lang.Thread.run(Thread.java:748) > > > Thanks in Advance, > > Best Regards, > > Dom > > > wt., 12 lis 2019 o 03:26 tison <[hidden email]> napisał(a): > > > I suspect you suffer from Client submission failure which also throws > > AskTimeoutException. > > > > The related configure option are `akka.client.timeout` which you can > > increase. However, there > > was some cases you can resolve the problem by upgrading Java to latest > > minimum version 8u212 > > > > Best, > > tison. > > > > > > Zhu Zhu <[hidden email]> 于2019年11月11日周一 下午6:03写道: > > > >> Hi Dominik, > >> > >> Would you check whether the JM GC status? > >> One possible cause is that the large number of file metas > >> inHadoopInputFormat is burdening the JM memory. > >> > >> `akka.ask.timeout` is the default RPC timeout, while some RPCs may > >> override > >> this timeout for their own purpose. e.g. the RPCs from web usually use > >> `web.timeout`. > >> Providing the detailed call stack of the AskTimeoutException may help to > >> identify where this timeout happened. > >> > >> Thanks, > >> Zhu Zhu > >> > >> Dominik Wosiński <[hidden email]> 于2019年11月11日周一 上午3:35写道: > >> > >> > Hey, > >> > I have a very specific use case. I have a history of records stored as > >> > Parquet in S3. I would like to read and process them with Flink. The > >> issue > >> > is that the number of files is quite large ( >100k). If I provide the > >> full > >> > list of files to HadoopInputFormat that I am using it will fail with > >> > AskTimeoutException, which Is weird since I am using YARN and setting > >> the > >> > -yD akka.ask.timeout=600s, even thought according to the logs the > >> setting > >> > is processed properly, the job execution still with > AskTimeoutException > >> > after 10s, which seems weird to me. I have managed to go around this, > by > >> > grouping files and reading them in a loop, so that finally I have the > >> > Seq[DataSet<Record>]. But if I try to union those datasets, then I > will > >> > receive the AskTimeoutException again. So my question is, what can be > >> the > >> > reason behind this exception being thrown and why is the setting > >> ignored, > >> > even if this is pared properly. > >> > > >> > I will be glad for any help. > >> > > >> > Best Regards, > >> > Dom. > >> > > >> > > > -- Best, Jingsong Lee |
Hey Jingsong,
I will try to use the patch to verify. In the meantime, I have run the job with -D akka.ask.timeout and -D akka.client.timeout, both equal to 600s. But the stacktrace is the same : org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 6f0adc8c58263ef83cb770285094bcba) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) at org.apache.flink.api.java.DataSet.print(DataSet.java:1652) at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1862) at ai.humn.RawDataDrivingStateTransitionAnalysis$.addAnalysisPipeline(RawDataDrivingStateTransitionAnalysis.scala:59) at ai.humn.RawDataDrivingStateTransitionAnalysis$.main(RawDataDrivingStateTransitionAnalysis.scala:42) at ai.humn.RawDataDrivingStateTransitionAnalysis.main(RawDataDrivingStateTransitionAnalysis.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:207) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-1403343453]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) at java.lang.Thread.run(Thread.java:748) End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) wt., 12 lis 2019 o 12:15 Jingsong Li <[hidden email]> napisał(a): > Hi Dominik: > > I found a problem too that it maybe your root cause.[1] > JobConf in HadoopInputSplit may very big, contains hundreds of > configurations, if it is serialized by every split, that will significantly > reduce performance. Consider thousands of splits, the akka thread of > JobMaster will all on the serialization of conf. That may will lead to > various akka timeouts too. > So the reason of your job failure may be that the JobMaster is busy > serializing the configuration. > I have created a patch to solve it, you can take a look and try.[2] > > [1] https://issues.apache.org/jira/browse/FLINK-14722 > [2] > > https://github.com/JingsongLi/flink/commit/90c021ab8e7a175c6644c345e63383d828c415d7 > > Best, > Jingsong Lee > > On Tue, Nov 12, 2019 at 6:49 PM Dominik Wosiński <[hidden email]> wrote: > > > Hey, > > I have increased the `akka.client.timeout` but it has not helped at all. > > Here is the log with callstack for AskTimeoutException: > > > > 019-11-12 10:19:17,425 INFO > > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - > > Received JobGraph submission 81fbbc3f41ad5e08ac832d0e656478bc (Flink > > Java Job at Tue Nov 12 10:04:19 UTC 2019). > > 2019-11-12 10:19:17,425 INFO > > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - > > Submitting job 81fbbc3f41ad5e08ac832d0e656478bc (Flink Java Job at Tue > > Nov 12 10:04:19 UTC 2019). > > 2019-11-12 10:19:17,444 INFO > > org.apache.flink.runtime.rpc.akka.AkkaRpcService - > > Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster > > at akka://flink/user/jobmanager_0 . > > 2019-11-12 10:19:17,452 INFO > > org.apache.flink.runtime.jobmaster.JobMaster - > > Initializing job Flink Java Job at Tue Nov 12 10:04:19 UTC 2019 > > (81fbbc3f41ad5e08ac832d0e656478bc). > > 2019-11-12 10:19:17,477 INFO > > org.apache.flink.runtime.jobmaster.JobMaster - Using > > restart strategy NoRestartStrategy for Flink Java Job at Tue Nov 12 > > 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc). > > 2019-11-12 10:19:17,495 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > > recovers via failover strategy: full graph restart > > 2019-11-12 10:19:17,513 INFO > > org.apache.flink.runtime.jobmaster.JobMaster - > > Running initialization on master for job Flink Java Job at Tue Nov 12 > > 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc). > > 2019-11-12 10:19:17,777 INFO > > org.apache.flink.runtime.jobmaster.JobMaster - > > Successfully ran initialization on master in 264 ms. > > 2019-11-12 10:19:27,442 ERROR > > org.apache.flink.runtime.rest.handler.job.JobSubmitHandler - > > Unhandled exception. > > akka.pattern.AskTimeoutException: Ask timed out on > > [Actor[akka://flink/user/dispatcher#-1507147991]] after [10000 ms]. > > Sender[null] sent message of type > > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". > > at > > > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) > > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > > at > > > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > > at > > > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > > at > > > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > > at > > > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > > at > > > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) > > at > > > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) > > at > > > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > > at java.lang.Thread.run(Thread.java:748) > > > > > > Thanks in Advance, > > > > Best Regards, > > > > Dom > > > > > > wt., 12 lis 2019 o 03:26 tison <[hidden email]> napisał(a): > > > > > I suspect you suffer from Client submission failure which also throws > > > AskTimeoutException. > > > > > > The related configure option are `akka.client.timeout` which you can > > > increase. However, there > > > was some cases you can resolve the problem by upgrading Java to latest > > > minimum version 8u212 > > > > > > Best, > > > tison. > > > > > > > > > Zhu Zhu <[hidden email]> 于2019年11月11日周一 下午6:03写道: > > > > > >> Hi Dominik, > > >> > > >> Would you check whether the JM GC status? > > >> One possible cause is that the large number of file metas > > >> inHadoopInputFormat is burdening the JM memory. > > >> > > >> `akka.ask.timeout` is the default RPC timeout, while some RPCs may > > >> override > > >> this timeout for their own purpose. e.g. the RPCs from web usually use > > >> `web.timeout`. > > >> Providing the detailed call stack of the AskTimeoutException may help > to > > >> identify where this timeout happened. > > >> > > >> Thanks, > > >> Zhu Zhu > > >> > > >> Dominik Wosiński <[hidden email]> 于2019年11月11日周一 上午3:35写道: > > >> > > >> > Hey, > > >> > I have a very specific use case. I have a history of records stored > as > > >> > Parquet in S3. I would like to read and process them with Flink. The > > >> issue > > >> > is that the number of files is quite large ( >100k). If I provide > the > > >> full > > >> > list of files to HadoopInputFormat that I am using it will fail with > > >> > AskTimeoutException, which Is weird since I am using YARN and > setting > > >> the > > >> > -yD akka.ask.timeout=600s, even thought according to the logs the > > >> setting > > >> > is processed properly, the job execution still with > > AskTimeoutException > > >> > after 10s, which seems weird to me. I have managed to go around > this, > > by > > >> > grouping files and reading them in a loop, so that finally I have > the > > >> > Seq[DataSet<Record>]. But if I try to union those datasets, then I > > will > > >> > receive the AskTimeoutException again. So my question is, what can > be > > >> the > > >> > reason behind this exception being thrown and why is the setting > > >> ignored, > > >> > even if this is pared properly. > > >> > > > >> > I will be glad for any help. > > >> > > > >> > Best Regards, > > >> > Dom. > > >> > > > >> > > > > > > > > -- > Best, Jingsong Lee > |
In reply to this post by Jingsong Li
I have managed to locate the issue with timeout, changing `web.timeout` was
the solution. However, now I am getting the following error : 019-11-12 16:58:00,741 INFO org.apache.parquet.hadoop.ParquetInputFormat - Total input paths to process : 671 2019-11-12 16:58:04,878 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input files to process : 519 2019-11-12 16:58:04,878 INFO org.apache.parquet.hadoop.ParquetInputFormat - Total input paths to process : 519 2019-11-12 16:58:08,017 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input files to process : 382 2019-11-12 16:58:08,017 INFO org.apache.parquet.hadoop.ParquetInputFormat - Total input paths to process : 382 2019-11-12 16:58:12,277 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input files to process : 551 2019-11-12 16:58:12,277 INFO org.apache.parquet.hadoop.ParquetInputFormat - Total input paths to process : 551 2019-11-12 16:58:16,530 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input files to process : 507 2019-11-12 16:58:16,530 INFO org.apache.parquet.hadoop.ParquetInputFormat - Total input paths to process : 507 2019-11-12 16:58:20,080 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input files to process : 478 2019-11-12 16:58:20,080 INFO org.apache.parquet.hadoop.ParquetInputFormat - Total input paths to process : 478 2019-11-12 16:58:23,736 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission f0cd12c8e5e9d7e95cfc2f685c451089 (Flink Java Job at Tue Nov 12 16:40:19 UTC 2019). 2019-11-12 16:58:23,738 ERROR org.apache.flink.runtime.rest.handler.job.JobSubmitHandler - Unhandled exception. org.apache.flink.runtime.client.JobSubmissionException: Job has already been submitted. at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:268) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Which also seems weird, since I only submit job once. |
Free forum by Nabble | Edit this page |