chaiyongqiang created FLINK-14719:
-------------------------------------
Summary: Making Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API
Key: FLINK-14719
URL:
https://issues.apache.org/jira/browse/FLINK-14719 Project: Flink
Issue Type: Improvement
Components: Connectors / Kafka
Affects Versions: 1.8.0
Reporter: chaiyongqiang
Flink supports kafka transaction with FlinkKafkaProducer and FlinkKafkaProducer011 . When we use Datastream API , it's able to realize exactly once semantic . But when we use Table API, things are different.
The createKafkaProducer method in KafkaTableSink is used to create FlinkKafkaProducer to sending messages to Kafka server. It's like :
{code:java}
protected SinkFunction<Row> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer<>(
topic,
new KeyedSerializationSchemaWrapper<>(serializationSchema),
properties,
partitioner);
}
{code}
when we get into the constructor of FlinkKafkaProducer we can see this will lead to an at_least_once semantic producer :
{code:java}
public FlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
this(
defaultTopicId,
serializationSchema,
producerConfig,
customPartitioner,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}
{code}
This makes user could not achieve exactly-once semantic when using Table API.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)