Hello,
We're currently running into an issue upgrading the state of an application to Flink 1.11 and I think this could be caused by a potential backwards incompatibility that was introduced with Flink 1.11. A colleague of mine recently posted about it on the users list (without a response), but I'd like to bring this up here on the dev list in order to figure out if that incompatibility is intended behavior and/or a known issue. We're seeing that issue when trying to load the RocksDB state from Flink 1.9 into Flink 1.11 with an application that uses the Flink table environment. Immediately after startup RocksDBFullRestoreOperation#restoreKVStateMetaData raises an "The new key serializer must be compatible" exception. The reason for that seems to be that FLINK-16998 changed the row serialization format by adding a new Row#getKind field. There's a legacy mode of the row serializer but that's only used for reading the existing snapshot. As a consequence RowSerializerSnapshot#resolveOuterSchemaCompatibility will always return COMPATIBLE_AFTER_MIGRATION when reading a Flink 1.9 savepoint with Flink 1.11. The problem is that AbstractRocksDBRestoreOperation#readMetaData treats "compatible after migration" the same as "incompatible" and throws a "The new key serializer must be compatible." exception if it encounters that result. Is it expected that the introduction of Row#getKind breaks existing older state or is that a bug? So far I only reproduced this issue in a somewhat more complex codebase, but in case this is an unknown issue or not the intended behavior I can try to provide a small testcase (to rule out that anything in our own code triggers that issue). Example of a query that triggers that issue: https://gist.github.com/gstarnberger/f19a30df179c72a622490cbb041adb21 Full stacktrace: https://gist.github.com/gstarnberger/336d94e723b3ec599d09e17dd7d0ee00 - Guenther |
Hi Guenther,
sorry for overlooking your colleague's email. I think the answer to your problem is twofold. The underlying problem is that your query seems to use `RowData` as a key for some keyed operation. Since changing the key format might entail that keys need to be differently partitioned, Flink does not support changing the key format. That is why Flink fails also if the key format is compatible after migration. There is a small warning about this on the state evolution page [1]. The other part of the answer is that Flink does not support strict backwards compatibility for SQL queries if I am not mistaken (please chime in if this is no longer correct @Timo Walther <[hidden email]> and @[hidden email] <[hidden email]>). The problem is that queries might result in different execution plans after a version upgrade which then can not be mapped to the old state. Admittedly, in this case, it should have been possible but changing the `RowData` type which is used as a key breaks backwards compatibility. A bit confusing is that FLINK-16998 explicitly states that this change is not breaking backwards compatibility. What you could try to use as a workaround is Flink's state processor API [2] which allows you to rewrite savepoints. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html Cheers, Till On Wed, Dec 30, 2020 at 3:30 AM Guenther Starnberger <[hidden email]> wrote: > Hello, > > We're currently running into an issue upgrading the state of an > application to Flink 1.11 and I think this could be caused by a > potential backwards incompatibility that was introduced with Flink > 1.11. A colleague of mine recently posted about it on the users list > (without a response), but I'd like to bring this up here on the dev > list in order to figure out if that incompatibility is intended > behavior and/or a known issue. > > We're seeing that issue when trying to load the RocksDB state from > Flink 1.9 into Flink 1.11 with an application that uses the Flink > table environment. Immediately after startup > RocksDBFullRestoreOperation#restoreKVStateMetaData raises an "The new > key serializer must be compatible" exception. > > The reason for that seems to be that FLINK-16998 changed the row > serialization format by adding a new Row#getKind field. There's a > legacy mode of the row serializer but that's only used for reading the > existing snapshot. As a consequence > RowSerializerSnapshot#resolveOuterSchemaCompatibility will always > return COMPATIBLE_AFTER_MIGRATION when reading a Flink 1.9 savepoint > with Flink 1.11. > > The problem is that AbstractRocksDBRestoreOperation#readMetaData > treats "compatible after migration" the same as "incompatible" and > throws a "The new key serializer must be compatible." exception if it > encounters that result. > > Is it expected that the introduction of Row#getKind breaks existing > older state or is that a bug? So far I only reproduced this issue in a > somewhat more complex codebase, but in case this is an unknown issue > or not the intended behavior I can try to provide a small testcase (to > rule out that anything in our own code triggers that issue). > > Example of a query that triggers that issue: > https://gist.github.com/gstarnberger/f19a30df179c72a622490cbb041adb21 > > Full stacktrace: > https://gist.github.com/gstarnberger/336d94e723b3ec599d09e17dd7d0ee00 > > - Guenther > |
Hi Guenther,
If you are using the old planner in 1.9, and using the old planner in 1.11, then a state migration is needed because of the new added RowKind field. This is documented in the 1.11 release note [1]. If you are using the old planner in 1.9, and using the blink planner in 1.11, the state is not compatible. Because blink planner uses a different serializer for the keys and fields, i.e. RowData vs Row. Actually, Flink Table/SQL API doesn't provide state compatibility across major versions (i.e. 1.9, 1.10, 1.11). This is because it's quite difficult to keep state compatible for SQL queries as the physical plan may change when we introduce even a minor optimization, and we may also change the state structure to have better performance for operators. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/release-notes/flink-1.11.html#added-a-changeflag-to-row-type-flink-16998 On Wed, 30 Dec 2020 at 17:47, Till Rohrmann <[hidden email]> wrote: > Hi Guenther, > > sorry for overlooking your colleague's email. > > I think the answer to your problem is twofold. The underlying problem is > that your query seems to use `RowData` as a key for some keyed operation. > Since changing the key format might entail that keys need to be differently > partitioned, Flink does not support changing the key format. That is why > Flink fails also if the key format is compatible after migration. There is > a small warning about this on the state evolution page [1]. > > The other part of the answer is that Flink does not support strict > backwards compatibility for SQL queries if I am not mistaken (please chime > in if this is no longer correct @Timo Walther <[hidden email]> and > @[hidden email] <[hidden email]>). The problem is that queries might > result in different execution plans after a version upgrade which then can > not be mapped to the old state. Admittedly, in this case, it should have > been possible but changing the `RowData` type which is used as a key breaks > backwards compatibility. A bit confusing is that FLINK-16998 explicitly > states that this change is not breaking backwards compatibility. > > What you could try to use as a workaround is Flink's state processor API > [2] which allows you to rewrite savepoints. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > Cheers, > Till > > On Wed, Dec 30, 2020 at 3:30 AM Guenther Starnberger <[hidden email]> > wrote: > >> Hello, >> >> We're currently running into an issue upgrading the state of an >> application to Flink 1.11 and I think this could be caused by a >> potential backwards incompatibility that was introduced with Flink >> 1.11. A colleague of mine recently posted about it on the users list >> (without a response), but I'd like to bring this up here on the dev >> list in order to figure out if that incompatibility is intended >> behavior and/or a known issue. >> >> We're seeing that issue when trying to load the RocksDB state from >> Flink 1.9 into Flink 1.11 with an application that uses the Flink >> table environment. Immediately after startup >> RocksDBFullRestoreOperation#restoreKVStateMetaData raises an "The new >> key serializer must be compatible" exception. >> >> The reason for that seems to be that FLINK-16998 changed the row >> serialization format by adding a new Row#getKind field. There's a >> legacy mode of the row serializer but that's only used for reading the >> existing snapshot. As a consequence >> RowSerializerSnapshot#resolveOuterSchemaCompatibility will always >> return COMPATIBLE_AFTER_MIGRATION when reading a Flink 1.9 savepoint >> with Flink 1.11. >> >> The problem is that AbstractRocksDBRestoreOperation#readMetaData >> treats "compatible after migration" the same as "incompatible" and >> throws a "The new key serializer must be compatible." exception if it >> encounters that result. >> >> Is it expected that the introduction of Row#getKind breaks existing >> older state or is that a bug? So far I only reproduced this issue in a >> somewhat more complex codebase, but in case this is an unknown issue >> or not the intended behavior I can try to provide a small testcase (to >> rule out that anything in our own code triggers that issue). >> >> Example of a query that triggers that issue: >> https://gist.github.com/gstarnberger/f19a30df179c72a622490cbb041adb21 >> >> Full stacktrace: >> https://gist.github.com/gstarnberger/336d94e723b3ec599d09e17dd7d0ee00 >> >> - Guenther >> > |
Are these limitations documented somewhere @Jark? I couldn't find it on the
quick. If not, then we should update the documentation accordingly. In particular the problem with using the RowData as a key makes FLINK-16998 not easy to work around. Cheers, Till On Wed, Dec 30, 2020 at 11:20 AM Jark Wu <[hidden email]> wrote: > Hi Guenther, > > If you are using the old planner in 1.9, and using the old planner in > 1.11, then a state migration is > needed because of the new added RowKind field. This is documented in the > 1.11 release note [1]. > > If you are using the old planner in 1.9, and using the blink planner in > 1.11, the state is not compatible. > Because blink planner uses a different serializer for the keys and fields, > i.e. RowData vs Row. > > Actually, Flink Table/SQL API doesn't provide state compatibility across > major versions (i.e. 1.9, 1.10, 1.11). > This is because it's quite difficult to keep state compatible for SQL > queries as the physical plan may change > when we introduce even a minor optimization, and we may also change the > state structure to have better performance for operators. > > Best, > Jark > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.11/release-notes/flink-1.11.html#added-a-changeflag-to-row-type-flink-16998 > > On Wed, 30 Dec 2020 at 17:47, Till Rohrmann <[hidden email]> wrote: > >> Hi Guenther, >> >> sorry for overlooking your colleague's email. >> >> I think the answer to your problem is twofold. The underlying problem is >> that your query seems to use `RowData` as a key for some keyed operation. >> Since changing the key format might entail that keys need to be differently >> partitioned, Flink does not support changing the key format. That is why >> Flink fails also if the key format is compatible after migration. There is >> a small warning about this on the state evolution page [1]. >> >> The other part of the answer is that Flink does not support strict >> backwards compatibility for SQL queries if I am not mistaken (please chime >> in if this is no longer correct @Timo Walther <[hidden email]> and >> @[hidden email] <[hidden email]>). The problem is that queries might >> result in different execution plans after a version upgrade which then can >> not be mapped to the old state. Admittedly, in this case, it should have >> been possible but changing the `RowData` type which is used as a key breaks >> backwards compatibility. A bit confusing is that FLINK-16998 explicitly >> states that this change is not breaking backwards compatibility. >> >> What you could try to use as a workaround is Flink's state processor API >> [2] which allows you to rewrite savepoints. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html >> [2] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html >> >> Cheers, >> Till >> >> On Wed, Dec 30, 2020 at 3:30 AM Guenther Starnberger <[hidden email]> >> wrote: >> >>> Hello, >>> >>> We're currently running into an issue upgrading the state of an >>> application to Flink 1.11 and I think this could be caused by a >>> potential backwards incompatibility that was introduced with Flink >>> 1.11. A colleague of mine recently posted about it on the users list >>> (without a response), but I'd like to bring this up here on the dev >>> list in order to figure out if that incompatibility is intended >>> behavior and/or a known issue. >>> >>> We're seeing that issue when trying to load the RocksDB state from >>> Flink 1.9 into Flink 1.11 with an application that uses the Flink >>> table environment. Immediately after startup >>> RocksDBFullRestoreOperation#restoreKVStateMetaData raises an "The new >>> key serializer must be compatible" exception. >>> >>> The reason for that seems to be that FLINK-16998 changed the row >>> serialization format by adding a new Row#getKind field. There's a >>> legacy mode of the row serializer but that's only used for reading the >>> existing snapshot. As a consequence >>> RowSerializerSnapshot#resolveOuterSchemaCompatibility will always >>> return COMPATIBLE_AFTER_MIGRATION when reading a Flink 1.9 savepoint >>> with Flink 1.11. >>> >>> The problem is that AbstractRocksDBRestoreOperation#readMetaData >>> treats "compatible after migration" the same as "incompatible" and >>> throws a "The new key serializer must be compatible." exception if it >>> encounters that result. >>> >>> Is it expected that the introduction of Row#getKind breaks existing >>> older state or is that a bug? So far I only reproduced this issue in a >>> somewhat more complex codebase, but in case this is an unknown issue >>> or not the intended behavior I can try to provide a small testcase (to >>> rule out that anything in our own code triggers that issue). >>> >>> Example of a query that triggers that issue: >>> https://gist.github.com/gstarnberger/f19a30df179c72a622490cbb041adb21 >>> >>> Full stacktrace: >>> https://gist.github.com/gstarnberger/336d94e723b3ec599d09e17dd7d0ee00 >>> >>> - Guenther >>> >> |
In reply to this post by Till Rohrmann
On Wed, Dec 30, 2020 at 10:47 AM Till Rohrmann <[hidden email]> wrote:
Till, > sorry for overlooking your colleague's email. Not a problem at all! Thanks for the response to my email. > A bit confusing is that FLINK-16998 explicitly > states that this change is not breaking backwards compatibility. The comments regarding backwards compatibility are the reason why I assumed that this behavior might be unintended. If it's not a bug we're probably going to try rebuilding the state of the affected jobs. Just wanted to reach out in case this is not the intended behavior. > What you could try to use as a workaround is Flink's state processor API > [2] which allows you to rewrite savepoints. I briefly looked into the state processor API, but I wasn't sure if the version in 1.11 would work for us as it seems that support for reading window state was only added in 1.12 (upgrading to 1.12 would require larger changes as we're still using some deprecated APIs that have been removed in 1.12). - Guenther |
In reply to this post by Jark Wu-2
On Wed, Dec 30, 2020 at 11:21 AM Jark Wu <[hidden email]> wrote:
Jark, > If you are using the old planner in 1.9, and using the old planner in 1.11, > then a state migration is > needed because of the new added RowKind field. This is documented in the > 1.11 release note [1]. Yes - that's exactly the setup that we're using. Both versions currently use the old planner and state migration fails due to the "The new key serializer must be compatible" error. For testing I tried to patch the Flink source and to force usage of (only) the legacy mode in the RowSerializer (so that the state doesn't need to be migrated) and this seems to work. However, I'm not sure if this has any unintended side-effects so it seems that rebuilding the state is a much safer and more maintainable approach. - Guenther |
Hi Guenther,
I think it's safe to use legacy mode in your case, because the RowKind is never used in the old planner. Hi Till, It seems that the cross-major-version state incompatibility is not documented. I created FLINK-20823 to update the documentation. Best, Jark On Thu, 31 Dec 2020 at 08:14, Guenther Starnberger <[hidden email]> wrote: > On Wed, Dec 30, 2020 at 11:21 AM Jark Wu <[hidden email]> wrote: > > Jark, > > > If you are using the old planner in 1.9, and using the old planner in > 1.11, > > then a state migration is > > needed because of the new added RowKind field. This is documented in the > > 1.11 release note [1]. > > Yes - that's exactly the setup that we're using. Both versions > currently use the old planner and state migration fails due to the > "The new key serializer must be compatible" error. > > For testing I tried to patch the Flink source and to force usage of > (only) the legacy mode in the RowSerializer (so that the state doesn't > need to be migrated) and this seems to work. However, I'm not sure if > this has any unintended side-effects so it seems that rebuilding the > state is a much safer and more maintainable approach. > > - Guenther > |
Free forum by Nabble | Edit this page |