[jira] [Created] (FLINK-14528) Add a Constructor for FlinkKafkaProducer

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-14528) Add a Constructor for FlinkKafkaProducer

Shang Yuanchun (Jira)
Peng created FLINK-14528:
----------------------------

             Summary: Add a Constructor for FlinkKafkaProducer
                 Key: FLINK-14528
                 URL: https://issues.apache.org/jira/browse/FLINK-14528
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
    Affects Versions: 1.9.0
            Reporter: Peng


In flink 1.9.0, defaultTopic param is required for FlinkKafkaProducer constructor.

In fact, if I use the below constructor, it is not necessary. Furthermore, it is confused for developer to specify different topic name.
{code:java}
public FlinkKafkaProducer( String defaultTopic,
                           KafkaSerializationSchema<IN> serializationSchema,
                           Properties producerConfig,
                           FlinkKafkaProducer.Semantic semantic)
{code}
For example, set topic name to bar in the constructor.
{code:java}
input.addSink( new FlinkKafkaProducer<>(
                                     "bar",
                                     new KafkaSerializationSchemaImpl(),
                                     properties,
                                     FlinkKafkaProducer.Semantic.AT_LEAST_ONCE));
{code}
But actually send records to topic baz from KafkaSerializationSchema. 
{code:java}
public class KafkaSerializationSchemaImpl implements KafkaSerializationSchema<KafkaEvent>
{   
    @Override   
    public ProducerRecord<byte[], byte[]> serialize(KafkaEvent event, @Nullable Long timestamp) {
        return new ProducerRecord<>("baz", event.toString().getBytes());
    }
}
{code}
So I suggest add a new constructor like below.
{code:java}
public FlinkKafkaProducer( KafkaSerializationSchema<IN> serializationSchema,
                           Properties producerConfig,
                           FlinkKafkaProducer.Semantic semantic)
{code}
It is my humble opinion, please correct me, thanks in advance.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)