indraneel r created FLINK-11942:
----------------------------------- Summary: Flink kinesis connector throws kinesis producer daemon fatalError Key: FLINK-11942 URL: https://issues.apache.org/jira/browse/FLINK-11942 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.7.2 Reporter: indraneel r Flink connector crashes repeatedly with following error: ``` 437062 [kpl-callback-pool-28-thread-0] WARN org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer - An exception occurred while processing a record java.lang.RuntimeException: Unexpected error at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:513) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:183) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction$$anonfun$process$2.apply(SessionProcessingFunction.scala:37) at com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction$$anonfun$process$2.apply(SessionProcessingFunction.scala:33) at scala.collection.immutable.Stream.foreach(Stream.scala:594) at com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction.process(SessionProcessingFunction.scala:33) at com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction.process(SessionProcessingFunction.scala:13) at org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220) at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:339) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:181) ... 30 more 448476 [Window(EventTimeSessionWindows(1800000), ContinuousEventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (7/8)] INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer - Closing producer 448477 [Window(EventTimeSessionWindows(1800000), ContinuousEventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (7/8)] INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer - Flushing outstanding 294 records ``` -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |