Failing kafka consumer unable to cancel

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Failing kafka consumer unable to cancel

Gyula Fóra-2
Hey guys,

I ran into some issue with the kafka consumers.

I am reading from more than 50 topics with parallelism 1, and while running
the job I got the following exception during the checkpoint notification
(offset committing):

java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:935)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at
org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
at
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
at
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.notifyCheckpointComplete(FlinkKafkaConsumer.java:542)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:176)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:478)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:931)
... 5 more

This happened at the same time on multiple kafka consumers. Could this be
some Zookeeper related issue? Maybe we should be aware of this.

Another problem is that subsequently the whole pipeline got stuck at the
sources while canceling so the job could never restart. Maybe it would be
worth killing the whole thing and restart in these situations?

Thanks,
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Failing kafka consumer unable to cancel

Ufuk Celebi-2
https://issues.apache.org/jira/browse/KAFKA-824

This has been fixed for Kafka’s 0.9.0 version.

We should investigate why the job gets stuck though. Do you have a stack trace or any logs available?

– Ufuk

> On 17 Nov 2015, at 09:24, Gyula Fóra <[hidden email]> wrote:
>
> Hey guys,
>
> I ran into some issue with the kafka consumers.
>
> I am reading from more than 50 topics with parallelism 1, and while running
> the job I got the following exception during the checkpoint notification
> (offset committing):
>
> java.lang.RuntimeException: Error while confirming checkpoint
> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:935)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at
> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
> at
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
> at
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.notifyCheckpointComplete(FlinkKafkaConsumer.java:542)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:176)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:478)
> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:931)
> ... 5 more
>
> This happened at the same time on multiple kafka consumers. Could this be
> some Zookeeper related issue? Maybe we should be aware of this.
>
> Another problem is that subsequently the whole pipeline got stuck at the
> sources while canceling so the job could never restart. Maybe it would be
> worth killing the whole thing and restart in these situations?
>
> Thanks,
> Gyula

Reply | Threaded
Open this post in threaded view
|

Re: Failing kafka consumer unable to cancel

Ufuk Celebi-2
I had a quick chat with Robert and Stephan.

The problem is that the StreamTask cancellation needs to acquire a lock, which is held by the Kafka client in an infinite loop.

At this point, I’m not sure what our options are here. Maybe Stephan or Robert can chime in…

– Ufuk

> On 17 Nov 2015, at 10:43, Ufuk Celebi <[hidden email]> wrote:
>
> https://issues.apache.org/jira/browse/KAFKA-824
>
> This has been fixed for Kafka’s 0.9.0 version.
>
> We should investigate why the job gets stuck though. Do you have a stack trace or any logs available?
>
> – Ufuk
>
>> On 17 Nov 2015, at 09:24, Gyula Fóra <[hidden email]> wrote:
>>
>> Hey guys,
>>
>> I ran into some issue with the kafka consumers.
>>
>> I am reading from more than 50 topics with parallelism 1, and while running
>> the job I got the following exception during the checkpoint notification
>> (offset committing):
>>
>> java.lang.RuntimeException: Error while confirming checkpoint
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:935)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.NullPointerException
>> at
>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>> at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.notifyCheckpointComplete(FlinkKafkaConsumer.java:542)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:176)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:478)
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:931)
>> ... 5 more
>>
>> This happened at the same time on multiple kafka consumers. Could this be
>> some Zookeeper related issue? Maybe we should be aware of this.
>>
>> Another problem is that subsequently the whole pipeline got stuck at the
>> sources while canceling so the job could never restart. Maybe it would be
>> worth killing the whole thing and restart in these situations?
>>
>> Thanks,
>> Gyula
>

Reply | Threaded
Open this post in threaded view
|

Re: Failing kafka consumer unable to cancel

Stephan Ewen
In reply to this post by Ufuk Celebi-2
Hey!

The problem here is that there is no such thing as proper thread killing in
Java (at least it makes everything unstable if you do). Threads need to
exit cooperatively.

