Antti Kaikkonen created FLINK-19692:
--------------------------------------- Summary: Can't restore feedback channel from savepoint Key: FLINK-19692 URL: https://issues.apache.org/jira/browse/FLINK-19692 Project: Flink Issue Type: Bug Components: API / DataStream, API / State Processor, Stateful Functions Affects Versions: 1.11.2 Reporter: Antti Kaikkonen When using the new statefun-flink-datastream integration the following error is thrown by the *feedback -> union* task when trying to restore from a savepoint: {code:java} java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: java.io.IOException: position out of bounds at org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167) ... 9 more Caused by: java.io.IOException: position out of bounds at org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228) ... 10 more {code} The error is only thrown when the feedback channel has been used. I have tested with the [example application|https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java] and the error is thrown only if it is modified to actually use the feedback channel. I simply modified the invoke method to sometimes forward the greeting to a random name: {code:java} @Override public void invoke(Context context, Object input) { int seen = seenCount.updateAndGet(MyFunction::increment); context.send(GREETINGS, String.format("Hello %s at the %d-th time", input, seen)); String[] names = {"Stephan", "Igal", "Gordon", "Seth", "Marta"}; ThreadLocalRandom random = ThreadLocalRandom.current(); int index = random.nextInt(names.length); final String name2 = names[index]; if (random.nextDouble() < 0.5) context.send(new Address(GREET, name2), input); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |