Did Flink 1.11 break backwards compatibility for the table environment?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Did Flink 1.11 break backwards compatibility for the table environment?

Guenther Starnberger
Hello,

We're currently running into an issue upgrading the state of an
application to Flink 1.11 and I think this could be caused by a
potential backwards incompatibility that was introduced with Flink
1.11. A colleague of mine recently posted about it on the users list
(without a response), but I'd like to bring this up here on the dev
list in order to figure out if that incompatibility is intended
behavior and/or a known issue.

We're seeing that issue when trying to load the RocksDB state from
Flink 1.9 into Flink 1.11 with an application that uses the Flink
table environment. Immediately after startup
RocksDBFullRestoreOperation#restoreKVStateMetaData raises an "The new
key serializer must be compatible" exception.

The reason for that seems to be that FLINK-16998 changed the row
serialization format by adding a new Row#getKind field. There's a
legacy mode of the row serializer but that's only used for reading the
existing snapshot. As a consequence
RowSerializerSnapshot#resolveOuterSchemaCompatibility will always
return COMPATIBLE_AFTER_MIGRATION when reading a Flink 1.9 savepoint
with Flink 1.11.

The problem is that AbstractRocksDBRestoreOperation#readMetaData
treats "compatible after migration" the same as "incompatible" and
throws a "The new key serializer must be compatible." exception if it
encounters that result.

Is it expected that the introduction of Row#getKind breaks existing
older state or is that a bug? So far I only reproduced this issue in a
somewhat more complex codebase, but in case this is an unknown issue
or not the intended behavior I can try to provide a small testcase (to
rule out that anything in our own code triggers that issue).

Example of a query that triggers that issue:
https://gist.github.com/gstarnberger/f19a30df179c72a622490cbb041adb21

Full stacktrace:
https://gist.github.com/gstarnberger/336d94e723b3ec599d09e17dd7d0ee00

- Guenther
Reply | Threaded
Open this post in threaded view
|

Re: Did Flink 1.11 break backwards compatibility for the table environment?

Till Rohrmann
Hi Guenther,

sorry for overlooking your colleague's email.

I think the answer to your problem is twofold. The underlying problem is
that your query seems to use `RowData` as a key for some keyed operation.
Since changing the key format might entail that keys need to be differently
partitioned, Flink does not support changing the key format. That is why
Flink fails also if the key format is compatible after migration. There is
a small warning about this on the state evolution page [1].

The other part of the answer is that Flink does not support strict
backwards compatibility for SQL queries if I am not mistaken (please chime
in if this is no longer correct @Timo Walther <[hidden email]> and
@[hidden email] <[hidden email]>). The problem is that queries might
result in different execution plans after a version upgrade which then can
not be mapped to the old state. Admittedly, in this case, it should have
been possible but changing the `RowData` type which is used as a key breaks
backwards compatibility. A bit confusing is that FLINK-16998 explicitly
states that this change is not breaking backwards compatibility.

What you could try to use as a workaround is Flink's state processor API
[2] which allows you to rewrite savepoints.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

Cheers,
Till

On Wed, Dec 30, 2020 at 3:30 AM Guenther Starnberger <[hidden email]>
wrote:

> Hello,
>
> We're currently running into an issue upgrading the state of an
> application to Flink 1.11 and I think this could be caused by a
> potential backwards incompatibility that was introduced with Flink
> 1.11. A colleague of mine recently posted about it on the users list
> (without a response), but I'd like to bring this up here on the dev
> list in order to figure out if that incompatibility is intended
> behavior and/or a known issue.
>
> We're seeing that issue when trying to load the RocksDB state from
> Flink 1.9 into Flink 1.11 with an application that uses the Flink
> table environment. Immediately after startup
> RocksDBFullRestoreOperation#restoreKVStateMetaData raises an "The new
> key serializer must be compatible" exception.
>
> The reason for that seems to be that FLINK-16998 changed the row
> serialization format by adding a new Row#getKind field. There's a
> legacy mode of the row serializer but that's only used for reading the
> existing snapshot. As a consequence
> RowSerializerSnapshot#resolveOuterSchemaCompatibility will always
> return COMPATIBLE_AFTER_MIGRATION when reading a Flink 1.9 savepoint
> with Flink 1.11.
>
> The problem is that AbstractRocksDBRestoreOperation#readMetaData
> treats "compatible after migration" the same as "incompatible" and
> throws a "The new key serializer must be compatible." exception if it
> encounters that result.
>
> Is it expected that the introduction of Row#getKind breaks existing
> older state or is that a bug? So far I only reproduced this issue in a
> somewhat more complex codebase, but in case this is an unknown issue
> or not the intended behavior I can try to provide a small testcase (to
> rule out that anything in our own code triggers that issue).
>
> Example of a query that triggers that issue:
> https://gist.github.com/gstarnberger/f19a30df179c72a622490cbb041adb21
>
> Full stacktrace:
> https://gist.github.com/gstarnberger/336d94e723b3ec599d09e17dd7d0ee00
>
> - Guenther
>
Reply | Threaded
Open this post in threaded view
|

Re: Did Flink 1.11 break backwards compatibility for the table environment?

Jark Wu-2
Hi Guenther,

If you are using the old planner in 1.9, and using the old planner in 1.11,
then a state migration is
needed because of the new added RowKind field. This is documented in the
1.11 release note [1].

If you are using the old planner in 1.9, and using the blink planner in
1.11, the state is not compatible.
Because blink planner uses a different serializer for the keys and fields,
i.e. RowData vs Row.

Actually, Flink Table/SQL API doesn't provide state compatibility across
major versions (i.e. 1.9, 1.10, 1.11).
This is because it's quite difficult to keep state compatible for SQL
queries as the physical plan may change
when we introduce even a minor optimization, and we may also change the
state structure to have better performance for operators.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/release-notes/flink-1.11.html#added-a-changeflag-to-row-type-flink-16998

On Wed, 30 Dec 2020 at 17:47, Till Rohrmann <[hidden email]> wrote:

> Hi Guenther,
>
> sorry for overlooking your colleague's email.
>
> I think the answer to your problem is twofold. The underlying problem is
> that your query seems to use `RowData` as a key for some keyed operation.
> Since changing the key format might entail that keys need to be differently
> partitioned, Flink does not support changing the key format. That is why
> Flink fails also if the key format is compatible after migration. There is
> a small warning about this on the state evolution page [1].
>
> The other part of the answer is that Flink does not support strict
> backwards compatibility for SQL queries if I am not mistaken (please chime
> in if this is no longer correct @Timo Walther <[hidden email]> and
> @[hidden email] <[hidden email]>). The problem is that queries might
> result in different execution plans after a version upgrade which then can
> not be mapped to the old state. Admittedly, in this case, it should have
> been possible but changing the `RowData` type which is used as a key breaks
> backwards compatibility. A bit confusing is that FLINK-16998 explicitly
> states that this change is not breaking backwards compatibility.
>
> What you could try to use as a workaround is Flink's state processor API
> [2] which allows you to rewrite savepoints.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> Cheers,
> Till
>
> On Wed, Dec 30, 2020 at 3:30 AM Guenther Starnberger <[hidden email]>
> wrote:
>
>> Hello,
>>
>> We're currently running into an issue upgrading the state of an
>> application to Flink 1.11 and I think this could be caused by a
>> potential backwards incompatibility that was introduced with Flink
>> 1.11. A colleague of mine recently posted about it on the users list
>> (without a response), but I'd like to bring this up here on the dev
>> list in order to figure out if that incompatibility is intended
>> behavior and/or a known issue.
>>
>> We're seeing that issue when trying to load the RocksDB state from
>> Flink 1.9 into Flink 1.11 with an application that uses the Flink
>> table environment. Immediately after startup
>> RocksDBFullRestoreOperation#restoreKVStateMetaData raises an "The new
>> key serializer must be compatible" exception.
>>
>> The reason for that seems to be that FLINK-16998 changed the row
>> serialization format by adding a new Row#getKind field. There's a
>> legacy mode of the row serializer but that's only used for reading the
>> existing snapshot. As a consequence
>> RowSerializerSnapshot#resolveOuterSchemaCompatibility will always
>> return COMPATIBLE_AFTER_MIGRATION when reading a Flink 1.9 savepoint
>> with Flink 1.11.
>>
>> The problem is that AbstractRocksDBRestoreOperation#readMetaData
>> treats "compatible after migration" the same as "incompatible" and
>> throws a "The new key serializer must be compatible." exception if it
>> encounters that result.
>>
>> Is it expected that the introduction of Row#getKind breaks existing
>> older state or is that a bug? So far I only reproduced this issue in a
>> somewhat more complex codebase, but in case this is an unknown issue
>> or not the intended behavior I can try to provide a small testcase (to
>> rule out that anything in our own code triggers that issue).
>>
>> Example of a query that triggers that issue:
>> https://gist.github.com/gstarnberger/f19a30df179c72a622490cbb041adb21
>>
>> Full stacktrace:
>> https://gist.github.com/gstarnberger/336d94e723b3ec599d09e17dd7d0ee00
>>
>> - Guenther
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Did Flink 1.11 break backwards compatibility for the table environment?

