[jira] [Created] (FLINK-10721) kafkaFetcher runFetchLoop throw exception will cause follow-up code not execute in FlinkKafkaConsumerBase run method

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-10721) kafkaFetcher runFetchLoop throw exception will cause follow-up code not execute in FlinkKafkaConsumerBase run method

Shang Yuanchun (Jira)
zhaoshijie created FLINK-10721:
----------------------------------

             Summary: kafkaFetcher runFetchLoop throw exception will cause follow-up code not execute in FlinkKafkaConsumerBase run method
                 Key: FLINK-10721
                 URL: https://issues.apache.org/jira/browse/FLINK-10721
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector
    Affects Versions: 1.6.2
            Reporter: zhaoshijie


In FlinkKafkaConsumerBase run method on line 721(master branch), if kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw exception then finally execute cancel method, cancel method will execute kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute handover.close, then result in handover.pollNext throw ClosedException),then next code will not execute,especially discoveryLoopError not be throwed,so, real culprit exception will be Swallowed.
failed log like this:

{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

  at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229)
  at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
  at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
  at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
  at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180)
  at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695)
  at java.lang.Thread.run(Thread.java:745)
{code}

Shoud we modify it as follows?
{code:java}

{code}

try {
                                kafkaFetcher.runFetchLoop();
                        } catch (Exception e) {
                                // if discoveryLoopErrorRef not null ,we should throw real culprit exception
                                if (discoveryLoopErrorRef.get() != null){
                                        throw new RuntimeException(discoveryLoopErrorRef.get());
                                } else {
                                        throw e;
                                }
                        }




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)