JobManager scale limitation - Slow S3 checkpoint deletes

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

JobManager scale limitation - Slow S3 checkpoint deletes

Jamie Grier-3
We've run into an issue that limits the max parallelism of jobs we can run
and what it seems to boil down to is that the JobManager becomes
unresponsive while essentially spending all of it's time discarding
checkpoints from S3.  This results in sluggish UI, sporadic
AkkaAskTimeouts, heartbeat misses, etc.

Since S3 (and I assume HDFS) have policy that can be used to discard old
objects without Flink actively deleting them I think it would be a useful
feature to add the option to Flink to not ever discard checkpoints.  I
believe this will solve the problem.

Any objections or other known solutions to this problem?

-Jamie
Reply | Threaded
Open this post in threaded view
|

Re: JobManager scale limitation - Slow S3 checkpoint deletes

Stephan Ewen
I think having an option to not actively delete checkpoints (but rather
have the TTL feature of the file system take care of it) sounds like a good
idea.

I am curious why you get heartbeat misses and akka timeouts during deletes.
Are some parts of the deletes happening sychronously in the actor thread?

On Wed, Mar 6, 2019 at 3:40 PM Jamie Grier <[hidden email]> wrote:

> We've run into an issue that limits the max parallelism of jobs we can run
> and what it seems to boil down to is that the JobManager becomes
> unresponsive while essentially spending all of it's time discarding
> checkpoints from S3.  This results in sluggish UI, sporadic
> AkkaAskTimeouts, heartbeat misses, etc.
>
> Since S3 (and I assume HDFS) have policy that can be used to discard old
> objects without Flink actively deleting them I think it would be a useful
> feature to add the option to Flink to not ever discard checkpoints.  I
> believe this will solve the problem.
>
> Any objections or other known solutions to this problem?
>
> -Jamie
>
Reply | Threaded
Open this post in threaded view
|

Re: JobManager scale limitation - Slow S3 checkpoint deletes

Jamie Grier-3
Yup, it looks like the actor threads are spending all of their time
communicating with S3.  I've attached a picture of a typical stack trace
for one of the actor threads [1].  At the end of that call stack what
you'll see is the thread blocking on synchronous communication with the S3
service.  This is for one of the flink-akka.actor.default-dispatcher
threads.

I've also attached a link to a YourKit snapshot if you'd like to explore
the profiling data in more detail [2]

[1]
https://drive.google.com/open?id=0BzMcf4IvnGWNNjEzMmRJbkFiWkZpZDFtQWo4LXByUXpPSFpR
[2] https://drive.google.com/open?id=1iHCKJT-PTQUcDzFuIiacJ1MgAkfxza3W



On Wed, Mar 6, 2019 at 7:41 AM Stephan Ewen <[hidden email]> wrote:

> I think having an option to not actively delete checkpoints (but rather
> have the TTL feature of the file system take care of it) sounds like a good
> idea.
>
> I am curious why you get heartbeat misses and akka timeouts during deletes.
> Are some parts of the deletes happening sychronously in the actor thread?
>
> On Wed, Mar 6, 2019 at 3:40 PM Jamie Grier <[hidden email]>
> wrote:
>
> > We've run into an issue that limits the max parallelism of jobs we can
> run
> > and what it seems to boil down to is that the JobManager becomes
> > unresponsive while essentially spending all of it's time discarding
> > checkpoints from S3.  This results in sluggish UI, sporadic
> > AkkaAskTimeouts, heartbeat misses, etc.
> >
> > Since S3 (and I assume HDFS) have policy that can be used to discard old
> > objects without Flink actively deleting them I think it would be a useful
> > feature to add the option to Flink to not ever discard checkpoints.  I
> > believe this will solve the problem.
> >
> > Any objections or other known solutions to this problem?
> >
> > -Jamie
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: JobManager scale limitation - Slow S3 checkpoint deletes

Thomas Weise
Nice!

Perhaps for file systems without TTL/expiration support (AFAIK includes
HDFS), cleanup could be performed in the task managers?


On Wed, Mar 6, 2019 at 6:01 PM Jamie Grier <[hidden email]> wrote:

> Yup, it looks like the actor threads are spending all of their time
> communicating with S3.  I've attached a picture of a typical stack trace
> for one of the actor threads [1].  At the end of that call stack what
> you'll see is the thread blocking on synchronous communication with the S3
> service.  This is for one of the flink-akka.actor.default-dispatcher
> threads.
>
> I've also attached a link to a YourKit snapshot if you'd like to explore
> the profiling data in more detail [2]
>
> [1]
>
> https://drive.google.com/open?id=0BzMcf4IvnGWNNjEzMmRJbkFiWkZpZDFtQWo4LXByUXpPSFpR
> [2] https://drive.google.com/open?id=1iHCKJT-PTQUcDzFuIiacJ1MgAkfxza3W
>
>
>
> On Wed, Mar 6, 2019 at 7:41 AM Stephan Ewen <[hidden email]> wrote:
>
> > I think having an option to not actively delete checkpoints (but rather
> > have the TTL feature of the file system take care of it) sounds like a
> good
> > idea.
> >
> > I am curious why you get heartbeat misses and akka timeouts during
> deletes.
> > Are some parts of the deletes happening sychronously in the actor thread?
> >
> > On Wed, Mar 6, 2019 at 3:40 PM Jamie Grier <[hidden email]>
> > wrote:
> >
> > > We've run into an issue that limits the max parallelism of jobs we can
> > run
> > > and what it seems to boil down to is that the JobManager becomes
> > > unresponsive while essentially spending all of it's time discarding
> > > checkpoints from S3.  This results in sluggish UI, sporadic
> > > AkkaAskTimeouts, heartbeat misses, etc.
> > >
> > > Since S3 (and I assume HDFS) have policy that can be used to discard
> old
> > > objects without Flink actively deleting them I think it would be a
> useful
> > > feature to add the option to Flink to not ever discard checkpoints.  I
> > > believe this will solve the problem.
> > >
> > > Any objections or other known solutions to this problem?
> > >
> > > -Jamie
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: JobManager scale limitation - Slow S3 checkpoint deletes

Yun Tang
Sharing the communication pressure of a single node to multi task managers would be a good idea. From my point of view, let task managers to know the information that some specific checkpoint had already been aborted could benefit a lot of things:

  *   Let task manager to clean up the files, which is the topic of this thread.
  *   Let `StreamTask` could cancel aborted running checkpoint in task-side, just as https://issues.apache.org/jira/browse/FLINK-8871 want to achieve.
  *   Let local state store could prune local checkpoints as soon as possible without waiting for next `notifyCheckpointComplete` come.
  *   Let state backend on task manager side could did something on its side, which would be really helpful for specific state backend disaggregating computation and storage.

Best
Yun Tang
________________________________
From: Thomas Weise <[hidden email]>
Sent: Thursday, March 7, 2019 12:06
To: [hidden email]
Subject: Re: JobManager scale limitation - Slow S3 checkpoint deletes

Nice!

Perhaps for file systems without TTL/expiration support (AFAIK includes
HDFS), cleanup could be performed in the task managers?


On Wed, Mar 6, 2019 at 6:01 PM Jamie Grier <[hidden email]> wrote:

> Yup, it looks like the actor threads are spending all of their time
> communicating with S3.  I've attached a picture of a typical stack trace
> for one of the actor threads [1].  At the end of that call stack what
> you'll see is the thread blocking on synchronous communication with the S3
> service.  This is for one of the flink-akka.actor.default-dispatcher
> threads.
>
> I've also attached a link to a YourKit snapshot if you'd like to explore
> the profiling data in more detail [2]
>
> [1]
>
> https://drive.google.com/open?id=0BzMcf4IvnGWNNjEzMmRJbkFiWkZpZDFtQWo4LXByUXpPSFpR
> [2] https://drive.google.com/open?id=1iHCKJT-PTQUcDzFuIiacJ1MgAkfxza3W
>
>
>
> On Wed, Mar 6, 2019 at 7:41 AM Stephan Ewen <[hidden email]> wrote:
>
> > I think having an option to not actively delete checkpoints (but rather
> > have the TTL feature of the file system take care of it) sounds like a
> good
> > idea.
> >
> > I am curious why you get heartbeat misses and akka timeouts during
> deletes.
> > Are some parts of the deletes happening sychronously in the actor thread?
> >
> > On Wed, Mar 6, 2019 at 3:40 PM Jamie Grier <[hidden email]>
> > wrote:
> >
> > > We've run into an issue that limits the max parallelism of jobs we can
> > run
> > > and what it seems to boil down to is that the JobManager becomes
> > > unresponsive while essentially spending all of it's time discarding
> > > checkpoints from S3.  This results in sluggish UI, sporadic
> > > AkkaAskTimeouts, heartbeat misses, etc.
> > >
> > > Since S3 (and I assume HDFS) have policy that can be used to discard
> old
> > > objects without Flink actively deleting them I think it would be a
> useful
> > > feature to add the option to Flink to not ever discard checkpoints.  I
> > > believe this will solve the problem.
> > >
> > > Any objections or other known solutions to this problem?
> > >
> > > -Jamie
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: JobManager scale limitation - Slow S3 checkpoint deletes

