[jira] [Created] (FLINK-20189) Restored feedback events may be silently dropped if per key-group header bytes were not fully read

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-20189) Restored feedback events may be silently dropped if per key-group header bytes were not fully read

Shang Yuanchun (Jira)
Tzu-Li (Gordon) Tai created FLINK-20189:
-------------------------------------------

             Summary: Restored feedback events may be silently dropped if per key-group header bytes were not fully read
                 Key: FLINK-20189
                 URL: https://issues.apache.org/jira/browse/FLINK-20189
             Project: Flink
          Issue Type: Task
          Components: Stateful Functions
    Affects Versions: statefun-2.2.1
            Reporter: Tzu-Li (Gordon) Tai
            Assignee: Tzu-Li (Gordon) Tai
             Fix For: statefun-2.3.0, statefun-2.2.2


The attempt to read the per key-group header bytes here does not guarantee the header bytes are fully-read:
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java#L163

What could happen is the following:
* Say the input stream actually has the header bytes written in there
* Less then {{HEADER_BYTES.length}} number of bytes was read into the read buffer, in the above reference code line.
* The {{if (bytesRead > 0 && !Arrays.equals(header, HEADER_BYTES))}} check would be true, because the read byte array != the expected header bytes.
* We would mistakenly think that the header bytes are not in the input stream, and pushback. i.e. the header bytes were not being skipped, and the following reads would see the header bytes first.
* Most importantly, since the header bytes are not being skipped in this case, the {{STATEFUN_VERSION}} (which is {{0}}) is being incorrectly read by {{KeyGroupStream.readFrom(...)}} as the number of feedback elements to read: https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/KeyGroupStream.java#L57
* The end result of all of this is in this scenario: some checkpointed feedback events would be silently dropped.

Although it is hard to say how possible this would happen in reality, and would also depend on the actual implementation of the {{InputStream}}, from the general contracts of {{InputStream#read(byte[])}} this is definitely something that should be addressed.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)