[jira] [Created] (FLINK-14719) Making Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-14719) Making Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API

Shang Yuanchun (Jira)
chaiyongqiang created FLINK-14719:
-------------------------------------

             Summary: Making  Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API
                 Key: FLINK-14719
                 URL: https://issues.apache.org/jira/browse/FLINK-14719
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
    Affects Versions: 1.8.0
            Reporter: chaiyongqiang


Flink supports kafka transaction with FlinkKafkaProducer and FlinkKafkaProducer011 . When we use Datastream API , it's able to realize exactly once semantic .  But when we use Table API, things are different.

The createKafkaProducer method in KafkaTableSink is used to create FlinkKafkaProducer to sending messages to Kafka server.  It's like :


{code:java}
protected SinkFunction<Row> createKafkaProducer(
                String topic,
                Properties properties,
                SerializationSchema<Row> serializationSchema,
                Optional<FlinkKafkaPartitioner<Row>> partitioner) {
                return new FlinkKafkaProducer<>(
                        topic,
                        new KeyedSerializationSchemaWrapper<>(serializationSchema),
                        properties,
                        partitioner);
        }
{code}

when we get into the constructor of FlinkKafkaProducer we can see this will lead to an at_least_once semantic producer :


{code:java}
        public FlinkKafkaProducer(
                String defaultTopicId,
                KeyedSerializationSchema<IN> serializationSchema,
                Properties producerConfig,
                Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
                this(
                        defaultTopicId,
                        serializationSchema,
                        producerConfig,
                        customPartitioner,
                        FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
                        DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
        }
{code}

This makes user could not achieve exactly-once semantic when using Table API.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)