Allen Wang created FLINK-11303:
----------------------------------
Summary: Utilizing Kafka headers for serialization and deserialization
Key: FLINK-11303
URL:
https://issues.apache.org/jira/browse/FLINK-11303 Project: Flink
Issue Type: Improvement
Components: Kafka Connector
Reporter: Allen Wang
Kafka introduces headers in producer and consumer record since version 0.11. This is the high level description: [
https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers]
However, current Flink Kafka connector simply ignores the headers. This will make it hard to integrate with the Kafka ecosystem where other Kafka clients make use of the headers.
I propose to support headers in Flink by modifying the following API:
* In KeyedSerializationSchema, add
{code:java}
List<Tuple2<String, byte[]>> getHeaders(T element)
{code}
* In KeyedDeserializationSchema, add
{code:java}
T deserailize(byte[] messageKey, byte[] message, List<Tuple2<String, byte[]>> headers, String topic, int partition, long offset) throws IOException{code}
These new methods will be invoked by FlinkKafkaProducer and KafkaFetcher in the serialization and deserialization process. If backward compatibility is a concern, we can add default implementation to these methods where headers are ignored.
If backward compatiblity
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)