[jira] [Created] (FLINK-22211) DataStream.collect() logs warnings if job is not initialized yet

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-22211) DataStream.collect() logs warnings if job is not initialized yet

Shang Yuanchun (Jira)
Chesnay Schepler created FLINK-22211:
----------------------------------------

             Summary: DataStream.collect() logs warnings if job is not initialized yet
                 Key: FLINK-22211
                 URL: https://issues.apache.org/jira/browse/FLINK-22211
             Project: Flink
          Issue Type: Sub-task
          Components: Client / Job Submission
            Reporter: Chesnay Schepler
            Assignee: Chesnay Schepler
             Fix For: 1.13.0


When using {{DataStream.collect()}} we always have an excpetion in the log for the first fetch attempt, before the JM is ready.
The loop retries and the program succeeds, but the exception in the log raises confusion about whether there is a swallowed but impactful error.

{code}
7199 [main] WARN  org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An exception occurs when fetching query results
java.util.concurrent.ExecutionException: org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException: Unable to get JobMasterGateway for initializing job. The requested operation is not available while the JobManager is initializing.
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
        at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:155) ~[classes/:?]
        at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126) [classes/:?]
        at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) [classes/:?]
        at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) [classes/:?]
        at org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1320) [classes/:?]
        at org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1303) [classes/:?]
        at org.apache.flink.runtime.operators.coordination.OperatorEventSendingCheckpointITCase.testLostOperatorEventsLeadsToRecovery(OperatorEventSendingCheckpointITCase.java:88) [test-classes/:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) [junit-4.12.jar:4.12]
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) [junit-4.12.jar:4.12]
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) [junit-4.12.jar:4.12]
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) [junit-4.12.jar:4.12]
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) [junit-4.12.jar:4.12]
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) [junit-4.12.jar:4.12]
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) [junit-4.12.jar:4.12]
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) [junit-4.12.jar:4.12]
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) [junit-4.12.jar:4.12]
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) [junit-4.12.jar:4.12]
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) [junit-4.12.jar:4.12]
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) [junit-4.12.jar:4.12]
        at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) [junit-4.12.jar:4.12]
        at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) [junit-4.12.jar:4.12]
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363) [junit-4.12.jar:4.12]
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137) [junit-4.12.jar:4.12]
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) [junit-rt.jar:?]
        at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) [junit-rt.jar:?]
        at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220) [junit-rt.jar:?]
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53) [junit-rt.jar:?]
Caused by: org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException: Unable to get JobMasterGateway for initializing job. The requested operation is not available while the JobManager is initializing.
        at org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:892) ~[classes/:?]
        at org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:902) ~[classes/:?]
        at org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:724) ~[classes/:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[classes/:?]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[classes/:?]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[classes/:?]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[classes/:?]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[scala-library-2.11.12.jar:?]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[scala-library-2.11.12.jar:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:?]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[akka-actor_2.11-2.5.21.jar:2.5.21]
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)