[jira] [Created] (FLINK-3101) Flink Kafka consumer crashes with NPE when it sees deleted record

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-3101) Flink Kafka consumer crashes with NPE when it sees deleted record

Shang Yuanchun (Jira)
Sanjar Akhmedov created FLINK-3101:
--------------------------------------

             Summary: Flink Kafka consumer crashes with NPE when it sees deleted record
                 Key: FLINK-3101
                 URL: https://issues.apache.org/jira/browse/FLINK-3101
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector
    Affects Versions: 0.10.1, 1.0.0
         Environment: Apache Flink 0.10.1 binary for Hadoop 2.6.0 with Scala 2.10.
            Reporter: Sanjar Akhmedov


Kafka allows a records to be deleted from the log by sending a record with key and null payload. Consumers still can see those null values (deletes) before they are compacted (delete retention point).

Flink Kafka consumer crashes with NPE when it sees such record:
{noformat}
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception
        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:443)
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)