The Kafka Function calls simply are uninterruptibly stuck and never return
(pretty bad bug in their Zookeeper Client). As far as I know one cannot
clean this up properly unless one kills the process.

We could try and work around this by running the Zookeeper commit in a
dedicated lightweight thread that shares no resources and thus does not
make the system unstable if stopped (against better advise ;-) )

Stephan





On Tue, Nov 17, 2015 at 10:43 AM, Ufuk Celebi <[hidden email]> wrote:

> https://issues.apache.org/jira/browse/KAFKA-824
>
> This has been fixed for Kafka’s 0.9.0 version.
>
> We should investigate why the job gets stuck though. Do you have a stack
> trace or any logs available?
>
> – Ufuk
>
> > On 17 Nov 2015, at 09:24, Gyula Fóra <[hidden email]> wrote:
> >
> > Hey guys,
> >
> > I ran into some issue with the kafka consumers.
> >
> > I am reading from more than 50 topics with parallelism 1, and while
> running
> > the job I got the following exception during the checkpoint notification
> > (offset committing):
> >
> > java.lang.RuntimeException: Error while confirming checkpoint
> > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:935)
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.NullPointerException
> > at
> >
> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
> > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
> > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> > at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
> > at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.notifyCheckpointComplete(FlinkKafkaConsumer.java:542)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:176)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:478)
> > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:931)
> > ... 5 more
> >
> > This happened at the same time on multiple kafka consumers. Could this be
> > some Zookeeper related issue? Maybe we should be aware of this.
> >
> > Another problem is that subsequently the whole pipeline got stuck at the
> > sources while canceling so the job could never restart. Maybe it would be
> > worth killing the whole thing and restart in these situations?
> >
> > Thanks,
> > Gyula
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Failing kafka consumer unable to cancel

Stephan Ewen
Another idea: The bug is fixed in the Zookeeper Client that Kafka uses, so
if we can bump the transitive dependency, that might fix it...

On Tue, Nov 17, 2015 at 11:19 AM, Stephan Ewen <[hidden email]> wrote:

> Hey!
>
> The problem here is that there is no such thing as proper thread killing
> in Java (at least it makes everything unstable if you do). Threads need to
> exit cooperatively.
>
> The Kafka Function calls simply are uninterruptibly stuck and never return
> (pretty bad bug in their Zookeeper Client). As far as I know one cannot
> clean this up properly unless one kills the process.
>
> We could try and work around this by running the Zookeeper commit in a
> dedicated lightweight thread that shares no resources and thus does not
> make the system unstable if stopped (against better advise ;-) )
>
> Stephan
>
>
>
>
>
> On Tue, Nov 17, 2015 at 10:43 AM, Ufuk Celebi <[hidden email]> wrote:
>
>> https://issues.apache.org/jira/browse/KAFKA-824
>>
>> This has been fixed for Kafka’s 0.9.0 version.
>>
>> We should investigate why the job gets stuck though. Do you have a stack
>> trace or any logs available?
>>
>> – Ufuk
>>
>> > On 17 Nov 2015, at 09:24, Gyula Fóra <[hidden email]> wrote:
>> >
>> > Hey guys,
>> >
>> > I ran into some issue with the kafka consumers.
>> >
>> > I am reading from more than 50 topics with parallelism 1, and while
>> running
>> > the job I got the following exception during the checkpoint notification
>> > (offset committing):
>> >
>> > java.lang.RuntimeException: Error while confirming checkpoint
>> > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:935)
>> > at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> > at
>> >
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> > at
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> > at java.lang.Thread.run(Thread.java:745)
>> > Caused by: java.lang.NullPointerException
>> > at
>> >
>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>> > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>> > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>> > at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>> > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>> > at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>> > at
>> >
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>> > at
>> >
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>> > at
>> >
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.notifyCheckpointComplete(FlinkKafkaConsumer.java:542)
>> > at
>> >
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:176)
>> > at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:478)
>> > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:931)
>> > ... 5 more
>> >
>> > This happened at the same time on multiple kafka consumers. Could this
>> be
>> > some Zookeeper related issue? Maybe we should be aware of this.
>> >
>> > Another problem is that subsequently the whole pipeline got stuck at the
>> > sources while canceling so the job could never restart. Maybe it would
>> be
>> > worth killing the whole thing and restart in these situations?
>> >
>> > Thanks,
>> > Gyula
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Failing kafka consumer unable to cancel

Robert Metzger
I would try that approach first

On Tue, Nov 17, 2015 at 11:26 AM, Stephan Ewen <[hidden email]> wrote:

> Another idea: The bug is fixed in the Zookeeper Client that Kafka uses, so
> if we can bump the transitive dependency, that might fix it...
>
> On Tue, Nov 17, 2015 at 11:19 AM, Stephan Ewen <[hidden email]> wrote:
>
> > Hey!
> >
> > The problem here is that there is no such thing as proper thread killing
> > in Java (at least it makes everything unstable if you do). Threads need
> to
> > exit cooperatively.
> >
> > The Kafka Function calls simply are uninterruptibly stuck and never
> return
> > (pretty bad bug in their Zookeeper Client). As far as I know one cannot
> > clean this up properly unless one kills the process.
> >
> > We could try and work around this by running the Zookeeper commit in a
> > dedicated lightweight thread that shares no resources and thus does not
> > make the system unstable if stopped (against better advise ;-) )
> >
> > Stephan
> >
> >
> >
> >
> >
> > On Tue, Nov 17, 2015 at 10:43 AM, Ufuk Celebi <[hidden email]> wrote:
> >
> >> https://issues.apache.org/jira/browse/KAFKA-824
> >>
> >> This has been fixed for Kafka’s 0.9.0 version.
> >>
> >> We should investigate why the job gets stuck though. Do you have a stack
> >> trace or any logs available?
> >>
> >> – Ufuk
> >>
> >> > On 17 Nov 2015, at 09:24, Gyula Fóra <[hidden email]> wrote:
> >> >
> >> > Hey guys,
> >> >
> >> > I ran into some issue with the kafka consumers.
> >> >
> >> > I am reading from more than 50 topics with parallelism 1, and while
> >> running
> >> > the job I got the following exception during the checkpoint
> notification
> >> > (offset committing):
> >> >
> >> > java.lang.RuntimeException: Error while confirming checkpoint
> >> > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:935)
> >> > at
> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> > at
> >> >
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> >> > at
> >> >
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> >> > at java.lang.Thread.run(Thread.java:745)
> >> > Caused by: java.lang.NullPointerException
> >> > at
> >> >
> >>
> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
> >> > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
> >> > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> >> > at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
> >> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
> >> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> >> > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
> >> > at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
> >> > at
> >> >
> >>
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
> >> > at
> >> >
> >>
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
> >> > at
> >> >
> >>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.notifyCheckpointComplete(FlinkKafkaConsumer.java:542)
> >> > at
> >> >
> >>
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:176)
> >> > at
> >> >
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:478)
> >> > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:931)
> >> > ... 5 more
> >> >
> >> > This happened at the same time on multiple kafka consumers. Could this
> >> be
> >> > some Zookeeper related issue? Maybe we should be aware of this.
> >> >
> >> > Another problem is that subsequently the whole pipeline got stuck at
> the
> >> > sources while canceling so the job could never restart. Maybe it would
> >> be
> >> > worth killing the whole thing and restart in these situations?
> >> >
> >> > Thanks,
> >> > Gyula
> >>
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Failing kafka consumer unable to cancel

Gyula Fóra
Thanks for the quick response and thorough explanation :)

Gyula

Robert Metzger <[hidden email]> ezt írta (időpont: 2015. nov. 17., K,
11:27):

