Re: Task Local Recovery with mountable disks in the cloud

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: Task Local Recovery with mountable disks in the cloud

Stephan Ewen
/cc dev@flink


On Tue, Apr 20, 2021 at 1:29 AM Sonam Mandal <[hidden email]> wrote:

> Hello,
>
> We've been experimenting with Task-local recovery using Kubernetes. We
> have a way to specify mounting the same disk across Task Manager
> restarts/deletions for when the pods get recreated. In this scenario, we
> noticed that task local recovery does not kick in (as expected based on the
> documentation).
>
> We did try to comment out the code on the shutdown path which cleaned up
> the task local directories before the pod went down / was restarted. We
> noticed that remote recovery kicked in even though the task local state was
> present. I noticed that the slot IDs changed, and was wondering if this is
> the main reason that the task local state didn't get used in this scenario?
>
> Since we're using this shared disk to store the local state across pod
> failures, would it make sense to allow keeping the task local state so that
> we can get faster recovery even for situations where the Task Manager
> itself dies? In some sense, the storage here is disaggregated from the pods
> and can potentially benefit from task local recovery. Any reason why this
> is a bad idea in general?
>
> Is there a way to preserve the slot IDs across restarts? We setup the Task
> Manager to pin the resource-id, but that didn't seem to help. My
> understanding is that the slot ID needs to be reused for task local
> recovery to kick in.
>
> Thanks,
> Sonam
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Task Local Recovery with mountable disks in the cloud

Till Rohrmann
Hi Sonam,

sorry for the late reply. We were a bit caught in the midst of the feature
freeze for the next major Flink release.

In general, I think it is a very good idea to disaggregate the local state
storage to make it reusable across TaskManager failures. However, it is
also not trivial to do.

Maybe let me first describe how the current task local recovery works and
then see how we could improve it:

Flink creates for every slot allocation an AllocationID. The AllocationID
associates a slot on a TaskExecutor with a job and is also used for scoping
the lifetime of a slot wrt a job (theoretically, one and the same slot
could be used to fulfill multiple slot requests of the same job if the slot
allocation is freed in between). Note that the AllocationID is a random ID
and, thus, changes whenever the ResourceManager allocates a new slot on a
TaskExecutor for a job.

Task local recovery is effectively a state cache which is associated with
an AllocationID. So for every checkpoint and every task, a TaskExecutor
copies the state data and stores them in the task local recovery cache. The
cache is maintained as long as the slot allocation is valid (e.g. the slot
has not been freed by the JobMaster and the slot has not timed out). This
makes the lifecycle management of the state data quite easy and makes sure
that a process does not clutter local disks. On the JobMaster side, Flink
remembers for every Execution, where it is deployed (it remembers the
AllocationID). If a failover happens, then Flink tries to re-deploy the
Executions into the slots they were running in before by matching the
AllocationIDs.

The reason why we scoped the state cache to an AllocationID was for
simplicity and because we couldn't guarantee that a failed TaskExecutor X
will be restarted on the same machine again and thereby having access to
the same local disk as before. That's also why Flink deletes the cache
directory when a slot is freed or when the TaskExecutor is shut down
gracefully.

With persistent volumes this changes and we can make the TaskExecutors
"stateful" in the sense that we can reuse an already occupied cache. One
rather simple idea could be to also persist the slot allocations of a
TaskExecutor (which slot is allocated and what is its assigned
AllocationID). This information could be used to re-initialize the
TaskExecutor upon restart. That way, it does not have to register at the
ResourceManager and wait for new slot allocations but could directly start
offering its slots to the jobs it remembered. If the TaskExecutor cannot
find the JobMasters for the respective jobs, it would then free the slots
and clear the cache accordingly.

This could work as long as the ResourceManager does not start new
TaskExecutors whose slots could be used to recover the job. If this is a
problem, then one needs to answer the question how long to wait for the old
TaskExecutors to come back and reusing their local state vs. starting
quickly a fresh instance but having to restore state remotely.

