serialize Kafka messages with confluent registry under Flink 1.9.1

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

serialize Kafka messages with confluent registry under Flink 1.9.1

Akhlaq Malik
is it possible to publish message to Kafka serialized with KafkaAvroSerializer by Confluent. I’m using Flink 1.9.1 have saw that some development is going on newer version of flink-avro (1.11.0) but I’m stick to the version.

I would like to use the newly introduced KafkaSerializationSchema for serializing the message to Confluent schema-registry and Kakfa.

Here I have currently a class that is converting a class type T to avro but I want to use the confluent serialization.

```

public class KafkaMessageSerialization<T extends SpecificRecordBase> implements KafkaSerializationSchema<T> {
public static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSerialization.class);

final private String topic;

public KafkaMessageSerialization(String topic) {
    this.topic = topic;
}

@Override
public ProducerRecord<byte[], byte[]> serialize(T event, Long timestamp) {
    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    final Schema schema = event.getSchema();
    final DatumWriter<T> writer = new ReflectDatumWriter<>(schema);
    final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(outputStream, null);

    try {
        writer.write(event, binEncoder);
        binEncoder.flush();
    } catch (final Exception e) {
        LOG.error("serialization error", e);
        throw new RuntimeException(e);
    }

    return new ProducerRecord<>(topic, outputStream.toByteArray());
}
}
```

The usage is quite convenient .addSink(new FlinkKafkaProducer<>(SINK_TOPIC, new KafkaMessageSerialization<>(SINK_TOPIC), producerProps, Semantic.AT_LEAST_ONCE))