Tzu-Li (Gordon) Tai created FLINK-19748:
------------------------------------------- Summary: StateFun's UnboundedFeedbackLogger should call startNewKeyGroup for all assigned key groups Key: FLINK-19748 URL: https://issues.apache.org/jira/browse/FLINK-19748 Project: Flink Issue Type: Bug Components: Stateful Functions Affects Versions: statefun-2.2.0, statefun-2.1.0, statefun-2.0.0 Reporter: Tzu-Li (Gordon) Tai Currently, on commit the {{UnboundedFeedbackLogger}} only calls {{startNewKeyGroup}} on the raw keyed stream for key groups that actually have logged messages: https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java#L102 This means that it might skip some key groups, if a key group doesn't have any logged messages. This doesn't conform with the expected usage of Flink's {{KeyedStateCheckpointOutputStream}}, where it expects that for ALL key groups within the range, {{startNewKeyGroup}} needs to be invoked. The reason for this is that underneath, calling {{startNewKeyGroup}} would also record the starting stream offset position for the key group. However, when iterating through a raw keyed stream, the key group offsets iterator {{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} doesn't take into account that some key groups weren't written and therefore do not have offsets defined, and the streams will be seeked to incorrect positions. Ultimately, if some key groups were skipped while writing to the raw keyed stream, the following error will be thrown on restore: {code} java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: java.io.IOException: position out of bounds at org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167) ... 9 more Caused by: java.io.IOException: position out of bounds at org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228) ... 10 more {code} **Possible solutions** There are 2 possible solutions, either by fixing in StateFun or in Flink: - This can be fixed in StateFun by ensuring that the feedback logger starts a new key group for all key groups in range, by doing: {code} for (int keyGroupId : rawKeyedStream.getKeyGroupList()) { rawKeyedStream.startNewKeyGroup(keyGroupId); // write to stream if there are logged messages for this key group } {code} - Or, alternatively, we change the {{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} in Flink to skip key groups that don't have a defined offset (i.e. {{startNewKeyGroup}} wasn't called for these key groups). -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |