[DISCUSS] Breaking Savepoint Compatibility from 1.1 to 1.2

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Breaking Savepoint Compatibility from 1.1 to 1.2

Aljoscha Krettek-2
Hi,
Stefan and I are currently working on preparing our state infrastructure
for the introduction of key-grouped state. This is the parent issue for
key-grouped state https://issues.apache.org/jira/browse/FLINK-3755 while
this is the specific issue that we are currently working on
https://issues.apache.org/jira/browse/FLINK-4381.

We are at a point where we think that we have a reasonable implementation
that we think is good and quite future proof. Unfortunately, this would
break compatibility of Savepoints between versions before our code and
versions with our changes. We would like to discuss how to proceed with
this since it has the potential to affect a lot of people. I'll first try
and explain the current state of state (pun intended) and then give an
overview of the changes that we currently have.

In the current version (Flink 1.1, master ...) the state that an operator
sends to the CheckpointCoordinator is a black box (serialized using Java
Serialization). The checkpoint coordinator stores it and when a job is
restarted it sends these black boxes to the tasks which know how to read
them again. The serialized object that the tasks sends as state roughly
looks like this:

class StreamTaskStateList {
  StreamTaskState states[]
}

class StreamTaskState {
  StateHandle<?> operatorState;
  StateHandle<Serializable> functionState;
  HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates;
}

the kv states are a map from state name to a snapshot of a keyed state (all
of it). The other fields are more black boxes of state that was serialized
using Java Serialization.

Our current code sends this around between task and CheckpointCoordinator:

StreamStateHandle[] chainedStates
KeyGroupsStateHandle keyGroupsState

class KeyGroupsStateHandle {
  Map<Integer, Long> keyGroups // this is the index
  StreamStateHandle data // this is the actual state for each key group
}

the index is used for finding the contents for a specific key group in the
stream. The chained states are the state for each operator in the chain,
written to a stream. (This contains both operator state and the function
state). This representation allows us to break the chain of operators if we
want to in the future because we have the state of each operator separately
and the checkpoint coordinator is aware of it. The key-group state
representation allows the checkpoint coordinator to re-assign the key
groups to operators upon restore. We should also mention that all of the
state on the CheckpointCoordinator side will be serialized using our code,
not Java serialization. This should allow the possibility of schema
evolution in the future.

The problem now is that stuff breaks when we try and restore from a
savepoint with the old format on a system that uses the new format. The
only solution that I see right now is to keep all the old state classes as
they are. Create a new hierarchy for Flink 1.2 and when restoring from a
pre-1.2 savepoint we have to manually try and tweeze out the old state and
put it into the new representation. This will get nasty very quickly, for
example, think about the state backends where we basically have to have two
versions now and have code that can read from the old state and then funnel
that into the new-version state backend somehow. As I said, it's not
impossible but very involved.

By the way, this problem of versions of code is not restricted to
savepoints, for every code that could be affected by loading versions of
stuff from earlier code we essentially have to keep them as they are
forever. (Or teach operators how to load state from earlier versions as
well.)

So, what do you think about this?

Cheers,
Stefan & Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Breaking Savepoint Compatibility from 1.1 to 1.2

Gyula Fóra-2
Hi,

I think this is a very important change for the the future of the system
that provides a much cleaner internal representation of the states.

You are right that this can in theory break programs written in 1.1 when
upgraded to 1.2 but I wonder if this will be actually a practical problem
in the companies using Flink. If it is very important to keep the job
running they can delay upgrading to 1.2 and find a way to be able to
restart the job. This will probably come in handy on that rainy Sunday when
every system suddenly breaks :)

I know this might be a stupid way to look at it  but adding all the
plumbing code for compatibility might actually cause more issues in the
long run than it wins for us.

Just my thoughts :)

Regards,
Gyula


Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2016. aug. 11.,
Cs, 16:52):

> Hi,
> Stefan and I are currently working on preparing our state infrastructure
> for the introduction of key-grouped state. This is the parent issue for
> key-grouped state https://issues.apache.org/jira/browse/FLINK-3755 while
> this is the specific issue that we are currently working on
> https://issues.apache.org/jira/browse/FLINK-4381.
>
> We are at a point where we think that we have a reasonable implementation
> that we think is good and quite future proof. Unfortunately, this would
> break compatibility of Savepoints between versions before our code and
> versions with our changes. We would like to discuss how to proceed with
> this since it has the potential to affect a lot of people. I'll first try
> and explain the current state of state (pun intended) and then give an
> overview of the changes that we currently have.
>
> In the current version (Flink 1.1, master ...) the state that an operator
> sends to the CheckpointCoordinator is a black box (serialized using Java
> Serialization). The checkpoint coordinator stores it and when a job is
> restarted it sends these black boxes to the tasks which know how to read
> them again. The serialized object that the tasks sends as state roughly
> looks like this:
>
> class StreamTaskStateList {
>   StreamTaskState states[]
> }
>
> class StreamTaskState {
>   StateHandle<?> operatorState;
>   StateHandle<Serializable> functionState;
>   HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates;
> }
>
> the kv states are a map from state name to a snapshot of a keyed state (all
> of it). The other fields are more black boxes of state that was serialized
> using Java Serialization.
>
> Our current code sends this around between task and CheckpointCoordinator:
>
> StreamStateHandle[] chainedStates
> KeyGroupsStateHandle keyGroupsState
>
> class KeyGroupsStateHandle {
>   Map<Integer, Long> keyGroups // this is the index
>   StreamStateHandle data // this is the actual state for each key group
> }
>
> the index is used for finding the contents for a specific key group in the
> stream. The chained states are the state for each operator in the chain,
> written to a stream. (This contains both operator state and the function
> state). This representation allows us to break the chain of operators if we
> want to in the future because we have the state of each operator separately
> and the checkpoint coordinator is aware of it. The key-group state
> representation allows the checkpoint coordinator to re-assign the key
> groups to operators upon restore. We should also mention that all of the
> state on the CheckpointCoordinator side will be serialized using our code,
> not Java serialization. This should allow the possibility of schema
> evolution in the future.
>
> The problem now is that stuff breaks when we try and restore from a
> savepoint with the old format on a system that uses the new format. The
> only solution that I see right now is to keep all the old state classes as
> they are. Create a new hierarchy for Flink 1.2 and when restoring from a
> pre-1.2 savepoint we have to manually try and tweeze out the old state and
> put it into the new representation. This will get nasty very quickly, for
> example, think about the state backends where we basically have to have two
> versions now and have code that can read from the old state and then funnel
> that into the new-version state backend somehow. As I said, it's not
> impossible but very involved.
>
> By the way, this problem of versions of code is not restricted to
> savepoints, for every code that could be affected by loading versions of
> stuff from earlier code we essentially have to keep them as they are
> forever. (Or teach operators how to load state from earlier versions as
> well.)
>
> So, what do you think about this?
>
> Cheers,
> Stefan & Aljoscha
>
mxm
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Breaking Savepoint Compatibility from 1.1 to 1.2

mxm
Hi Aljoscha,

I'm not very deep into the state backend implementation. However, I
think a breaking change is unavoidable with the new key groups. The
only way that we achieve backwards-compatibility is to include a
translator from the old state format to the new one. As you already
mentioned, this is quite involved and we would have to maintain this
translator at least until 2.0.

I'm with Gyula; I think we can afford to break this once for 1.2 and
maintain backwards-compatibility afterwards. We should ask some
production users and get their feedback on the problem.

Cheers,
Max

On Thu, Aug 11, 2016 at 5:23 PM, Gyula Fóra <[hidden email]> wrote:

> Hi,
>
> I think this is a very important change for the the future of the system
> that provides a much cleaner internal representation of the states.
>
> You are right that this can in theory break programs written in 1.1 when
> upgraded to 1.2 but I wonder if this will be actually a practical problem
> in the companies using Flink. If it is very important to keep the job
> running they can delay upgrading to 1.2 and find a way to be able to
> restart the job. This will probably come in handy on that rainy Sunday when
> every system suddenly breaks :)
>
> I know this might be a stupid way to look at it  but adding all the
> plumbing code for compatibility might actually cause more issues in the
> long run than it wins for us.
>
> Just my thoughts :)
>
> Regards,
> Gyula
>
>
> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2016. aug. 11.,
> Cs, 16:52):
>
>> Hi,
>> Stefan and I are currently working on preparing our state infrastructure
>> for the introduction of key-grouped state. This is the parent issue for
>> key-grouped state https://issues.apache.org/jira/browse/FLINK-3755 while
>> this is the specific issue that we are currently working on
>> https://issues.apache.org/jira/browse/FLINK-4381.
>>
>> We are at a point where we think that we have a reasonable implementation
>> that we think is good and quite future proof. Unfortunately, this would
>> break compatibility of Savepoints between versions before our code and
>> versions with our changes. We would like to discuss how to proceed with
>> this since it has the potential to affect a lot of people. I'll first try
>> and explain the current state of state (pun intended) and then give an
>> overview of the changes that we currently have.
>>
>> In the current version (Flink 1.1, master ...) the state that an operator
>> sends to the CheckpointCoordinator is a black box (serialized using Java
>> Serialization). The checkpoint coordinator stores it and when a job is
>> restarted it sends these black boxes to the tasks which know how to read
>> them again. The serialized object that the tasks sends as state roughly
>> looks like this:
>>
>> class StreamTaskStateList {
>>   StreamTaskState states[]
>> }
>>
>> class StreamTaskState {
>>   StateHandle<?> operatorState;
>>   StateHandle<Serializable> functionState;
>>   HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates;
>> }
>>
>> the kv states are a map from state name to a snapshot of a keyed state (all
>> of it). The other fields are more black boxes of state that was serialized
>> using Java Serialization.
>>
>> Our current code sends this around between task and CheckpointCoordinator:
>>
>> StreamStateHandle[] chainedStates
>> KeyGroupsStateHandle keyGroupsState
>>
>> class KeyGroupsStateHandle {
>>   Map<Integer, Long> keyGroups // this is the index
>>   StreamStateHandle data // this is the actual state for each key group
>> }
>>
>> the index is used for finding the contents for a specific key group in the
>> stream. The chained states are the state for each operator in the chain,
>> written to a stream. (This contains both operator state and the function
>> state). This representation allows us to break the chain of operators if we
>> want to in the future because we have the state of each operator separately
>> and the checkpoint coordinator is aware of it. The key-group state
>> representation allows the checkpoint coordinator to re-assign the key
>> groups to operators upon restore. We should also mention that all of the
>> state on the CheckpointCoordinator side will be serialized using our code,
>> not Java serialization. This should allow the possibility of schema
>> evolution in the future.
>>
>> The problem now is that stuff breaks when we try and restore from a
>> savepoint with the old format on a system that uses the new format. The
>> only solution that I see right now is to keep all the old state classes as
>> they are. Create a new hierarchy for Flink 1.2 and when restoring from a
>> pre-1.2 savepoint we have to manually try and tweeze out the old state and
>> put it into the new representation. This will get nasty very quickly, for
>> example, think about the state backends where we basically have to have two
>> versions now and have code that can read from the old state and then funnel
>> that into the new-version state backend somehow. As I said, it's not
>> impossible but very involved.
>>
>> By the way, this problem of versions of code is not restricted to
>> savepoints, for every code that could be affected by loading versions of
>> stuff from earlier code we essentially have to keep them as they are
>> forever. (Or teach operators how to load state from earlier versions as
>> well.)
>>
>> So, what do you think about this?
>>
>> Cheers,
>> Stefan & Aljoscha
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Breaking Savepoint Compatibility from 1.1 to 1.2

Ufuk Celebi-2
I agree that it is acceptable to break it (again). At the moment, we
already prohibit users from resuming 1.0 savepoints with 1.1 and so
far (~1 week since release) no one complained. I think that at this
point most users use it only with the same major version as they used
when triggering the savepoint, e.g. 1) pause and resume and maybe 2)
pause, update bugfix version, resume.



On Fri, Aug 12, 2016 at 11:11 AM, Maximilian Michels <[hidden email]> wrote:

> Hi Aljoscha,
>
> I'm not very deep into the state backend implementation. However, I
> think a breaking change is unavoidable with the new key groups. The
> only way that we achieve backwards-compatibility is to include a
> translator from the old state format to the new one. As you already
> mentioned, this is quite involved and we would have to maintain this
> translator at least until 2.0.
>
> I'm with Gyula; I think we can afford to break this once for 1.2 and
> maintain backwards-compatibility afterwards. We should ask some
> production users and get their feedback on the problem.
>
> Cheers,
> Max
>
> On Thu, Aug 11, 2016 at 5:23 PM, Gyula Fóra <[hidden email]> wrote:
>> Hi,
>>
>> I think this is a very important change for the the future of the system
>> that provides a much cleaner internal representation of the states.
>>
>> You are right that this can in theory break programs written in 1.1 when
>> upgraded to 1.2 but I wonder if this will be actually a practical problem
>> in the companies using Flink. If it is very important to keep the job
>> running they can delay upgrading to 1.2 and find a way to be able to
>> restart the job. This will probably come in handy on that rainy Sunday when
>> every system suddenly breaks :)
>>
>> I know this might be a stupid way to look at it  but adding all the
>> plumbing code for compatibility might actually cause more issues in the
>> long run than it wins for us.
>>
>> Just my thoughts :)
>>
>> Regards,
>> Gyula
>>
>>
>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2016. aug. 11.,
>> Cs, 16:52):
>>
>>> Hi,
>>> Stefan and I are currently working on preparing our state infrastructure
>>> for the introduction of key-grouped state. This is the parent issue for
>>> key-grouped state https://issues.apache.org/jira/browse/FLINK-3755 while
>>> this is the specific issue that we are currently working on
>>> https://issues.apache.org/jira/browse/FLINK-4381.
>>>
>>> We are at a point where we think that we have a reasonable implementation
>>> that we think is good and quite future proof. Unfortunately, this would
>>> break compatibility of Savepoints between versions before our code and
>>> versions with our changes. We would like to discuss how to proceed with
>>> this since it has the potential to affect a lot of people. I'll first try
>>> and explain the current state of state (pun intended) and then give an
>>> overview of the changes that we currently have.
>>>
>>> In the current version (Flink 1.1, master ...) the state that an operator
>>> sends to the CheckpointCoordinator is a black box (serialized using Java
>>> Serialization). The checkpoint coordinator stores it and when a job is
>>> restarted it sends these black boxes to the tasks which know how to read
>>> them again. The serialized object that the tasks sends as state roughly
>>> looks like this:
>>>
>>> class StreamTaskStateList {
>>>   StreamTaskState states[]
>>> }
>>>
>>> class StreamTaskState {
>>>   StateHandle<?> operatorState;
>>>   StateHandle<Serializable> functionState;
>>>   HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates;
>>> }
>>>
>>> the kv states are a map from state name to a snapshot of a keyed state (all
>>> of it). The other fields are more black boxes of state that was serialized
>>> using Java Serialization.
>>>
>>> Our current code sends this around between task and CheckpointCoordinator:
>>>
>>> StreamStateHandle[] chainedStates
>>> KeyGroupsStateHandle keyGroupsState
>>>
>>> class KeyGroupsStateHandle {
>>>   Map<Integer, Long> keyGroups // this is the index
>>>   StreamStateHandle data // this is the actual state for each key group
>>> }
>>>
>>> the index is used for finding the contents for a specific key group in the
>>> stream. The chained states are the state for each operator in the chain,
>>> written to a stream. (This contains both operator state and the function
>>> state). This representation allows us to break the chain of operators if we
>>> want to in the future because we have the state of each operator separately
>>> and the checkpoint coordinator is aware of it. The key-group state
>>> representation allows the checkpoint coordinator to re-assign the key
>>> groups to operators upon restore. We should also mention that all of the
>>> state on the CheckpointCoordinator side will be serialized using our code,
>>> not Java serialization. This should allow the possibility of schema
>>> evolution in the future.
>>>
>>> The problem now is that stuff breaks when we try and restore from a
>>> savepoint with the old format on a system that uses the new format. The
>>> only solution that I see right now is to keep all the old state classes as
>>> they are. Create a new hierarchy for Flink 1.2 and when restoring from a
>>> pre-1.2 savepoint we have to manually try and tweeze out the old state and
>>> put it into the new representation. This will get nasty very quickly, for
>>> example, think about the state backends where we basically have to have two
>>> versions now and have code that can read from the old state and then funnel
>>> that into the new-version state backend somehow. As I said, it's not
>>> impossible but very involved.
>>>
>>> By the way, this problem of versions of code is not restricted to
>>> savepoints, for every code that could be affected by loading versions of
>>> stuff from earlier code we essentially have to keep them as they are
>>> forever. (Or teach operators how to load state from earlier versions as
>>> well.)
>>>
>>> So, what do you think about this?
>>>
>>> Cheers,
>>> Stefan & Aljoscha
>>>