李开青 created FLINK-16580:
---------------------------
Summary: flink-connector-kafka desrializer
Key: FLINK-16580
URL:
https://issues.apache.org/jira/browse/FLINK-16580 Project: Flink
Issue Type: Wish
Components: Connectors / Kafka
Affects Versions: 1.10.0
Reporter: 李开青
FlinkKafkaConsumer.setDeserializer(Properties props)
Why is ByteArrayDeserializer.class mandatory?
I found the flink sql conf "connector.properties.key.deserializer" will lose efficacy
private static void setDeserializer(Properties props) {
final String deSerName = ByteArrayDeserializer.class.getName();
Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
}
if (valDeSer != null && !valDeSer.equals(deSerName)) {
LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
}
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)