> I would try that approach first
>
> On Tue, Nov 17, 2015 at 11:26 AM, Stephan Ewen <[hidden email]> wrote:
>
> > Another idea: The bug is fixed in the Zookeeper Client that Kafka uses,
> so
> > if we can bump the transitive dependency, that might fix it...
> >
> > On Tue, Nov 17, 2015 at 11:19 AM, Stephan Ewen <[hidden email]> wrote:
> >
> > > Hey!
> > >
> > > The problem here is that there is no such thing as proper thread
> killing
> > > in Java (at least it makes everything unstable if you do). Threads need
> > to
> > > exit cooperatively.
> > >
> > > The Kafka Function calls simply are uninterruptibly stuck and never
> > return
> > > (pretty bad bug in their Zookeeper Client). As far as I know one cannot
> > > clean this up properly unless one kills the process.
> > >
> > > We could try and work around this by running the Zookeeper commit in a
> > > dedicated lightweight thread that shares no resources and thus does not
> > > make the system unstable if stopped (against better advise ;-) )
> > >
> > > Stephan
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Nov 17, 2015 at 10:43 AM, Ufuk Celebi <[hidden email]> wrote:
> > >
> > >> https://issues.apache.org/jira/browse/KAFKA-824
> > >>
> > >> This has been fixed for Kafka’s 0.9.0 version.
> > >>
> > >> We should investigate why the job gets stuck though. Do you have a
> stack
> > >> trace or any logs available?
> > >>
> > >> – Ufuk
> > >>
> > >> > On 17 Nov 2015, at 09:24, Gyula Fóra <[hidden email]> wrote:
> > >> >
> > >> > Hey guys,
> > >> >
> > >> > I ran into some issue with the kafka consumers.
> > >> >
> > >> > I am reading from more than 50 topics with parallelism 1, and while
> > >> running
> > >> > the job I got the following exception during the checkpoint
> > notification
> > >> > (offset committing):
> > >> >
> > >> > java.lang.RuntimeException: Error while confirming checkpoint
> > >> > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:935)
> > >> > at
> > >>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > >> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > >> > at
> > >> >
> > >>
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> > >> > at
> > >> >
> > >>
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> > >> > at java.lang.Thread.run(Thread.java:745)
> > >> > Caused by: java.lang.NullPointerException
> > >> > at
> > >> >
> > >>
> >
> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
> > >> > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
> > >> > at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> > >> > at
> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
> > >> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
> > >> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> > >> > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
> > >> > at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
> > >> > at
> > >> >
> > >>
> >
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
> > >> > at
> > >> >
> > >>
> >
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
> > >> > at
> > >> >
> > >>
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.notifyCheckpointComplete(FlinkKafkaConsumer.java:542)
> > >> > at
> > >> >
> > >>
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:176)
> > >> > at
> > >> >
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:478)
> > >> > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:931)
> > >> > ... 5 more
> > >> >
> > >> > This happened at the same time on multiple kafka consumers. Could
> this
> > >> be
> > >> > some Zookeeper related issue? Maybe we should be aware of this.
> > >> >
> > >> > Another problem is that subsequently the whole pipeline got stuck at
> > the
> > >> > sources while canceling so the job could never restart. Maybe it
> would
> > >> be
> > >> > worth killing the whole thing and restart in these situations?
> > >> >
> > >> > Thanks,
> > >> > Gyula
> > >>
> > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Failing kafka consumer unable to cancel

Gyula Fóra
Should I open a JIRA for this?

Gyula Fóra <[hidden email]> ezt írta (időpont: 2015. nov. 17., K,
11:30):

