Checkpoints keep waiting on source locks

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpoints keep waiting on source locks

Gyula Fóra
Hey All,

I think there is some serious issue with the checkpoints. Running a simple
program like this won't complete any checkpoints:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.enableCheckpointing(5000);

env.generateSequence(1, 100).map(t -> { Thread.sleep(1000); return t; })
.map(t -> t).print();
env.execute();

The job will start executing and triggering checkpoints but the the
triggerCheckpoint method of the StreamTask will be stuck waiting for the
checkpoint lock. It will never take a snapshot...

Any ideas?
This happens on any parallelism, and for other sources as well.

Cheers,
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints keep waiting on source locks

Stephan Ewen
Hey!

The issue is that checkpoints can only happen in between elements being in
the pipeline. You block the pipeline in the sleep() call.
Since the checkpoint lock is not fair, the few cycles that the source
releases the lock are not enough for the checkpointer to acquire it.

I wonder if this is an artificial corner case, or actually an issue. The
solution is theoretically simple: Use a fair lock, but we would need to
break the data sources API and switch from "synchronized(Object)" to a fair
"java.concurrent.ReentrantLock".

Greetings,
Stephan


On Wed, Oct 21, 2015 at 1:47 PM, Gyula Fóra <[hidden email]> wrote:

> Hey All,
>
> I think there is some serious issue with the checkpoints. Running a simple
> program like this won't complete any checkpoints:
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> env.enableCheckpointing(5000);
>
> env.generateSequence(1, 100).map(t -> { Thread.sleep(1000); return t; })
> .map(t -> t).print();
> env.execute();
>
> The job will start executing and triggering checkpoints but the the
> triggerCheckpoint method of the StreamTask will be stuck waiting for the
> checkpoint lock. It will never take a snapshot...
>
> Any ideas?
> This happens on any parallelism, and for other sources as well.
>
> Cheers,
> Gyula
>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints keep waiting on source locks

Gyula Fóra
Thanks!
I am personally using Thread.sleep() a lot for flow control in my test
topologies, this might only be me, but it seems to be a pretty annoying
thing when you want to test your streaming jobs.

Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. okt. 21., Sze,
13:59):

> Hey!
>
> The issue is that checkpoints can only happen in between elements being in
> the pipeline. You block the pipeline in the sleep() call.
> Since the checkpoint lock is not fair, the few cycles that the source
> releases the lock are not enough for the checkpointer to acquire it.
>
> I wonder if this is an artificial corner case, or actually an issue. The
> solution is theoretically simple: Use a fair lock, but we would need to
> break the data sources API and switch from "synchronized(Object)" to a fair
> "java.concurrent.ReentrantLock".
>
> Greetings,
> Stephan
>
>
> On Wed, Oct 21, 2015 at 1:47 PM, Gyula Fóra <[hidden email]> wrote:
>
> > Hey All,
> >
> > I think there is some serious issue with the checkpoints. Running a
> simple
> > program like this won't complete any checkpoints:
> >
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > env.setParallelism(2);
> > env.enableCheckpointing(5000);
> >
> > env.generateSequence(1, 100).map(t -> { Thread.sleep(1000); return t; })
> > .map(t -> t).print();
> > env.execute();
> >
> > The job will start executing and triggering checkpoints but the the
> > triggerCheckpoint method of the StreamTask will be stuck waiting for the
> > checkpoint lock. It will never take a snapshot...
> >
> > Any ideas?
> > This happens on any parallelism, and for other sources as well.
> >
> > Cheers,
> > Gyula
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints keep waiting on source locks

Stephan Ewen
Thread sleeping is a bit of a rouge way of testing, but I see your point...

I am starting to think we should see how much it costs to swap the lock for
a fair lock after 0.10 and, if it costs not too much, switch that for 1.0
...

Stephan


On Wed, Oct 21, 2015 at 2:01 PM, Gyula Fóra <[hidden email]> wrote:

> Thanks!
> I am personally using Thread.sleep() a lot for flow control in my test
> topologies, this might only be me, but it seems to be a pretty annoying
> thing when you want to test your streaming jobs.
>
> Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. okt. 21., Sze,
> 13:59):
>
> > Hey!
> >
> > The issue is that checkpoints can only happen in between elements being
> in
> > the pipeline. You block the pipeline in the sleep() call.
> > Since the checkpoint lock is not fair, the few cycles that the source
> > releases the lock are not enough for the checkpointer to acquire it.
> >
> > I wonder if this is an artificial corner case, or actually an issue. The
> > solution is theoretically simple: Use a fair lock, but we would need to
> > break the data sources API and switch from "synchronized(Object)" to a
> fair
> > "java.concurrent.ReentrantLock".
> >
> > Greetings,
> > Stephan
> >
> >
> > On Wed, Oct 21, 2015 at 1:47 PM, Gyula Fóra <[hidden email]>
> wrote:
> >
> > > Hey All,
> > >
> > > I think there is some serious issue with the checkpoints. Running a
> > simple
> > > program like this won't complete any checkpoints:
> > >
> > > StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > env.setParallelism(2);
> > > env.enableCheckpointing(5000);
> > >
> > > env.generateSequence(1, 100).map(t -> { Thread.sleep(1000); return t;
> })
> > > .map(t -> t).print();
> > > env.execute();
> > >
> > > The job will start executing and triggering checkpoints but the the
> > > triggerCheckpoint method of the StreamTask will be stuck waiting for
> the
> > > checkpoint lock. It will never take a snapshot...
> > >
> > > Any ideas?
> > > This happens on any parallelism, and for other sources as well.
> > >
> > > Cheers,
> > > Gyula
> > >
> >
>