Flink Read thousands of files with batch

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Read thousands of files with batch

Dominik Wosiński
Hey,
I have a very specific use case. I have a history of records stored as
Parquet in S3. I would like to read and process them with Flink. The issue
is that the number of files is quite large ( >100k). If I provide the full
list of files to HadoopInputFormat that I am using it will fail with
AskTimeoutException, which Is weird since I am using YARN and setting the
-yD akka.ask.timeout=600s, even thought according to the logs the setting
is processed properly, the job execution still with AskTimeoutException
after 10s, which seems weird to me. I have managed to go around this, by
grouping files and reading them in a loop, so that finally I have the
Seq[DataSet<Record>]. But if I try to union those datasets, then I will
receive the AskTimeoutException again. So my question is, what can be the
reason behind this exception being thrown and why is the setting ignored,
even if this is pared properly.

I will be glad for any help.

Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Flink Read thousands of files with batch

Zhu Zhu
Hi Dominik,

Would you check whether the JM GC status?
One possible cause is that the large number of file metas
inHadoopInputFormat is burdening the JM memory.

`akka.ask.timeout` is the default RPC timeout, while some RPCs may override
this timeout for their own purpose. e.g. the RPCs from web usually use
`web.timeout`.
Providing the detailed call stack of the AskTimeoutException may help to
identify where this timeout happened.

Thanks,
Zhu Zhu

Dominik Wosiński <[hidden email]> 于2019年11月11日周一 上午3:35写道:

> Hey,
> I have a very specific use case. I have a history of records stored as
> Parquet in S3. I would like to read and process them with Flink. The issue
> is that the number of files is quite large ( >100k). If I provide the full
> list of files to HadoopInputFormat that I am using it will fail with
> AskTimeoutException, which Is weird since I am using YARN and setting the
> -yD akka.ask.timeout=600s, even thought according to the logs the setting
> is processed properly, the job execution still with AskTimeoutException
> after 10s, which seems weird to me. I have managed to go around this, by
> grouping files and reading them in a loop, so that finally I have the
> Seq[DataSet<Record>]. But if I try to union those datasets, then I will
> receive the AskTimeoutException again. So my question is, what can be the
> reason behind this exception being thrown and why is the setting ignored,
> even if this is pared properly.
>
> I will be glad for any help.
>
> Best Regards,
> Dom.
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Read thousands of files with batch

tison
I suspect you suffer from Client submission failure which also throws
AskTimeoutException.

The related configure option are `akka.client.timeout` which you can
increase. However, there
was some cases you can resolve the problem by upgrading Java to latest
minimum version 8u212

Best,
tison.


Zhu Zhu <[hidden email]> 于2019年11月11日周一 下午6:03写道:

> Hi Dominik,
>
> Would you check whether the JM GC status?
> One possible cause is that the large number of file metas
> inHadoopInputFormat is burdening the JM memory.
>
> `akka.ask.timeout` is the default RPC timeout, while some RPCs may override
> this timeout for their own purpose. e.g. the RPCs from web usually use
> `web.timeout`.
> Providing the detailed call stack of the AskTimeoutException may help to
> identify where this timeout happened.
>
> Thanks,
> Zhu Zhu
>
> Dominik Wosiński <[hidden email]> 于2019年11月11日周一 上午3:35写道:
>
> > Hey,
> > I have a very specific use case. I have a history of records stored as
> > Parquet in S3. I would like to read and process them with Flink. The
> issue
> > is that the number of files is quite large ( >100k). If I provide the
> full
> > list of files to HadoopInputFormat that I am using it will fail with
> > AskTimeoutException, which Is weird since I am using YARN and setting the
> > -yD akka.ask.timeout=600s, even thought according to the logs the setting
> > is processed properly, the job execution still with AskTimeoutException
> > after 10s, which seems weird to me. I have managed to go around this, by
> > grouping files and reading them in a loop, so that finally I have the
> > Seq[DataSet<Record>]. But if I try to union those datasets, then I will
> > receive the AskTimeoutException again. So my question is, what can be the
> > reason behind this exception being thrown and why is the setting ignored,
> > even if this is pared properly.
> >
> > I will be glad for any help.
> >
> > Best Regards,
> > Dom.
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Read thousands of files with batch

Dominik Wosiński
Hey,
I have increased the `akka.client.timeout` but it has not helped at all.
Here is the log with callstack for AskTimeoutException:

019-11-12 10:19:17,425 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      -
Received JobGraph submission 81fbbc3f41ad5e08ac832d0e656478bc (Flink
Java Job at Tue Nov 12 10:04:19 UTC 2019).
2019-11-12 10:19:17,425 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      -
Submitting job 81fbbc3f41ad5e08ac832d0e656478bc (Flink Java Job at Tue
Nov 12 10:04:19 UTC 2019).
2019-11-12 10:19:17,444 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService              -
Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster
at akka://flink/user/jobmanager_0 .
2019-11-12 10:19:17,452 INFO
org.apache.flink.runtime.jobmaster.JobMaster                  -
Initializing job Flink Java Job at Tue Nov 12 10:04:19 UTC 2019
(81fbbc3f41ad5e08ac832d0e656478bc).
2019-11-12 10:19:17,477 INFO
org.apache.flink.runtime.jobmaster.JobMaster                  - Using
restart strategy NoRestartStrategy for Flink Java Job at Tue Nov 12
10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc).
2019-11-12 10:19:17,495 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
recovers via failover strategy: full graph restart
2019-11-12 10:19:17,513 INFO
org.apache.flink.runtime.jobmaster.JobMaster                  -
Running initialization on master for job Flink Java Job at Tue Nov 12
10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc).
2019-11-12 10:19:17,777 INFO
org.apache.flink.runtime.jobmaster.JobMaster                  -
Successfully ran initialization on master in 264 ms.
2019-11-12 10:19:27,442 ERROR
org.apache.flink.runtime.rest.handler.job.JobSubmitHandler    -
Unhandled exception.
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#-1507147991]] after [10000 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
        at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
        at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
        at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
        at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
        at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
        at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
        at java.lang.Thread.run(Thread.java:748)


Thanks in Advance,

Best Regards,

Dom


wt., 12 lis 2019 o 03:26 tison <[hidden email]> napisał(a):

> I suspect you suffer from Client submission failure which also throws
> AskTimeoutException.
>
> The related configure option are `akka.client.timeout` which you can
> increase. However, there
> was some cases you can resolve the problem by upgrading Java to latest
> minimum version 8u212
>
> Best,
> tison.
>
>
> Zhu Zhu <[hidden email]> 于2019年11月11日周一 下午6:03写道:
>
>> Hi Dominik,
>>
>> Would you check whether the JM GC status?
>> One possible cause is that the large number of file metas
>> inHadoopInputFormat is burdening the JM memory.
>>
>> `akka.ask.timeout` is the default RPC timeout, while some RPCs may
>> override
>> this timeout for their own purpose. e.g. the RPCs from web usually use
>> `web.timeout`.
>> Providing the detailed call stack of the AskTimeoutException may help to
>> identify where this timeout happened.
>>
>> Thanks,
>> Zhu Zhu
>>
>> Dominik Wosiński <[hidden email]> 于2019年11月11日周一 上午3:35写道:
>>
>> > Hey,
>> > I have a very specific use case. I have a history of records stored as
>> > Parquet in S3. I would like to read and process them with Flink. The
>> issue
>> > is that the number of files is quite large ( >100k). If I provide the
>> full
>> > list of files to HadoopInputFormat that I am using it will fail with
>> > AskTimeoutException, which Is weird since I am using YARN and setting
>> the
>> > -yD akka.ask.timeout=600s, even thought according to the logs the
>> setting
>> > is processed properly, the job execution still with AskTimeoutException
>> > after 10s, which seems weird to me. I have managed to go around this, by
>> > grouping files and reading them in a loop, so that finally I have the
>> > Seq[DataSet<Record>]. But if I try to union those datasets, then I will
>> > receive the AskTimeoutException again. So my question is, what can be
>> the
>> > reason behind this exception being thrown and why is the setting
>> ignored,
>> > even if this is pared properly.
>> >
>> > I will be glad for any help.
>> >
>> > Best Regards,
>> > Dom.
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Read thousands of files with batch

Jingsong Li
Hi Dominik:

I found a problem too that it maybe your root cause.[1]
JobConf in HadoopInputSplit may very big, contains hundreds of
configurations, if it is serialized by every split, that will significantly
reduce performance. Consider thousands of splits, the akka thread of
JobMaster will all on the serialization of conf. That may will lead to
various akka timeouts too.
So the reason of your job failure may be that the JobMaster is busy
serializing the configuration.
I have created a patch to solve it, you can take a look and try.[2]

[1] https://issues.apache.org/jira/browse/FLINK-14722
[2]
https://github.com/JingsongLi/flink/commit/90c021ab8e7a175c6644c345e63383d828c415d7

Best,
Jingsong Lee

