Hi,
I am currently facing strange behaviour of the FlinkKafkaConsumer09 class. I am using Flink 1.0.3. These are my properties: val properties = new Properties() properties.setProperty("bootstrap.servers", config.urlKafka) properties.setProperty("group.id", COLLECTOR_NAME) properties.setProperty("auto.offset.reset", *"earliest"*) According to the new consumer API of Kafka, this should result in the following: /auto.offset.reset: * smallest : automatically reset the offset to the smallest offset/ (source: https://kafka.apache.org/documentation.html#newconsumerapi) However, it starts from the latest item in my topic. Is this a bug or am I doing something wrong? Regards, Kevin |
Hi Kevin,
Was the same “group.id” used before? What may be happening is that on startup of the consumer (not from failure restore), any existing committed offset for the groupId in Kafka’s brokers will be used as the starting point. The “auto.offset.reset” is only respected when no committed offsets can be found. Currently, if Flink’s checkpointing isn’t enabled, FlinkKafkaConsumer09 will periodically commit offsets back to Kafka brokers. So, it could be that you’re actually using those offsets as the actual starting points. Perhaps you can try using a new groupId and see if the behaviour still exists? Regards, Gordon On July 28, 2016 at 4:15:12 PM, Kevin Jacobs ([hidden email]) wrote: Hi, I am currently facing strange behaviour of the FlinkKafkaConsumer09 class. I am using Flink 1.0.3. These are my properties: val properties = new Properties() properties.setProperty("bootstrap.servers", config.urlKafka) properties.setProperty("group.id", COLLECTOR_NAME) properties.setProperty("auto.offset.reset", *"earliest"*) According to the new consumer API of Kafka, this should result in the following: /auto.offset.reset: * smallest : automatically reset the offset to the smallest offset/ (source: https://kafka.apache.org/documentation.html#newconsumerapi) However, it starts from the latest item in my topic. Is this a bug or am I doing something wrong? Regards, Kevin |
In reply to this post by Kevin Jacobs
Hi Kevin,
You need to use properties.setProperty("auto.offset.reset", "smallest") for Kafka 9 to start from the smallest offset. Note, that in Kafka 8 you need to use properties.setProperty("auto.offset.reset", "earliest") to achieve the same behavior. Kafka keeps track of the offsets per group id. If you have already read from a topic with a certain group id and want to restart from the smallest offset available, you need to generate a unique group id. Cheers, Max On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <[hidden email]> wrote: > Hi, > > I am currently facing strange behaviour of the FlinkKafkaConsumer09 class. I > am using Flink 1.0.3. > > These are my properties: > > val properties = new Properties() > properties.setProperty("bootstrap.servers", config.urlKafka) > properties.setProperty("group.id", COLLECTOR_NAME) > properties.setProperty("auto.offset.reset", *"earliest"*) > > According to the new consumer API of Kafka, this should result in the > following: > > /auto.offset.reset: * smallest : automatically reset the offset to the > smallest offset/ (source: > https://kafka.apache.org/documentation.html#newconsumerapi) > > However, it starts from the latest item in my topic. Is this a bug or am I > doing something wrong? > > Regards, > Kevin > |
Thank you Gordon and Max,
Thank you Gordon, that explains the behaviour a bit better to me. I am now adding the timestamp to the group ID and that is a good workaround for now. The "smallest" option is unfortunately not available in this version of the FlinkKafkaConsumer class. Cheers, Kevin On 28.07.2016 10:39, Maximilian Michels wrote: > Hi Kevin, > > You need to use properties.setProperty("auto.offset.reset", > "smallest") for Kafka 9 to start from the smallest offset. Note, that > in Kafka 8 you need to use properties.setProperty("auto.offset.reset", > "earliest") to achieve the same behavior. > > Kafka keeps track of the offsets per group id. If you have already > read from a topic with a certain group id and want to restart from the > smallest offset available, you need to generate a unique group id. > > Cheers, > Max > > On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <[hidden email]> wrote: >> Hi, >> >> I am currently facing strange behaviour of the FlinkKafkaConsumer09 class. I >> am using Flink 1.0.3. >> >> These are my properties: >> >> val properties = new Properties() >> properties.setProperty("bootstrap.servers", config.urlKafka) >> properties.setProperty("group.id", COLLECTOR_NAME) >> properties.setProperty("auto.offset.reset", *"earliest"*) >> >> According to the new consumer API of Kafka, this should result in the >> following: >> >> /auto.offset.reset: * smallest : automatically reset the offset to the >> smallest offset/ (source: >> https://kafka.apache.org/documentation.html#newconsumerapi) >> >> However, it starts from the latest item in my topic. Is this a bug or am I >> doing something wrong? >> >> Regards, >> Kevin >> |
Hi Kevin,
Just a re-clarification: for Kafka 0.9 it would be “earliest”, & “smallest” for the older Kafka 0.8. I’m wondering whether or not it is reasonable to add a Flink-specific way to set the consumer’s starting position to “earliest” and “latest”, without respecting the external Kafka offset store. Perhaps we can change the current behaviour (checking committed offsets in Kafka as starting point) as a user option, and add new options to read from “earliest” and “latest” regardless of the groupId and externally committed offsets. I think this better matches how users usually interpret the functionality of setting starting positions, while also keeping the “auto.offset.reset” behaviour that frequent Kafka users are used to. Also, this would also more clearly define that under the context of Flink, the external Kafka offset store is used only to expose the consumers progress to the outside world, and not used to manipulate how topics are read. Just an idea I have in mind, not sure if it would be a reasonable add. It’d be great to hear what other think of this. Regards, Gordon On July 28, 2016 at 4:44:02 PM, Kevin Jacobs ([hidden email]) wrote: Thank you Gordon and Max, Thank you Gordon, that explains the behaviour a bit better to me. I am now adding the timestamp to the group ID and that is a good workaround for now. The "smallest" option is unfortunately not available in this version of the FlinkKafkaConsumer class. Cheers, Kevin On 28.07.2016 10:39, Maximilian Michels wrote: > Hi Kevin, > > You need to use properties.setProperty("auto.offset.reset", > "smallest") for Kafka 9 to start from the smallest offset. Note, that > in Kafka 8 you need to use properties.setProperty("auto.offset.reset", > "earliest") to achieve the same behavior. > > Kafka keeps track of the offsets per group id. If you have already > read from a topic with a certain group id and want to restart from the > smallest offset available, you need to generate a unique group id. > > Cheers, > Max > > On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <[hidden email]> >> Hi, >> >> I am currently facing strange behaviour of the FlinkKafkaConsumer09 class. I >> am using Flink 1.0.3. >> >> These are my properties: >> >> val properties = new Properties() >> properties.setProperty("bootstrap.servers", config.urlKafka) >> properties.setProperty("group.id", COLLECTOR_NAME) >> properties.setProperty("auto.offset.reset", *"earliest"*) >> >> According to the new consumer API of Kafka, this should result in the >> following: >> >> /auto.offset.reset: * smallest : automatically reset the offset to the >> smallest offset/ (source: >> https://kafka.apache.org/documentation.html#newconsumerapi) >> >> However, it starts from the latest item in my topic. Is this a bug or am >> doing something wrong? >> >> Regards, >> Kevin >> |
Hi Tai,
Should definitely be possible. Would you mind opening a JIRA issue with the description you posted? Thanks, Max On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon <[hidden email]> wrote: > Hi Kevin, > > Just a re-clarification: for Kafka 0.9 it would be “earliest”, & “smallest” > for the older Kafka 0.8. > > I’m wondering whether or not it is reasonable to add a Flink-specific way > to set the consumer’s starting position to “earliest” and “latest”, without > respecting the external Kafka offset store. Perhaps we can change the > current behaviour (checking committed offsets in Kafka as starting point) > as a user option, and add new options to read from “earliest” and “latest” > regardless of the groupId and externally committed offsets. I think this > better matches how users usually interpret the functionality of setting > starting positions, while also keeping the “auto.offset.reset” behaviour > that frequent Kafka users are used to. Also, this would also more clearly > define that under the context of Flink, the external Kafka offset store is > used only to expose the consumers progress to the outside world, and not > used to manipulate how topics are read. > > Just an idea I have in mind, not sure if it would be a reasonable add. It’d > be great to hear what other think of this. > > Regards, > Gordon > > > On July 28, 2016 at 4:44:02 PM, Kevin Jacobs ([hidden email]) wrote: > > Thank you Gordon and Max, > > Thank you Gordon, that explains the behaviour a bit better to me. I am > now adding the timestamp to the group ID and that is a good workaround > for now. The "smallest" option is unfortunately not available in this > version of the FlinkKafkaConsumer class. > > Cheers, > Kevin > > > On 28.07.2016 10:39, Maximilian Michels wrote: >> Hi Kevin, >> >> You need to use properties.setProperty("auto.offset.reset", >> "smallest") for Kafka 9 to start from the smallest offset. Note, that >> in Kafka 8 you need to use properties.setProperty("auto.offset.reset", >> "earliest") to achieve the same behavior. >> >> Kafka keeps track of the offsets per group id. If you have already >> read from a topic with a certain group id and want to restart from the >> smallest offset available, you need to generate a unique group id. >> >> Cheers, >> Max >> >> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <[hidden email]> > wrote: >>> Hi, >>> >>> I am currently facing strange behaviour of the FlinkKafkaConsumer09 > class. I >>> am using Flink 1.0.3. >>> >>> These are my properties: >>> >>> val properties = new Properties() >>> properties.setProperty("bootstrap.servers", config.urlKafka) >>> properties.setProperty("group.id", COLLECTOR_NAME) >>> properties.setProperty("auto.offset.reset", *"earliest"*) >>> >>> According to the new consumer API of Kafka, this should result in the >>> following: >>> >>> /auto.offset.reset: * smallest : automatically reset the offset to the >>> smallest offset/ (source: >>> https://kafka.apache.org/documentation.html#newconsumerapi) >>> >>> However, it starts from the latest item in my topic. Is this a bug or am > I >>> doing something wrong? >>> >>> Regards, >>> Kevin >>> |
Hi Max,
Sure, I was planning to do so, but wanted to see if it was a reasonable feature to add before opening a JIRA :) Here's the new JIRA: https://issues.apache.org/jira/browse/FLINK-4280 Regards, Gordon On Fri, Jul 29, 2016 at 4:03 PM, Maximilian Michels <[hidden email]> wrote: > Hi Tai, > > Should definitely be possible. Would you mind opening a JIRA issue > with the description you posted? > > Thanks, > Max > > On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon <[hidden email]> wrote: > > Hi Kevin, > > > > Just a re-clarification: for Kafka 0.9 it would be “earliest”, & > “smallest” > > for the older Kafka 0.8. > > > > I’m wondering whether or not it is reasonable to add a Flink-specific way > > to set the consumer’s starting position to “earliest” and “latest”, > without > > respecting the external Kafka offset store. Perhaps we can change the > > current behaviour (checking committed offsets in Kafka as starting point) > > as a user option, and add new options to read from “earliest” and > “latest” > > regardless of the groupId and externally committed offsets. I think this > > better matches how users usually interpret the functionality of setting > > starting positions, while also keeping the “auto.offset.reset” behaviour > > that frequent Kafka users are used to. Also, this would also more clearly > > define that under the context of Flink, the external Kafka offset store > is > > used only to expose the consumers progress to the outside world, and not > > used to manipulate how topics are read. > > > > Just an idea I have in mind, not sure if it would be a reasonable add. > It’d > > be great to hear what other think of this. > > > > Regards, > > Gordon > > > > > > On July 28, 2016 at 4:44:02 PM, Kevin Jacobs ([hidden email]) > wrote: > > > > Thank you Gordon and Max, > > > > Thank you Gordon, that explains the behaviour a bit better to me. I am > > now adding the timestamp to the group ID and that is a good workaround > > for now. The "smallest" option is unfortunately not available in this > > version of the FlinkKafkaConsumer class. > > > > Cheers, > > Kevin > > > > > > On 28.07.2016 10:39, Maximilian Michels wrote: > >> Hi Kevin, > >> > >> You need to use properties.setProperty("auto.offset.reset", > >> "smallest") for Kafka 9 to start from the smallest offset. Note, that > >> in Kafka 8 you need to use properties.setProperty("auto.offset.reset", > >> "earliest") to achieve the same behavior. > >> > >> Kafka keeps track of the offsets per group id. If you have already > >> read from a topic with a certain group id and want to restart from the > >> smallest offset available, you need to generate a unique group id. > >> > >> Cheers, > >> Max > >> > >> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <[hidden email]> > > wrote: > >>> Hi, > >>> > >>> I am currently facing strange behaviour of the FlinkKafkaConsumer09 > > class. I > >>> am using Flink 1.0.3. > >>> > >>> These are my properties: > >>> > >>> val properties = new Properties() > >>> properties.setProperty("bootstrap.servers", config.urlKafka) > >>> properties.setProperty("group.id", COLLECTOR_NAME) > >>> properties.setProperty("auto.offset.reset", *"earliest"*) > >>> > >>> According to the new consumer API of Kafka, this should result in the > >>> following: > >>> > >>> /auto.offset.reset: * smallest : automatically reset the offset to the > >>> smallest offset/ (source: > >>> https://kafka.apache.org/documentation.html#newconsumerapi) > >>> > >>> However, it starts from the latest item in my topic. Is this a bug or > am > > I > >>> doing something wrong? > >>> > >>> Regards, > >>> Kevin > >>> > -- Tzu-Li (Gordon) Tai |
Thanks!
On Fri, Jul 29, 2016 at 11:43 AM, Gordon Tai (戴資力) <[hidden email]> wrote: > Hi Max, > > Sure, I was planning to do so, but wanted to see if it was a reasonable > feature to add before opening a JIRA :) > Here's the new JIRA: https://issues.apache.org/jira/browse/FLINK-4280 > > Regards, > Gordon > > On Fri, Jul 29, 2016 at 4:03 PM, Maximilian Michels <[hidden email]> wrote: > >> Hi Tai, >> >> Should definitely be possible. Would you mind opening a JIRA issue >> with the description you posted? >> >> Thanks, >> Max >> >> On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon <[hidden email]> wrote: >> > Hi Kevin, >> > >> > Just a re-clarification: for Kafka 0.9 it would be “earliest”, & >> “smallest” >> > for the older Kafka 0.8. >> > >> > I’m wondering whether or not it is reasonable to add a Flink-specific way >> > to set the consumer’s starting position to “earliest” and “latest”, >> without >> > respecting the external Kafka offset store. Perhaps we can change the >> > current behaviour (checking committed offsets in Kafka as starting point) >> > as a user option, and add new options to read from “earliest” and >> “latest” >> > regardless of the groupId and externally committed offsets. I think this >> > better matches how users usually interpret the functionality of setting >> > starting positions, while also keeping the “auto.offset.reset” behaviour >> > that frequent Kafka users are used to. Also, this would also more clearly >> > define that under the context of Flink, the external Kafka offset store >> is >> > used only to expose the consumers progress to the outside world, and not >> > used to manipulate how topics are read. >> > >> > Just an idea I have in mind, not sure if it would be a reasonable add. >> It’d >> > be great to hear what other think of this. >> > >> > Regards, >> > Gordon >> > >> > >> > On July 28, 2016 at 4:44:02 PM, Kevin Jacobs ([hidden email]) >> wrote: >> > >> > Thank you Gordon and Max, >> > >> > Thank you Gordon, that explains the behaviour a bit better to me. I am >> > now adding the timestamp to the group ID and that is a good workaround >> > for now. The "smallest" option is unfortunately not available in this >> > version of the FlinkKafkaConsumer class. >> > >> > Cheers, >> > Kevin >> > >> > >> > On 28.07.2016 10:39, Maximilian Michels wrote: >> >> Hi Kevin, >> >> >> >> You need to use properties.setProperty("auto.offset.reset", >> >> "smallest") for Kafka 9 to start from the smallest offset. Note, that >> >> in Kafka 8 you need to use properties.setProperty("auto.offset.reset", >> >> "earliest") to achieve the same behavior. >> >> >> >> Kafka keeps track of the offsets per group id. If you have already >> >> read from a topic with a certain group id and want to restart from the >> >> smallest offset available, you need to generate a unique group id. >> >> >> >> Cheers, >> >> Max >> >> >> >> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <[hidden email]> >> > wrote: >> >>> Hi, >> >>> >> >>> I am currently facing strange behaviour of the FlinkKafkaConsumer09 >> > class. I >> >>> am using Flink 1.0.3. >> >>> >> >>> These are my properties: >> >>> >> >>> val properties = new Properties() >> >>> properties.setProperty("bootstrap.servers", config.urlKafka) >> >>> properties.setProperty("group.id", COLLECTOR_NAME) >> >>> properties.setProperty("auto.offset.reset", *"earliest"*) >> >>> >> >>> According to the new consumer API of Kafka, this should result in the >> >>> following: >> >>> >> >>> /auto.offset.reset: * smallest : automatically reset the offset to the >> >>> smallest offset/ (source: >> >>> https://kafka.apache.org/documentation.html#newconsumerapi) >> >>> >> >>> However, it starts from the latest item in my topic. Is this a bug or >> am >> > I >> >>> doing something wrong? >> >>> >> >>> Regards, >> >>> Kevin >> >>> >> > > > > -- > Tzu-Li (Gordon) Tai |
Free forum by Nabble | Edit this page |