Failure restarting Flink 1.5.0 job from checkpoint

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Failure restarting Flink 1.5.0 job from checkpoint

Ken Krugler
Hi devs,

I coded up a simple iteration that uses a KeyedProcessFunction, as a way of showing how to use timers to do state iteration.

This worked fine, but then I wanted to try out checkpoints. I modified the KeyedProcessFunction to throw an exception after a fixed number of calls.

When this happens, it puts my job into a loop, where restarting the job fails with a NullPointerException:

18/05/30 16:38:40 DEBUG executiongraph.ExecutionGraph:1496 - Try to restart or fail the job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353) if no longer possible.
java.lang.RuntimeException: Example of a failure triggering a job restart
        at com.scaleunlimited.flinksnippets.examples.IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(IterationWithProcessFunctionTimers.java:74)
        at com.scaleunlimited.flinksnippets.examples.IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(IterationWithProcessFunctionTimers.java:1)
        at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:748)
18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1375 - Job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353) switched from state FAILING to RESTARTING.
18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1506 - Restarting the job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353).
18/05/30 16:38:40 WARN executiongraph.ExecutionGraph:1273 - Failed to restart the job.
java.lang.NullPointerException
        at org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104)
        at org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup.resetConstraints(CoLocationGroup.java:119)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1247)
        at org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
        at org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

CoLocationContraint.java:104 is this one line function:

        public boolean isAssignedAndAlive() {
                return lockedLocation != null && sharedSlot.isAlive();
        }

So I have to assume sharedSlot is null - I don’t know if that’s valid, or if this means that the constraint is being used before setSharedSlot() is called.

In any case, this same chunk of logging output repeats immediately, ad infinitum.

Is there something else I should try to track down what’s going on?

Thanks,

— Ken

PS - checkpointing is set up via:

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);
        env.setParallelism(2);
        env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);

And the skeleton of the simple workflow is:

        IterativeStream<String> iter = env.addSource(source).iterate(1000L);
        DataStream<String> updated = iter.keyBy(new MyKeySelector()).process(new MyKeyedProcessFunction());
        iter.closeWith(updated).print();


--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378

Reply | Threaded
Open this post in threaded view
|

Re: Failure restarting Flink 1.5.0 job from checkpoint

Aljoscha Krettek-2
Hi Ken,

I think you might have independently discovered this issue: https://issues.apache.org/jira/browse/FLINK-9458 <https://issues.apache.org/jira/browse/FLINK-9458>

Best,
Aljoscha

> On 31. May 2018, at 01:46, Ken Krugler <[hidden email]> wrote:
>
> Hi devs,
>
> I coded up a simple iteration that uses a KeyedProcessFunction, as a way of showing how to use timers to do state iteration.
>
> This worked fine, but then I wanted to try out checkpoints. I modified the KeyedProcessFunction to throw an exception after a fixed number of calls.
>
> When this happens, it puts my job into a loop, where restarting the job fails with a NullPointerException:
>
> 18/05/30 16:38:40 DEBUG executiongraph.ExecutionGraph:1496 - Try to restart or fail the job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353) if no longer possible.
> java.lang.RuntimeException: Example of a failure triggering a job restart
> at com.scaleunlimited.flinksnippets.examples.IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(IterationWithProcessFunctionTimers.java:74)
> at com.scaleunlimited.flinksnippets.examples.IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(IterationWithProcessFunctionTimers.java:1)
> at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1375 - Job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353) switched from state FAILING to RESTARTING.
> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1506 - Restarting the job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353).
> 18/05/30 16:38:40 WARN executiongraph.ExecutionGraph:1273 - Failed to restart the job.
> java.lang.NullPointerException
> at org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104)
> at org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup.resetConstraints(CoLocationGroup.java:119)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1247)
> at org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
> at org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
>
> CoLocationContraint.java:104 is this one line function:
>
> public boolean isAssignedAndAlive() {
> return lockedLocation != null && sharedSlot.isAlive();
> }
>
> So I have to assume sharedSlot is null - I don’t know if that’s valid, or if this means that the constraint is being used before setSharedSlot() is called.
>
> In any case, this same chunk of logging output repeats immediately, ad infinitum.
>
> Is there something else I should try to track down what’s going on?
>
> Thanks,
>
> — Ken
>
> PS - checkpointing is set up via:
>
>        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);
>        env.setParallelism(2);
>        env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true);
>        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
>
> And the skeleton of the simple workflow is:
>
>        IterativeStream<String> iter = env.addSource(source).iterate(1000L);
>        DataStream<String> updated = iter.keyBy(new MyKeySelector()).process(new MyKeyedProcessFunction());
>        iter.closeWith(updated).print();
>
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
>

Reply | Threaded
Open this post in threaded view
|

Re: Failure restarting Flink 1.5.0 job from checkpoint

Ken Krugler
Hi Aljoscha,

Yes, looks that way, thanks the issue reference - I’d checked Jira few days ago, looks like FLINK-9458 was added very recently :)

I’ll follow up in Jira to see if a small code snippet would be useful.

— Ken

> On May 31, 2018, at 1:17 AM, Aljoscha Krettek <[hidden email]> wrote:
>
> Hi Ken,
>
> I think you might have independently discovered this issue: https://issues.apache.org/jira/browse/FLINK-9458 <https://issues.apache.org/jira/browse/FLINK-9458>
>
> Best,
> Aljoscha
>
>> On 31. May 2018, at 01:46, Ken Krugler <[hidden email]> wrote:
>>
>> Hi devs,
>>
>> I coded up a simple iteration that uses a KeyedProcessFunction, as a way of showing how to use timers to do state iteration.
>>
>> This worked fine, but then I wanted to try out checkpoints. I modified the KeyedProcessFunction to throw an exception after a fixed number of calls.
>>
>> When this happens, it puts my job into a loop, where restarting the job fails with a NullPointerException:
>>
>> 18/05/30 16:38:40 DEBUG executiongraph.ExecutionGraph:1496 - Try to restart or fail the job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353) if no longer possible.
>> java.lang.RuntimeException: Example of a failure triggering a job restart
>> at com.scaleunlimited.flinksnippets.examples.IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(IterationWithProcessFunctionTimers.java:74)
>> at com.scaleunlimited.flinksnippets.examples.IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(IterationWithProcessFunctionTimers.java:1)
>> at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>> at java.lang.Thread.run(Thread.java:748)
>> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1375 - Job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353) switched from state FAILING to RESTARTING.
>> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1506 - Restarting the job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353).
>> 18/05/30 16:38:40 WARN executiongraph.ExecutionGraph:1273 - Failed to restart the job.
>> java.lang.NullPointerException
>> at org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104)
>> at org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup.resetConstraints(CoLocationGroup.java:119)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1247)
>> at org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
>> at org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> CoLocationContraint.java:104 is this one line function:
>>
>> public boolean isAssignedAndAlive() {
>> return lockedLocation != null && sharedSlot.isAlive();
>> }
>>
>> So I have to assume sharedSlot is null - I don’t know if that’s valid, or if this means that the constraint is being used before setSharedSlot() is called.
>>
>> In any case, this same chunk of logging output repeats immediately, ad infinitum.
>>
>> Is there something else I should try to track down what’s going on?
>>
>> Thanks,
>>
>> — Ken
>>
>> PS - checkpointing is set up via:
>>
>>       final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);
>>       env.setParallelism(2);
>>       env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true);
>>       env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
>>
>> And the skeleton of the simple workflow is:
>>
>>       IterativeStream<String> iter = env.addSource(source).iterate(1000L);
>>       DataStream<String> updated = iter.keyBy(new MyKeySelector()).process(new MyKeyedProcessFunction());
>>       iter.closeWith(updated).print();

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Failure restarting Flink 1.5.0 job from checkpoint

Till Rohrmann
Thanks for reporting the issue Ken. This looks indeed very strange and we
need to investigate how this can happen.

Cheers,
Till

On Thu, May 31, 2018 at 8:07 PM, Ken Krugler <[hidden email]>
wrote:

