Hi Flink community:
Our Flink application sends different types of protobuf messages on-the-wire. Since protobuf cannot be handled by Flink type serializer, we had to register custom Kyro serializer: env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass1.class, ProtobufSerializer.class); We found registering each protobuf class is not a viable solution for schema evolution. Particularly, when adding/removing new messages we would encounter errors when restoring state backend: Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_d63019dde9166d2672f543f01f344085_(13/32) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 5 common frames omitted Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createOperatorStateBackend(FsStateBackend.java:504) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246) ... 7 common frames omitted Caused by: java.lang.IllegalStateException: Missing value for the key 'proto$*MyProtoClass2*' at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190) We then switch to registering the default protobuf class for the super class of all proto -- Message.class , and this issue appears to go away. see.getConfig().*addDefaultKryoSerializer*(Message.class, ProtobufSerializer .class); Questions: 1) It seems custom Kyro serializers are registered with the Flink state backend. Can we confirm when using the default Kyro serializer, only the super class (e.g Message.class) is registered and no specific protobuf message is associated with state ? 2) Will proto ser/de supported by Flink type serializer in the future and is there any longer term roadmap for supporting state evolution for protobuf-type messages? Thanks a lot. |
Hi Ying
What version of Flink are you using and please more exception stack. Moreover, what is the relationship between `MyProtoClass2` and `MyProtoClass1`? As far as I know, registering the Message class should not be the proper solution. For the 2nd question, you could refer to FLINK-11333 [1] for more information. CC @Tzu-Li (Gordon) Tai<mailto:[hidden email]> as he might provide more information about this. [1] https://issues.apache.org/jira/browse/FLINK-11333 Best Yun Tang ________________________________ From: Ying Xu <[hidden email]> Sent: Thursday, August 8, 2019 1:51 To: [hidden email] <[hidden email]> Subject: Custom type serializer inside Flink state Hi Flink community: Our Flink application sends different types of protobuf messages on-the-wire. Since protobuf cannot be handled by Flink type serializer, we had to register custom Kyro serializer: env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass1.class, ProtobufSerializer.class); We found registering each protobuf class is not a viable solution for schema evolution. Particularly, when adding/removing new messages we would encounter errors when restoring state backend: Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_d63019dde9166d2672f543f01f344085_(13/32) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 5 common frames omitted Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createOperatorStateBackend(FsStateBackend.java:504) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246) ... 7 common frames omitted Caused by: java.lang.IllegalStateException: Missing value for the key 'proto$*MyProtoClass2*' at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190) We then switch to registering the default protobuf class for the super class of all proto -- Message.class , and this issue appears to go away. see.getConfig().*addDefaultKryoSerializer*(Message.class, ProtobufSerializer .class); Questions: 1) It seems custom Kyro serializers are registered with the Flink state backend. Can we confirm when using the default Kyro serializer, only the super class (e.g Message.class) is registered and no specific protobuf message is associated with state ? 2) Will proto ser/de supported by Flink type serializer in the future and is there any longer term roadmap for supporting state evolution for protobuf-type messages? Thanks a lot. |
Hi Yun:
Thanks for the quick reply. Thanks for pointing to FLINK-11333 <https://issues.apache.org/jira/browse/FLINK-11333>, will take a look. We are currently using Flink 1.8.0. To summarize the behavior: 1) In the first version of the Flink app, there is a protobuf class getting registered for Kyro: env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass1.class, ProtobufSerializer.class); 2) Then the flink app gets updated with a second protobuf class, say, MyProtoClass2.class. env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass1.class, ProtobufSerializer.class); env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass2.class, ProtobufSerializer.class); 3) Cancel the first version of Flink app; take a savepoint; and deploy the second version Flink app with the savepoint. Occasionally one would encounter the exception described above. So far we are able to work around this issue, by simply registering the default Kryo deserializer for Message.class <https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/Message> -- the super class for all the generated protobuf messages. *NOTE*: Above is a high-level example. In practice, our app is more complex, with a much larger number of protobuf classes. Messages may be added/deleted across different versions of the Flink app. On Wed, Aug 7, 2019 at 11:23 AM Yun Tang <[hidden email]> wrote: > Hi Ying > > What version of Flink are you using and please more exception stack. > Moreover, what is the relationship between `MyProtoClass2` and > `MyProtoClass1`? As far as I know, registering the Message class should not > be the proper solution. > > For the 2nd question, you could refer to FLINK-11333 [1] for more > information. > > CC @Tzu-Li (Gordon) Tai<mailto:[hidden email]> as he might provide > more information about this. > > [1] https://issues.apache.org/jira/browse/FLINK-11333 > > Best > Yun Tang > > ________________________________ > From: Ying Xu <[hidden email]> > Sent: Thursday, August 8, 2019 1:51 > To: [hidden email] <[hidden email]> > Subject: Custom type serializer inside Flink state > > Hi Flink community: > > Our Flink application sends different types of protobuf messages > on-the-wire. Since protobuf cannot be handled by Flink type serializer, we > had to register custom Kyro serializer: > > env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass1.class, > ProtobufSerializer.class); > > We found registering each protobuf class is not a viable solution for > schema evolution. Particularly, when adding/removing new messages we would > encounter errors when restoring state backend: > > > Caused by: org.apache.flink.util.FlinkException: Could not restore operator > state backend for StreamSource_d63019dde9166d2672f543f01f344085_(13/32) > from any of the 1 provided restore options. > at > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ... 5 common frames omitted > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore operator state backend > at > > org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) > at > > org.apache.flink.runtime.state.filesystem.FsStateBackend.createOperatorStateBackend(FsStateBackend.java:504) > at > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246) > ... 7 common frames omitted > Caused by: java.lang.IllegalStateException: Missing value for the key > 'proto$*MyProtoClass2*' > at > > org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190) > > We then switch to registering the default protobuf class for the super > class of all proto -- Message.class , and this issue appears to go away. > > see.getConfig().*addDefaultKryoSerializer*(Message.class, > ProtobufSerializer > .class); > > > Questions: > > 1) It seems custom Kyro serializers are registered with the Flink state > backend. Can we confirm when using the default Kyro serializer, only the > super class (e.g Message.class) is registered and no specific protobuf > message is associated with state ? > > 2) Will proto ser/de supported by Flink type serializer in the future and > is > there any longer term roadmap for supporting state evolution for > protobuf-type messages? > > Thanks a lot. > |
Free forum by Nabble | Edit this page |