Hey,
I was wondering whether something has changed for KafkaConsumer, since I am using Kafka 2.0.0 with Flink and I wanted to use group offsets but there seems to be no change in the topic where Kafka stores it's offsets, after restart Flink uses the `auto.offset.reset` so it seems that there is no offsets commit happening. The checkpoints are properly configured and I am able to restore with Savepoint. But the group offsets are not working properly. It there anything that has changed in this manner ? Best Regards, Dom. |
Hi Dominik,
There has not been any change to the offset committing logic in KafkaConsumer for a while. But the logic is a little complicated. The offset commit to Kafka is only enabled in the following two cases: 1. Flink checkpoint is enabled AND commitOffsetsOnCheckpoint is true (default value is true) 2. Flink checkpoint is disabled AND the vanilla KafkaConsumer has a) enable.auto.commit=true (default value is true); b) auto.commit.interval.ms>0 (default value is 5000). Note that in case 1, if the job exits before the first checkpoint takes place, then there will be no offset committed. Can you check if your setting falls in one of the two cases? Thanks, Jiangjie (Becket) Qin On Wed, Sep 4, 2019 at 9:03 PM Dominik Wosiński <[hidden email]> wrote: > Hey, > I was wondering whether something has changed for KafkaConsumer, since I am > using Kafka 2.0.0 with Flink and I wanted to use group offsets but there > seems to be no change in the topic where Kafka stores it's offsets, after > restart Flink uses the `auto.offset.reset` so it seems that there is no > offsets commit happening. The checkpoints are properly configured and I am > able to restore with Savepoint. But the group offsets are not working > properly. It there anything that has changed in this manner ? > > Best Regards, > Dom. > |
Hey,
Yeah I am using the first case. Is there a specific requirement for checkpoints ? Like do they need to be externalized or so ? Best, Dom. czw., 5 wrz 2019 o 05:32 Becket Qin <[hidden email]> napisał(a): > Hi Dominik, > > There has not been any change to the offset committing logic in > KafkaConsumer for a while. But the logic is a little complicated. The > offset commit to Kafka is only enabled in the following two cases: > > 1. Flink checkpoint is enabled AND commitOffsetsOnCheckpoint is true > (default value is true) > 2. Flink checkpoint is disabled AND the vanilla KafkaConsumer has a) > enable.auto.commit=true (default value is true); b) > auto.commit.interval.ms>0 > (default value is 5000). > > Note that in case 1, if the job exits before the first checkpoint takes > place, then there will be no offset committed. > > Can you check if your setting falls in one of the two cases? > > Thanks, > > Jiangjie (Becket) Qin > > > > > On Wed, Sep 4, 2019 at 9:03 PM Dominik Wosiński <[hidden email]> wrote: > > > Hey, > > I was wondering whether something has changed for KafkaConsumer, since I > am > > using Kafka 2.0.0 with Flink and I wanted to use group offsets but there > > seems to be no change in the topic where Kafka stores it's offsets, after > > restart Flink uses the `auto.offset.reset` so it seems that there is no > > offsets commit happening. The checkpoints are properly configured and I > am > > able to restore with Savepoint. But the group offsets are not working > > properly. It there anything that has changed in this manner ? > > > > Best Regards, > > Dom. > > > |
No, I don't think so.
As long as you have a successful checkpoint, The offset will be committed. Thanks, Jiangjie (Becket) Qin On Thu, Sep 5, 2019 at 4:56 PM Dominik Wosiński <[hidden email]> wrote: > Hey, > Yeah I am using the first case. Is there a specific requirement for > checkpoints ? Like do they need to be externalized or so ? > > > Best, > Dom. > > czw., 5 wrz 2019 o 05:32 Becket Qin <[hidden email]> napisał(a): > > > Hi Dominik, > > > > There has not been any change to the offset committing logic in > > KafkaConsumer for a while. But the logic is a little complicated. The > > offset commit to Kafka is only enabled in the following two cases: > > > > 1. Flink checkpoint is enabled AND commitOffsetsOnCheckpoint is true > > (default value is true) > > 2. Flink checkpoint is disabled AND the vanilla KafkaConsumer has a) > > enable.auto.commit=true (default value is true); b) > > auto.commit.interval.ms>0 > > (default value is 5000). > > > > Note that in case 1, if the job exits before the first checkpoint takes > > place, then there will be no offset committed. > > > > Can you check if your setting falls in one of the two cases? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > > > On Wed, Sep 4, 2019 at 9:03 PM Dominik Wosiński <[hidden email]> > wrote: > > > > > Hey, > > > I was wondering whether something has changed for KafkaConsumer, since > I > > am > > > using Kafka 2.0.0 with Flink and I wanted to use group offsets but > there > > > seems to be no change in the topic where Kafka stores it's offsets, > after > > > restart Flink uses the `auto.offset.reset` so it seems that there is no > > > offsets commit happening. The checkpoints are properly configured and I > > am > > > able to restore with Savepoint. But the group offsets are not working > > > properly. It there anything that has changed in this manner ? > > > > > > Best Regards, > > > Dom. > > > > > > |
In reply to this post by Becket Qin
Hi jiangjie, Yeah I am using the second case. (Flink 1.7.1, Kafka 0.10.2, FlinkKafkaConsumer010) But now there is a problem, the data is consumed normally, but the commit offset is not continued. The following exception is found: Becket Qin <[hidden email]> 于2019年9月5日周四 上午11:32写道: Hi Dominik, |
Free forum by Nabble | Edit this page |