Storing offsets in Kafka

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Storing offsets in Kafka

Dominik Wosiński
Hey,
I was wondering whether something has changed for KafkaConsumer, since I am
using Kafka 2.0.0 with Flink and I wanted to use group offsets but there
seems to be no change in the topic where Kafka stores it's offsets, after
restart Flink uses the `auto.offset.reset` so it seems that there is no
offsets commit happening. The checkpoints are properly configured and I am
able to restore with Savepoint. But the group offsets are not working
properly. It there anything that has changed in this manner ?

Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Storing offsets in Kafka

Becket Qin
Hi Dominik,

There has not been any change to the offset committing logic in
KafkaConsumer for a while. But the logic is a little complicated. The
offset commit to Kafka is only enabled in the following two cases:

1. Flink checkpoint is enabled AND commitOffsetsOnCheckpoint is true
(default value is true)
2. Flink checkpoint is disabled AND the vanilla KafkaConsumer has a)
enable.auto.commit=true (default value is true); b) auto.commit.interval.ms>0
(default value is 5000).

Note that in case 1, if the job exits before the first checkpoint takes
place, then there will be no offset committed.

Can you check if your setting falls in one of the two cases?

Thanks,

Jiangjie (Becket) Qin




On Wed, Sep 4, 2019 at 9:03 PM Dominik Wosiński <[hidden email]> wrote:

> Hey,
> I was wondering whether something has changed for KafkaConsumer, since I am
> using Kafka 2.0.0 with Flink and I wanted to use group offsets but there
> seems to be no change in the topic where Kafka stores it's offsets, after
> restart Flink uses the `auto.offset.reset` so it seems that there is no
> offsets commit happening. The checkpoints are properly configured and I am
> able to restore with Savepoint. But the group offsets are not working
> properly. It there anything that has changed in this manner ?
>
> Best Regards,
> Dom.
>
Reply | Threaded
Open this post in threaded view
|

Re: Storing offsets in Kafka

Dominik Wosiński
Hey,
Yeah I am using the first case. Is there a specific requirement for
checkpoints ? Like do they need to be externalized or so ?


Best,
Dom.

czw., 5 wrz 2019 o 05:32 Becket Qin <[hidden email]> napisał(a):

> Hi Dominik,
>
> There has not been any change to the offset committing logic in
> KafkaConsumer for a while. But the logic is a little complicated. The
> offset commit to Kafka is only enabled in the following two cases:
>
> 1. Flink checkpoint is enabled AND commitOffsetsOnCheckpoint is true
> (default value is true)
> 2. Flink checkpoint is disabled AND the vanilla KafkaConsumer has a)
> enable.auto.commit=true (default value is true); b)
> auto.commit.interval.ms>0
> (default value is 5000).
>
> Note that in case 1, if the job exits before the first checkpoint takes
> place, then there will be no offset committed.
>
> Can you check if your setting falls in one of the two cases?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
> On Wed, Sep 4, 2019 at 9:03 PM Dominik Wosiński <[hidden email]> wrote:
>
> > Hey,
> > I was wondering whether something has changed for KafkaConsumer, since I
> am
> > using Kafka 2.0.0 with Flink and I wanted to use group offsets but there
> > seems to be no change in the topic where Kafka stores it's offsets, after
> > restart Flink uses the `auto.offset.reset` so it seems that there is no
> > offsets commit happening. The checkpoints are properly configured and I
> am
> > able to restore with Savepoint. But the group offsets are not working
> > properly. It there anything that has changed in this manner ?
> >
> > Best Regards,
> > Dom.
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Storing offsets in Kafka

Becket Qin
No, I don't think so.

As long as you have a successful checkpoint, The offset will be committed.

Thanks,

Jiangjie (Becket) Qin

On Thu, Sep 5, 2019 at 4:56 PM Dominik Wosiński <[hidden email]> wrote:

> Hey,
> Yeah I am using the first case. Is there a specific requirement for
> checkpoints ? Like do they need to be externalized or so ?
>
>
> Best,
> Dom.
>
> czw., 5 wrz 2019 o 05:32 Becket Qin <[hidden email]> napisał(a):
>
> > Hi Dominik,
> >
> > There has not been any change to the offset committing logic in
> > KafkaConsumer for a while. But the logic is a little complicated. The
> > offset commit to Kafka is only enabled in the following two cases:
> >
> > 1. Flink checkpoint is enabled AND commitOffsetsOnCheckpoint is true
> > (default value is true)
> > 2. Flink checkpoint is disabled AND the vanilla KafkaConsumer has a)
> > enable.auto.commit=true (default value is true); b)
> > auto.commit.interval.ms>0
> > (default value is 5000).
> >
> > Note that in case 1, if the job exits before the first checkpoint takes
> > place, then there will be no offset committed.
> >
> > Can you check if your setting falls in one of the two cases?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> >
> > On Wed, Sep 4, 2019 at 9:03 PM Dominik Wosiński <[hidden email]>
> wrote:
> >
> > > Hey,
> > > I was wondering whether something has changed for KafkaConsumer, since
> I
> > am
> > > using Kafka 2.0.0 with Flink and I wanted to use group offsets but
> there
> > > seems to be no change in the topic where Kafka stores it's offsets,
> after
> > > restart Flink uses the `auto.offset.reset` so it seems that there is no
> > > offsets commit happening. The checkpoints are properly configured and I
> > am
> > > able to restore with Savepoint. But the group offsets are not working
> > > properly. It there anything that has changed in this manner ?
> > >
> > > Best Regards,
> > > Dom.
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Storing offsets in Kafka

Marvin777
In reply to this post by Becket Qin
Hi jiangjie,

Yeah I am using the second case.  (Flink 1.7.1, Kafka 0.10.2, FlinkKafkaConsumer010)
But now there is a problem, the data is consumed normally, but the commit offset is not continued. The following exception is found:





Becket Qin <[hidden email]> 于2019年9月5日周四 上午11:32写道:
Hi Dominik,

There has not been any change to the offset committing logic in
KafkaConsumer for a while. But the logic is a little complicated. The
offset commit to Kafka is only enabled in the following two cases:

1. Flink checkpoint is enabled AND commitOffsetsOnCheckpoint is true
(default value is true)
2. Flink checkpoint is disabled AND the vanilla KafkaConsumer has a)
enable.auto.commit=true (default value is true); b) auto.commit.interval.ms>0
(default value is 5000).

Note that in case 1, if the job exits before the first checkpoint takes
place, then there will be no offset committed.

Can you check if your setting falls in one of the two cases?

Thanks,

Jiangjie (Becket) Qin




On Wed, Sep 4, 2019 at 9:03 PM Dominik Wosiński <[hidden email]> wrote:

> Hey,
> I was wondering whether something has changed for KafkaConsumer, since I am
> using Kafka 2.0.0 with Flink and I wanted to use group offsets but there
> seems to be no change in the topic where Kafka stores it's offsets, after
> restart Flink uses the `auto.offset.reset` so it seems that there is no
> offsets commit happening. The checkpoints are properly configured and I am
> able to restore with Savepoint. But the group offsets are not working
> properly. It there anything that has changed in this manner ?
>
> Best Regards,
> Dom.
>