JSONKeyValueDeserializationSchema does not handle null Kafka keys

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

JSONKeyValueDeserializationSchema does not handle null Kafka keys

Jia Teoh
Hi devs,

I was testing writing data to a Kafka topic and reading from it using the JSONKeyValueDeserializationSchema and encountered NPEs. After tracing them, it seems that null messageKeys are not handled: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java#L52

I've attached what should be a minimum working Scala example with this email - you will need to provide "--topic" and the normal kafka parameters (eg --bootstrap.servers). I tried replacing line 52 with:
JsonNode key = null;
if(messageKey != null) {
key = mapper.readValue(messageKey, JsonNode.class);
}
node.set("key", key);
After which my test application was able to operate as expected. Could somebody confirm this before I file a ticket or issue a bug request? 


Apologies if this is the wrong place to be emailing. Please don't hesitate to redirect me if that's the case. If there's any additional details I need to provide, just let me know.

Thanks,
Jia Teoh