Tzu-Li (Gordon) Tai created FLINK-20189:
-------------------------------------------
Summary: Restored feedback events may be silently dropped if per key-group header bytes were not fully read
Key: FLINK-20189
URL:
https://issues.apache.org/jira/browse/FLINK-20189 Project: Flink
Issue Type: Task
Components: Stateful Functions
Affects Versions: statefun-2.2.1
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
Fix For: statefun-2.3.0, statefun-2.2.2
The attempt to read the per key-group header bytes here does not guarantee the header bytes are fully-read:
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java#L163What could happen is the following:
* Say the input stream actually has the header bytes written in there
* Less then {{HEADER_BYTES.length}} number of bytes was read into the read buffer, in the above reference code line.
* The {{if (bytesRead > 0 && !Arrays.equals(header, HEADER_BYTES))}} check would be true, because the read byte array != the expected header bytes.
* We would mistakenly think that the header bytes are not in the input stream, and pushback. i.e. the header bytes were not being skipped, and the following reads would see the header bytes first.
* Most importantly, since the header bytes are not being skipped in this case, the {{STATEFUN_VERSION}} (which is {{0}}) is being incorrectly read by {{KeyGroupStream.readFrom(...)}} as the number of feedback elements to read:
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/KeyGroupStream.java#L57* The end result of all of this is in this scenario: some checkpointed feedback events would be silently dropped.
Although it is hard to say how possible this would happen in reality, and would also depend on the actual implementation of the {{InputStream}}, from the general contracts of {{InputStream#read(byte[])}} this is definitely something that should be addressed.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)