IllegalStateException from incompatible state type - better exception type?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

IllegalStateException from incompatible state type - better exception type?

Kim, Hwanju
Hi!

In case where new state type is incompatible with old one from savepoint, we get IllegalStateException via org.apache.flink.util.Preconditions.checkState from checkStateMetaInfo:
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220

For example, if an updated app uses reducing state but restoring from aggregating state, we can get the following exception:
java.lang.IllegalStateException: Incompatible key/value state types. Was [AGGREGATING], registered with [REDUCING].
        at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
        at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
        at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
        at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
        at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
        at java.lang.Thread.run(Thread.java:748)


The topic of this email is not about how to solve this issue, rather discussing about different type of exception to consistently inform users of incompatible state. I think StateMigrationException seems reasonable for this, as other types of state incompatibility is notified by that exception – key/namespace/value. Although the existing use of StateMigrationException is mostly from compatibility check from type serializer snapshot, its semantic seems to notify developers of state incompatibility in general. The type mismatch from state descriptor (like aggregating to reducing) seems more like upfront/fast check of incompatibility, so it looks better using StateMigrationException rather than too general IllegalStateException.

A bit of context behind this thought is we internally consider IllegalStateException from flink core code base as sort of runtime state violation (e.g., internal state is inconsistent), so we generally expect this to be not from user’s mistake. Apparently, the above incompatible type is frequent mistakes from developers when restoring from savepoint. We would like to narrow such a specific type of user error down to a certain type of exception, not being thrown as general java exception.

I would like to hear how you think about changing the exception type to StateMigrationException in checkStateMetaInfo. If it’s not reasonable, I wonder why and what alternative would be considered (e.g., another or new type).

Thanks,
Hwanju


Reply | Threaded
Open this post in threaded view
|

Re: IllegalStateException from incompatible state type - better exception type?

Till Rohrmann
Hi Hwanju,

thanks for starting this discussion. I pretty much like the idea to be able
to distinguish between user code faults and framework faults. This also
helps in deciding on the recovery action to take. Hence, I would be +1 for
using certain exceptions consistently in order to indicate the origin of a
fault where possible. The big challenge I see is to do it consistently and
to share this contract with the community so that future changes take this
into account. I don't have a good idea other than being very diligent.

Concerning the concrete usage of the StateMigrationException, I fear that
we have cases where we throw this exception if the serializers are
incompatible (user fault) but also when serialization fails (IOException
which can be a user (corrupt checkpoint) as well as framework fault
(filesystem hickup)) (
https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java#L213).
Consequently, we might first need to clean the usage of
StateMigrationException up before being able to use it consistently.
Moreover, in the case of reading a checkpoint I am not entirely sure how to
discern all possible errors into user and framework faults.

I am pulling in Gordon who worked on this part in the past and might be
able to give us some more details on the usage of the
StateMigrationException.

Cheers,
Till

On Wed, Oct 7, 2020 at 12:27 AM Kim, Hwanju <[hidden email]>
wrote:

> Hi!
>
> In case where new state type is incompatible with old one from savepoint,
> we get IllegalStateException via
> org.apache.flink.util.Preconditions.checkState from checkStateMetaInfo:
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220
>
> For example, if an updated app uses reducing state but restoring from
> aggregating state, we can get the following exception:
> java.lang.IllegalStateException: Incompatible key/value state types. Was
> [AGGREGATING], registered with [REDUCING].
>         at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>         at
> org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216)
>         at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520)
>         at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
>         at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
>         at
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>         at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>         at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568)
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
>         at java.lang.Thread.run(Thread.java:748)
>
>
> The topic of this email is not about how to solve this issue, rather
> discussing about different type of exception to consistently inform users
> of incompatible state. I think StateMigrationException seems reasonable for
> this, as other types of state incompatibility is notified by that exception
> – key/namespace/value. Although the existing use of StateMigrationException
> is mostly from compatibility check from type serializer snapshot, its
> semantic seems to notify developers of state incompatibility in general.
> The type mismatch from state descriptor (like aggregating to reducing)
> seems more like upfront/fast check of incompatibility, so it looks better
> using StateMigrationException rather than too general IllegalStateException.
>
> A bit of context behind this thought is we internally consider
> IllegalStateException from flink core code base as sort of runtime state
> violation (e.g., internal state is inconsistent), so we generally expect
> this to be not from user’s mistake. Apparently, the above incompatible type
> is frequent mistakes from developers when restoring from savepoint. We
> would like to narrow such a specific type of user error down to a certain
> type of exception, not being thrown as general java exception.
>
> I would like to hear how you think about changing the exception type to
> StateMigrationException in checkStateMetaInfo. If it’s not reasonable, I
> wonder why and what alternative would be considered (e.g., another or new
> type).
>
> Thanks,
> Hwanju
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: IllegalStateException from incompatible state type - better exception type?

Kim, Hwanju
Hi Till,

Thanks for the opinion and insight. I think your concern is valid, as still nontrivial number of issues would be somewhat subtle to confidently say it's framework or user error. Serialization-related issue is one of them, as even user-triggered issue leads to exceptions in many places from the framework without clear boundary. The approach so far has been not based on theory and completeness, but more on data-driven and likelihood. When it's confusing, we are err on the side of saying it's framework issue. But what you pointed out about StateMigrationException from migrateSerializedValue could be one example that is ambiguous. General idea about such ambiguous issue is if it's one-off hiccup, misclassification effect is low, but if it's constant (e.g., filesystem not working well), its impact is high but there should be more exceptions thrown from many places not just from a single point of state migration and expecting other dominant exceptions to be notified as framework issue.

If I understand it correctly,
1) From the perspective of semantic, state type mismatch (e.g., aggregating vs. reducing) can be thrown as StateMigrationException, as it is clearly from user with incompatible state.
2) Subtle StateMigrationException throws (like during migration not from compatibility check), we can refine that catching lower-level exceptions in a finer grain manner (possibly using a different exception).

I think the two issues would be independent, so either can be handled separately as long as the semantic of StateMigrationException is clear. Out of the two, 1) seems straightforward, so if agreed, I can create an issue to work on. Nonetheless, I would like to hear more about Gordon's thought.

Thanks,
Hwanju

On 10/7/20, 1:39 AM, "Till Rohrmann" <[hidden email]> wrote:

    CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



    Hi Hwanju,

    thanks for starting this discussion. I pretty much like the idea to be able
    to distinguish between user code faults and framework faults. This also
    helps in deciding on the recovery action to take. Hence, I would be +1 for
    using certain exceptions consistently in order to indicate the origin of a
    fault where possible. The big challenge I see is to do it consistently and
    to share this contract with the community so that future changes take this
    into account. I don't have a good idea other than being very diligent.

    Concerning the concrete usage of the StateMigrationException, I fear that
    we have cases where we throw this exception if the serializers are
    incompatible (user fault) but also when serialization fails (IOException
    which can be a user (corrupt checkpoint) as well as framework fault
    (filesystem hickup)) (
    https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java#L213).
    Consequently, we might first need to clean the usage of
    StateMigrationException up before being able to use it consistently.
    Moreover, in the case of reading a checkpoint I am not entirely sure how to
    discern all possible errors into user and framework faults.

    I am pulling in Gordon who worked on this part in the past and might be
    able to give us some more details on the usage of the
    StateMigrationException.

    Cheers,
    Till

    On Wed, Oct 7, 2020 at 12:27 AM Kim, Hwanju <[hidden email]>
    wrote:

    > Hi!
    >
    > In case where new state type is incompatible with old one from savepoint,
    > we get IllegalStateException via
    > org.apache.flink.util.Preconditions.checkState from checkStateMetaInfo:
    >
    > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220
    >
    > For example, if an updated app uses reducing state but restoring from
    > aggregating state, we can get the following exception:
    > java.lang.IllegalStateException: Incompatible key/value state types. Was
    > [AGGREGATING], registered with [REDUCING].
    >         at
    > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
    >         at
    > org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216)
    >         at
    > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520)
    >         at
    > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
    >         at
    > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
    >         at
    > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
    >         at
    > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
    >         at
    > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
    >         at
    > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568)
    >         at
    > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
    >         at
    > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438)
    >         at
    > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298)
    >         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
    >         at java.lang.Thread.run(Thread.java:748)
    >
    >
    > The topic of this email is not about how to solve this issue, rather
    > discussing about different type of exception to consistently inform users
    > of incompatible state. I think StateMigrationException seems reasonable for
    > this, as other types of state incompatibility is notified by that exception
    > – key/namespace/value. Although the existing use of StateMigrationException
    > is mostly from compatibility check from type serializer snapshot, its
    > semantic seems to notify developers of state incompatibility in general.
    > The type mismatch from state descriptor (like aggregating to reducing)
    > seems more like upfront/fast check of incompatibility, so it looks better
    > using StateMigrationException rather than too general IllegalStateException.
    >
    > A bit of context behind this thought is we internally consider
    > IllegalStateException from flink core code base as sort of runtime state
    > violation (e.g., internal state is inconsistent), so we generally expect
    > this to be not from user’s mistake. Apparently, the above incompatible type
    > is frequent mistakes from developers when restoring from savepoint. We
    > would like to narrow such a specific type of user error down to a certain
    > type of exception, not being thrown as general java exception.
    >
    > I would like to hear how you think about changing the exception type to
    > StateMigrationException in checkStateMetaInfo. If it’s not reasonable, I
    > wonder why and what alternative would be considered (e.g., another or new
    > type).
    >
    > Thanks,
    > Hwanju
    >
    >
    >