On Tue, Nov 12, 2019 at 6:49 PM Dominik Wosiński <[hidden email]> wrote:

> Hey,
> I have increased the `akka.client.timeout` but it has not helped at all.
> Here is the log with callstack for AskTimeoutException:
>
> 019-11-12 10:19:17,425 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher      -
> Received JobGraph submission 81fbbc3f41ad5e08ac832d0e656478bc (Flink
> Java Job at Tue Nov 12 10:04:19 UTC 2019).
> 2019-11-12 10:19:17,425 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher      -
> Submitting job 81fbbc3f41ad5e08ac832d0e656478bc (Flink Java Job at Tue
> Nov 12 10:04:19 UTC 2019).
> 2019-11-12 10:19:17,444 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService              -
> Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster
> at akka://flink/user/jobmanager_0 .
> 2019-11-12 10:19:17,452 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                  -
> Initializing job Flink Java Job at Tue Nov 12 10:04:19 UTC 2019
> (81fbbc3f41ad5e08ac832d0e656478bc).
> 2019-11-12 10:19:17,477 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                  - Using
> restart strategy NoRestartStrategy for Flink Java Job at Tue Nov 12
> 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc).
> 2019-11-12 10:19:17,495 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> recovers via failover strategy: full graph restart
> 2019-11-12 10:19:17,513 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                  -
> Running initialization on master for job Flink Java Job at Tue Nov 12
> 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc).
> 2019-11-12 10:19:17,777 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                  -
> Successfully ran initialization on master in 264 ms.
> 2019-11-12 10:19:27,442 ERROR
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler    -
> Unhandled exception.
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#-1507147991]] after [10000 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>         at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>         at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>         at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>         at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>         at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>         at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>         at java.lang.Thread.run(Thread.java:748)
>
>
> Thanks in Advance,
>
> Best Regards,
>
> Dom
>
>
> wt., 12 lis 2019 o 03:26 tison <[hidden email]> napisał(a):
>
> > I suspect you suffer from Client submission failure which also throws
> > AskTimeoutException.
> >
> > The related configure option are `akka.client.timeout` which you can
> > increase. However, there
> > was some cases you can resolve the problem by upgrading Java to latest
> > minimum version 8u212
> >
> > Best,
> > tison.
> >
> >
> > Zhu Zhu <[hidden email]> 于2019年11月11日周一 下午6:03写道:
> >
> >> Hi Dominik,
> >>
> >> Would you check whether the JM GC status?
> >> One possible cause is that the large number of file metas
> >> inHadoopInputFormat is burdening the JM memory.
> >>
> >> `akka.ask.timeout` is the default RPC timeout, while some RPCs may
> >> override
> >> this timeout for their own purpose. e.g. the RPCs from web usually use
> >> `web.timeout`.
> >> Providing the detailed call stack of the AskTimeoutException may help to
> >> identify where this timeout happened.
> >>
> >> Thanks,
> >> Zhu Zhu
> >>
> >> Dominik Wosiński <[hidden email]> 于2019年11月11日周一 上午3:35写道:
> >>
> >> > Hey,
> >> > I have a very specific use case. I have a history of records stored as
> >> > Parquet in S3. I would like to read and process them with Flink. The
> >> issue
> >> > is that the number of files is quite large ( >100k). If I provide the
> >> full
> >> > list of files to HadoopInputFormat that I am using it will fail with
> >> > AskTimeoutException, which Is weird since I am using YARN and setting
> >> the
> >> > -yD akka.ask.timeout=600s, even thought according to the logs the
> >> setting
> >> > is processed properly, the job execution still with
> AskTimeoutException
> >> > after 10s, which seems weird to me. I have managed to go around this,
> by
> >> > grouping files and reading them in a loop, so that finally I have the
> >> > Seq[DataSet<Record>]. But if I try to union those datasets, then I
> will
> >> > receive the AskTimeoutException again. So my question is, what can be
> >> the
> >> > reason behind this exception being thrown and why is the setting
> >> ignored,
> >> > even if this is pared properly.
> >> >
> >> > I will be glad for any help.
> >> >
> >> > Best Regards,
> >> > Dom.
> >> >
> >>
> >
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Flink Read thousands of files with batch

Dominik Wosiński
Hey Jingsong,
I will try to use the patch to verify.
In the meantime, I have run the job with -D akka.ask.timeout and -D
akka.client.timeout, both equal to 600s.
But the stacktrace is the same :

org.apache.flink.client.program.ProgramInvocationException: Could not
retrieve the execution result. (JobID: 6f0adc8c58263ef83cb770285094bcba)