> Thanks for the quick response and thorough explanation :)
>
> Gyula
>
> Robert Metzger <[hidden email]> ezt írta (időpont: 2015. nov. 17.,
> K, 11:27):
>
>> I would try that approach first
>>
>> On Tue, Nov 17, 2015 at 11:26 AM, Stephan Ewen <[hidden email]> wrote:
>>
>> > Another idea: The bug is fixed in the Zookeeper Client that Kafka uses,
>> so
>> > if we can bump the transitive dependency, that might fix it...
>> >
>> > On Tue, Nov 17, 2015 at 11:19 AM, Stephan Ewen <[hidden email]>
>> wrote:
>> >
>> > > Hey!
>> > >
>> > > The problem here is that there is no such thing as proper thread
>> killing
>> > > in Java (at least it makes everything unstable if you do). Threads
>> need
>> > to
>> > > exit cooperatively.
>> > >
>> > > The Kafka Function calls simply are uninterruptibly stuck and never
>> > return
>> > > (pretty bad bug in their Zookeeper Client). As far as I know one
>> cannot
>> > > clean this up properly unless one kills the process.
>> > >
>> > > We could try and work around this by running the Zookeeper commit in a
>> > > dedicated lightweight thread that shares no resources and thus does
>> not
>> > > make the system unstable if stopped (against better advise ;-) )
>> > >
>> > > Stephan
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > On Tue, Nov 17, 2015 at 10:43 AM, Ufuk Celebi <[hidden email]> wrote:
>> > >
>> > >> https://issues.apache.org/jira/browse/KAFKA-824
>> > >>
>> > >> This has been fixed for Kafka’s 0.9.0 version.
>> > >>
>> > >> We should investigate why the job gets stuck though. Do you have a
>> stack
>> > >> trace or any logs available?
>> > >>
>> > >> – Ufuk
>> > >>
>> > >> > On 17 Nov 2015, at 09:24, Gyula Fóra <[hidden email]> wrote:
>> > >> >
>> > >> > Hey guys,
>> > >> >
>> > >> > I ran into some issue with the kafka consumers.
>> > >> >
>> > >> > I am reading from more than 50 topics with parallelism 1, and while
>> > >> running
>> > >> > the job I got the following exception during the checkpoint
>> > notification
>> > >> > (offset committing):
>> > >> >
>> > >> > java.lang.RuntimeException: Error while confirming checkpoint
>> > >> > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:935)
>> > >> > at
>> > >>
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> > >> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> > >> > at
>> > >> >
>> > >>
>> >
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> > >> > at
>> > >> >
>> > >>
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> > >> > at java.lang.Thread.run(Thread.java:745)
>> > >> > Caused by: java.lang.NullPointerException
>> > >> > at
>> > >> >
>> > >>
>> >
>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>> > >> > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>> > >> > at
>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>> > >> > at
>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>> > >> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>> > >> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>> > >> > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>> > >> > at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>> > >> > at
>> > >> >
>> > >>
>> >
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>> > >> > at
>> > >> >
>> > >>
>> >
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>> > >> > at
>> > >> >
>> > >>
>> >
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.notifyCheckpointComplete(FlinkKafkaConsumer.java:542)
>> > >> > at
>> > >> >
>> > >>
>> >
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:176)
>> > >> > at
>> > >> >
>> > >>
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:478)
>> > >> > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:931)
>> > >> > ... 5 more
>> > >> >
>> > >> > This happened at the same time on multiple kafka consumers. Could
>> this
>> > >> be
>> > >> > some Zookeeper related issue? Maybe we should be aware of this.
>> > >> >
>> > >> > Another problem is that subsequently the whole pipeline got stuck
>> at
>> > the
>> > >> > sources while canceling so the job could never restart. Maybe it
>> would
>> > >> be
>> > >> > worth killing the whole thing and restart in these situations?
>> > >> >
>> > >> > Thanks,
>> > >> > Gyula
>> > >>
>> > >>
>> > >
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Failing kafka consumer unable to cancel

Stephan Ewen
sure

On Tue, Nov 17, 2015 at 11:30 AM, Gyula Fóra <[hidden email]> wrote:

