Flink Kafka consumer auto-commit timeout

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Kafka consumer auto-commit timeout

Rong Rong
Hi All,

I would like to bring back this discussion which I saw multiple times in
previous ML threads [1], but there seem to have no solution if
checkpointing is disabled.

All of these ML reported exceptions have one common pattern:

> *INFO* org.apache.kafka.clients.consumer.internals.AbstractCoordinator  -
> Marking the coordinator kafka[xxx]:9092 (id: ??? rack: ???) dead for
> group consumergroup[xxx]
> *WARN* org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -Auto-commit
> of offsets {topic[xxx]=OffsetAndMetadata{offset=???,metadata=???''}}
> failed for group consumergroup[xxx]: Offset commit failed with a retriable
> exception. You should retry committing offsets. The underlying error was:
> The request timed out.

In most of the cases enabling *OffsetCommitMode.ON_CHECKPOINTS* fixes the
issue - Flink Kafka consumer will explicitly commit offset when
checkpointing and that goes down completely different code path comparing
with enabling Kafka consumer option *enable.auto.commit* and let Kafka
consumer handles it.

That brings me to the question:
- Is this feature (disabling checkpoint and restarting job from Kafka
committed GROUP_OFFSET) not supported?
- How does Flink-Kafka actually handles auto-commit to coordinator given
the fact that Flink ignores the coordinator assignments and uses
self-assigning partitions instead?


A bit of our observation:
We had conducted some experiments when option *enable.auto.commit* is set
to true, with Kafka 011 on both Flink 1.4 and 1.6, and observed that the
behavior is extremely weird after the above warning were seen:
- the task manager metrics
**.Source.KafkaConsumer.current-offsets.topic-[xxx]-partition-[yyy]* is
moving forward, tracking the latest Kafka broker offset - indicating that
the message consumption thread is executing without any issue.
- the task manager metrics
**.Source.KafkaConsumer.committed-offsets.topic-[xxx]-partition-[yyy]* is
stuck indefinitely - indicating that it has stopped talking to the
coordinator.

We would try to experiment this with Flink 1.10 later, but has anyone
experiencing similar issues with later Flink releases as well?

Thanks,
Rong

