Spongebob created FLINK-22190:
--------------------------------- Summary: no guarantee on Flink exactly_once sink to Kafka Key: FLINK-22190 URL: https://issues.apache.org/jira/browse/FLINK-22190 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.12.2 Environment: *flink: 1.12.2* *kafka: 2.7.0* Reporter: Spongebob When I tried to test the function of flink exactly_once sink to kafka, I found it can not run as expectation. here's the pipline of the flink applications: raw data(flink app0)-> kafka topic1 -> flink app1 -> kafka topic2 -> flink app2, flink tasks may met / byZeroException in random. Below shows the codes: {code:java} //代码占位符 raw data, flink app0: class SimpleSource1 extends SourceFunction[String] { var switch = true val students: Array[String] = Array("Tom", "Jerry", "Gory") override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = { var i = 0 while (switch) { sourceContext.collect(s"${students(Random.nextInt(students.length))},$i") i += 1 Thread.sleep(5000) } } override def cancel(): Unit = switch = false } val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment val dataStream = streamEnv.addSource(new SimpleSource1) dataStream.addSink(new FlinkKafkaProducer[String]("xfy:9092", "single-partition-topic-2", new SimpleStringSchema())) streamEnv.execute("sink kafka") flink-app1: val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE) val prop = new Properties() prop.setProperty("bootstrap.servers", "xfy:9092") prop.setProperty("group.id", "test") val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String]( "single-partition-topic-2", new SimpleStringSchema, prop )) val resultStream = dataStream.map(x => { val data = x.split(",") (data(0), data(1), data(1).toInt / Random.nextInt(5)).toString() } ) resultStream.print().setParallelism(1) val propProducer = new Properties() propProducer.setProperty("bootstrap.servers", "xfy:9092") propProducer.setProperty("transaction.timeout.ms", s"${1000 * 60 * 5}") resultStream.addSink(new FlinkKafkaProducer[String]( "single-partition-topic", new MyKafkaSerializationSchema("single-partition-topic"), propProducer, Semantic.EXACTLY_ONCE)) streamEnv.execute("sink kafka") flink-app2: val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment val prop = new Properties() prop.setProperty("bootstrap.servers", "xfy:9092") prop.setProperty("group.id", "test") prop.setProperty("isolation_level", "read_committed") val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String]( "single-partition-topic", new SimpleStringSchema, prop )) dataStream.print().setParallelism(1) streamEnv.execute("consumer kafka"){code} flink app1 will print some duplicate numbers, and to my expectation flink app2 will deduplicate them but the fact shows not. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |