[jira] [Created] (FLINK-11303) Utilizing Kafka headers for serialization and deserialization

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-11303) Utilizing Kafka headers for serialization and deserialization

Shang Yuanchun (Jira)
Allen Wang created FLINK-11303:
----------------------------------

             Summary: Utilizing Kafka headers for serialization and deserialization
                 Key: FLINK-11303
                 URL: https://issues.apache.org/jira/browse/FLINK-11303
             Project: Flink
          Issue Type: Improvement
          Components: Kafka Connector
            Reporter: Allen Wang


Kafka introduces headers in producer and consumer record since version 0.11. This is the high level description: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers]

However, current Flink Kafka connector simply ignores the headers. This will make it hard to integrate with the Kafka ecosystem where other Kafka clients make use of the headers.

 

I propose to support headers in Flink by modifying the following API:
 * In KeyedSerializationSchema, add
{code:java}
List<Tuple2<String, byte[]>> getHeaders(T element)
{code}

 * In KeyedDeserializationSchema, add
{code:java}
T deserailize(byte[] messageKey, byte[] message, List<Tuple2<String, byte[]>> headers, String topic, int partition, long offset) throws IOException{code}

 

These new methods will be invoked by FlinkKafkaProducer and KafkaFetcher in the serialization and deserialization process. If backward compatibility is a concern, we can add default implementation to these methods where headers are ignored.

 

If backward compatiblity 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)