Yu Yang created FLINK-18017:
-------------------------------
Summary: improve Kafka connector to handle record deserialization exception and report related metrics
Key: FLINK-18017
URL:
https://issues.apache.org/jira/browse/FLINK-18017 Project: Flink
Issue Type: Improvement
Components: Connectors / Kafka
Affects Versions: 1.9.1
Reporter: Yu Yang
Corrupted messages can get into the message pipeline for various reasons. When a Flink deserializer fails to deserialize the message, and throw an exception due to corrupted message, the flink application will be blocked until we update the deserializer to handle the exception.
Currently messages are deserialized as below in
flink_pinterest/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
{code:java}
for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
final T value = deserializer.deserialize(record);
if (deserializer.isEndOfStream(value)) {
// end of stream signaled
running = false;
break;
}
// emit the actual record. this also updates offset state atomically
// and deals with timestamps and watermark generation
emitRecord(value, partition, record.offset(), record);
}
{code}
Flink Kafka connector needs to catch exception from deserialization, and expose related metrics.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)