Reply | Threaded
Open this post in threaded view
|

Re: IllegalStateException from incompatible state type - better exception type?

Tzu-Li (Gordon) Tai
Hi,

Thanks for the discussions so far. I agree that the use of the
StateMigrationException and it's semantics is rather undefined and
inconsistent as of the current state.

On Thu, Oct 8, 2020 at 7:38 AM Kim, Hwanju <[hidden email]> wrote:

> Hi Till,
>
> Thanks for the opinion and insight. I think your concern is valid, as
> still nontrivial number of issues would be somewhat subtle to confidently
> say it's framework or user error. Serialization-related issue is one of
> them, as even user-triggered issue leads to exceptions in many places from
> the framework without clear boundary. The approach so far has been not
> based on theory and completeness, but more on data-driven and likelihood.
> When it's confusing, we are err on the side of saying it's framework issue.
> But what you pointed out about StateMigrationException from
> migrateSerializedValue could be one example that is ambiguous. General idea
> about such ambiguous issue is if it's one-off hiccup, misclassification
> effect is low, but if it's constant (e.g., filesystem not working well),
> its impact is high but there should be more exceptions thrown from many
> places not just from a single point of state migration and expecting other
> dominant exceptions to be notified as framework issue.
>
> If I understand it correctly,
> 1) From the perspective of semantic, state type mismatch (e.g.,
> aggregating vs. reducing) can be thrown as StateMigrationException, as it
> is clearly from user with incompatible state.
> 2) Subtle StateMigrationException throws (like during migration not from
> compatibility check), we can refine that catching lower-level exceptions in
> a finer grain manner (possibly using a different exception).
>

This observation is accurate. To summarize the discussions so far, we can
categorize all failures on restore related to state compatibility into two
categories:
1) incompatible state serializers
(TypeSerializerSchemaCompatibility#isIncompatible()) or types (ListState vs
MapState), and
2) if all compatibility checks pass and migration is required, subtle
filesystem hiccups / corrupted checkpoints files / incorrect serializer
implementations can still surface during the process of a migration.

I'm wondering if we should instead include a different exception type for
1).

For 1), a new exception type named IncompatibleStateDescriptorException
(TBD) seems like a better name, as in that case, the state can never be
migrated and is always a user-failure.
The name immediately implies that user-provided properties for state
access, e.g. serializers / state type, are incompatible with checkpointed
data.

For 2), we can continue to use StateMigrationException to wrap lower-level
exceptions. The intent here isn't to classify whether the error was
user-caused or a framework issue, but to simply relate the exception to the
process of state migration.

