[jira] [Created] (FLINK-16580) flink-connector-kafka desrializer

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-16580) flink-connector-kafka desrializer

Shang Yuanchun (Jira)
李开青 created FLINK-16580:
---------------------------

             Summary: flink-connector-kafka desrializer
                 Key: FLINK-16580
                 URL: https://issues.apache.org/jira/browse/FLINK-16580
             Project: Flink
          Issue Type: Wish
          Components: Connectors / Kafka
    Affects Versions: 1.10.0
            Reporter: 李开青


FlinkKafkaConsumer.setDeserializer(Properties props)

Why is ByteArrayDeserializer.class mandatory?

I found the flink sql conf "connector.properties.key.deserializer" will lose efficacy

 

private static void setDeserializer(Properties props) {
 final String deSerName = ByteArrayDeserializer.class.getName();

 Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
 Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);

 if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
 LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
 }
 if (valDeSer != null && !valDeSer.equals(deSerName)) {
 LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
 }

 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)