Custom type serializer inside Flink state

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom type serializer inside Flink state

Ying Xu
Hi Flink community:

Our Flink application sends different types of protobuf messages
on-the-wire.  Since protobuf cannot be handled by Flink type serializer, we
had to register custom Kyro serializer:

env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass1.class,
ProtobufSerializer.class);

We found registering each protobuf class is not a viable solution for
schema evolution.  Particularly, when adding/removing new messages we would
encounter errors when restoring state backend:


Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_d63019dde9166d2672f543f01f344085_(13/32)
from any of the 1 provided restore options.
        at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        ... 5 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
        at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
        at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createOperatorStateBackend(FsStateBackend.java:504)
        at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246)
        ... 7 common frames omitted
Caused by: java.lang.IllegalStateException: Missing value for the key
'proto$*MyProtoClass2*'
        at
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)

We then switch to registering the default protobuf class for the super
class of all proto -- Message.class , and this issue appears to go away.

see.getConfig().*addDefaultKryoSerializer*(Message.class, ProtobufSerializer
.class);


Questions:

1)  It seems custom Kyro serializers are registered with the Flink state
backend. Can we confirm when using the default Kyro serializer, only the
super class (e.g Message.class) is registered and no specific protobuf
message is associated with state ?

2)  Will proto ser/de supported by Flink type serializer in the future and is
there any longer term roadmap for supporting state evolution for
protobuf-type messages?

Thanks a lot.
Reply | Threaded
Open this post in threaded view
|

Re: Custom type serializer inside Flink state

Yun Tang
Hi Ying

What version of Flink are you using and please more exception stack. Moreover, what is the relationship between `MyProtoClass2` and `MyProtoClass1`? As far as I know, registering the Message class should not be the proper solution.

For the 2nd question, you could refer to FLINK-11333 [1] for more information.

CC @Tzu-Li (Gordon) Tai<mailto:[hidden email]> as he might provide more information about this.

[1] https://issues.apache.org/jira/browse/FLINK-11333

Best
Yun Tang

________________________________
From: Ying Xu <[hidden email]>
Sent: Thursday, August 8, 2019 1:51
To: [hidden email] <[hidden email]>
Subject: Custom type serializer inside Flink state

Hi Flink community:

Our Flink application sends different types of protobuf messages
on-the-wire.  Since protobuf cannot be handled by Flink type serializer, we
had to register custom Kyro serializer:

env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass1.class,
ProtobufSerializer.class);

We found registering each protobuf class is not a viable solution for
schema evolution.  Particularly, when adding/removing new messages we would
encounter errors when restoring state backend:


Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_d63019dde9166d2672f543f01f344085_(13/32)
from any of the 1 provided restore options.
        at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        ... 5 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
        at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
        at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createOperatorStateBackend(FsStateBackend.java:504)
        at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246)
        ... 7 common frames omitted
Caused by: java.lang.IllegalStateException: Missing value for the key
'proto$*MyProtoClass2*'
        at
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)

We then switch to registering the default protobuf class for the super
class of all proto -- Message.class , and this issue appears to go away.

see.getConfig().*addDefaultKryoSerializer*(Message.class, ProtobufSerializer
.class);


Questions:

1)  It seems custom Kyro serializers are registered with the Flink state
backend. Can we confirm when using the default Kyro serializer, only the
super class (e.g Message.class) is registered and no specific protobuf
message is associated with state ?

2)  Will proto ser/de supported by Flink type serializer in the future and is
there any longer term roadmap for supporting state evolution for
protobuf-type messages?

Thanks a lot.
Reply | Threaded
Open this post in threaded view
|

Re: Custom type serializer inside Flink state

Ying Xu
Hi Yun:

Thanks for the quick reply.   Thanks for pointing to FLINK-11333
<https://issues.apache.org/jira/browse/FLINK-11333>, will take a look.

We are currently using Flink 1.8.0.  To summarize the behavior:

1) In the first version of the Flink app, there is a protobuf class getting
registered for Kyro:
    env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass1.class,
ProtobufSerializer.class);

2) Then the flink app gets updated with a second protobuf class, say,
MyProtoClass2.class.
    env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass1.class,
ProtobufSerializer.class);
    env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass2.class,
ProtobufSerializer.class);

3) Cancel the first version of Flink app; take a savepoint;  and deploy the
second version Flink app with the savepoint. Occasionally one would
encounter the exception described above.

So far we are able to work around this issue, by simply registering the
default Kryo deserializer for Message.class
<https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/Message>
-- the super class for all the generated protobuf messages.

*NOTE*: Above is a high-level example. In practice, our app is more
complex, with a much larger number of protobuf classes. Messages may be
added/deleted across different versions of the Flink app.


On Wed, Aug 7, 2019 at 11:23 AM Yun Tang <[hidden email]> wrote:

> Hi Ying
>
> What version of Flink are you using and please more exception stack.
> Moreover, what is the relationship between `MyProtoClass2` and
> `MyProtoClass1`? As far as I know, registering the Message class should not
> be the proper solution.
>
> For the 2nd question, you could refer to FLINK-11333 [1] for more
> information.
>
> CC @Tzu-Li (Gordon) Tai<mailto:[hidden email]> as he might provide
> more information about this.
>
> [1] https://issues.apache.org/jira/browse/FLINK-11333
>
> Best
> Yun Tang
>
> ________________________________
> From: Ying Xu <[hidden email]>
> Sent: Thursday, August 8, 2019 1:51
> To: [hidden email] <[hidden email]>
> Subject: Custom type serializer inside Flink state
>
> Hi Flink community:
>
> Our Flink application sends different types of protobuf messages
> on-the-wire.  Since protobuf cannot be handled by Flink type serializer, we
> had to register custom Kyro serializer:
>
> env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass1.class,
> ProtobufSerializer.class);
>
> We found registering each protobuf class is not a viable solution for
> schema evolution.  Particularly, when adding/removing new messages we would
> encounter errors when restoring state backend:
>
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator
> state backend for StreamSource_d63019dde9166d2672f543f01f344085_(13/32)
> from any of the 1 provided restore options.
>         at
>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>         ... 5 common frames omitted
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore operator state backend
>         at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
>         at
>
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createOperatorStateBackend(FsStateBackend.java:504)
>         at
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246)
>         ... 7 common frames omitted
> Caused by: java.lang.IllegalStateException: Missing value for the key
> 'proto$*MyProtoClass2*'
>         at
>
> org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
>
> We then switch to registering the default protobuf class for the super
> class of all proto -- Message.class , and this issue appears to go away.
>
> see.getConfig().*addDefaultKryoSerializer*(Message.class,
> ProtobufSerializer
> .class);
>
>
> Questions:
>
> 1)  It seems custom Kyro serializers are registered with the Flink state
> backend. Can we confirm when using the default Kyro serializer, only the
> super class (e.g Message.class) is registered and no specific protobuf
> message is associated with state ?
>
> 2)  Will proto ser/de supported by Flink type serializer in the future and
> is
> there any longer term roadmap for supporting state evolution for
> protobuf-type messages?
>
> Thanks a lot.
>