[jira] [Created] (FLINK-22452) Support specifying custom transactional.id prefix in FlinkKafkaProducer

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-22452) Support specifying custom transactional.id prefix in FlinkKafkaProducer

Shang Yuanchun (Jira)
Wenhao Ji created FLINK-22452:
---------------------------------

             Summary: Support specifying custom transactional.id prefix in FlinkKafkaProducer
                 Key: FLINK-22452
                 URL: https://issues.apache.org/jira/browse/FLINK-22452
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
    Affects Versions: 1.12.2
            Reporter: Wenhao Ji


Currently, the "transactional.id"s of the Kafka producers in FlinkKafkaProducer are generated based on the task name. This mechanism has some limitations:
 * It will exceed Kafka's limitation if the task name is too long. (resolved in FLINK-17691)
 * They will very likely clash each other if the job topologies are similar. (discussed in FLINK-11654)
 * Only certain "transactional.id" may be authorized by [Prefixed ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls] on the target Kafka cluster.

Besides, the spring community has introduced the [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)] method to their Kafka client.

Therefore, I think it will be necessary to have this feature in the Flink Kafka connector. 

 

As discussed in FLINK-11654, the possible solution will be,
 * either introduce an additional method called setTransactionalIdPrefix(String) in the FlinkKafkaProducer,
 * or use the existing "transactional.id" properties as the prefix.

 And the behavior of the "transactional.id" generation will be
 * keep the behavior as it was if absent,
 * use the one if present as the prefix for the TransactionalIdsGenerator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)