Hi All,
I would like to bring back this discussion which I saw multiple times in previous ML threads [1], but there seem to have no solution if checkpointing is disabled. All of these ML reported exceptions have one common pattern: > *INFO* org.apache.kafka.clients.consumer.internals.AbstractCoordinator - > Marking the coordinator kafka[xxx]:9092 (id: ??? rack: ???) dead for > group consumergroup[xxx] > *WARN* org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -Auto-commit > of offsets {topic[xxx]=OffsetAndMetadata{offset=???,metadata=???''}} > failed for group consumergroup[xxx]: Offset commit failed with a retriable > exception. You should retry committing offsets. The underlying error was: > The request timed out. In most of the cases enabling *OffsetCommitMode.ON_CHECKPOINTS* fixes the issue - Flink Kafka consumer will explicitly commit offset when checkpointing and that goes down completely different code path comparing with enabling Kafka consumer option *enable.auto.commit* and let Kafka consumer handles it. That brings me to the question: - Is this feature (disabling checkpoint and restarting job from Kafka committed GROUP_OFFSET) not supported? - How does Flink-Kafka actually handles auto-commit to coordinator given the fact that Flink ignores the coordinator assignments and uses self-assigning partitions instead? A bit of our observation: We had conducted some experiments when option *enable.auto.commit* is set to true, with Kafka 011 on both Flink 1.4 and 1.6, and observed that the behavior is extremely weird after the above warning were seen: - the task manager metrics **.Source.KafkaConsumer.current-offsets.topic-[xxx]-partition-[yyy]* is moving forward, tracking the latest Kafka broker offset - indicating that the message consumption thread is executing without any issue. - the task manager metrics **.Source.KafkaConsumer.committed-offsets.topic-[xxx]-partition-[yyy]* is stuck indefinitely - indicating that it has stopped talking to the coordinator. We would try to experiment this with Flink 1.10 later, but has anyone experiencing similar issues with later Flink releases as well? Thanks, Rong -- [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-offset-auto-commit-stops-after-timeout-td18696.html http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-consumer-stopped-committing-offsets-td20624.html http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Odd-job-failure-td19845.html http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-td14822.html http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Logs-show-Marking-the-coordinator-2147483637-dead-in-Flink-Kafka-conn-td7396.html http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-takes-long-with-FlinkKafkaConsumer-td7561.html |
On 09.03.20 06:10, Rong Rong wrote:
> - Is this feature (disabling checkpoint and restarting job from Kafka > committed GROUP_OFFSET) not supported? I believe the Flink community never put much (any?) effort into this because the Flink Kafka Consumer does its own offset handling. Starting from the committed offsets should work fine, though, the default startup mode is even StartupMode.GROUP_OFFSETS. > - How does Flink-Kafka actually handles auto-commit to coordinator given > the fact that Flink ignores the coordinator assignments and uses > self-assigning partitions instead? I think we don't do anything for this case, the Kafka Consumer code will do the committing if 'enable.auto.commit' is set. I don't know how this will play with out code because we disable the automatic group handling. Do you think letting Kafka do the auto committing is ever useful, if you have a Flink job that does checkpoints you will get the correct offset committing and you can start a job from the committed offsets. In what cases would you want to use the builtin Kafka offset committing? Best, Aljoscha |
Hi Aljoscha,
Thank you for the help and reply. 1. I think we have finally pinpointed what the root cause to this issue is: When partitions are assigned manually (e.g. with assign() API instead subscribe() API) the client will not try to rediscover the coordinator if it dies [1]. This seems to no longer be an issue after Kafka 1.1.0 After cherry-picking the PR[2] back to Kafka 0.11.x branch and package it with our Flink application, we haven't seen this issue re-occurred so far. 2. The GROUP_OFFSETS is in fact the default startup mode if Checkpoint is not enable - that's why I was a bit surprise that this problem was reported so many times. To follow up on the question "whether resuming from GROUP_OFFSETS are useful": there are definitely use cases where users don't want to use checkpointing (e.g. due to resource constraint, storage cost consideration, etc), but somehow still want to avoid a certain amount of data loss. Most of our analytics use cases falls into this category. -- Rong [1] https://issues.apache.org/jira/browse/KAFKA-6362 [2] https://github.com/apache/kafka/pull/4326 On Wed, Mar 11, 2020 at 10:16 AM Aljoscha Krettek <[hidden email]> wrote: > On 09.03.20 06:10, Rong Rong wrote: > > - Is this feature (disabling checkpoint and restarting job from Kafka > > committed GROUP_OFFSET) not supported? > > I believe the Flink community never put much (any?) effort into this > because the Flink Kafka Consumer does its own offset handling. Starting > from the committed offsets should work fine, though, the default startup > mode is even StartupMode.GROUP_OFFSETS. > > > - How does Flink-Kafka actually handles auto-commit to coordinator given > > the fact that Flink ignores the coordinator assignments and uses > > self-assigning partitions instead? > > I think we don't do anything for this case, the Kafka Consumer code will > do the committing if 'enable.auto.commit' is set. I don't know how this > will play with out code because we disable the automatic group handling. > > Do you think letting Kafka do the auto committing is ever useful, if you > have a Flink job that does checkpoints you will get the correct offset > committing and you can start a job from the committed offsets. In what > cases would you want to use the builtin Kafka offset committing? > > Best, > Aljoscha > |
Thanks for the update!
On 13.03.20 13:47, Rong Rong wrote: > 1. I think we have finally pinpointed what the root cause to this issue is: > When partitions are assigned manually (e.g. with assign() API instead > subscribe() API) the client will not try to rediscover the coordinator if > it dies [1]. This seems to no longer be an issue after Kafka 1.1.0 > After cherry-picking the PR[2] back to Kafka 0.11.x branch and package it > with our Flink application, we haven't seen this issue re-occurred so far. So the solution to this thread is: we don't do anything because it is a Kafka bug that was fixed? > 2. The GROUP_OFFSETS is in fact the default startup mode if Checkpoint is > not enable - that's why I was a bit surprise that this problem was reported > so many times. > To follow up on the question "whether resuming from GROUP_OFFSETS are > useful": there are definitely use cases where users don't want to use > checkpointing (e.g. due to resource constraint, storage cost consideration, > etc), but somehow still want to avoid a certain amount of data loss. Most > of our analytics use cases falls into this category. Yes, this is what I assumed. I was not suggesting to remove the feature. We also just leave it as is, right? Best, Aljoscha |
Yes, this is a Kafka side issue.
Since the affected version of Kafka is all below 1.1.0, ideally speaking we should upgrade Kafka minor version on flink-connector-kafka-0.10/0.11 once the fix was back-ported on the Kafka side. However based on the fact that the PR has been merged for 2 years, I am not sure that would eventually happen. -- Rong On Fri, Mar 13, 2020 at 6:43 AM Aljoscha Krettek <[hidden email]> wrote: > Thanks for the update! > > On 13.03.20 13:47, Rong Rong wrote: > > 1. I think we have finally pinpointed what the root cause to this issue > is: > > When partitions are assigned manually (e.g. with assign() API instead > > subscribe() API) the client will not try to rediscover the coordinator if > > it dies [1]. This seems to no longer be an issue after Kafka 1.1.0 > > After cherry-picking the PR[2] back to Kafka 0.11.x branch and package it > > with our Flink application, we haven't seen this issue re-occurred so > far. > > So the solution to this thread is: we don't do anything because it is a > Kafka bug that was fixed? > > > 2. The GROUP_OFFSETS is in fact the default startup mode if Checkpoint is > > not enable - that's why I was a bit surprise that this problem was > reported > > so many times. > > To follow up on the question "whether resuming from GROUP_OFFSETS are > > useful": there are definitely use cases where users don't want to use > > checkpointing (e.g. due to resource constraint, storage cost > consideration, > > etc), but somehow still want to avoid a certain amount of data loss. Most > > of our analytics use cases falls into this category. > > Yes, this is what I assumed. I was not suggesting to remove the feature. > We also just leave it as is, right? > > Best, > Aljoscha > |
Free forum by Nabble | Edit this page |