Till Rohrmann
I think part of the problem is that we currently use the executor of the
common RpcService to run the I/O operations as Stephan suspected [1]. I
will be fixing this problem for 1.8.0 and 1.7.3.

This should resolve the problem but supporting different means of clean up
might still be interesting to add.

[1] https://issues.apache.org/jira/browse/FLINK-11851

Cheers,
Till

On Thu, Mar 7, 2019 at 8:56 AM Yun Tang <[hidden email]> wrote:

> Sharing the communication pressure of a single node to multi task managers
> would be a good idea. From my point of view, let task managers to know the
> information that some specific checkpoint had already been aborted could
> benefit a lot of things:
>
>   *   Let task manager to clean up the files, which is the topic of this
> thread.
>   *   Let `StreamTask` could cancel aborted running checkpoint in
> task-side, just as https://issues.apache.org/jira/browse/FLINK-8871 want
> to achieve.
>   *   Let local state store could prune local checkpoints as soon as
> possible without waiting for next `notifyCheckpointComplete` come.
>   *   Let state backend on task manager side could did something on its
> side, which would be really helpful for specific state backend
> disaggregating computation and storage.
>
> Best
> Yun Tang
> ________________________________
> From: Thomas Weise <[hidden email]>
> Sent: Thursday, March 7, 2019 12:06
> To: [hidden email]
> Subject: Re: JobManager scale limitation - Slow S3 checkpoint deletes
>
> Nice!
>
> Perhaps for file systems without TTL/expiration support (AFAIK includes
> HDFS), cleanup could be performed in the task managers?
>
>
> On Wed, Mar 6, 2019 at 6:01 PM Jamie Grier <[hidden email]>
> wrote:
>
> > Yup, it looks like the actor threads are spending all of their time
> > communicating with S3.  I've attached a picture of a typical stack trace
> > for one of the actor threads [1].  At the end of that call stack what
> > you'll see is the thread blocking on synchronous communication with the
> S3
> > service.  This is for one of the flink-akka.actor.default-dispatcher
> > threads.
> >
> > I've also attached a link to a YourKit snapshot if you'd like to explore
> > the profiling data in more detail [2]
> >
> > [1]
> >
> >
> https://drive.google.com/open?id=0BzMcf4IvnGWNNjEzMmRJbkFiWkZpZDFtQWo4LXByUXpPSFpR
> > [2] https://drive.google.com/open?id=1iHCKJT-PTQUcDzFuIiacJ1MgAkfxza3W
> >
> >
> >
> > On Wed, Mar 6, 2019 at 7:41 AM Stephan Ewen <[hidden email]> wrote:
> >
> > > I think having an option to not actively delete checkpoints (but rather
> > > have the TTL feature of the file system take care of it) sounds like a
> > good
> > > idea.
> > >
> > > I am curious why you get heartbeat misses and akka timeouts during
> > deletes.
> > > Are some parts of the deletes happening sychronously in the actor
> thread?
> > >
> > > On Wed, Mar 6, 2019 at 3:40 PM Jamie Grier <[hidden email]>
> > > wrote:
> > >
> > > > We've run into an issue that limits the max parallelism of jobs we
> can
> > > run
> > > > and what it seems to boil down to is that the JobManager becomes
> > > > unresponsive while essentially spending all of it's time discarding
> > > > checkpoints from S3.  This results in sluggish UI, sporadic
> > > > AkkaAskTimeouts, heartbeat misses, etc.
> > > >
> > > > Since S3 (and I assume HDFS) have policy that can be used to discard
> > old
> > > > objects without Flink actively deleting them I think it would be a
> > useful
> > > > feature to add the option to Flink to not ever discard checkpoints.
> I
> > > > believe this will solve the problem.
> > > >
> > > > Any objections or other known solutions to this problem?
> > > >
> > > > -Jamie
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: JobManager scale limitation - Slow S3 checkpoint deletes

Jamie Grier-3
All of the suggestions is this thread are good ones.  I had also considered
farming the actual cleanup work out to all the TaskMangers as well but
didn't realize how the easy the fix might be for this.  Thanks, Till.

With Till's change https://github.com/apache/flink/pull/7924 the problem
we're actually experiencing should be fixed so I'm not going to pursue this
any further right now.

-Jamie


On Thu, Mar 7, 2019 at 2:29 AM Till Rohrmann <[hidden email]> wrote:

> I think part of the problem is that we currently use the executor of the
> common RpcService to run the I/O operations as Stephan suspected [1]. I
> will be fixing this problem for 1.8.0 and 1.7.3.
>
> This should resolve the problem but supporting different means of clean up
> might still be interesting to add.
>
> [1] https://issues.apache.org/jira/browse/FLINK-11851
>
> Cheers,
> Till
>
> On Thu, Mar 7, 2019 at 8:56 AM Yun Tang <[hidden email]> wrote:
>
> > Sharing the communication pressure of a single node to multi task
> managers
> > would be a good idea. From my point of view, let task managers to know
> the
> > information that some specific checkpoint had already been aborted could
> > benefit a lot of things:
> >
> >   *   Let task manager to clean up the files, which is the topic of this
> > thread.
> >   *   Let `StreamTask` could cancel aborted running checkpoint in
> > task-side, just as https://issues.apache.org/jira/browse/FLINK-8871 want
> > to achieve.
> >   *   Let local state store could prune local checkpoints as soon as
> > possible without waiting for next `notifyCheckpointComplete` come.
> >   *   Let state backend on task manager side could did something on its
> > side, which would be really helpful for specific state backend
> > disaggregating computation and storage.
> >
> > Best
> > Yun Tang
> > ________________________________
> > From: Thomas Weise <[hidden email]>
> > Sent: Thursday, March 7, 2019 12:06
> > To: [hidden email]
> > Subject: Re: JobManager scale limitation - Slow S3 checkpoint deletes
> >
> > Nice!
> >
> > Perhaps for file systems without TTL/expiration support (AFAIK includes
> > HDFS), cleanup could be performed in the task managers?
> >
> >
> > On Wed, Mar 6, 2019 at 6:01 PM Jamie Grier <[hidden email]>
> > wrote:
> >
> > > Yup, it looks like the actor threads are spending all of their time
> > > communicating with S3.  I've attached a picture of a typical stack
> trace
> > > for one of the actor threads [1].  At the end of that call stack what
> > > you'll see is the thread blocking on synchronous communication with the
> > S3
> > > service.  This is for one of the flink-akka.actor.default-dispatcher
> > > threads.
> > >
> > > I've also attached a link to a YourKit snapshot if you'd like to
> explore
> > > the profiling data in more detail [2]
> > >
> > > [1]
> > >
> > >
> >
> https://drive.google.com/open?id=0BzMcf4IvnGWNNjEzMmRJbkFiWkZpZDFtQWo4LXByUXpPSFpR
> > > [2] https://drive.google.com/open?id=1iHCKJT-PTQUcDzFuIiacJ1MgAkfxza3W
> > >
> > >
> > >
> > > On Wed, Mar 6, 2019 at 7:41 AM Stephan Ewen <[hidden email]> wrote:
> > >
> > > > I think having an option to not actively delete checkpoints (but
> rather
> > > > have the TTL feature of the file system take care of it) sounds like
> a
> > > good
> > > > idea.
> > > >
> > > > I am curious why you get heartbeat misses and akka timeouts during
> > > deletes.
> > > > Are some parts of the deletes happening sychronously in the actor
> > thread?
> > > >
> > > > On Wed, Mar 6, 2019 at 3:40 PM Jamie Grier <[hidden email]>
> > > > wrote:
> > > >
> > > > > We've run into an issue that limits the max parallelism of jobs we
> > can
> > > > run
> > > > > and what it seems to boil down to is that the JobManager becomes
> > > > > unresponsive while essentially spending all of it's time discarding
> > > > > checkpoints from S3.  This results in sluggish UI, sporadic
> > > > > AkkaAskTimeouts, heartbeat misses, etc.
> > > > >
> > > > > Since S3 (and I assume HDFS) have policy that can be used to
> discard
> > > old
> > > > > objects without Flink actively deleting them I think it would be a
> > > useful
> > > > > feature to add the option to Flink to not ever discard checkpoints.
> > I
> > > > > believe this will solve the problem.
> > > > >
> > > > > Any objections or other known solutions to this problem?
> > > > >
> > > > > -Jamie
> > > > >
> > > >
> > >
> >
>