An alternative solution proposal which is probably more powerful albeit
also more complex would be to make the cache information explicit when
registering the TaskExecutor at the ResourceManager and later offering
slots to the JobMaster. For example, the TaskExecutor could tell the
ResourceManager which states it has locally cached (it probably needs to
contain key group ranges for every stored state) and this information could
be used to decide from which TaskExecutor to allocate slots for a job.
Similarly on the JobMaster side we could use this information to calculate
the best mapping between Executions and slots. I think that mechanism could
better deal with rescaling events where there is no perfect match between
Executions and slots because of the changed key group ranges.

So to answer your question: There is currently no way to preserve
AllocationIDs across restarts. However, we could use the persistent volume
to store this information so that we can restore it on restart of a
TaskExecutor. This could enable task local state recovery for cases where
we lose a TaskExecutor and restart it with the same persistent volume.

Cheers,
Till

On Wed, Apr 21, 2021 at 7:26 PM Stephan Ewen <[hidden email]> wrote:

> /cc dev@flink
>
>
> On Tue, Apr 20, 2021 at 1:29 AM Sonam Mandal <[hidden email]>
> wrote:
>
> > Hello,
> >
> > We've been experimenting with Task-local recovery using Kubernetes. We
> > have a way to specify mounting the same disk across Task Manager
> > restarts/deletions for when the pods get recreated. In this scenario, we
> > noticed that task local recovery does not kick in (as expected based on
> the
> > documentation).
> >
> > We did try to comment out the code on the shutdown path which cleaned up
> > the task local directories before the pod went down / was restarted. We
> > noticed that remote recovery kicked in even though the task local state
> was
> > present. I noticed that the slot IDs changed, and was wondering if this
> is
> > the main reason that the task local state didn't get used in this
> scenario?
> >
> > Since we're using this shared disk to store the local state across pod
> > failures, would it make sense to allow keeping the task local state so
> that
> > we can get faster recovery even for situations where the Task Manager
> > itself dies? In some sense, the storage here is disaggregated from the
> pods
> > and can potentially benefit from task local recovery. Any reason why this
> > is a bad idea in general?
> >
> > Is there a way to preserve the slot IDs across restarts? We setup the
> Task
> > Manager to pin the resource-id, but that didn't seem to help. My
> > understanding is that the slot ID needs to be reused for task local
> > recovery to kick in.
> >
> > Thanks,
> > Sonam
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Task Local Recovery with mountable disks in the cloud

Till Rohrmann
Hi Sonam,

I think it would be great to create a FLIP for this feature. FLIPs don't
have to be super large and in this case, I could see it work to express the
general idea to make local recovery work across TaskManager failures and
then outline the different ideas we had so far. If we then decide to go
with the persisting of cache information (the AllocationIDs), then this
could be a good outcome. If we decide to go with the more complex solution
of telling the ResourceManager and JobMaster about the ranges of cached
state data, then this is also ok.

Cheers,
Till

On Fri, May 7, 2021 at 6:30 PM Sonam Mandal <[hidden email]> wrote:

> Hi Till,
>
> Thanks for getting back to me. Apologies for my delayed response.
>
> Thanks for confirming that the slot ID (Allocation ID) is indeed necessary
> today for task local recovery to kick in, and thanks for your insights on
> how to make this work.
>
> We are interested in exploring this disaggregation between local state
> storage and slots to allow potential reuse of local state even when TMs go
> down.
>
> I'm planning to spend some time exploring the Flink code around local
> recovery and state persistence. I'm still new to Flink, so any guidance
> will be helpful. I think both of your ideas on how to make this happen
> are interesting and worth exploring. What's the procedure to collaborate or
> get guidance on this feature? Will a FLIP be required, or will opening a
> ticket do?
>
> Thanks,
> Sonam
> ------------------------------
> *From:* Till Rohrmann <[hidden email]>
> *Sent:* Monday, April 26, 2021 10:24 AM
> *To:* dev <[hidden email]>
> *Cc:* [hidden email] <[hidden email]>; Sonam Mandal <
> [hidden email]>
> *Subject:* Re: Task Local Recovery with mountable disks in the cloud
>
> Hi Sonam,
>
> sorry for the late reply. We were a bit caught in the midst of the feature
> freeze for the next major Flink release.
>
> In general, I think it is a very good idea to disaggregate the local state
> storage to make it reusable across TaskManager failures. However, it is
> also not trivial to do.
>
> Maybe let me first describe how the current task local recovery works and
> then see how we could improve it:
>
> Flink creates for every slot allocation an AllocationID. The AllocationID
> associates a slot on a TaskExecutor with a job and is also used for scoping
> the lifetime of a slot wrt a job (theoretically, one and the same slot
> could be used to fulfill multiple slot requests of the same job if the slot
> allocation is freed in between). Note that the AllocationID is a random ID
> and, thus, changes whenever the ResourceManager allocates a new slot on a
> TaskExecutor for a job.
>
> Task local recovery is effectively a state cache which is associated with
> an AllocationID. So for every checkpoint and every task, a TaskExecutor
> copies the state data and stores them in the task local recovery cache. The
> cache is maintained as long as the slot allocation is valid (e.g. the slot
> has not been freed by the JobMaster and the slot has not timed out). This
> makes the lifecycle management of the state data quite easy and makes sure
> that a process does not clutter local disks. On the JobMaster side, Flink
> remembers for every Execution, where it is deployed (it remembers the
> AllocationID). If a failover happens, then Flink tries to re-deploy the
> Executions into the slots they were running in before by matching the
> AllocationIDs.
>
> The reason why we scoped the state cache to an AllocationID was for
> simplicity and because we couldn't guarantee that a failed TaskExecutor X
> will be restarted on the same machine again and thereby having access to
> the same local disk as before. That's also why Flink deletes the cache
> directory when a slot is freed or when the TaskExecutor is shut down
> gracefully.
>
> With persistent volumes this changes and we can make the TaskExecutors
> "stateful" in the sense that we can reuse an already occupied cache. One
> rather simple idea could be to also persist the slot allocations of a
> TaskExecutor (which slot is allocated and what is its assigned
> AllocationID). This information could be used to re-initialize the
> TaskExecutor upon restart. That way, it does not have to register at the
> ResourceManager and wait for new slot allocations but could directly start
> offering its slots to the jobs it remembered. If the TaskExecutor cannot
> find the JobMasters for the respective jobs, it would then free the slots
> and clear the cache accordingly.
>
> This could work as long as the ResourceManager does not start new
> TaskExecutors whose slots could be used to recover the job. If this is a
> problem, then one needs to answer the question how long to wait for the old
> TaskExecutors to come back and reusing their local state vs. starting
> quickly a fresh instance but having to restore state remotely.
>
> An alternative solution proposal which is probably more powerful albeit
> also more complex would be to make the cache information explicit when
> registering the TaskExecutor at the ResourceManager and later offering
> slots to the JobMaster. For example, the TaskExecutor could tell the
> ResourceManager which states it has locally cached (it probably needs to
> contain key group ranges for every stored state) and this information could
> be used to decide from which TaskExecutor to allocate slots for a job.
> Similarly on the JobMaster side we could use this information to calculate
> the best mapping between Executions and slots. I think that mechanism could
> better deal with rescaling events where there is no perfect match between
> Executions and slots because of the changed key group ranges.
>
> So to answer your question: There is currently no way to preserve
> AllocationIDs across restarts. However, we could use the persistent volume
> to store this information so that we can restore it on restart of a
> TaskExecutor. This could enable task local state recovery for cases where
> we lose a TaskExecutor and restart it with the same persistent volume.
>
> Cheers,
> Till
>
> On Wed, Apr 21, 2021 at 7:26 PM Stephan Ewen <[hidden email]> wrote:
>
> /cc dev@flink
>
>
> On Tue, Apr 20, 2021 at 1:29 AM Sonam Mandal <[hidden email]>
> wrote:
>
> > Hello,
> >
> > We've been experimenting with Task-local recovery using Kubernetes. We
> > have a way to specify mounting the same disk across Task Manager
> > restarts/deletions for when the pods get recreated. In this scenario, we
> > noticed that task local recovery does not kick in (as expected based on
> the
> > documentation).
> >
> > We did try to comment out the code on the shutdown path which cleaned up
> > the task local directories before the pod went down / was restarted. We
> > noticed that remote recovery kicked in even though the task local state
> was
> > present. I noticed that the slot IDs changed, and was wondering if this
> is
> > the main reason that the task local state didn't get used in this
> scenario?
> >
> > Since we're using this shared disk to store the local state across pod
> > failures, would it make sense to allow keeping the task local state so
> that
> > we can get faster recovery even for situations where the Task Manager
> > itself dies? In some sense, the storage here is disaggregated from the
> pods
> > and can potentially benefit from task local recovery. Any reason why this
> > is a bad idea in general?
> >
> > Is there a way to preserve the slot IDs across restarts? We setup the
> Task
> > Manager to pin the resource-id, but that didn't seem to help. My
> > understanding is that the slot ID needs to be reused for task local
> > recovery to kick in.
> >
> > Thanks,
> > Sonam
> >
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Task Local Recovery with mountable disks in the cloud

Yang Wang
Just a side input, not only the persistent volume could help with keeping
the local state for the TaskManager pod, but also the ephemeral storage.

Ephemeral storage is bound to the lifecycle of TaskManager pod. And it
could be shared between different restarts of TaskManager container.


Best,
Yang

Sonam Mandal <[hidden email]> 于2021年5月11日周二 上午1:02写道:

> Hi Till,
>
> Sure, that sounds good. I'll open a FLIP for this when we start working on
> it.
>
> Thanks for the insights!
>
> Regards,
> Sonam
> ------------------------------
> *From:* Till Rohrmann <[hidden email]>
> *Sent:* Monday, May 10, 2021 2:26 AM
> *To:* Sonam Mandal <[hidden email]>
> *Cc:* dev <[hidden email]>; [hidden email] <
> [hidden email]>
> *Subject:* Re: Task Local Recovery with mountable disks in the cloud
>
> Hi Sonam,
>
> I think it would be great to create a FLIP for this feature. FLIPs don't
> have to be super large and in this case, I could see it work to express the
> general idea to make local recovery work across TaskManager failures and
> then outline the different ideas we had so far. If we then decide to go
> with the persisting of cache information (the AllocationIDs), then this
> could be a good outcome. If we decide to go with the more complex solution
> of telling the ResourceManager and JobMaster about the ranges of cached
> state data, then this is also ok.
>
> Cheers,
> Till
>
> On Fri, May 7, 2021 at 6:30 PM Sonam Mandal <[hidden email]> wrote:
>
> Hi Till,
>
> Thanks for getting back to me. Apologies for my delayed response.
>
> Thanks for confirming that the slot ID (Allocation ID) is indeed necessary
> today for task local recovery to kick in, and thanks for your insights on
> how to make this work.
>
> We are interested in exploring this disaggregation between local state
> storage and slots to allow potential reuse of local state even when TMs go
> down.
>
> I'm planning to spend some time exploring the Flink code around local
> recovery and state persistence. I'm still new to Flink, so any guidance
> will be helpful. I think both of your ideas on how to make this happen
> are interesting and worth exploring. What's the procedure to collaborate or
> get guidance on this feature? Will a FLIP be required, or will opening a
> ticket do?
>
> Thanks,
> Sonam
> ------------------------------
> *From:* Till Rohrmann <[hidden email]>
> *Sent:* Monday, April 26, 2021 10:24 AM
> *To:* dev <[hidden email]>
> *Cc:* [hidden email] <[hidden email]>; Sonam Mandal <
> [hidden email]>
> *Subject:* Re: Task Local Recovery with mountable disks in the cloud
>
> Hi Sonam,
>
> sorry for the late reply. We were a bit caught in the midst of the feature
> freeze for the next major Flink release.
>
> In general, I think it is a very good idea to disaggregate the local state
> storage to make it reusable across TaskManager failures. However, it is
> also not trivial to do.
>
> Maybe let me first describe how the current task local recovery works and
> then see how we could improve it:
>
> Flink creates for every slot allocation an AllocationID. The AllocationID
> associates a slot on a TaskExecutor with a job and is also used for scoping
> the lifetime of a slot wrt a job (theoretically, one and the same slot
> could be used to fulfill multiple slot requests of the same job if the slot
> allocation is freed in between). Note that the AllocationID is a random ID
> and, thus, changes whenever the ResourceManager allocates a new slot on a
> TaskExecutor for a job.
>
> Task local recovery is effectively a state cache which is associated with
> an AllocationID. So for every checkpoint and every task, a TaskExecutor
> copies the state data and stores them in the task local recovery cache. The
> cache is maintained as long as the slot allocation is valid (e.g. the slot
> has not been freed by the JobMaster and the slot has not timed out). This
> makes the lifecycle management of the state data quite easy and makes sure
> that a process does not clutter local disks. On the JobMaster side, Flink
> remembers for every Execution, where it is deployed (it remembers the
> AllocationID). If a failover happens, then Flink tries to re-deploy the
> Executions into the slots they were running in before by matching the
> AllocationIDs.
>
> The reason why we scoped the state cache to an AllocationID was for
> simplicity and because we couldn't guarantee that a failed TaskExecutor X
> will be restarted on the same machine again and thereby having access to
> the same local disk as before. That's also why Flink deletes the cache
> directory when a slot is freed or when the TaskExecutor is shut down
> gracefully.
>
> With persistent volumes this changes and we can make the TaskExecutors
> "stateful" in the sense that we can reuse an already occupied cache. One
> rather simple idea could be to also persist the slot allocations of a
> TaskExecutor (which slot is allocated and what is its assigned
> AllocationID). This information could be used to re-initialize the
> TaskExecutor upon restart. That way, it does not have to register at the
> ResourceManager and wait for new slot allocations but could directly start
> offering its slots to the jobs it remembered. If the TaskExecutor cannot
> find the JobMasters for the respective jobs, it would then free the slots
> and clear the cache accordingly.
>
> This could work as long as the ResourceManager does not start new
> TaskExecutors whose slots could be used to recover the job. If this is a
> problem, then one needs to answer the question how long to wait for the old
> TaskExecutors to come back and reusing their local state vs. starting
> quickly a fresh instance but having to restore state remotely.
>
> An alternative solution proposal which is probably more powerful albeit
> also more complex would be to make the cache information explicit when
> registering the TaskExecutor at the ResourceManager and later offering
> slots to the JobMaster. For example, the TaskExecutor could tell the
> ResourceManager which states it has locally cached (it probably needs to
> contain key group ranges for every stored state) and this information could
> be used to decide from which TaskExecutor to allocate slots for a job.
> Similarly on the JobMaster side we could use this information to calculate
> the best mapping between Executions and slots. I think that mechanism could
> better deal with rescaling events where there is no perfect match between
> Executions and slots because of the changed key group ranges.
>
> So to answer your question: There is currently no way to preserve
> AllocationIDs across restarts. However, we could use the persistent volume
> to store this information so that we can restore it on restart of a
> TaskExecutor. This could enable task local state recovery for cases where
> we lose a TaskExecutor and restart it with the same persistent volume.
>
> Cheers,
> Till
>
> On Wed, Apr 21, 2021 at 7:26 PM Stephan Ewen <[hidden email]> wrote:
>
> /cc dev@flink
>
>
> On Tue, Apr 20, 2021 at 1:29 AM Sonam Mandal <[hidden email]>
> wrote:
>
> > Hello,
> >
> > We've been experimenting with Task-local recovery using Kubernetes. We
> > have a way to specify mounting the same disk across Task Manager
> > restarts/deletions for when the pods get recreated. In this scenario, we
> > noticed that task local recovery does not kick in (as expected based on
> the
> > documentation).
> >
> > We did try to comment out the code on the shutdown path which cleaned up
> > the task local directories before the pod went down / was restarted. We
> > noticed that remote recovery kicked in even though the task local state
> was
> > present. I noticed that the slot IDs changed, and was wondering if this
> is
> > the main reason that the task local state didn't get used in this
> scenario?
> >
> > Since we're using this shared disk to store the local state across pod
> > failures, would it make sense to allow keeping the task local state so
> that
> > we can get faster recovery even for situations where the Task Manager
> > itself dies? In some sense, the storage here is disaggregated from the
> pods
> > and can potentially benefit from task local recovery. Any reason why this
> > is a bad idea in general?
> >
> > Is there a way to preserve the slot IDs across restarts? We setup the
> Task
> > Manager to pin the resource-id, but that didn't seem to help. My
> > understanding is that the slot ID needs to be reused for task local
> > recovery to kick in.
> >
> > Thanks,
> > Sonam
> >
> >
>
>