I agree that these can be handled separately, by starting off first with
differentiating the above scenarios with different exception types /
clearing the use of StateMigrationException.

Cheers,
Gordon


>
> I think the two issues would be independent, so either can be handled
> separately as long as the semantic of StateMigrationException is clear. Out
> of the two, 1) seems straightforward, so if agreed, I can create an issue
> to work on. Nonetheless, I would like to hear more about Gordon's thought.
>
> Thanks,
> Hwanju
>
> On 10/7/20, 1:39 AM, "Till Rohrmann" <[hidden email]> wrote:
>
>     CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
>     Hi Hwanju,
>
>     thanks for starting this discussion. I pretty much like the idea to be
> able
>     to distinguish between user code faults and framework faults. This also
>     helps in deciding on the recovery action to take. Hence, I would be +1
> for
>     using certain exceptions consistently in order to indicate the origin
> of a
>     fault where possible. The big challenge I see is to do it consistently
> and
>     to share this contract with the community so that future changes take
> this
>     into account. I don't have a good idea other than being very diligent.
>
>     Concerning the concrete usage of the StateMigrationException, I fear
> that
>     we have cases where we throw this exception if the serializers are
>     incompatible (user fault) but also when serialization fails
> (IOException
>     which can be a user (corrupt checkpoint) as well as framework fault
>     (filesystem hickup)) (
>
> https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java#L213
> ).
>     Consequently, we might first need to clean the usage of
>     StateMigrationException up before being able to use it consistently.
>     Moreover, in the case of reading a checkpoint I am not entirely sure
> how to
>     discern all possible errors into user and framework faults.
>
>     I am pulling in Gordon who worked on this part in the past and might be
>     able to give us some more details on the usage of the
>     StateMigrationException.
>
>     Cheers,
>     Till
>
>     On Wed, Oct 7, 2020 at 12:27 AM Kim, Hwanju
> <[hidden email]>
>     wrote:
>
>     > Hi!
>     >
>     > In case where new state type is incompatible with old one from
> savepoint,
>     > we get IllegalStateException via
>     > org.apache.flink.util.Preconditions.checkState from
> checkStateMetaInfo:
>     >
>     >
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220
>     >
>     > For example, if an updated app uses reducing state but restoring from
>     > aggregating state, we can get the following exception:
>     > java.lang.IllegalStateException: Incompatible key/value state types.
> Was
>     > [AGGREGATING], registered with [REDUCING].
>     >         at
>     >
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>     >         at
>     >
> org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216)
>     >         at
>     >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520)
>     >         at
>     >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
>     >         at
>     >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
>     >         at
>     >
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>     >         at
>     >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>     >         at
>     >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
>     >         at
>     >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568)
>     >         at
>     >
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
>     >         at
>     >
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438)
>     >         at
>     >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298)
>     >         at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
>     >         at java.lang.Thread.run(Thread.java:748)
>     >
>     >
>     > The topic of this email is not about how to solve this issue, rather
>     > discussing about different type of exception to consistently inform
> users
>     > of incompatible state. I think StateMigrationException seems
> reasonable for
>     > this, as other types of state incompatibility is notified by that
> exception
>     > – key/namespace/value. Although the existing use of
> StateMigrationException
>     > is mostly from compatibility check from type serializer snapshot, its
>     > semantic seems to notify developers of state incompatibility in
> general.
>     > The type mismatch from state descriptor (like aggregating to
> reducing)
>     > seems more like upfront/fast check of incompatibility, so it looks
> better
>     > using StateMigrationException rather than too general
> IllegalStateException.
>     >
>     > A bit of context behind this thought is we internally consider
>     > IllegalStateException from flink core code base as sort of runtime
> state
>     > violation (e.g., internal state is inconsistent), so we generally
> expect
>     > this to be not from user’s mistake. Apparently, the above
> incompatible type
>     > is frequent mistakes from developers when restoring from savepoint.
> We
>     > would like to narrow such a specific type of user error down to a
> certain
>     > type of exception, not being thrown as general java exception.
>     >
>     > I would like to hear how you think about changing the exception type
> to
>     > StateMigrationException in checkStateMetaInfo. If it’s not
> reasonable, I
>     > wonder why and what alternative would be considered (e.g., another
> or new
>     > type).
>     >
>     > Thanks,
>     > Hwanju
>     >
>     >
>     >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: IllegalStateException from incompatible state type - better exception type?

