Hello,
I am running into the problem where (avro) schema evolution works perfectly for operator/keyed state but does not work when used with keyBy(). For example: I have a job like so: env.addSource(someSource()) .keyBy(object -> getMyAvroObject()) .process(doSomething()) .addSink(someSink()); Where MyAvroObject has the following avdl: enum SomeEnum{ A,B } record SomeKey { SomeEnum someEnum; } This works fine but when I change my avro object to: enum SomeEnum{ A,B,C } record SomeKey { SomeEnum someEnum; } So with the added "C" in SomeEnum. If I restart my job (from savepoint) with this new schema I get the following exception: Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible. Coming from this piece of code (https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L141): if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) { throw new StateMigrationException("The new key serializer must be compatible."); } My question is: What is the reason key serializer / key state explicitly does not support state migration? And is there any way to work around this? Regards, Richard |
Hi Richard,
I've pulled in Gordon who worked on this feature. He should be able to tell you about the current limitations of Flink's schema evolution. Cheers, Till On Wed, May 29, 2019 at 1:44 PM Richard Deurwaarder <[hidden email]> wrote: > Hello, > > I am running into the problem where (avro) schema evolution works perfectly > for operator/keyed state but does not work when used with keyBy(). > > For example: > > I have a job like so: > > env.addSource(someSource()) > .keyBy(object -> getMyAvroObject()) > .process(doSomething()) > .addSink(someSink()); > > Where MyAvroObject has the following avdl: > > enum SomeEnum{ > A,B > } > > record SomeKey { > SomeEnum someEnum; > } > > This works fine but when I change my avro object to: > > > enum SomeEnum{ > A,B,C > } > > record SomeKey { > SomeEnum someEnum; > } > > > So with the added "C" in SomeEnum. If I restart my job (from savepoint) > with this new schema I get the following exception: > > Caused by: org.apache.flink.util.StateMigrationException: The new key > serializer must be compatible. > > > Coming from this piece of code > ( > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L141 > ): > > > if (keySerializerSchemaCompat.isCompatibleAfterMigration() || > keySerializerSchemaCompat.isIncompatible()) { > throw new StateMigrationException("The new key serializer must be > compatible."); > } > > > > My question is: > > > What is the reason key serializer / key state explicitly does not > support state migration? And is there any way to work around this? > > > Regards, > > > Richard > |
Hi Richard,
Schema evolution for data types that are used as keys is not allowed because, potentially, if the schema of the key changes, hash codes of keys may also change and can break partitioning for internal state managed by Flink. There are of course some evolution scenarios that would not result in the hashes changing (e.g. only evolving the serialization format), but at the moment there is no way for Flink to determine that given the current serializer compatibility interface. For the workaround, I would need a bit more information: Could you let me know if this job is already in production and you are trying to restore from a savepoint? Would it be possible for you to give me a snippet of the generated SpecificRecord from your Avro schema? Cheers, Gordon On Mon, Jun 3, 2019 at 6:15 PM Till Rohrmann <[hidden email]> wrote: > Hi Richard, > > I've pulled in Gordon who worked on this feature. He should be able to > tell you about the current limitations of Flink's schema evolution. > > Cheers, > Till > > On Wed, May 29, 2019 at 1:44 PM Richard Deurwaarder <[hidden email]> > wrote: > >> Hello, >> >> I am running into the problem where (avro) schema evolution works >> perfectly >> for operator/keyed state but does not work when used with keyBy(). >> >> For example: >> >> I have a job like so: >> >> env.addSource(someSource()) >> .keyBy(object -> getMyAvroObject()) >> .process(doSomething()) >> .addSink(someSink()); >> >> Where MyAvroObject has the following avdl: >> >> enum SomeEnum{ >> A,B >> } >> >> record SomeKey { >> SomeEnum someEnum; >> } >> >> This works fine but when I change my avro object to: >> >> >> enum SomeEnum{ >> A,B,C >> } >> >> record SomeKey { >> SomeEnum someEnum; >> } >> >> >> So with the added "C" in SomeEnum. If I restart my job (from savepoint) >> with this new schema I get the following exception: >> >> Caused by: org.apache.flink.util.StateMigrationException: The new key >> serializer must be compatible. >> >> >> Coming from this piece of code >> ( >> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L141 >> ): >> >> >> if (keySerializerSchemaCompat.isCompatibleAfterMigration() || >> keySerializerSchemaCompat.isIncompatible()) { >> throw new StateMigrationException("The new key serializer must be >> compatible."); >> } >> >> >> >> My question is: >> >> >> What is the reason key serializer / key state explicitly does not >> support state migration? And is there any way to work around this? >> >> >> Regards, >> >> >> Richard >> > |
Free forum by Nabble | Edit this page |