Peng created FLINK-14528:
----------------------------
Summary: Add a Constructor for FlinkKafkaProducer
Key: FLINK-14528
URL:
https://issues.apache.org/jira/browse/FLINK-14528 Project: Flink
Issue Type: Improvement
Components: Connectors / Kafka
Affects Versions: 1.9.0
Reporter: Peng
In flink 1.9.0, defaultTopic param is required for FlinkKafkaProducer constructor.
In fact, if I use the below constructor, it is not necessary. Furthermore, it is confused for developer to specify different topic name.
{code:java}
public FlinkKafkaProducer( String defaultTopic,
KafkaSerializationSchema<IN> serializationSchema,
Properties producerConfig,
FlinkKafkaProducer.Semantic semantic)
{code}
For example, set topic name to bar in the constructor.
{code:java}
input.addSink( new FlinkKafkaProducer<>(
"bar",
new KafkaSerializationSchemaImpl(),
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE));
{code}
But actually send records to topic baz from KafkaSerializationSchema.
{code:java}
public class KafkaSerializationSchemaImpl implements KafkaSerializationSchema<KafkaEvent>
{
@Override
public ProducerRecord<byte[], byte[]> serialize(KafkaEvent event, @Nullable Long timestamp) {
return new ProducerRecord<>("baz", event.toString().getBytes());
}
}
{code}
So I suggest add a new constructor like below.
{code:java}
public FlinkKafkaProducer( KafkaSerializationSchema<IN> serializationSchema,
Properties producerConfig,
FlinkKafkaProducer.Semantic semantic)
{code}
It is my humble opinion, please correct me, thanks in advance.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)