Hey All,
I think there is some serious issue with the checkpoints. Running a simple program like this won't complete any checkpoints: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env.enableCheckpointing(5000); env.generateSequence(1, 100).map(t -> { Thread.sleep(1000); return t; }) .map(t -> t).print(); env.execute(); The job will start executing and triggering checkpoints but the the triggerCheckpoint method of the StreamTask will be stuck waiting for the checkpoint lock. It will never take a snapshot... Any ideas? This happens on any parallelism, and for other sources as well. Cheers, Gyula |
Hey!
The issue is that checkpoints can only happen in between elements being in the pipeline. You block the pipeline in the sleep() call. Since the checkpoint lock is not fair, the few cycles that the source releases the lock are not enough for the checkpointer to acquire it. I wonder if this is an artificial corner case, or actually an issue. The solution is theoretically simple: Use a fair lock, but we would need to break the data sources API and switch from "synchronized(Object)" to a fair "java.concurrent.ReentrantLock". Greetings, Stephan On Wed, Oct 21, 2015 at 1:47 PM, Gyula Fóra <[hidden email]> wrote: > Hey All, > > I think there is some serious issue with the checkpoints. Running a simple > program like this won't complete any checkpoints: > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(2); > env.enableCheckpointing(5000); > > env.generateSequence(1, 100).map(t -> { Thread.sleep(1000); return t; }) > .map(t -> t).print(); > env.execute(); > > The job will start executing and triggering checkpoints but the the > triggerCheckpoint method of the StreamTask will be stuck waiting for the > checkpoint lock. It will never take a snapshot... > > Any ideas? > This happens on any parallelism, and for other sources as well. > > Cheers, > Gyula > |
Thanks!
I am personally using Thread.sleep() a lot for flow control in my test topologies, this might only be me, but it seems to be a pretty annoying thing when you want to test your streaming jobs. Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. okt. 21., Sze, 13:59): > Hey! > > The issue is that checkpoints can only happen in between elements being in > the pipeline. You block the pipeline in the sleep() call. > Since the checkpoint lock is not fair, the few cycles that the source > releases the lock are not enough for the checkpointer to acquire it. > > I wonder if this is an artificial corner case, or actually an issue. The > solution is theoretically simple: Use a fair lock, but we would need to > break the data sources API and switch from "synchronized(Object)" to a fair > "java.concurrent.ReentrantLock". > > Greetings, > Stephan > > > On Wed, Oct 21, 2015 at 1:47 PM, Gyula Fóra <[hidden email]> wrote: > > > Hey All, > > > > I think there is some serious issue with the checkpoints. Running a > simple > > program like this won't complete any checkpoints: > > > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setParallelism(2); > > env.enableCheckpointing(5000); > > > > env.generateSequence(1, 100).map(t -> { Thread.sleep(1000); return t; }) > > .map(t -> t).print(); > > env.execute(); > > > > The job will start executing and triggering checkpoints but the the > > triggerCheckpoint method of the StreamTask will be stuck waiting for the > > checkpoint lock. It will never take a snapshot... > > > > Any ideas? > > This happens on any parallelism, and for other sources as well. > > > > Cheers, > > Gyula > > > |
Thread sleeping is a bit of a rouge way of testing, but I see your point...
I am starting to think we should see how much it costs to swap the lock for a fair lock after 0.10 and, if it costs not too much, switch that for 1.0 ... Stephan On Wed, Oct 21, 2015 at 2:01 PM, Gyula Fóra <[hidden email]> wrote: > Thanks! > I am personally using Thread.sleep() a lot for flow control in my test > topologies, this might only be me, but it seems to be a pretty annoying > thing when you want to test your streaming jobs. > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. okt. 21., Sze, > 13:59): > > > Hey! > > > > The issue is that checkpoints can only happen in between elements being > in > > the pipeline. You block the pipeline in the sleep() call. > > Since the checkpoint lock is not fair, the few cycles that the source > > releases the lock are not enough for the checkpointer to acquire it. > > > > I wonder if this is an artificial corner case, or actually an issue. The > > solution is theoretically simple: Use a fair lock, but we would need to > > break the data sources API and switch from "synchronized(Object)" to a > fair > > "java.concurrent.ReentrantLock". > > > > Greetings, > > Stephan > > > > > > On Wed, Oct 21, 2015 at 1:47 PM, Gyula Fóra <[hidden email]> > wrote: > > > > > Hey All, > > > > > > I think there is some serious issue with the checkpoints. Running a > > simple > > > program like this won't complete any checkpoints: > > > > > > StreamExecutionEnvironment env = > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > env.setParallelism(2); > > > env.enableCheckpointing(5000); > > > > > > env.generateSequence(1, 100).map(t -> { Thread.sleep(1000); return t; > }) > > > .map(t -> t).print(); > > > env.execute(); > > > > > > The job will start executing and triggering checkpoints but the the > > > triggerCheckpoint method of the StreamTask will be stuck waiting for > the > > > checkpoint lock. It will never take a snapshot... > > > > > > Any ideas? > > > This happens on any parallelism, and for other sources as well. > > > > > > Cheers, > > > Gyula > > > > > > |
Free forum by Nabble | Edit this page |