at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)

at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)

at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)

at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)

at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)

at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1862)

at
ai.humn.RawDataDrivingStateTransitionAnalysis$.addAnalysisPipeline(RawDataDrivingStateTransitionAnalysis.scala:59)

at
ai.humn.RawDataDrivingStateTransitionAnalysis$.main(RawDataDrivingStateTransitionAnalysis.scala:42)

at
ai.humn.RawDataDrivingStateTransitionAnalysis.main(RawDataDrivingStateTransitionAnalysis.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)

at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)

at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit JobGraph.

at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)

at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)

at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)

at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:207)

at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)

at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)

at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)

at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[Internal server error., <Exception on server side:

akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#-1403343453]] after [10000 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)

at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)

at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)

at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)

at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)

at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)

at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)

at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)

at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)

at java.lang.Thread.run(Thread.java:748)


End of exception on server side>]

at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)

at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)

at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)

at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)


wt., 12 lis 2019 o 12:15 Jingsong Li <[hidden email]> napisał(a):

> Hi Dominik:
>
> I found a problem too that it maybe your root cause.[1]
> JobConf in HadoopInputSplit may very big, contains hundreds of
> configurations, if it is serialized by every split, that will significantly
> reduce performance. Consider thousands of splits, the akka thread of
> JobMaster will all on the serialization of conf. That may will lead to
> various akka timeouts too.
> So the reason of your job failure may be that the JobMaster is busy
> serializing the configuration.
> I have created a patch to solve it, you can take a look and try.[2]
>
> [1] https://issues.apache.org/jira/browse/FLINK-14722
> [2]
>
> https://github.com/JingsongLi/flink/commit/90c021ab8e7a175c6644c345e63383d828c415d7
>
> Best,
> Jingsong Lee
>
> On Tue, Nov 12, 2019 at 6:49 PM Dominik Wosiński <[hidden email]> wrote:
>
> > Hey,
> > I have increased the `akka.client.timeout` but it has not helped at all.
> > Here is the log with callstack for AskTimeoutException:
> >
> > 019-11-12 10:19:17,425 INFO
> > org.apache.flink.runtime.dispatcher.StandaloneDispatcher      -
> > Received JobGraph submission 81fbbc3f41ad5e08ac832d0e656478bc (Flink
> > Java Job at Tue Nov 12 10:04:19 UTC 2019).
> > 2019-11-12 10:19:17,425 INFO
> > org.apache.flink.runtime.dispatcher.StandaloneDispatcher      -
> > Submitting job 81fbbc3f41ad5e08ac832d0e656478bc (Flink Java Job at Tue
> > Nov 12 10:04:19 UTC 2019).
> > 2019-11-12 10:19:17,444 INFO
> > org.apache.flink.runtime.rpc.akka.AkkaRpcService              -
> > Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster
> > at akka://flink/user/jobmanager_0 .
> > 2019-11-12 10:19:17,452 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster                  -
> > Initializing job Flink Java Job at Tue Nov 12 10:04:19 UTC 2019
> > (81fbbc3f41ad5e08ac832d0e656478bc).
> > 2019-11-12 10:19:17,477 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster                  - Using
> > restart strategy NoRestartStrategy for Flink Java Job at Tue Nov 12
> > 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc).
> > 2019-11-12 10:19:17,495 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> > recovers via failover strategy: full graph restart
> > 2019-11-12 10:19:17,513 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster                  -
> > Running initialization on master for job Flink Java Job at Tue Nov 12
> > 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc).
> > 2019-11-12 10:19:17,777 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster                  -
> > Successfully ran initialization on master in 264 ms.
> > 2019-11-12 10:19:27,442 ERROR
> > org.apache.flink.runtime.rest.handler.job.JobSubmitHandler    -
> > Unhandled exception.
> > akka.pattern.AskTimeoutException: Ask timed out on
> > [Actor[akka://flink/user/dispatcher#-1507147991]] after [10000 ms].
> > Sender[null] sent message of type
> > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> >         at
> >
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
> >         at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> >         at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> >         at
> >
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> >         at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> >         at
> >
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
> >         at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
> >         at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
> >         at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
> >         at java.lang.Thread.run(Thread.java:748)
> >
> >
> > Thanks in Advance,
> >
> > Best Regards,
> >
> > Dom
> >
> >
> > wt., 12 lis 2019 o 03:26 tison <[hidden email]> napisał(a):
> >
> > > I suspect you suffer from Client submission failure which also throws
> > > AskTimeoutException.
> > >
> > > The related configure option are `akka.client.timeout` which you can
> > > increase. However, there
> > > was some cases you can resolve the problem by upgrading Java to latest
> > > minimum version 8u212
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Zhu Zhu <[hidden email]> 于2019年11月11日周一 下午6:03写道:
> > >
> > >> Hi Dominik,
> > >>
> > >> Would you check whether the JM GC status?
> > >> One possible cause is that the large number of file metas
> > >> inHadoopInputFormat is burdening the JM memory.
> > >>
> > >> `akka.ask.timeout` is the default RPC timeout, while some RPCs may
> > >> override
> > >> this timeout for their own purpose. e.g. the RPCs from web usually use
> > >> `web.timeout`.
> > >> Providing the detailed call stack of the AskTimeoutException may help
> to
> > >> identify where this timeout happened.
> > >>
> > >> Thanks,
> > >> Zhu Zhu
> > >>
> > >> Dominik Wosiński <[hidden email]> 于2019年11月11日周一 上午3:35写道:
> > >>
> > >> > Hey,
> > >> > I have a very specific use case. I have a history of records stored
> as
> > >> > Parquet in S3. I would like to read and process them with Flink. The
> > >> issue
> > >> > is that the number of files is quite large ( >100k). If I provide
> the
> > >> full
> > >> > list of files to HadoopInputFormat that I am using it will fail with
> > >> > AskTimeoutException, which Is weird since I am using YARN and
> setting
> > >> the
> > >> > -yD akka.ask.timeout=600s, even thought according to the logs the
> > >> setting
> > >> > is processed properly, the job execution still with
> > AskTimeoutException
> > >> > after 10s, which seems weird to me. I have managed to go around
> this,
> > by
> > >> > grouping files and reading them in a loop, so that finally I have
> the
> > >> > Seq[DataSet<Record>]. But if I try to union those datasets, then I
> > will
> > >> > receive the AskTimeoutException again. So my question is, what can
> be
> > >> the
> > >> > reason behind this exception being thrown and why is the setting
> > >> ignored,
> > >> > even if this is pared properly.
> > >> >
> > >> > I will be glad for any help.
> > >> >
> > >> > Best Regards,
> > >> > Dom.
> > >> >
> > >>
> > >
> >
>
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Read thousands of files with batch

Dominik Wosiński
In reply to this post by Jingsong Li
I have managed to locate the issue with timeout, changing `web.timeout` was
the solution. However, now I am getting the following error :

019-11-12 16:58:00,741 INFO
org.apache.parquet.hadoop.ParquetInputFormat                  - Total
input paths to process : 671
2019-11-12 16:58:04,878 INFO
org.apache.hadoop.mapreduce.lib.input.FileInputFormat         - Total
input files to process : 519
2019-11-12 16:58:04,878 INFO
org.apache.parquet.hadoop.ParquetInputFormat                  - Total
input paths to process : 519
2019-11-12 16:58:08,017 INFO
org.apache.hadoop.mapreduce.lib.input.FileInputFormat         - Total
input files to process : 382
2019-11-12 16:58:08,017 INFO
org.apache.parquet.hadoop.ParquetInputFormat                  - Total
input paths to process : 382
2019-11-12 16:58:12,277 INFO
org.apache.hadoop.mapreduce.lib.input.FileInputFormat         - Total
input files to process : 551
2019-11-12 16:58:12,277 INFO
org.apache.parquet.hadoop.ParquetInputFormat                  - Total
input paths to process : 551
2019-11-12 16:58:16,530 INFO
org.apache.hadoop.mapreduce.lib.input.FileInputFormat         - Total
input files to process : 507
2019-11-12 16:58:16,530 INFO
org.apache.parquet.hadoop.ParquetInputFormat                  - Total
input paths to process : 507
2019-11-12 16:58:20,080 INFO
org.apache.hadoop.mapreduce.lib.input.FileInputFormat         - Total
input files to process : 478
2019-11-12 16:58:20,080 INFO
org.apache.parquet.hadoop.ParquetInputFormat                  - Total
input paths to process : 478
2019-11-12 16:58:23,736 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      -
Received JobGraph submission f0cd12c8e5e9d7e95cfc2f685c451089 (Flink
Java Job at Tue Nov 12 16:40:19 UTC 2019).
2019-11-12 16:58:23,738 ERROR
org.apache.flink.runtime.rest.handler.job.JobSubmitHandler    -
Unhandled exception.
org.apache.flink.runtime.client.JobSubmissionException: Job has
already been submitted.
        at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:268)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



Which also seems weird, since I only submit job once.