Hi!
In case where new state type is incompatible with old one from savepoint, we get IllegalStateException via org.apache.flink.util.Preconditions.checkState from checkStateMetaInfo: https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220 For example, if an updated app uses reducing state but restoring from aggregating state, we can get the following exception: java.lang.IllegalStateException: Incompatible key/value state types. Was [AGGREGATING], registered with [REDUCING]. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714) at java.lang.Thread.run(Thread.java:748) The topic of this email is not about how to solve this issue, rather discussing about different type of exception to consistently inform users of incompatible state. I think StateMigrationException seems reasonable for this, as other types of state incompatibility is notified by that exception – key/namespace/value. Although the existing use of StateMigrationException is mostly from compatibility check from type serializer snapshot, its semantic seems to notify developers of state incompatibility in general. The type mismatch from state descriptor (like aggregating to reducing) seems more like upfront/fast check of incompatibility, so it looks better using StateMigrationException rather than too general IllegalStateException. A bit of context behind this thought is we internally consider IllegalStateException from flink core code base as sort of runtime state violation (e.g., internal state is inconsistent), so we generally expect this to be not from user’s mistake. Apparently, the above incompatible type is frequent mistakes from developers when restoring from savepoint. We would like to narrow such a specific type of user error down to a certain type of exception, not being thrown as general java exception. I would like to hear how you think about changing the exception type to StateMigrationException in checkStateMetaInfo. If it’s not reasonable, I wonder why and what alternative would be considered (e.g., another or new type). Thanks, Hwanju |
Hi Hwanju,
thanks for starting this discussion. I pretty much like the idea to be able to distinguish between user code faults and framework faults. This also helps in deciding on the recovery action to take. Hence, I would be +1 for using certain exceptions consistently in order to indicate the origin of a fault where possible. The big challenge I see is to do it consistently and to share this contract with the community so that future changes take this into account. I don't have a good idea other than being very diligent. Concerning the concrete usage of the StateMigrationException, I fear that we have cases where we throw this exception if the serializers are incompatible (user fault) but also when serialization fails (IOException which can be a user (corrupt checkpoint) as well as framework fault (filesystem hickup)) ( https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java#L213). Consequently, we might first need to clean the usage of StateMigrationException up before being able to use it consistently. Moreover, in the case of reading a checkpoint I am not entirely sure how to discern all possible errors into user and framework faults. I am pulling in Gordon who worked on this part in the past and might be able to give us some more details on the usage of the StateMigrationException. Cheers, Till On Wed, Oct 7, 2020 at 12:27 AM Kim, Hwanju <[hidden email]> wrote: > Hi! > > In case where new state type is incompatible with old one from savepoint, > we get IllegalStateException via > org.apache.flink.util.Preconditions.checkState from checkStateMetaInfo: > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220 > > For example, if an updated app uses reducing state but restoring from > aggregating state, we can get the following exception: > java.lang.IllegalStateException: Incompatible key/value state types. Was > [AGGREGATING], registered with [REDUCING]. > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > at > org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714) > at java.lang.Thread.run(Thread.java:748) > > > The topic of this email is not about how to solve this issue, rather > discussing about different type of exception to consistently inform users > of incompatible state. I think StateMigrationException seems reasonable for > this, as other types of state incompatibility is notified by that exception > – key/namespace/value. Although the existing use of StateMigrationException > is mostly from compatibility check from type serializer snapshot, its > semantic seems to notify developers of state incompatibility in general. > The type mismatch from state descriptor (like aggregating to reducing) > seems more like upfront/fast check of incompatibility, so it looks better > using StateMigrationException rather than too general IllegalStateException. > > A bit of context behind this thought is we internally consider > IllegalStateException from flink core code base as sort of runtime state > violation (e.g., internal state is inconsistent), so we generally expect > this to be not from user’s mistake. Apparently, the above incompatible type > is frequent mistakes from developers when restoring from savepoint. We > would like to narrow such a specific type of user error down to a certain > type of exception, not being thrown as general java exception. > > I would like to hear how you think about changing the exception type to > StateMigrationException in checkStateMetaInfo. If it’s not reasonable, I > wonder why and what alternative would be considered (e.g., another or new > type). > > Thanks, > Hwanju > > > |
Hi Till,
Thanks for the opinion and insight. I think your concern is valid, as still nontrivial number of issues would be somewhat subtle to confidently say it's framework or user error. Serialization-related issue is one of them, as even user-triggered issue leads to exceptions in many places from the framework without clear boundary. The approach so far has been not based on theory and completeness, but more on data-driven and likelihood. When it's confusing, we are err on the side of saying it's framework issue. But what you pointed out about StateMigrationException from migrateSerializedValue could be one example that is ambiguous. General idea about such ambiguous issue is if it's one-off hiccup, misclassification effect is low, but if it's constant (e.g., filesystem not working well), its impact is high but there should be more exceptions thrown from many places not just from a single point of state migration and expecting other dominant exceptions to be notified as framework issue. If I understand it correctly, 1) From the perspective of semantic, state type mismatch (e.g., aggregating vs. reducing) can be thrown as StateMigrationException, as it is clearly from user with incompatible state. 2) Subtle StateMigrationException throws (like during migration not from compatibility check), we can refine that catching lower-level exceptions in a finer grain manner (possibly using a different exception). I think the two issues would be independent, so either can be handled separately as long as the semantic of StateMigrationException is clear. Out of the two, 1) seems straightforward, so if agreed, I can create an issue to work on. Nonetheless, I would like to hear more about Gordon's thought. Thanks, Hwanju On 10/7/20, 1:39 AM, "Till Rohrmann" <[hidden email]> wrote: CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hi Hwanju, thanks for starting this discussion. I pretty much like the idea to be able to distinguish between user code faults and framework faults. This also helps in deciding on the recovery action to take. Hence, I would be +1 for using certain exceptions consistently in order to indicate the origin of a fault where possible. The big challenge I see is to do it consistently and to share this contract with the community so that future changes take this into account. I don't have a good idea other than being very diligent. Concerning the concrete usage of the StateMigrationException, I fear that we have cases where we throw this exception if the serializers are incompatible (user fault) but also when serialization fails (IOException which can be a user (corrupt checkpoint) as well as framework fault (filesystem hickup)) ( https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java#L213). Consequently, we might first need to clean the usage of StateMigrationException up before being able to use it consistently. Moreover, in the case of reading a checkpoint I am not entirely sure how to discern all possible errors into user and framework faults. I am pulling in Gordon who worked on this part in the past and might be able to give us some more details on the usage of the StateMigrationException. Cheers, Till On Wed, Oct 7, 2020 at 12:27 AM Kim, Hwanju <[hidden email]> wrote: > Hi! > > In case where new state type is incompatible with old one from savepoint, > we get IllegalStateException via > org.apache.flink.util.Preconditions.checkState from checkStateMetaInfo: > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220 > > For example, if an updated app uses reducing state but restoring from > aggregating state, we can get the following exception: > java.lang.IllegalStateException: Incompatible key/value state types. Was > [AGGREGATING], registered with [REDUCING]. > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > at > org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714) > at java.lang.Thread.run(Thread.java:748) > > > The topic of this email is not about how to solve this issue, rather > discussing about different type of exception to consistently inform users > of incompatible state. I think StateMigrationException seems reasonable for > this, as other types of state incompatibility is notified by that exception > – key/namespace/value. Although the existing use of StateMigrationException > is mostly from compatibility check from type serializer snapshot, its > semantic seems to notify developers of state incompatibility in general. > The type mismatch from state descriptor (like aggregating to reducing) > seems more like upfront/fast check of incompatibility, so it looks better > using StateMigrationException rather than too general IllegalStateException. > > A bit of context behind this thought is we internally consider > IllegalStateException from flink core code base as sort of runtime state > violation (e.g., internal state is inconsistent), so we generally expect > this to be not from user’s mistake. Apparently, the above incompatible type > is frequent mistakes from developers when restoring from savepoint. We > would like to narrow such a specific type of user error down to a certain > type of exception, not being thrown as general java exception. > > I would like to hear how you think about changing the exception type to > StateMigrationException in checkStateMetaInfo. If it’s not reasonable, I > wonder why and what alternative would be considered (e.g., another or new > type). > > Thanks, > Hwanju > > > |
Hi,
Thanks for the discussions so far. I agree that the use of the StateMigrationException and it's semantics is rather undefined and inconsistent as of the current state. On Thu, Oct 8, 2020 at 7:38 AM Kim, Hwanju <[hidden email]> wrote: > Hi Till, > > Thanks for the opinion and insight. I think your concern is valid, as > still nontrivial number of issues would be somewhat subtle to confidently > say it's framework or user error. Serialization-related issue is one of > them, as even user-triggered issue leads to exceptions in many places from > the framework without clear boundary. The approach so far has been not > based on theory and completeness, but more on data-driven and likelihood. > When it's confusing, we are err on the side of saying it's framework issue. > But what you pointed out about StateMigrationException from > migrateSerializedValue could be one example that is ambiguous. General idea > about such ambiguous issue is if it's one-off hiccup, misclassification > effect is low, but if it's constant (e.g., filesystem not working well), > its impact is high but there should be more exceptions thrown from many > places not just from a single point of state migration and expecting other > dominant exceptions to be notified as framework issue. > > If I understand it correctly, > 1) From the perspective of semantic, state type mismatch (e.g., > aggregating vs. reducing) can be thrown as StateMigrationException, as it > is clearly from user with incompatible state. > 2) Subtle StateMigrationException throws (like during migration not from > compatibility check), we can refine that catching lower-level exceptions in > a finer grain manner (possibly using a different exception). > This observation is accurate. To summarize the discussions so far, we can categorize all failures on restore related to state compatibility into two categories: 1) incompatible state serializers (TypeSerializerSchemaCompatibility#isIncompatible()) or types (ListState vs MapState), and 2) if all compatibility checks pass and migration is required, subtle filesystem hiccups / corrupted checkpoints files / incorrect serializer implementations can still surface during the process of a migration. I'm wondering if we should instead include a different exception type for 1). For 1), a new exception type named IncompatibleStateDescriptorException (TBD) seems like a better name, as in that case, the state can never be migrated and is always a user-failure. The name immediately implies that user-provided properties for state access, e.g. serializers / state type, are incompatible with checkpointed data. For 2), we can continue to use StateMigrationException to wrap lower-level exceptions. The intent here isn't to classify whether the error was user-caused or a framework issue, but to simply relate the exception to the process of state migration. I agree that these can be handled separately, by starting off first with differentiating the above scenarios with different exception types / clearing the use of StateMigrationException. Cheers, Gordon > > I think the two issues would be independent, so either can be handled > separately as long as the semantic of StateMigrationException is clear. Out > of the two, 1) seems straightforward, so if agreed, I can create an issue > to work on. Nonetheless, I would like to hear more about Gordon's thought. > > Thanks, > Hwanju > > On 10/7/20, 1:39 AM, "Till Rohrmann" <[hidden email]> wrote: > > CAUTION: This email originated from outside of the organization. Do > not click links or open attachments unless you can confirm the sender and > know the content is safe. > > > > Hi Hwanju, > > thanks for starting this discussion. I pretty much like the idea to be > able > to distinguish between user code faults and framework faults. This also > helps in deciding on the recovery action to take. Hence, I would be +1 > for > using certain exceptions consistently in order to indicate the origin > of a > fault where possible. The big challenge I see is to do it consistently > and > to share this contract with the community so that future changes take > this > into account. I don't have a good idea other than being very diligent. > > Concerning the concrete usage of the StateMigrationException, I fear > that > we have cases where we throw this exception if the serializers are > incompatible (user fault) but also when serialization fails > (IOException > which can be a user (corrupt checkpoint) as well as framework fault > (filesystem hickup)) ( > > https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java#L213 > ). > Consequently, we might first need to clean the usage of > StateMigrationException up before being able to use it consistently. > Moreover, in the case of reading a checkpoint I am not entirely sure > how to > discern all possible errors into user and framework faults. > > I am pulling in Gordon who worked on this part in the past and might be > able to give us some more details on the usage of the > StateMigrationException. > > Cheers, > Till > > On Wed, Oct 7, 2020 at 12:27 AM Kim, Hwanju > <[hidden email]> > wrote: > > > Hi! > > > > In case where new state type is incompatible with old one from > savepoint, > > we get IllegalStateException via > > org.apache.flink.util.Preconditions.checkState from > checkStateMetaInfo: > > > > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220 > > > > For example, if an updated app uses reducing state but restoring from > > aggregating state, we can get the following exception: > > java.lang.IllegalStateException: Incompatible key/value state types. > Was > > [AGGREGATING], registered with [REDUCING]. > > at > > > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > > at > > > org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216) > > at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520) > > at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) > > at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) > > at > > > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > > at > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > > at > > > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) > > at > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568) > > at > > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298) > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:714) > > at java.lang.Thread.run(Thread.java:748) > > > > > > The topic of this email is not about how to solve this issue, rather > > discussing about different type of exception to consistently inform > users > > of incompatible state. I think StateMigrationException seems > reasonable for > > this, as other types of state incompatibility is notified by that > exception > > – key/namespace/value. Although the existing use of > StateMigrationException > > is mostly from compatibility check from type serializer snapshot, its > > semantic seems to notify developers of state incompatibility in > general. > > The type mismatch from state descriptor (like aggregating to > reducing) > > seems more like upfront/fast check of incompatibility, so it looks > better > > using StateMigrationException rather than too general > IllegalStateException. > > > > A bit of context behind this thought is we internally consider > > IllegalStateException from flink core code base as sort of runtime > state > > violation (e.g., internal state is inconsistent), so we generally > expect > > this to be not from user’s mistake. Apparently, the above > incompatible type > > is frequent mistakes from developers when restoring from savepoint. > We > > would like to narrow such a specific type of user error down to a > certain > > type of exception, not being thrown as general java exception. > > > > I would like to hear how you think about changing the exception type > to > > StateMigrationException in checkStateMetaInfo. If it’s not > reasonable, I > > wonder why and what alternative would be considered (e.g., another > or new > > type). > > > > Thanks, > > Hwanju > > > > > > > > |
Thanks Gordon.
If its semantic is not (yet) clearly defined, I think we can make it a bit more clear. Incrementally, I think any exception that's more specific than general Java exception (e.g., IllegalStateException) would be at least making information better, where or not there is clearly settled guideline for specific exceptions like StateMigrationException. Thanks, Hwanju On 10/7/20, 11:53 PM, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote: CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hi, Thanks for the discussions so far. I agree that the use of the StateMigrationException and it's semantics is rather undefined and inconsistent as of the current state. On Thu, Oct 8, 2020 at 7:38 AM Kim, Hwanju <[hidden email]> wrote: > Hi Till, > > Thanks for the opinion and insight. I think your concern is valid, as > still nontrivial number of issues would be somewhat subtle to confidently > say it's framework or user error. Serialization-related issue is one of > them, as even user-triggered issue leads to exceptions in many places from > the framework without clear boundary. The approach so far has been not > based on theory and completeness, but more on data-driven and likelihood. > When it's confusing, we are err on the side of saying it's framework issue. > But what you pointed out about StateMigrationException from > migrateSerializedValue could be one example that is ambiguous. General idea > about such ambiguous issue is if it's one-off hiccup, misclassification > effect is low, but if it's constant (e.g., filesystem not working well), > its impact is high but there should be more exceptions thrown from many > places not just from a single point of state migration and expecting other > dominant exceptions to be notified as framework issue. > > If I understand it correctly, > 1) From the perspective of semantic, state type mismatch (e.g., > aggregating vs. reducing) can be thrown as StateMigrationException, as it > is clearly from user with incompatible state. > 2) Subtle StateMigrationException throws (like during migration not from > compatibility check), we can refine that catching lower-level exceptions in > a finer grain manner (possibly using a different exception). > This observation is accurate. To summarize the discussions so far, we can categorize all failures on restore related to state compatibility into two categories: 1) incompatible state serializers (TypeSerializerSchemaCompatibility#isIncompatible()) or types (ListState vs MapState), and 2) if all compatibility checks pass and migration is required, subtle filesystem hiccups / corrupted checkpoints files / incorrect serializer implementations can still surface during the process of a migration. I'm wondering if we should instead include a different exception type for 1). For 1), a new exception type named IncompatibleStateDescriptorException (TBD) seems like a better name, as in that case, the state can never be migrated and is always a user-failure. The name immediately implies that user-provided properties for state access, e.g. serializers / state type, are incompatible with checkpointed data. For 2), we can continue to use StateMigrationException to wrap lower-level exceptions. The intent here isn't to classify whether the error was user-caused or a framework issue, but to simply relate the exception to the process of state migration. I agree that these can be handled separately, by starting off first with differentiating the above scenarios with different exception types / clearing the use of StateMigrationException. Cheers, Gordon > > I think the two issues would be independent, so either can be handled > separately as long as the semantic of StateMigrationException is clear. Out > of the two, 1) seems straightforward, so if agreed, I can create an issue > to work on. Nonetheless, I would like to hear more about Gordon's thought. > > Thanks, > Hwanju > > On 10/7/20, 1:39 AM, "Till Rohrmann" <[hidden email]> wrote: > > CAUTION: This email originated from outside of the organization. Do > not click links or open attachments unless you can confirm the sender and > know the content is safe. > > > > Hi Hwanju, > > thanks for starting this discussion. I pretty much like the idea to be > able > to distinguish between user code faults and framework faults. This also > helps in deciding on the recovery action to take. Hence, I would be +1 > for > using certain exceptions consistently in order to indicate the origin > of a > fault where possible. The big challenge I see is to do it consistently > and > to share this contract with the community so that future changes take > this > into account. I don't have a good idea other than being very diligent. > > Concerning the concrete usage of the StateMigrationException, I fear > that > we have cases where we throw this exception if the serializers are > incompatible (user fault) but also when serialization fails > (IOException > which can be a user (corrupt checkpoint) as well as framework fault > (filesystem hickup)) ( > > https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java#L213 > ). > Consequently, we might first need to clean the usage of > StateMigrationException up before being able to use it consistently. > Moreover, in the case of reading a checkpoint I am not entirely sure > how to > discern all possible errors into user and framework faults. > > I am pulling in Gordon who worked on this part in the past and might be > able to give us some more details on the usage of the > StateMigrationException. > > Cheers, > Till > > On Wed, Oct 7, 2020 at 12:27 AM Kim, Hwanju > <[hidden email]> > wrote: > > > Hi! > > > > In case where new state type is incompatible with old one from > savepoint, > > we get IllegalStateException via > > org.apache.flink.util.Preconditions.checkState from > checkStateMetaInfo: > > > > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java#L220 > > > > For example, if an updated app uses reducing state but restoring from > > aggregating state, we can get the following exception: > > java.lang.IllegalStateException: Incompatible key/value state types. > Was > > [AGGREGATING], registered with [REDUCING]. > > at > > > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > > at > > > org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(RegisteredKeyValueStateBackendMetaInfo.java:216) > > at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:520) > > at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) > > at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) > > at > > > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > > at > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > > at > > > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) > > at > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:568) > > at > > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:240) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:438) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:298) > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:714) > > at java.lang.Thread.run(Thread.java:748) > > > > > > The topic of this email is not about how to solve this issue, rather > > discussing about different type of exception to consistently inform > users > > of incompatible state. I think StateMigrationException seems > reasonable for > > this, as other types of state incompatibility is notified by that > exception > > – key/namespace/value. Although the existing use of > StateMigrationException > > is mostly from compatibility check from type serializer snapshot, its > > semantic seems to notify developers of state incompatibility in > general. > > The type mismatch from state descriptor (like aggregating to > reducing) > > seems more like upfront/fast check of incompatibility, so it looks > better > > using StateMigrationException rather than too general > IllegalStateException. > > > > A bit of context behind this thought is we internally consider > > IllegalStateException from flink core code base as sort of runtime > state > > violation (e.g., internal state is inconsistent), so we generally > expect > > this to be not from user’s mistake. Apparently, the above > incompatible type > > is frequent mistakes from developers when restoring from savepoint. > We > > would like to narrow such a specific type of user error down to a > certain > > type of exception, not being thrown as general java exception. > > > > I would like to hear how you think about changing the exception type > to > > StateMigrationException in checkStateMetaInfo. If it’s not > reasonable, I > > wonder why and what alternative would be considered (e.g., another > or new > > type). > > > > Thanks, > > Hwanju > > > > > > > > |
Free forum by Nabble | Edit this page |