mzz created FLINK-18575:
--------------------------- Summary: Failed to send data to Kafka Key: FLINK-18575 URL: https://issues.apache.org/jira/browse/FLINK-18575 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.10.0 Reporter: mzz Flink version: 1.10.0 Kafka version: 2.2 *code:* {code:java} private def producerKafka(aggs_result: DataStream[String], topic: String, parallelism: Int) = { val kafkaPro = new Properties() kafkaPro.setProperty("bootstrap.servers", SINK_BROKERS) kafkaPro.setProperty("zookeeper.connect", SINK_ZK) kafkaPro.setProperty("request.timeout.ms", "10000") kafkaPro.setProperty("compression.type", "snappy") kafkaPro.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "") // 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试: kafkaPro.setProperty(ProducerConfig.RETRIES_CONFIG, "5") val kafka = new FlinkKafkaProducer[String](topic, new ResultDtSerialization(topic), kafkaPro, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE) aggs_result.addSink(kafka).setParallelism(parallelism) } {code} *when i use this code to produce to kafka ,its report a Error : *{code:java} 2020-07-13 10:25:47,624 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator. org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Pending record count must be zero at this point: 1 at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Pending record count must be zero at this point: 1 at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834) ... 8 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |