Job recovery with task manager restart

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Job recovery with task manager restart

Thomas Weise
Hi,

When a job fails and is recovered by Flink, task manager JVMs are reused.
That can cause problems when the failed job wasn't cleaned up properly, for
example leaving behind the user class loader. This would manifest in rising
base for memory usage, leading to a death spiral.

It would be good to provide an option that guarantees isolation, by
restarting the task manager processes. Managing the processes would depend
on how Flink is deployed, but the recovery sequence would need to provide a
hook for the user.

Has there been prior discussion or related work?

Thanks,
Thomas
Reply | Threaded
Open this post in threaded view
|

Re: Job recovery with task manager restart

Kim, Hwanju
Hi Thomas,

I have a sort of question regarding the class loader issue, as it seems interesting.
My understanding is that at least user class loader is unregistered and re-registered (from/to library cache on TM) across task restart. If I understand it correctly, unregistered one should be GCed as long as no object loaded by the user class loader is lingering across task restart. Indeed, however, there is no guarantee that UDF cleans up everything on close(). I've seen that some libraries used in UDF let a daemon thread outlive a task, so any object loaded by unregistered user class loader in the thread causes the class loader to be leaked (also daemon threads are also leaked since those keep being spawned, albeit singleton, due to newly registered class loader). If a job keeps restarting, this behavior leads to metaspace OOM or out of threads/OOM. So, my question is if the memory issue you've seen is due to whether Flink issue or the side-effect that UDF causes (as I described). Second question is if there's anything else other than class loader issue. Of course, I also wonder if any prior discussion is going on.

Best,
Hwanju

On 5/16/19, 8:01 AM, "Thomas Weise" <[hidden email]> wrote:

    Hi,
   
    When a job fails and is recovered by Flink, task manager JVMs are reused.
    That can cause problems when the failed job wasn't cleaned up properly, for
    example leaving behind the user class loader. This would manifest in rising
    base for memory usage, leading to a death spiral.
   
    It would be good to provide an option that guarantees isolation, by
    restarting the task manager processes. Managing the processes would depend
    on how Flink is deployed, but the recovery sequence would need to provide a
    hook for the user.
   
    Has there been prior discussion or related work?
   
    Thanks,
    Thomas
   

Reply | Threaded
Open this post in threaded view
|

Re: Job recovery with task manager restart

Till Rohrmann
Hi Thomas and Hwanju,

thanks for starting this discussion. As far as I know, there has not been a
lot of prior discussion or related work with respect to this topic.
Somewhat related is the discussion about job isolation in a session cluster
[1].

Whenever there is resource leak on Flink's side, we should try to fix it.
However, I see that user code might be out of our control and for this such
a feature might be useful.

How would such a feature behave in detail? Would you like that all TMs
which executed a task of a restarting job are being restarted? What happens
if these TMs execute other jobs (in session mode)? Should the decision to
restart happen locally on the TM (number of tolerated task failures) or
centralized on the RM where different resolution strategies could run?

To mitigate the problem of class loader leaks and GC pressure, we thought
about binding the class loader to a slot. As long as a JM owns this slot
(so also across job restarts), the user code class loader should then be
reusable.

[1] https://issues.apache.org/jira/browse/FLINK-9662

Cheers,
Till

On Thu, May 16, 2019 at 7:28 PM Kim, Hwanju <[hidden email]>
wrote:

> Hi Thomas,
>
> I have a sort of question regarding the class loader issue, as it seems
> interesting.
> My understanding is that at least user class loader is unregistered and
> re-registered (from/to library cache on TM) across task restart. If I
> understand it correctly, unregistered one should be GCed as long as no
> object loaded by the user class loader is lingering across task restart.
> Indeed, however, there is no guarantee that UDF cleans up everything on
> close(). I've seen that some libraries used in UDF let a daemon thread
> outlive a task, so any object loaded by unregistered user class loader in
> the thread causes the class loader to be leaked (also daemon threads are
> also leaked since those keep being spawned, albeit singleton, due to newly
> registered class loader). If a job keeps restarting, this behavior leads to
> metaspace OOM or out of threads/OOM. So, my question is if the memory issue
> you've seen is due to whether Flink issue or the side-effect that UDF
> causes (as I described). Second question is if there's anything else other
> than class loader issue. Of course, I also wonder if any prior discussion
> is going on.
>
> Best,
> Hwanju
>
> On 5/16/19, 8:01 AM, "Thomas Weise" <[hidden email]> wrote:
>
>     Hi,
>
>     When a job fails and is recovered by Flink, task manager JVMs are
> reused.
>     That can cause problems when the failed job wasn't cleaned up
> properly, for
>     example leaving behind the user class loader. This would manifest in
> rising
>     base for memory usage, leading to a death spiral.
>
>     It would be good to provide an option that guarantees isolation, by
>     restarting the task manager processes. Managing the processes would
> depend
>     on how Flink is deployed, but the recovery sequence would need to
> provide a
>     hook for the user.
>
>     Has there been prior discussion or related work?
>
>     Thanks,
>     Thomas
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Job recovery with task manager restart

Thomas Weise
Hi Till,

Thanks for the background.

It seems that we cannot always rely on the user code to not cause leaks and
that can wreak havoc even when everything in Flink works as expected (user
code managed threads may not terminate, class loader GC may not work due to
references in the parent class loader, direct memory not deallocated and so
on).

I think it would be nice to have the option to terminate the TMs. Maybe it
can be solved by making TMs exit similar to task cancellation [1] and let
the user configure after how many job starts this should occur?

For the session mode case, it would affect other jobs that share the TMs,
but that's not avoidable. A resource leak would eventually compromise the
entire cluster.

Thanks,
Thomas

[1] https://issues.apache.org/jira/browse/FLINK-4715


On Fri, May 17, 2019 at 12:50 AM Till Rohrmann <[hidden email]> wrote:

> Hi Thomas and Hwanju,
>
> thanks for starting this discussion. As far as I know, there has not been a
> lot of prior discussion or related work with respect to this topic.
> Somewhat related is the discussion about job isolation in a session cluster
> [1].
>
> Whenever there is resource leak on Flink's side, we should try to fix it.
> However, I see that user code might be out of our control and for this such
> a feature might be useful.
>
> How would such a feature behave in detail? Would you like that all TMs
> which executed a task of a restarting job are being restarted? What happens
> if these TMs execute other jobs (in session mode)? Should the decision to
> restart happen locally on the TM (number of tolerated task failures) or
> centralized on the RM where different resolution strategies could run?
>
> To mitigate the problem of class loader leaks and GC pressure, we thought
> about binding the class loader to a slot. As long as a JM owns this slot
> (so also across job restarts), the user code class loader should then be
> reusable.
>
> [1] https://issues.apache.org/jira/browse/FLINK-9662
>
> Cheers,
> Till
>
> On Thu, May 16, 2019 at 7:28 PM Kim, Hwanju <[hidden email]>
> wrote:
>
> > Hi Thomas,
> >
> > I have a sort of question regarding the class loader issue, as it seems
> > interesting.
> > My understanding is that at least user class loader is unregistered and
> > re-registered (from/to library cache on TM) across task restart. If I
> > understand it correctly, unregistered one should be GCed as long as no
> > object loaded by the user class loader is lingering across task restart.
> > Indeed, however, there is no guarantee that UDF cleans up everything on
> > close(). I've seen that some libraries used in UDF let a daemon thread
> > outlive a task, so any object loaded by unregistered user class loader in
> > the thread causes the class loader to be leaked (also daemon threads are
> > also leaked since those keep being spawned, albeit singleton, due to
> newly
> > registered class loader). If a job keeps restarting, this behavior leads
> to
> > metaspace OOM or out of threads/OOM. So, my question is if the memory
> issue
> > you've seen is due to whether Flink issue or the side-effect that UDF
> > causes (as I described). Second question is if there's anything else
> other
> > than class loader issue. Of course, I also wonder if any prior discussion
> > is going on.
> >
> > Best,
> > Hwanju
> >
> > On 5/16/19, 8:01 AM, "Thomas Weise" <[hidden email]> wrote:
> >
> >     Hi,
> >
> >     When a job fails and is recovered by Flink, task manager JVMs are
> > reused.
> >     That can cause problems when the failed job wasn't cleaned up
> > properly, for
> >     example leaving behind the user class loader. This would manifest in
> > rising
> >     base for memory usage, leading to a death spiral.
> >
> >     It would be good to provide an option that guarantees isolation, by
> >     restarting the task manager processes. Managing the processes would
> > depend
> >     on how Flink is deployed, but the recovery sequence would need to
> > provide a
> >     hook for the user.
> >
> >     Has there been prior discussion or related work?
> >
> >     Thanks,
> >     Thomas
> >
> >
> >
>