> Should I open a JIRA for this?
>
> Gyula Fóra <[hidden email]> ezt írta (időpont: 2015. nov. 17., K,
> 11:30):
>
> > Thanks for the quick response and thorough explanation :)
> >
> > Gyula
> >
> > Robert Metzger <[hidden email]> ezt írta (időpont: 2015. nov. 17.,
> > K, 11:27):
> >
> >> I would try that approach first
> >>
> >> On Tue, Nov 17, 2015 at 11:26 AM, Stephan Ewen <[hidden email]>
> wrote:
> >>
> >> > Another idea: The bug is fixed in the Zookeeper Client that Kafka
> uses,
> >> so
> >> > if we can bump the transitive dependency, that might fix it...
> >> >
> >> > On Tue, Nov 17, 2015 at 11:19 AM, Stephan Ewen <[hidden email]>
> >> wrote:
> >> >
> >> > > Hey!
> >> > >
> >> > > The problem here is that there is no such thing as proper thread
> >> killing
> >> > > in Java (at least it makes everything unstable if you do). Threads
> >> need
> >> > to
> >> > > exit cooperatively.
> >> > >
> >> > > The Kafka Function calls simply are uninterruptibly stuck and never
> >> > return
> >> > > (pretty bad bug in their Zookeeper Client). As far as I know one
> >> cannot
> >> > > clean this up properly unless one kills the process.
> >> > >
> >> > > We could try and work around this by running the Zookeeper commit
> in a
> >> > > dedicated lightweight thread that shares no resources and thus does
> >> not
> >> > > make the system unstable if stopped (against better advise ;-) )
> >> > >
> >> > > Stephan
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Tue, Nov 17, 2015 at 10:43 AM, Ufuk Celebi <[hidden email]>
> wrote:
> >> > >
> >> > >> https://issues.apache.org/jira/browse/KAFKA-824
> >> > >>
> >> > >> This has been fixed for Kafka’s 0.9.0 version.
> >> > >>
> >> > >> We should investigate why the job gets stuck though. Do you have a
> >> stack
> >> > >> trace or any logs available?
> >> > >>
> >> > >> – Ufuk
> >> > >>
> >> > >> > On 17 Nov 2015, at 09:24, Gyula Fóra <[hidden email]> wrote:
> >> > >> >
> >> > >> > Hey guys,
> >> > >> >
> >> > >> > I ran into some issue with the kafka consumers.
> >> > >> >
> >> > >> > I am reading from more than 50 topics with parallelism 1, and
> while
> >> > >> running
> >> > >> > the job I got the following exception during the checkpoint
> >> > notification
> >> > >> > (offset committing):
> >> > >> >
> >> > >> > java.lang.RuntimeException: Error while confirming checkpoint
> >> > >> > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:935)
> >> > >> > at
> >> > >>
> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> > >> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> > >> > at
> >> > >> >
> >> > >>
> >> >
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> >> > >> > at
> >> > >> >
> >> > >>
> >> >
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> >> > >> > at java.lang.Thread.run(Thread.java:745)
> >> > >> > Caused by: java.lang.NullPointerException
> >> > >> > at
> >> > >> >
> >> > >>
> >> >
> >>
> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
> >> > >> > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
> >> > >> > at
> >> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> >> > >> > at
> >> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
> >> > >> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
> >> > >> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> >> > >> > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
> >> > >> > at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
> >> > >> > at
> >> > >> >
> >> > >>
> >> >
> >>
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
> >> > >> > at
> >> > >> >
> >> > >>
> >> >
> >>
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
> >> > >> > at
> >> > >> >
> >> > >>
> >> >
> >>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.notifyCheckpointComplete(FlinkKafkaConsumer.java:542)
> >> > >> > at
> >> > >> >
> >> > >>
> >> >
> >>
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:176)
> >> > >> > at
> >> > >> >
> >> > >>
> >> >
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:478)
> >> > >> > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:931)
> >> > >> > ... 5 more
> >> > >> >
> >> > >> > This happened at the same time on multiple kafka consumers. Could
> >> this
> >> > >> be
> >> > >> > some Zookeeper related issue? Maybe we should be aware of this.
> >> > >> >
> >> > >> > Another problem is that subsequently the whole pipeline got stuck
> >> at
> >> > the
> >> > >> > sources while canceling so the job could never restart. Maybe it
> >> would
> >> > >> be
> >> > >> > worth killing the whole thing and restart in these situations?
> >> > >> >
> >> > >> > Thanks,
> >> > >> > Gyula
> >> > >>
> >> > >>
> >> > >
> >> >
> >>
> >
>