Till Rohrmann created FLINK-21691:
-------------------------------------
Summary: KafkaSource fails with NPE when setting it up
Key: FLINK-21691
URL:
https://issues.apache.org/jira/browse/FLINK-21691 Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.12.2, 1.13.0
Reporter: Till Rohrmann
Fix For: 1.13.0, 1.12.3
A user reported that the new {{KafkaSource}} fails with a {{NullPointerException}}:
{code}
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
{code}
when setting it up like this:
{code}
val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka")
private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
val builder = KafkaSource.builder<String>()
.setBootstrapServers(params.get("bootstrapServers"))
.setGroupId(params.get("groupId"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setTopics("topic")
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
if (params.getBoolean("boundedSource", false)) {
builder.setBounded(OffsetsInitializer.latest())
}
return builder.build()
}
{code}
The problem seems to be that the {{ValueDeserializerWrapper}} does not set the deserializer the deserialize method is called, but {{getProducedType}} is actually called first resulting in the {{NullPointerException}}.
https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E--
This message was sent by Atlassian Jira
(v8.3.4#803005)