> Hi Aljoscha,
>
> Yes, looks that way, thanks the issue reference - I’d checked Jira few
> days ago, looks like FLINK-9458 was added very recently :)
>
> I’ll follow up in Jira to see if a small code snippet would be useful.
>
> — Ken
>
> > On May 31, 2018, at 1:17 AM, Aljoscha Krettek <[hidden email]>
> wrote:
> >
> > Hi Ken,
> >
> > I think you might have independently discovered this issue:
> https://issues.apache.org/jira/browse/FLINK-9458 <
> https://issues.apache.org/jira/browse/FLINK-9458>
> >
> > Best,
> > Aljoscha
> >
> >> On 31. May 2018, at 01:46, Ken Krugler <[hidden email]>
> wrote:
> >>
> >> Hi devs,
> >>
> >> I coded up a simple iteration that uses a KeyedProcessFunction, as a
> way of showing how to use timers to do state iteration.
> >>
> >> This worked fine, but then I wanted to try out checkpoints. I modified
> the KeyedProcessFunction to throw an exception after a fixed number of
> calls.
> >>
> >> When this happens, it puts my job into a loop, where restarting the job
> fails with a NullPointerException:
> >>
> >> 18/05/30 16:38:40 DEBUG executiongraph.ExecutionGraph:1496 - Try to
> restart or fail the job Flink Streaming Job (
> f144fd0fb301db0ae14c7b991a25b353) if no longer possible.
> >> java.lang.RuntimeException: Example of a failure triggering a job
> restart
> >>      at com.scaleunlimited.flinksnippets.examples.
> IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(
> IterationWithProcessFunctionTimers.java:74)
> >>      at com.scaleunlimited.flinksnippets.examples.
> IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(
> IterationWithProcessFunctionTimers.java:1)
> >>      at org.apache.flink.streaming.api.operators.KeyedProcessOperator.
> processElement(KeyedProcessOperator.java:85)
> >>      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:202)
> >>      at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask.run(OneInputStreamTask.java:103)
> >>      at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:306)
> >>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> >>      at java.lang.Thread.run(Thread.java:748)
> >> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1375 - Job Flink
> Streaming Job (f144fd0fb301db0ae14c7b991a25b353) switched from state
> FAILING to RESTARTING.
> >> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1506 - Restarting
> the job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353).
> >> 18/05/30 16:38:40 WARN executiongraph.ExecutionGraph:1273 - Failed to
> restart the job.
> >> java.lang.NullPointerException
> >>      at org.apache.flink.runtime.jobmanager.scheduler.
> CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104)
> >>      at org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup.
> resetConstraints(CoLocationGroup.java:119)
> >>      at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> restart(ExecutionGraph.java:1247)
> >>      at org.apache.flink.runtime.executiongraph.restart.
> ExecutionGraphRestartCallback.triggerFullRecovery(
> ExecutionGraphRestartCallback.java:59)
> >>      at org.apache.flink.runtime.executiongraph.restart.
> FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
> >>      at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> >>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >>      at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >>      at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >>      at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >>      at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >>      at java.lang.Thread.run(Thread.java:748)
> >>
> >> CoLocationContraint.java:104 is this one line function:
> >>
> >>      public boolean isAssignedAndAlive() {
> >>              return lockedLocation != null && sharedSlot.isAlive();
> >>      }
> >>
> >> So I have to assume sharedSlot is null - I don’t know if that’s valid,
> or if this means that the constraint is being used before setSharedSlot()
> is called.
> >>
> >> In any case, this same chunk of logging output repeats immediately, ad
> infinitum.
> >>
> >> Is there something else I should try to track down what’s going on?
> >>
> >> Thanks,
> >>
> >> — Ken
> >>
> >> PS - checkpointing is set up via:
> >>
> >>       final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> createLocalEnvironment(2);
> >>       env.setParallelism(2);
> >>       env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE,
> true);
> >>       env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
> >>
> >> And the skeleton of the simple workflow is:
> >>
> >>       IterativeStream<String> iter = env.addSource(source).iterate(
> 1000L);
> >>       DataStream<String> updated = iter.keyBy(new
> MyKeySelector()).process(new MyKeyedProcessFunction());
> >>       iter.closeWith(updated).print();
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>