Kim, Hwanju
Thanks Gordon.

If its semantic is not (yet) clearly defined, I think we can make it a bit more clear.
Incrementally, I think any exception that's more specific than general Java exception (e.g., IllegalStateException) would be at least making information better, where or not there is clearly settled guideline for specific exceptions like StateMigrationException.

Thanks,
Hwanju

On 10/7/20, 11:53 PM, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote:

    CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



    Hi,

    Thanks for the discussions so far. I agree that the use of the
    StateMigrationException and it's semantics is rather undefined and
    inconsistent as of the current state.

    On Thu, Oct 8, 2020 at 7:38 AM Kim, Hwanju <[hidden email]> wrote:

    > Hi Till,
    >
    > Thanks for the opinion and insight. I think your concern is valid, as
    > still nontrivial number of issues would be somewhat subtle to confidently
    > say it's framework or user error. Serialization-related issue is one of
    > them, as even user-triggered issue leads to exceptions in many places from
    > the framework without clear boundary. The approach so far has been not
    > based on theory and completeness, but more on data-driven and likelihood.
    > When it's confusing, we are err on the side of saying it's framework issue.
    > But what you pointed out about StateMigrationException from
    > migrateSerializedValue could be one example that is ambiguous. General idea
    > about such ambiguous issue is if it's one-off hiccup, misclassification
    > effect is low, but if it's constant (e.g., filesystem not working well),
    > its impact is high but there should be more exceptions thrown from many
    > places not just from a single point of state migration and expecting other
    > dominant exceptions to be notified as framework issue.
    >
    > If I understand it correctly,
    > 1) From the perspective of semantic, state type mismatch (e.g.,
    > aggregating vs. reducing) can be thrown as StateMigrationException, as it
    > is clearly from user with incompatible state.
    > 2) Subtle StateMigrationException throws (like during migration not from
    > compatibility check), we can refine that catching lower-level exceptions in
    > a finer grain manner (possibly using a different exception).
    >

    This observation is accurate. To summarize the discussions so far, we can
    categorize all failures on restore related to state compatibility into two
    categories:
    1) incompatible state serializers
    (TypeSerializerSchemaCompatibility#isIncompatible()) or types (ListState vs
    MapState), and
    2) if all compatibility checks pass and migration is required, subtle
    filesystem hiccups / corrupted checkpoints files / incorrect serializer
    implementations can still surface during the process of a migration.

    I'm wondering if we should instead include a different exception type for
    1).

    For 1), a new exception type named IncompatibleStateDescriptorException
    (TBD) seems like a better name, as in that case, the state can never be
    migrated and is always a user-failure.
    The name immediately implies that user-provided properties for state
    access, e.g. serializers / state type, are incompatible with checkpointed
    data.

    For 2), we can continue to use StateMigrationException to wrap lower-level
    exceptions. The intent here isn't to classify whether the error was
    user-caused or a framework issue, but to simply relate the exception to the
    process of state migration.

    I agree that these can be handled separately, by starting off first with
    differentiating the above scenarios with different exception types /
    clearing the use of StateMigrationException.

    Cheers,
    Gordon


    >
    > I think the two issues would be independent, so either can be handled
    > separately as long as the semantic of StateMigrationException is clear. Out
    > of the two, 1) seems straightforward, so if agreed, I can create an issue
    > to work on. Nonetheless, I would like to hear more about Gordon's thought.
    >
    > Thanks,
    > Hwanju
    >
    > On 10/7/20, 1:39 AM, "Till Rohrmann" <[hidden email]> wrote:
    >
    >     CAUTION: This email originated from outside of the organization. Do
    > not click links or open attachments unless you can confirm the sender and
    > know the content is safe.
    >
    >
    >
    >     Hi Hwanju,
    >
    >     thanks for starting this discussion. I pretty much like the idea to be
    > able
    >     to distinguish between user code faults and framework faults. This also
    >     helps in deciding on the recovery action to take. Hence, I would be +1
    > for
    >     using certain exceptions consistently in order to indicate the origin
    > of a
    >     fault where possible. The big challenge I see is to do it consistently
    > and
    >     to share this contract with the community so that future changes take
    > this
    >     into account. I don't have a good idea other than being very diligent.
    >
    >     Concerning the concrete usage of the StateMigrationException, I fear
    > that
    >     we have cases where we throw this exception if the serializers are
    >     incompatible (user fault) but also when serialization fails
    > (IOException
    >     which can be a user (corrupt checkpoint) as well as framework fault
    >     (filesystem hickup)) (
    >
    > https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java#L213
    > ).
    >     Consequently, we might first need to clean the usage of
    >     StateMigrationException up before being able to use it consistently.
    >     Moreover, in the case of reading a checkpoint I am not entirely sure
    > how to
    >     discern all possible errors into user and framework faults.
    >
    >     I am pulling in Gordon who worked on this part in the past and might be
    >     able to give us some more details on the usage of the
    >     StateMigrationException.
    >
    >     Cheers,
    >     Till
    >
    >     On Wed, Oct 7, 2020 at 12:27 AM Kim, Hwanju
    > <[hidden email]>
    >     wrote:
    >
    >     > Hi!
    >     >
    >     > In case where new state type is incompatible with old one from
    > savepoint,
    >     > we get IllegalStateException via
    >     > org.apache.flink.util.Preconditions.checkState from
    > checkStateMetaInfo:
    >     >
    >     >
    > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220
    >     >
    >     > For example, if an updated app uses reducing state but restoring from
    >     > aggregating state, we can get the following exception:
    >     > java.lang.IllegalStateException: Incompatible key/value state types.
    > Was
    >     > [AGGREGATING], registered with [REDUCING].
    >     >         at
    >     >
    > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
    >     >         at
    >     >
    > org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216)
    >     >         at
    >     >
    > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520)
    >     >         at
    >     >
    > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
    >     >         at
    >     >
    > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
    >     >         at
    >     >
    > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
    >     >         at
    >     >
    > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
    >     >         at
    >     >
    > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
    >     >         at
    >     >
    > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568)
    >     >         at
    >     >
    > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240)
    >     >         at
    >     >
    > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438)
    >     >         at
    >     >
    > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298)
    >     >         at
    > org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
    >     >         at java.lang.Thread.run(Thread.java:748)
    >     >
    >     >
    >     > The topic of this email is not about how to solve this issue, rather
    >     > discussing about different type of exception to consistently inform
    > users
    >     > of incompatible state. I think StateMigrationException seems
    > reasonable for
    >     > this, as other types of state incompatibility is notified by that
    > exception
    >     > – key/namespace/value. Although the existing use of
    > StateMigrationException
    >     > is mostly from compatibility check from type serializer snapshot, its
    >     > semantic seems to notify developers of state incompatibility in
    > general.
    >     > The type mismatch from state descriptor (like aggregating to
    > reducing)
    >     > seems more like upfront/fast check of incompatibility, so it looks
    > better
    >     > using StateMigrationException rather than too general
    > IllegalStateException.
    >     >
    >     > A bit of context behind this thought is we internally consider
    >     > IllegalStateException from flink core code base as sort of runtime
    > state
    >     > violation (e.g., internal state is inconsistent), so we generally
    > expect
    >     > this to be not from user’s mistake. Apparently, the above
    > incompatible type
    >     > is frequent mistakes from developers when restoring from savepoint.
    > We
    >     > would like to narrow such a specific type of user error down to a
    > certain
    >     > type of exception, not being thrown as general java exception.
    >     >
    >     > I would like to hear how you think about changing the exception type
    > to
    >     > StateMigrationException in checkStateMetaInfo. If it’s not
    > reasonable, I
    >     > wonder why and what alternative would be considered (e.g., another
    > or new
    >     > type).
    >     >
    >     > Thanks,
    >     > Hwanju
    >     >
    >     >
    >     >
    >
    >