Key state does not support migration

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Key state does not support migration

Richard Deurwaarder
Hello,

I am running into the problem where (avro) schema evolution works perfectly
for operator/keyed state but does not work when used with keyBy().

For example:

I have a job like so:

    env.addSource(someSource())
          .keyBy(object -> getMyAvroObject())
          .process(doSomething())
          .addSink(someSink());

Where MyAvroObject has the following avdl:

enum SomeEnum{
  A,B
}

record SomeKey {
  SomeEnum someEnum;
}

This works fine but when I change my avro object to:


enum SomeEnum{
  A,B,C
}

record SomeKey {
  SomeEnum someEnum;
}


So with the added "C" in SomeEnum. If I restart my job (from savepoint)
with this new schema I get the following exception:

Caused by: org.apache.flink.util.StateMigrationException: The new key
serializer must be compatible.


Coming from this piece of code
(https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L141):


if (keySerializerSchemaCompat.isCompatibleAfterMigration() ||
keySerializerSchemaCompat.isIncompatible()) {
    throw new StateMigrationException("The new key serializer must be
compatible.");
}



My question is:


What is the reason key serializer / key state explicitly does not
support state migration? And is there any way to work around this?


Regards,


Richard
Reply | Threaded
Open this post in threaded view
|

Re: Key state does not support migration

Till Rohrmann
Hi Richard,

I've pulled in Gordon who worked on this feature. He should be able to tell
you about the current limitations of Flink's schema evolution.

Cheers,
Till

On Wed, May 29, 2019 at 1:44 PM Richard Deurwaarder <[hidden email]> wrote:

> Hello,
>
> I am running into the problem where (avro) schema evolution works perfectly
> for operator/keyed state but does not work when used with keyBy().
>
> For example:
>
> I have a job like so:
>
>     env.addSource(someSource())
>           .keyBy(object -> getMyAvroObject())
>           .process(doSomething())
>           .addSink(someSink());
>
> Where MyAvroObject has the following avdl:
>
> enum SomeEnum{
>   A,B
> }
>
> record SomeKey {
>   SomeEnum someEnum;
> }
>
> This works fine but when I change my avro object to:
>
>
> enum SomeEnum{
>   A,B,C
> }
>
> record SomeKey {
>   SomeEnum someEnum;
> }
>
>
> So with the added "C" in SomeEnum. If I restart my job (from savepoint)
> with this new schema I get the following exception:
>
> Caused by: org.apache.flink.util.StateMigrationException: The new key
> serializer must be compatible.
>
>
> Coming from this piece of code
> (
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L141
> ):
>
>
> if (keySerializerSchemaCompat.isCompatibleAfterMigration() ||
> keySerializerSchemaCompat.isIncompatible()) {
>     throw new StateMigrationException("The new key serializer must be
> compatible.");
> }
>
>
>
> My question is:
>
>
> What is the reason key serializer / key state explicitly does not
> support state migration? And is there any way to work around this?
>
>
> Regards,
>
>
> Richard
>
Reply | Threaded
Open this post in threaded view
|

Re: Key state does not support migration

Tzu-Li (Gordon) Tai
Hi Richard,

Schema evolution for data types that are used as keys is not allowed
because, potentially, if the schema of the key changes, hash codes of keys
may also change and can break partitioning for internal state managed by
Flink.
There are of course some evolution scenarios that would not result in the
hashes changing (e.g. only evolving the serialization format), but at the
moment there is no way for Flink to determine that given the current
serializer compatibility interface.

For the workaround, I would need a bit more information:
Could you let me know if this job is already in production and you are
trying to restore from a savepoint? Would it be possible for you to give me
a snippet of the generated SpecificRecord from your Avro schema?

Cheers,
Gordon

On Mon, Jun 3, 2019 at 6:15 PM Till Rohrmann <[hidden email]> wrote:

> Hi Richard,
>
> I've pulled in Gordon who worked on this feature. He should be able to
> tell you about the current limitations of Flink's schema evolution.
>
> Cheers,
> Till
>
> On Wed, May 29, 2019 at 1:44 PM Richard Deurwaarder <[hidden email]>
> wrote:
>
>> Hello,
>>
>> I am running into the problem where (avro) schema evolution works
>> perfectly
>> for operator/keyed state but does not work when used with keyBy().
>>
>> For example:
>>
>> I have a job like so:
>>
>>     env.addSource(someSource())
>>           .keyBy(object -> getMyAvroObject())
>>           .process(doSomething())
>>           .addSink(someSink());
>>
>> Where MyAvroObject has the following avdl:
>>
>> enum SomeEnum{
>>   A,B
>> }
>>
>> record SomeKey {
>>   SomeEnum someEnum;
>> }
>>
>> This works fine but when I change my avro object to:
>>
>>
>> enum SomeEnum{
>>   A,B,C
>> }
>>
>> record SomeKey {
>>   SomeEnum someEnum;
>> }
>>
>>
>> So with the added "C" in SomeEnum. If I restart my job (from savepoint)
>> with this new schema I get the following exception:
>>
>> Caused by: org.apache.flink.util.StateMigrationException: The new key
>> serializer must be compatible.
>>
>>
>> Coming from this piece of code
>> (
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L141
>> ):
>>
>>
>> if (keySerializerSchemaCompat.isCompatibleAfterMigration() ||
>> keySerializerSchemaCompat.isIncompatible()) {
>>     throw new StateMigrationException("The new key serializer must be
>> compatible.");
>> }
>>
>>
>>
>> My question is:
>>
>>
>> What is the reason key serializer / key state explicitly does not
>> support state migration? And is there any way to work around this?
>>
>>
>> Regards,
>>
>>
>> Richard
>>
>