binguo created FLINK-21121:
------------------------------ Summary: TaggedOperatorSubtaskState is missing when creating a new savepoint using state processor api Key: FLINK-21121 URL: https://issues.apache.org/jira/browse/FLINK-21121 Project: Flink Issue Type: Bug Components: API / State Processor Affects Versions: 1.11.0 Reporter: binguo I am getting an exception when using the Flink State Processor API to write a new SavePoint, which is: ``` java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_e8ea6e352a1a627513ffbd4573fa1628_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState' at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545) at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225) at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204) at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189) at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164) at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:94) at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ... 15 more ``` My java code: ``` @Override public void createNewSavepoint(ExecutionEnvironment env, String savepointPath, StateBackend stateBackend, ParameterTool config) { String savepointOutputPath = config.get(EapSavepointConstants.EAP_SAVEPOINT_OUTPUT_PATH); int maxParallelism = config.getInt(EapSavepointConstants.EAP_SAVEPOINT_MAX_PARALLELISM); Long windowTimeSize = config.getLong(EapSavepointConstants.WINDOW_TIME_SIZE); TumblingProcessingTimeWindows processTimeWindows = TumblingProcessingTimeWindows.of(Time.seconds(windowTimeSize)); try { ExistingSavepoint existingSavepoint = Savepoint.load(env, savepointPath, stateBackend); DataSet<Tuple2<KafkaTopicPartition, Long>> kafkaListState = existingSavepoint.readUnionState( OperatorUidAndNameConstants.KAFKA_SOURCE_UID, StateNameConstants.KAFKA_OFFSET_STATE_NAME, KafkaStateUtils.createTypeInformation(), KafkaStateUtils.createStateDescriptorSerializer(env.getConfig())); logger.info("Print kafka offset"); kafkaListState.print(); Savepoint.create(stateBackend, maxParallelism) .withOperator(OperatorUidAndNameConstants.KAFKA_SOURCE_UID, kafkaTransformation) .write(savepointOutputPath); } catch (IOException e) { logger.error("Savepoint load: " + e.getMessage()); e.printStackTrace(); } catch (Exception e) { logger.error("print state: " + e.getMessage()); e.printStackTrace(); } } // KafkaStateUtils.java public class KafkaStateUtils { /** * Creates state serializer for kafka topic partition to offset tuple. * Using of the explicit state serializer with KryoSerializer is needed because otherwise * users cannot use 'disableGenericTypes' properties with KafkaConsumer. * @param executionConfig * @return */ public static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> createStateDescriptorSerializer( ExecutionConfig executionConfig) { // explicit serializer will keep the compatibility with GenericTypeInformation // and allow to disableGenericTypes for users TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[]{ new KryoSerializer<>(KafkaTopicPartition.class, executionConfig), LongSerializer.INSTANCE }; @SuppressWarnings("unchecked") Class<Tuple2<KafkaTopicPartition, Long>> tupleClass = (Class<Tuple2<KafkaTopicPartition, Long>>) (Class<?>) Tuple2.class; return new TupleSerializer<>(tupleClass, fieldSerializers); } public static TypeInformation<Tuple2<KafkaTopicPartition, Long>> createTypeInformation() { return TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {}); } } ``` After remote debugging, it was found that the value of `org.apache.flink.state.api.output.TaggedOperatorSubtaskStated` could not be parsed in `org.apache.flink.util.LinkedOptionalMapSerializer#readOptionalMap` Personally think that `TaggedOperatorSubtaskState` should implement CompositeStateHandle, please give some suggestions, thank you. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |