|
is it possible to publish message to Kafka serialized with KafkaAvroSerializer by Confluent. I’m using Flink 1.9.1 have saw that some development is going on newer version of flink-avro (1.11.0) but I’m stick to the version.
I would like to use the newly introduced KafkaSerializationSchema for serializing the message to Confluent schema-registry and Kakfa.
Here I have currently a class that is converting a class type T to avro but I want to use the confluent serialization.
```
public class KafkaMessageSerialization<T extends SpecificRecordBase> implements KafkaSerializationSchema<T> {
public static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSerialization.class);
final private String topic;
public KafkaMessageSerialization(String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(T event, Long timestamp) {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final Schema schema = event.getSchema();
final DatumWriter<T> writer = new ReflectDatumWriter<>(schema);
final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(outputStream, null);
try {
writer.write(event, binEncoder);
binEncoder.flush();
} catch (final Exception e) {
LOG.error("serialization error", e);
throw new RuntimeException(e);
}
return new ProducerRecord<>(topic, outputStream.toByteArray());
}
}
```
The usage is quite convenient .addSink(new FlinkKafkaProducer<>(SINK_TOPIC, new KafkaMessageSerialization<>(SINK_TOPIC), producerProps, Semantic.AT_LEAST_ONCE))
|