Leonid Ilyevsky created FLINK-21385:
---------------------------------------
Summary: AvroDeserializationSchema corrupted after getting invalid data
Key: FLINK-21385
URL:
https://issues.apache.org/jira/browse/FLINK-21385 Project: Flink
Issue Type: Bug
Components: API / Type Serialization System
Affects Versions: 1.12.1
Reporter: Leonid Ilyevsky
Attachments: Test.out, Test.scala, TestRec.avsc
After getting the data which cannot be deserialized, AvroDeserializationSchema goes into some corrupted state, which prevents it from properly deserializing good data that comes later.
Looks like some buffer inside it gets messed up; the "bad" data is not properly skipped.
Please see the attached files that help to reproduce the issue:
TestRec.avsc with Avro schema, Test.scala with test code, and Test.out with the output of my test run.
In my test, I generate 10 testing records, serialize them, and then deserialize using two methods: "decode2" using AvroDeserializationSchema and, for comparison, "decode" using SpecificDatumReader.
Every other record is intentionally broken by adding extra 5 bytes in the front (I simulated the Confluent schema ID, because this is how I discovered this problem).
Ideally, the "bad" records should just result in the exception, and the "good" records should be properly decoded. This is true for the simple "decode" method.
However, "decode2" using AvroDeserializationSchema, after the first "bad" record cannot decode the good one. Further down, it does decode something, but not the data just passed in; it returns some previous record instead.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)