Re: Is it possible to do state migration with checkpoints?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Re: Is it possible to do state migration with checkpoints?

Sivaprasanna
Adding dev@ to get some traction. Any help would be greatly appreciated.

Thanks.

On Thu, Jul 23, 2020 at 11:48 AM Sivaprasanna <[hidden email]> wrote:
[hidden email] 

A follow up question. I tried taking a savepoint but the job failed immediately. It happens everytime I take a savepoint. The job is running on a Yarn cluster so it fails with "container running out of memory". The state size averages around 1.2G but also peaks to ~4.5 GB sometimes (please refer to the screenshot below). The job is running with 2GB task manager heap & 2GB task manager managed memory. I increased the managed memory to 6GB assuming the failure has something to do with RocksDB but it failed even with 6GB managed memory. I guess I am missing on some configurations. Can you folks please help me with this?



On Wed, Jul 22, 2020 at 7:32 PM Sivaprasanna <[hidden email]> wrote:
Hi,

We are trying out state schema migration for one of our stateful pipelines. We use few Avro type states. Changes made to the job:
    1. Updated the schema for one of the states (added a new 'boolean' field with default value). 
    2. Modified the code by removing a couple of ValueStates.

To push these changes, I stopped the live job and resubmitted the new jar with the latest *checkpoint* path. However, the job failed with the following error:

java.lang.RuntimeException: Error while getting state
    at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
    at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
    ...
    ...
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)


I was going through the state schema evolution doc. The document mentions that we need to take a *savepoint* and restart the job with the savepoint path. We are using RocksDB backend with incremental checkpoint enabled. Can we not use the latest checkpoint available when we are dealing with state schema changes?

Complete stacktrace is attached with this mail.

-
Sivaprasanna