Till Rohrmann
Are these limitations documented somewhere @Jark? I couldn't find it on the
quick. If not, then we should update the documentation accordingly. In
particular the problem with using the RowData as a key makes FLINK-16998
not easy to work around.

Cheers,
Till

On Wed, Dec 30, 2020 at 11:20 AM Jark Wu <[hidden email]> wrote:

> Hi Guenther,
>
> If you are using the old planner in 1.9, and using the old planner in
> 1.11, then a state migration is
> needed because of the new added RowKind field. This is documented in the
> 1.11 release note [1].
>
> If you are using the old planner in 1.9, and using the blink planner in
> 1.11, the state is not compatible.
> Because blink planner uses a different serializer for the keys and fields,
> i.e. RowData vs Row.
>
> Actually, Flink Table/SQL API doesn't provide state compatibility across
> major versions (i.e. 1.9, 1.10, 1.11).
> This is because it's quite difficult to keep state compatible for SQL
> queries as the physical plan may change
> when we introduce even a minor optimization, and we may also change the
> state structure to have better performance for operators.
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/release-notes/flink-1.11.html#added-a-changeflag-to-row-type-flink-16998
>
> On Wed, 30 Dec 2020 at 17:47, Till Rohrmann <[hidden email]> wrote:
>
>> Hi Guenther,
>>
>> sorry for overlooking your colleague's email.
>>
>> I think the answer to your problem is twofold. The underlying problem is
>> that your query seems to use `RowData` as a key for some keyed operation.
>> Since changing the key format might entail that keys need to be differently
>> partitioned, Flink does not support changing the key format. That is why
>> Flink fails also if the key format is compatible after migration. There is
>> a small warning about this on the state evolution page [1].
>>
>> The other part of the answer is that Flink does not support strict
>> backwards compatibility for SQL queries if I am not mistaken (please chime
>> in if this is no longer correct @Timo Walther <[hidden email]> and
>> @[hidden email] <[hidden email]>). The problem is that queries might
>> result in different execution plans after a version upgrade which then can
>> not be mapped to the old state. Admittedly, in this case, it should have
>> been possible but changing the `RowData` type which is used as a key breaks
>> backwards compatibility. A bit confusing is that FLINK-16998 explicitly
>> states that this change is not breaking backwards compatibility.
>>
>> What you could try to use as a workaround is Flink's state processor API
>> [2] which allows you to rewrite savepoints.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> Cheers,
>> Till
>>
>> On Wed, Dec 30, 2020 at 3:30 AM Guenther Starnberger <[hidden email]>
>> wrote:
>>
>>> Hello,
>>>
>>> We're currently running into an issue upgrading the state of an
>>> application to Flink 1.11 and I think this could be caused by a
>>> potential backwards incompatibility that was introduced with Flink
>>> 1.11. A colleague of mine recently posted about it on the users list
>>> (without a response), but I'd like to bring this up here on the dev
>>> list in order to figure out if that incompatibility is intended
>>> behavior and/or a known issue.
>>>
>>> We're seeing that issue when trying to load the RocksDB state from
>>> Flink 1.9 into Flink 1.11 with an application that uses the Flink
>>> table environment. Immediately after startup
>>> RocksDBFullRestoreOperation#restoreKVStateMetaData raises an "The new
>>> key serializer must be compatible" exception.
>>>
>>> The reason for that seems to be that FLINK-16998 changed the row
>>> serialization format by adding a new Row#getKind field. There's a
>>> legacy mode of the row serializer but that's only used for reading the
>>> existing snapshot. As a consequence
>>> RowSerializerSnapshot#resolveOuterSchemaCompatibility will always
>>> return COMPATIBLE_AFTER_MIGRATION when reading a Flink 1.9 savepoint
>>> with Flink 1.11.
>>>
>>> The problem is that AbstractRocksDBRestoreOperation#readMetaData
>>> treats "compatible after migration" the same as "incompatible" and
>>> throws a "The new key serializer must be compatible." exception if it
>>> encounters that result.
>>>
>>> Is it expected that the introduction of Row#getKind breaks existing
>>> older state or is that a bug? So far I only reproduced this issue in a
>>> somewhat more complex codebase, but in case this is an unknown issue
>>> or not the intended behavior I can try to provide a small testcase (to
>>> rule out that anything in our own code triggers that issue).
>>>
>>> Example of a query that triggers that issue:
>>> https://gist.github.com/gstarnberger/f19a30df179c72a622490cbb041adb21
>>>
>>> Full stacktrace:
>>> https://gist.github.com/gstarnberger/336d94e723b3ec599d09e17dd7d0ee00
>>>
>>> - Guenther
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Did Flink 1.11 break backwards compatibility for the table environment?

