Hi,
When a job fails and is recovered by Flink, task manager JVMs are reused. That can cause problems when the failed job wasn't cleaned up properly, for example leaving behind the user class loader. This would manifest in rising base for memory usage, leading to a death spiral. It would be good to provide an option that guarantees isolation, by restarting the task manager processes. Managing the processes would depend on how Flink is deployed, but the recovery sequence would need to provide a hook for the user. Has there been prior discussion or related work? Thanks, Thomas |
Hi Thomas,
I have a sort of question regarding the class loader issue, as it seems interesting. My understanding is that at least user class loader is unregistered and re-registered (from/to library cache on TM) across task restart. If I understand it correctly, unregistered one should be GCed as long as no object loaded by the user class loader is lingering across task restart. Indeed, however, there is no guarantee that UDF cleans up everything on close(). I've seen that some libraries used in UDF let a daemon thread outlive a task, so any object loaded by unregistered user class loader in the thread causes the class loader to be leaked (also daemon threads are also leaked since those keep being spawned, albeit singleton, due to newly registered class loader). If a job keeps restarting, this behavior leads to metaspace OOM or out of threads/OOM. So, my question is if the memory issue you've seen is due to whether Flink issue or the side-effect that UDF causes (as I described). Second question is if there's anything else other than class loader issue. Of course, I also wonder if any prior discussion is going on. Best, Hwanju On 5/16/19, 8:01 AM, "Thomas Weise" <[hidden email]> wrote: Hi, When a job fails and is recovered by Flink, task manager JVMs are reused. That can cause problems when the failed job wasn't cleaned up properly, for example leaving behind the user class loader. This would manifest in rising base for memory usage, leading to a death spiral. It would be good to provide an option that guarantees isolation, by restarting the task manager processes. Managing the processes would depend on how Flink is deployed, but the recovery sequence would need to provide a hook for the user. Has there been prior discussion or related work? Thanks, Thomas |
Hi Thomas and Hwanju,
thanks for starting this discussion. As far as I know, there has not been a lot of prior discussion or related work with respect to this topic. Somewhat related is the discussion about job isolation in a session cluster [1]. Whenever there is resource leak on Flink's side, we should try to fix it. However, I see that user code might be out of our control and for this such a feature might be useful. How would such a feature behave in detail? Would you like that all TMs which executed a task of a restarting job are being restarted? What happens if these TMs execute other jobs (in session mode)? Should the decision to restart happen locally on the TM (number of tolerated task failures) or centralized on the RM where different resolution strategies could run? To mitigate the problem of class loader leaks and GC pressure, we thought about binding the class loader to a slot. As long as a JM owns this slot (so also across job restarts), the user code class loader should then be reusable. [1] https://issues.apache.org/jira/browse/FLINK-9662 Cheers, Till On Thu, May 16, 2019 at 7:28 PM Kim, Hwanju <[hidden email]> wrote: > Hi Thomas, > > I have a sort of question regarding the class loader issue, as it seems > interesting. > My understanding is that at least user class loader is unregistered and > re-registered (from/to library cache on TM) across task restart. If I > understand it correctly, unregistered one should be GCed as long as no > object loaded by the user class loader is lingering across task restart. > Indeed, however, there is no guarantee that UDF cleans up everything on > close(). I've seen that some libraries used in UDF let a daemon thread > outlive a task, so any object loaded by unregistered user class loader in > the thread causes the class loader to be leaked (also daemon threads are > also leaked since those keep being spawned, albeit singleton, due to newly > registered class loader). If a job keeps restarting, this behavior leads to > metaspace OOM or out of threads/OOM. So, my question is if the memory issue > you've seen is due to whether Flink issue or the side-effect that UDF > causes (as I described). Second question is if there's anything else other > than class loader issue. Of course, I also wonder if any prior discussion > is going on. > > Best, > Hwanju > > On 5/16/19, 8:01 AM, "Thomas Weise" <[hidden email]> wrote: > > Hi, > > When a job fails and is recovered by Flink, task manager JVMs are > reused. > That can cause problems when the failed job wasn't cleaned up > properly, for > example leaving behind the user class loader. This would manifest in > rising > base for memory usage, leading to a death spiral. > > It would be good to provide an option that guarantees isolation, by > restarting the task manager processes. Managing the processes would > depend > on how Flink is deployed, but the recovery sequence would need to > provide a > hook for the user. > > Has there been prior discussion or related work? > > Thanks, > Thomas > > > |
Hi Till,
Thanks for the background. It seems that we cannot always rely on the user code to not cause leaks and that can wreak havoc even when everything in Flink works as expected (user code managed threads may not terminate, class loader GC may not work due to references in the parent class loader, direct memory not deallocated and so on). I think it would be nice to have the option to terminate the TMs. Maybe it can be solved by making TMs exit similar to task cancellation [1] and let the user configure after how many job starts this should occur? For the session mode case, it would affect other jobs that share the TMs, but that's not avoidable. A resource leak would eventually compromise the entire cluster. Thanks, Thomas [1] https://issues.apache.org/jira/browse/FLINK-4715 On Fri, May 17, 2019 at 12:50 AM Till Rohrmann <[hidden email]> wrote: > Hi Thomas and Hwanju, > > thanks for starting this discussion. As far as I know, there has not been a > lot of prior discussion or related work with respect to this topic. > Somewhat related is the discussion about job isolation in a session cluster > [1]. > > Whenever there is resource leak on Flink's side, we should try to fix it. > However, I see that user code might be out of our control and for this such > a feature might be useful. > > How would such a feature behave in detail? Would you like that all TMs > which executed a task of a restarting job are being restarted? What happens > if these TMs execute other jobs (in session mode)? Should the decision to > restart happen locally on the TM (number of tolerated task failures) or > centralized on the RM where different resolution strategies could run? > > To mitigate the problem of class loader leaks and GC pressure, we thought > about binding the class loader to a slot. As long as a JM owns this slot > (so also across job restarts), the user code class loader should then be > reusable. > > [1] https://issues.apache.org/jira/browse/FLINK-9662 > > Cheers, > Till > > On Thu, May 16, 2019 at 7:28 PM Kim, Hwanju <[hidden email]> > wrote: > > > Hi Thomas, > > > > I have a sort of question regarding the class loader issue, as it seems > > interesting. > > My understanding is that at least user class loader is unregistered and > > re-registered (from/to library cache on TM) across task restart. If I > > understand it correctly, unregistered one should be GCed as long as no > > object loaded by the user class loader is lingering across task restart. > > Indeed, however, there is no guarantee that UDF cleans up everything on > > close(). I've seen that some libraries used in UDF let a daemon thread > > outlive a task, so any object loaded by unregistered user class loader in > > the thread causes the class loader to be leaked (also daemon threads are > > also leaked since those keep being spawned, albeit singleton, due to > newly > > registered class loader). If a job keeps restarting, this behavior leads > to > > metaspace OOM or out of threads/OOM. So, my question is if the memory > issue > > you've seen is due to whether Flink issue or the side-effect that UDF > > causes (as I described). Second question is if there's anything else > other > > than class loader issue. Of course, I also wonder if any prior discussion > > is going on. > > > > Best, > > Hwanju > > > > On 5/16/19, 8:01 AM, "Thomas Weise" <[hidden email]> wrote: > > > > Hi, > > > > When a job fails and is recovered by Flink, task manager JVMs are > > reused. > > That can cause problems when the failed job wasn't cleaned up > > properly, for > > example leaving behind the user class loader. This would manifest in > > rising > > base for memory usage, leading to a death spiral. > > > > It would be good to provide an option that guarantees isolation, by > > restarting the task manager processes. Managing the processes would > > depend > > on how Flink is deployed, but the recovery sequence would need to > > provide a > > hook for the user. > > > > Has there been prior discussion or related work? > > > > Thanks, > > Thomas > > > > > > > |
Free forum by Nabble | Edit this page |