Hi ,
I am always facing this issue with Flink job on yarn. Basically I am reading data from kafka, transforming it & putting in kafka only. My build.sbt is: val flinkVersion = "1.3.2" val flinkKafkaConnect = "0.10.2" libraryDependencies ++= Seq( "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, "org.apache.flink" %% "flink-table" % flinkVersion, "org.json4s" %% "json4s-native" % "3.5.3", "org.json4s" %% "json4s-jackson" % "3.5.3" ) *Note: One of the node in our kafka Cluster goes down.* java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. java:289) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. java:173) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve. java:108) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor. java:188) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask. java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask. java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. java:51) at org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector. scala:58) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction. scala:75) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. scala:65) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. scala:36) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction. java:44) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator. java:597) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator. java:504) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService. java:275) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager. java:107) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator. java:946) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. java:286) ... 7 more Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. scala:28) at DataStreamCalcRule$127.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. scala:67) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. scala:35) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator. java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 23 more Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap. java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 35 more Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. java:51) at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. scala:622) at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. scala:622) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream. scala:622) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap. java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 41 more Caused by: java.lang.Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase. java:373) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 .invokeInternal(FlinkKafkaProducer010.java:302) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 .invoke(FlinkKafkaProducer010.java:407) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink. java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 52 more Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2017-12-20 05:42:16,008 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state RUNNING to FAILING. java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. java:289) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. java:173) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve. java:108) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor. java:188) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask. java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask. java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. java:51) at org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector. scala:58) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction. scala:75) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. scala:65) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. scala:36) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction. java:44) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator. java:597) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator. java:504) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService. java:275) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager. java:107) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator. java:946) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. java:286) ... 7 more Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. scala:28) at DataStreamCalcRule$127.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. scala:67) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. scala:35) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator. java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 23 more Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap. java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 35 more Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. java:51) at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. scala:622) at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. scala:622) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream. scala:622) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap. java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 41 more Caused by: java.lang.Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase. java:373) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 .invokeInternal(FlinkKafkaProducer010.java:302) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 .invoke(FlinkKafkaProducer010.java:407) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink. java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 52 more Thanks -- Shivam Sharma Data Engineer @ Goibibo Indian Institute Of Information Technology, Design and Manufacturing Jabalpur Mobile No- (+91) 8882114744 Email:- [hidden email] LinkedIn:-*https://www.linkedin.com/in/28shivamsharma <https://www.linkedin.com/in/28shivamsharma>* |
Hi,
did you see that the problem starts from a Kafka exception „Failed to send data to Kafka: This server is not the leader for that topic-partition.“? Is it possible that you had a network issue and the producer could not find the leader broker? Best, Stefan > Am 20.12.2017 um 10:57 schrieb Shivam Sharma <[hidden email]>: > > Hi , > > I am always facing this issue with Flink job on yarn. > Basically I am reading data from kafka, transforming it & putting in kafka > only. > > My build.sbt is: > > val flinkVersion = "1.3.2" > val flinkKafkaConnect = "0.10.2" > > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, > "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, > "org.apache.flink" %% "flink-table" % flinkVersion, > "org.json4s" %% "json4s-native" % "3.5.3", > "org.json4s" %% "json4s-jackson" % "3.5.3" > > ) > > *Note: One of the node in our kafka Cluster goes down.* > > > java.lang.RuntimeException: Exception occurred while processing valve > output watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > java:289) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. > java:173) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve. > java:108) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor. > java:188) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask. > java:69) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask. > java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. > java:51) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector. > scala:58) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction. > scala:75) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. > scala:65) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. > scala:36) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction. > java:44) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator. > java:597) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator. > java:504) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService. > java:275) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager. > java:107) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator. > java:946) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > java:286) > ... 7 more > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. > java:51) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. > scala:37) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. > scala:28) > at DataStreamCalcRule$127.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. > scala:67) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. > scala:35) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator. > java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 23 more > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap. > java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 35 more > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. > java:51) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. > scala:622) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. > scala:622) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream. > scala:622) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap. > java:50) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 41 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server > is not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase. > java:373) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > .invokeInternal(FlinkKafkaProducer010.java:302) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > .invoke(FlinkKafkaProducer010.java:407) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink. > java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 52 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > 2017-12-20 05:42:16,008 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state RUNNING > to FAILING. > java.lang.RuntimeException: Exception occurred while processing valve > output watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > java:289) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. > java:173) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve. > java:108) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor. > java:188) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask. > java:69) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask. > java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. > java:51) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector. > scala:58) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction. > scala:75) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. > scala:65) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. > scala:36) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction. > java:44) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator. > java:597) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator. > java:504) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService. > java:275) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager. > java:107) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator. > java:946) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > java:286) > ... 7 more > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. > java:51) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. > scala:37) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. > scala:28) > at DataStreamCalcRule$127.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. > scala:67) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. > scala:35) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator. > java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 23 more > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap. > java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 35 more > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:530) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. > java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. > java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. > java:51) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. > scala:622) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. > scala:622) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream. > scala:622) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap. > java:50) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 41 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server > is not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase. > java:373) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > .invokeInternal(FlinkKafkaProducer010.java:302) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > .invoke(FlinkKafkaProducer010.java:407) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink. > java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. > java:528) > ... 52 more > > > Thanks > > -- > Shivam Sharma > Data Engineer @ Goibibo > Indian Institute Of Information Technology, Design and Manufacturing > Jabalpur > Mobile No- (+91) 8882114744 > Email:- [hidden email] > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > <https://www.linkedin.com/in/28shivamsharma>* |
Hi Stefan,
Kafka one node was down. But I want it to restart automatically . How can I solve it? Thanks On Wed, Dec 20, 2017 at 4:24 PM, Stefan Richter <[hidden email] > wrote: > Hi, > > did you see that the problem starts from a Kafka exception „Failed to send > data to Kafka: This server is not the leader for that topic-partition.“? Is > it possible that you had a network issue and the producer could not find > the leader broker? > > Best, > Stefan > > > Am 20.12.2017 um 10:57 schrieb Shivam Sharma <[hidden email]>: > > > > Hi , > > > > I am always facing this issue with Flink job on yarn. > > Basically I am reading data from kafka, transforming it & putting in > kafka > > only. > > > > My build.sbt is: > > > > val flinkVersion = "1.3.2" > > val flinkKafkaConnect = "0.10.2" > > > > libraryDependencies ++= Seq( > > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, > > "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, > > "org.apache.flink" %% "flink-table" % flinkVersion, > > "org.json4s" %% "json4s-native" % "3.5.3", > > "org.json4s" %% "json4s-jackson" % "3.5.3" > > > > ) > > > > *Note: One of the node in our kafka Cluster goes down.* > > > > > > java.lang.RuntimeException: Exception occurred while processing valve > > output watermark: > > at > > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ > ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > > java:289) > > at > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. > findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. > > java:173) > > at > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. > inputWatermark(StatusWatermarkValve. > > java:108) > > at > > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput( > StreamInputProcessor. > > java:188) > > at > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( > OneInputStreamTask. > > java:69) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask. > > java:263) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector. > > java:51) > > at > > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector. > collect(TimeWindowPropertyCollector. > > scala:58) > > at > > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc > tion.apply(IncrementalAggregateWindowFunction. > > scala:75) > > at > > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow > Function.apply(IncrementalAggregateTimeWindowFunction. > > scala:65) > > at > > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow > Function.apply(IncrementalAggregateTimeWindowFunction. > > scala:36) > > at > > org.apache.flink.streaming.runtime.operators.windowing.functions. > InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct > ion. > > java:44) > > at > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator. > emitWindowContents(WindowOperator. > > java:597) > > at > > org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.onEventTime(WindowOperator. > > java:504) > > at > > org.apache.flink.streaming.api.operators.HeapInternalTimerService. > advanceWatermark(HeapInternalTimerService. > > java:275) > > at > > org.apache.flink.streaming.api.operators.InternalTimeServiceManager. > advanceWatermark(InternalTimeServiceManager. > > java:107) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator. > processWatermark(AbstractStreamOperator. > > java:946) > > at > > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ > ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > > java:286) > > ... 7 more > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector. > > java:51) > > at > > org.apache.flink.table.runtime.CRowWrappingCollector. > collect(CRowWrappingCollector. > > scala:37) > > at > > org.apache.flink.table.runtime.CRowWrappingCollector. > collect(CRowWrappingCollector. > > scala:28) > > at DataStreamCalcRule$127.processElement(Unknown Source) > > at > > org.apache.flink.table.runtime.CRowProcessRunner.processElement( > CRowProcessRunner. > > scala:67) > > at > > org.apache.flink.table.runtime.CRowProcessRunner.processElement( > CRowProcessRunner. > > scala:35) > > at > > org.apache.flink.streaming.api.operators.ProcessOperator.processElement( > ProcessOperator. > > java:66) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 23 more > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.StreamMap. > processElement(StreamMap. > > java:41) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 35 more > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector. > > java:51) > > at > > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ > anonfun$flatMap$1.apply(DataStream. > > scala:622) > > at > > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ > anonfun$flatMap$1.apply(DataStream. > > scala:622) > > at scala.collection.immutable.List.foreach(List.scala:381) > > at > > org.apache.flink.streaming.api.scala.DataStream$$anon$6. > flatMap(DataStream. > > scala:622) > > at > > org.apache.flink.streaming.api.operators.StreamFlatMap. > processElement(StreamFlatMap. > > java:50) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 41 more > > Caused by: java.lang.Exception: Failed to send data to Kafka: This server > > is not the leader for that topic-partition. > > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase. > checkErroneous(FlinkKafkaProducerBase. > > java:373) > > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > > .invokeInternal(FlinkKafkaProducer010.java:302) > > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > > .invoke(FlinkKafkaProducer010.java:407) > > at > > org.apache.flink.streaming.api.operators.StreamSink. > processElement(StreamSink. > > java:41) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 52 more > > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException > : > > This server is not the leader for that topic-partition. > > 2017-12-20 05:42:16,008 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > > Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state > RUNNING > > to FAILING. > > java.lang.RuntimeException: Exception occurred while processing valve > > output watermark: > > at > > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ > ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > > java:289) > > at > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. > findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. > > java:173) > > at > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. > inputWatermark(StatusWatermarkValve. > > java:108) > > at > > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput( > StreamInputProcessor. > > java:188) > > at > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( > OneInputStreamTask. > > java:69) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask. > > java:263) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector. > > java:51) > > at > > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector. > collect(TimeWindowPropertyCollector. > > scala:58) > > at > > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc > tion.apply(IncrementalAggregateWindowFunction. > > scala:75) > > at > > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow > Function.apply(IncrementalAggregateTimeWindowFunction. > > scala:65) > > at > > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow > Function.apply(IncrementalAggregateTimeWindowFunction. > > scala:36) > > at > > org.apache.flink.streaming.runtime.operators.windowing.functions. > InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct > ion. > > java:44) > > at > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator. > emitWindowContents(WindowOperator. > > java:597) > > at > > org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.onEventTime(WindowOperator. > > java:504) > > at > > org.apache.flink.streaming.api.operators.HeapInternalTimerService. > advanceWatermark(HeapInternalTimerService. > > java:275) > > at > > org.apache.flink.streaming.api.operators.InternalTimeServiceManager. > advanceWatermark(InternalTimeServiceManager. > > java:107) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator. > processWatermark(AbstractStreamOperator. > > java:946) > > at > > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ > ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > > java:286) > > ... 7 more > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector. > > java:51) > > at > > org.apache.flink.table.runtime.CRowWrappingCollector. > collect(CRowWrappingCollector. > > scala:37) > > at > > org.apache.flink.table.runtime.CRowWrappingCollector. > collect(CRowWrappingCollector. > > scala:28) > > at DataStreamCalcRule$127.processElement(Unknown Source) > > at > > org.apache.flink.table.runtime.CRowProcessRunner.processElement( > CRowProcessRunner. > > scala:67) > > at > > org.apache.flink.table.runtime.CRowProcessRunner.processElement( > CRowProcessRunner. > > scala:35) > > at > > org.apache.flink.streaming.api.operators.ProcessOperator.processElement( > ProcessOperator. > > java:66) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 23 more > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.StreamMap. > processElement(StreamMap. > > java:41) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 35 more > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector. > > java:51) > > at > > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ > anonfun$flatMap$1.apply(DataStream. > > scala:622) > > at > > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ > anonfun$flatMap$1.apply(DataStream. > > scala:622) > > at scala.collection.immutable.List.foreach(List.scala:381) > > at > > org.apache.flink.streaming.api.scala.DataStream$$anon$6. > flatMap(DataStream. > > scala:622) > > at > > org.apache.flink.streaming.api.operators.StreamFlatMap. > processElement(StreamFlatMap. > > java:50) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 41 more > > Caused by: java.lang.Exception: Failed to send data to Kafka: This server > > is not the leader for that topic-partition. > > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase. > checkErroneous(FlinkKafkaProducerBase. > > java:373) > > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > > .invokeInternal(FlinkKafkaProducer010.java:302) > > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > > .invoke(FlinkKafkaProducer010.java:407) > > at > > org.apache.flink.streaming.api.operators.StreamSink. > processElement(StreamSink. > > java:41) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 52 more > > > > > > Thanks > > > > -- > > Shivam Sharma > > Data Engineer @ Goibibo > > Indian Institute Of Information Technology, Design and Manufacturing > > Jabalpur > > Mobile No- (+91) 8882114744 > > Email:- [hidden email] > > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > > <https://www.linkedin.com/in/28shivamsharma>* > > -- Shivam Sharma Data Engineer @ Goibibo Indian Institute Of Information Technology, Design and Manufacturing Jabalpur Mobile No- (+91) 8882114744 Email:- [hidden email] LinkedIn:-*https://www.linkedin.com/in/28shivamsharma <https://www.linkedin.com/in/28shivamsharma>* |
Hi,
at this point, this seems more like a Kafka question than a Flink question. You think you need to configure high availability for Kafka with Zookeeper, you can probably find more about this here: https://www.cloudera.com/documentation/kafka/latest/topics/kafka_ha.html <https://www.cloudera.com/documentation/kafka/latest/topics/kafka_ha.html>. Best, Stefan > Am 20.12.2017 um 12:06 schrieb Shivam Sharma <[hidden email]>: > > Hi Stefan, > > Kafka one node was down. But I want it to restart automatically . How can I > solve it? > > Thanks > > On Wed, Dec 20, 2017 at 4:24 PM, Stefan Richter <[hidden email] >> wrote: > >> Hi, >> >> did you see that the problem starts from a Kafka exception „Failed to send >> data to Kafka: This server is not the leader for that topic-partition.“? Is >> it possible that you had a network issue and the producer could not find >> the leader broker? >> >> Best, >> Stefan >> >>> Am 20.12.2017 um 10:57 schrieb Shivam Sharma <[hidden email]>: >>> >>> Hi , >>> >>> I am always facing this issue with Flink job on yarn. >>> Basically I am reading data from kafka, transforming it & putting in >> kafka >>> only. >>> >>> My build.sbt is: >>> >>> val flinkVersion = "1.3.2" >>> val flinkKafkaConnect = "0.10.2" >>> >>> libraryDependencies ++= Seq( >>> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, >>> "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, >>> "org.apache.flink" %% "flink-table" % flinkVersion, >>> "org.json4s" %% "json4s-native" % "3.5.3", >>> "org.json4s" %% "json4s-jackson" % "3.5.3" >>> >>> ) >>> >>> *Note: One of the node in our kafka Cluster goes down.* >>> >>> >>> java.lang.RuntimeException: Exception occurred while processing valve >>> output watermark: >>> at >>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ >> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. >>> java:289) >>> at >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. >> findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. >>> java:173) >>> at >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. >> inputWatermark(StatusWatermarkValve. >>> java:108) >>> at >>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput( >> StreamInputProcessor. >>> java:188) >>> at >>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( >> OneInputStreamTask. >>> java:69) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask. >> invoke(StreamTask. >>> java:263) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect( >> TimestampedCollector. >>> java:51) >>> at >>> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector. >> collect(TimeWindowPropertyCollector. >>> scala:58) >>> at >>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc >> tion.apply(IncrementalAggregateWindowFunction. >>> scala:75) >>> at >>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow >> Function.apply(IncrementalAggregateTimeWindowFunction. >>> scala:65) >>> at >>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow >> Function.apply(IncrementalAggregateTimeWindowFunction. >>> scala:36) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.functions. >> InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct >> ion. >>> java:44) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator. >> emitWindowContents(WindowOperator. >>> java:597) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing. >> WindowOperator.onEventTime(WindowOperator. >>> java:504) >>> at >>> org.apache.flink.streaming.api.operators.HeapInternalTimerService. >> advanceWatermark(HeapInternalTimerService. >>> java:275) >>> at >>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager. >> advanceWatermark(InternalTimeServiceManager. >>> java:107) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator. >> processWatermark(AbstractStreamOperator. >>> java:946) >>> at >>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ >> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. >>> java:286) >>> ... 7 more >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect( >> TimestampedCollector. >>> java:51) >>> at >>> org.apache.flink.table.runtime.CRowWrappingCollector. >> collect(CRowWrappingCollector. >>> scala:37) >>> at >>> org.apache.flink.table.runtime.CRowWrappingCollector. >> collect(CRowWrappingCollector. >>> scala:28) >>> at DataStreamCalcRule$127.processElement(Unknown Source) >>> at >>> org.apache.flink.table.runtime.CRowProcessRunner.processElement( >> CRowProcessRunner. >>> scala:67) >>> at >>> org.apache.flink.table.runtime.CRowProcessRunner.processElement( >> CRowProcessRunner. >>> scala:35) >>> at >>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement( >> ProcessOperator. >>> java:66) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 23 more >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.StreamMap. >> processElement(StreamMap. >>> java:41) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 35 more >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect( >> TimestampedCollector. >>> java:51) >>> at >>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ >> anonfun$flatMap$1.apply(DataStream. >>> scala:622) >>> at >>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ >> anonfun$flatMap$1.apply(DataStream. >>> scala:622) >>> at scala.collection.immutable.List.foreach(List.scala:381) >>> at >>> org.apache.flink.streaming.api.scala.DataStream$$anon$6. >> flatMap(DataStream. >>> scala:622) >>> at >>> org.apache.flink.streaming.api.operators.StreamFlatMap. >> processElement(StreamFlatMap. >>> java:50) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 41 more >>> Caused by: java.lang.Exception: Failed to send data to Kafka: This server >>> is not the leader for that topic-partition. >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase. >> checkErroneous(FlinkKafkaProducerBase. >>> java:373) >>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 >>> .invokeInternal(FlinkKafkaProducer010.java:302) >>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 >>> .invoke(FlinkKafkaProducer010.java:407) >>> at >>> org.apache.flink.streaming.api.operators.StreamSink. >> processElement(StreamSink. >>> java:41) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 52 more >>> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException >> : >>> This server is not the leader for that topic-partition. >>> 2017-12-20 05:42:16,008 INFO >>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>> Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state >> RUNNING >>> to FAILING. >>> java.lang.RuntimeException: Exception occurred while processing valve >>> output watermark: >>> at >>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ >> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. >>> java:289) >>> at >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. >> findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. >>> java:173) >>> at >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. >> inputWatermark(StatusWatermarkValve. >>> java:108) >>> at >>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput( >> StreamInputProcessor. >>> java:188) >>> at >>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( >> OneInputStreamTask. >>> java:69) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask. >> invoke(StreamTask. >>> java:263) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect( >> TimestampedCollector. >>> java:51) >>> at >>> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector. >> collect(TimeWindowPropertyCollector. >>> scala:58) >>> at >>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc >> tion.apply(IncrementalAggregateWindowFunction. >>> scala:75) >>> at >>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow >> Function.apply(IncrementalAggregateTimeWindowFunction. >>> scala:65) >>> at >>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow >> Function.apply(IncrementalAggregateTimeWindowFunction. >>> scala:36) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.functions. >> InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct >> ion. >>> java:44) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator. >> emitWindowContents(WindowOperator. >>> java:597) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing. >> WindowOperator.onEventTime(WindowOperator. >>> java:504) >>> at >>> org.apache.flink.streaming.api.operators.HeapInternalTimerService. >> advanceWatermark(HeapInternalTimerService. >>> java:275) >>> at >>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager. >> advanceWatermark(InternalTimeServiceManager. >>> java:107) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator. >> processWatermark(AbstractStreamOperator. >>> java:946) >>> at >>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ >> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. >>> java:286) >>> ... 7 more >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect( >> TimestampedCollector. >>> java:51) >>> at >>> org.apache.flink.table.runtime.CRowWrappingCollector. >> collect(CRowWrappingCollector. >>> scala:37) >>> at >>> org.apache.flink.table.runtime.CRowWrappingCollector. >> collect(CRowWrappingCollector. >>> scala:28) >>> at DataStreamCalcRule$127.processElement(Unknown Source) >>> at >>> org.apache.flink.table.runtime.CRowProcessRunner.processElement( >> CRowProcessRunner. >>> scala:67) >>> at >>> org.apache.flink.table.runtime.CRowProcessRunner.processElement( >> CRowProcessRunner. >>> scala:35) >>> at >>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement( >> ProcessOperator. >>> java:66) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 23 more >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.StreamMap. >> processElement(StreamMap. >>> java:41) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 35 more >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect( >> TimestampedCollector. >>> java:51) >>> at >>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ >> anonfun$flatMap$1.apply(DataStream. >>> scala:622) >>> at >>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ >> anonfun$flatMap$1.apply(DataStream. >>> scala:622) >>> at scala.collection.immutable.List.foreach(List.scala:381) >>> at >>> org.apache.flink.streaming.api.scala.DataStream$$anon$6. >> flatMap(DataStream. >>> scala:622) >>> at >>> org.apache.flink.streaming.api.operators.StreamFlatMap. >> processElement(StreamFlatMap. >>> java:50) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 41 more >>> Caused by: java.lang.Exception: Failed to send data to Kafka: This server >>> is not the leader for that topic-partition. >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase. >> checkErroneous(FlinkKafkaProducerBase. >>> java:373) >>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 >>> .invokeInternal(FlinkKafkaProducer010.java:302) >>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 >>> .invoke(FlinkKafkaProducer010.java:407) >>> at >>> org.apache.flink.streaming.api.operators.StreamSink. >> processElement(StreamSink. >>> java:41) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 52 more >>> >>> >>> Thanks >>> >>> -- >>> Shivam Sharma >>> Data Engineer @ Goibibo >>> Indian Institute Of Information Technology, Design and Manufacturing >>> Jabalpur >>> Mobile No- (+91) 8882114744 >>> Email:- [hidden email] >>> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma >>> <https://www.linkedin.com/in/28shivamsharma>* >> >> > > > -- > Shivam Sharma > Data Engineer @ Goibibo > Indian Institute Of Information Technology, Design and Manufacturing > Jabalpur > Mobile No- (+91) 8882114744 > Email:- [hidden email] > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > <https://www.linkedin.com/in/28shivamsharma>* |
Free forum by Nabble | Edit this page |