Guenther Starnberger
In reply to this post by Till Rohrmann
On Wed, Dec 30, 2020 at 10:47 AM Till Rohrmann <[hidden email]> wrote:

Till,

> sorry for overlooking your colleague's email.

Not a problem at all! Thanks for the response to my email.

> A bit confusing is that FLINK-16998 explicitly
> states that this change is not breaking backwards compatibility.

The comments regarding backwards compatibility are the reason why I
assumed that this behavior might be unintended. If it's not a bug
we're probably going to try rebuilding the state of the affected jobs.
Just wanted to reach out in case this is not the intended behavior.

> What you could try to use as a workaround is Flink's state processor API
> [2] which allows you to rewrite savepoints.

I briefly looked into the state processor API, but I wasn't sure if
the version in 1.11 would work for us as it seems that support for
reading window state was only added in 1.12 (upgrading to 1.12 would
require larger changes as we're still using some deprecated APIs that
have been removed in 1.12).

- Guenther
Reply | Threaded
Open this post in threaded view
|

Re: Did Flink 1.11 break backwards compatibility for the table environment?

Guenther Starnberger
In reply to this post by Jark Wu-2
On Wed, Dec 30, 2020 at 11:21 AM Jark Wu <[hidden email]> wrote:

Jark,

> If you are using the old planner in 1.9, and using the old planner in 1.11,
> then a state migration is
> needed because of the new added RowKind field. This is documented in the
> 1.11 release note [1].

Yes - that's exactly the setup that we're using. Both versions
currently use the old planner and state migration fails due to the
"The new key serializer must be compatible" error.

For testing I tried to patch the Flink source and to force usage of
(only) the legacy mode in the RowSerializer (so that the state doesn't
need to be migrated) and this seems to work. However, I'm not sure if
this has any unintended side-effects so it seems that rebuilding the
state is a much safer and more maintainable approach.

- Guenther
Reply | Threaded
Open this post in threaded view
|

Re: Did Flink 1.11 break backwards compatibility for the table environment?

Jark Wu-2
Hi Guenther,

I think it's safe to use legacy mode in your case,
because the RowKind is never used in the old planner.

Hi Till,

It seems that the cross-major-version state incompatibility is
not documented.
I created FLINK-20823 to update the documentation.

Best,
Jark



On Thu, 31 Dec 2020 at 08:14, Guenther Starnberger <[hidden email]>
wrote:

> On Wed, Dec 30, 2020 at 11:21 AM Jark Wu <[hidden email]> wrote:
>
> Jark,
>
> > If you are using the old planner in 1.9, and using the old planner in
> 1.11,
> > then a state migration is
> > needed because of the new added RowKind field. This is documented in the
> > 1.11 release note [1].
>
> Yes - that's exactly the setup that we're using. Both versions
> currently use the old planner and state migration fails due to the
> "The new key serializer must be compatible" error.
>
> For testing I tried to patch the Flink source and to force usage of
> (only) the legacy mode in the RowSerializer (so that the state doesn't
> need to be migrated) and this seems to work. However, I'm not sure if
> this has any unintended side-effects so it seems that rebuilding the
> state is a much safer and more maintainable approach.
>
> - Guenther
>