--
[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-offset-auto-commit-stops-after-timeout-td18696.html
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-consumer-stopped-committing-offsets-td20624.html
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Odd-job-failure-td19845.html
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-td14822.html
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Logs-show-Marking-the-coordinator-2147483637-dead-in-Flink-Kafka-conn-td7396.html
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-takes-long-with-FlinkKafkaConsumer-td7561.html
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka consumer auto-commit timeout

Aljoscha Krettek-2
On 09.03.20 06:10, Rong Rong wrote:
> - Is this feature (disabling checkpoint and restarting job from Kafka
> committed GROUP_OFFSET) not supported?

I believe the Flink community never put much (any?) effort into this
because the Flink Kafka Consumer does its own offset handling. Starting
from the committed offsets should work fine, though, the default startup
mode is even StartupMode.GROUP_OFFSETS.

> - How does Flink-Kafka actually handles auto-commit to coordinator given
> the fact that Flink ignores the coordinator assignments and uses
> self-assigning partitions instead?

I think we don't do anything for this case, the Kafka Consumer code will
do the committing if 'enable.auto.commit' is set. I don't know how this
will play with out code because we disable the automatic group handling.

Do you think letting Kafka do the auto committing is ever useful, if you
have a Flink job that does checkpoints you will get the correct offset
committing and you can start a job from the committed offsets. In what
cases would you want to use the builtin Kafka offset committing?

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka consumer auto-commit timeout

Rong Rong
Hi Aljoscha,

Thank you for the help and reply.

1. I think we have finally pinpointed what the root cause to this issue is:
When partitions are assigned manually (e.g. with assign() API instead
subscribe() API) the client will not try to rediscover the coordinator if
it dies [1]. This seems to no longer be an issue after Kafka 1.1.0
After cherry-picking the PR[2] back to Kafka 0.11.x branch and package it
with our Flink application, we haven't seen this issue re-occurred so far.

2. The GROUP_OFFSETS is in fact the default startup mode if Checkpoint is
not enable - that's why I was a bit surprise that this problem was reported
so many times.
To follow up on the question "whether resuming from GROUP_OFFSETS are
useful": there are definitely use cases where users don't want to use
checkpointing (e.g. due to resource constraint, storage cost consideration,
etc), but somehow still want to avoid a certain amount of data loss. Most
of our analytics use cases falls into this category.


--
Rong


[1] https://issues.apache.org/jira/browse/KAFKA-6362
[2] https://github.com/apache/kafka/pull/4326


On Wed, Mar 11, 2020 at 10:16 AM Aljoscha Krettek <[hidden email]>
wrote:

> On 09.03.20 06:10, Rong Rong wrote:
> > - Is this feature (disabling checkpoint and restarting job from Kafka
> > committed GROUP_OFFSET) not supported?
>
> I believe the Flink community never put much (any?) effort into this
> because the Flink Kafka Consumer does its own offset handling. Starting
> from the committed offsets should work fine, though, the default startup
> mode is even StartupMode.GROUP_OFFSETS.
>
> > - How does Flink-Kafka actually handles auto-commit to coordinator given
> > the fact that Flink ignores the coordinator assignments and uses
> > self-assigning partitions instead?
>
> I think we don't do anything for this case, the Kafka Consumer code will
> do the committing if 'enable.auto.commit' is set. I don't know how this
> will play with out code because we disable the automatic group handling.
>
> Do you think letting Kafka do the auto committing is ever useful, if you
> have a Flink job that does checkpoints you will get the correct offset
> committing and you can start a job from the committed offsets. In what
> cases would you want to use the builtin Kafka offset committing?
>
> Best,
> Aljoscha
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka consumer auto-commit timeout

Aljoscha Krettek-2
Thanks for the update!

On 13.03.20 13:47, Rong Rong wrote:
> 1. I think we have finally pinpointed what the root cause to this issue is:
> When partitions are assigned manually (e.g. with assign() API instead
> subscribe() API) the client will not try to rediscover the coordinator if
> it dies [1]. This seems to no longer be an issue after Kafka 1.1.0
> After cherry-picking the PR[2] back to Kafka 0.11.x branch and package it
> with our Flink application, we haven't seen this issue re-occurred so far.

So the solution to this thread is: we don't do anything because it is a
Kafka bug that was fixed?

> 2. The GROUP_OFFSETS is in fact the default startup mode if Checkpoint is
> not enable - that's why I was a bit surprise that this problem was reported
> so many times.
> To follow up on the question "whether resuming from GROUP_OFFSETS are
> useful": there are definitely use cases where users don't want to use
> checkpointing (e.g. due to resource constraint, storage cost consideration,
> etc), but somehow still want to avoid a certain amount of data loss. Most
> of our analytics use cases falls into this category.

Yes, this is what I assumed. I was not suggesting to remove the feature.
We also just leave it as is, right?

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka consumer auto-commit timeout

Rong Rong
Yes, this is a Kafka side issue.

Since the affected version of Kafka is all below 1.1.0, ideally speaking we
should upgrade Kafka minor version on flink-connector-kafka-0.10/0.11 once
the fix was back-ported on the Kafka side.
However based on the fact that the PR has been merged for 2 years, I am not
sure that would eventually happen.

--
Rong

On Fri, Mar 13, 2020 at 6:43 AM Aljoscha Krettek <[hidden email]>
wrote:

> Thanks for the update!
>
> On 13.03.20 13:47, Rong Rong wrote:
> > 1. I think we have finally pinpointed what the root cause to this issue
> is:
> > When partitions are assigned manually (e.g. with assign() API instead
> > subscribe() API) the client will not try to rediscover the coordinator if
> > it dies [1]. This seems to no longer be an issue after Kafka 1.1.0
> > After cherry-picking the PR[2] back to Kafka 0.11.x branch and package it
> > with our Flink application, we haven't seen this issue re-occurred so
> far.
>
> So the solution to this thread is: we don't do anything because it is a
> Kafka bug that was fixed?
>
> > 2. The GROUP_OFFSETS is in fact the default startup mode if Checkpoint is
> > not enable - that's why I was a bit surprise that this problem was
> reported
> > so many times.
> > To follow up on the question "whether resuming from GROUP_OFFSETS are
> > useful": there are definitely use cases where users don't want to use
> > checkpointing (e.g. due to resource constraint, storage cost
> consideration,
> > etc), but somehow still want to avoid a certain amount of data loss. Most
> > of our analytics use cases falls into this category.
>
> Yes, this is what I assumed. I was not suggesting to remove the feature.
> We also just leave it as is, right?
>
> Best,
> Aljoscha
>