Hi!
I have run into a weird issue which I could have sworn that it wouldnt happen :D I feel there was a discussion about this in the past but maybe im wrong, but I hope someone can point me to a ticket. Lets say you create a kafka consumer that consumes (t1,t2,t3), you take a savepoint and deploy a new version that only consumes (t1). The restore logic now still starts to consume (t1,t2,t3) which feels very unintuitive as those were explicitly removed from the list. It is also hard to debug as the topics causing the problem are not defined anywhere in your job, configs etc. Has anyone run into this issue? Should we change this default behaviour or at least have an option to not do this? Cheers, Gyula |
Hi,
Partition offsets stored in state will always be respected when the consumer is restored from checkpoints / savepoints. AFAIK, this seems to have been the behaviour for quite some time now (since FlinkKafkaConsumer08). I think in the past there were some discussion to at least allow some way to ignore restored partition offsets. One way to enable this is to filter the restored partition offsets based on the configured list of specified topics / topic regex pattern in the current execution. This should work, since this can only be modified when restoring from savepoints (i.e. manual restores). To avoid breaking the current behaviour, we can maybe add a `filterRestoredPartitionOffsetState()` configuration on the consumer, which by default is disabled to match the current behaviour. What do you think? Cheers, Gordon On Wed, Feb 13, 2019 at 11:59 PM Gyula Fóra <[hidden email]> wrote: > Hi! > > I have run into a weird issue which I could have sworn that it wouldnt > happen :D > I feel there was a discussion about this in the past but maybe im wrong, > but I hope someone can point me to a ticket. > > Lets say you create a kafka consumer that consumes (t1,t2,t3), you take a > savepoint and deploy a new version that only consumes (t1). > > The restore logic now still starts to consume (t1,t2,t3) which feels very > unintuitive as those were explicitly removed from the list. It is also hard > to debug as the topics causing the problem are not defined anywhere in your > job, configs etc. > > Has anyone run into this issue? Should we change this default behaviour or > at least have an option to not do this? > > Cheers, > Gyula > |
Hello there,
I’m just wondering if there are real world use cases for maintaining this default behavior. It’s a bit counter intuitive and sometimes results in serious production issues. ( We had a similar issue when changing the topic name, and resulting reading every message twice - both from the old one and from the new). Cheers, Feng Le mer. 13 févr. 2019 à 17:56, Tzu-Li (Gordon) Tai <[hidden email]> a écrit : > Hi, > > Partition offsets stored in state will always be respected when the > consumer is restored from checkpoints / savepoints. > AFAIK, this seems to have been the behaviour for quite some time now (since > FlinkKafkaConsumer08). > > I think in the past there were some discussion to at least allow some way > to ignore restored partition offsets. > One way to enable this is to filter the restored partition offsets based on > the configured list of specified topics / topic regex pattern in the > current execution. This should work, since this can only be modified when > restoring from savepoints (i.e. manual restores). > To avoid breaking the current behaviour, we can maybe add a > `filterRestoredPartitionOffsetState()` configuration on the consumer, which > by default is disabled to match the current behaviour. > > What do you think? > > Cheers, > Gordon > > On Wed, Feb 13, 2019 at 11:59 PM Gyula Fóra <[hidden email]> wrote: > > > Hi! > > > > I have run into a weird issue which I could have sworn that it wouldnt > > happen :D > > I feel there was a discussion about this in the past but maybe im wrong, > > but I hope someone can point me to a ticket. > > > > Lets say you create a kafka consumer that consumes (t1,t2,t3), you take a > > savepoint and deploy a new version that only consumes (t1). > > > > The restore logic now still starts to consume (t1,t2,t3) which feels very > > unintuitive as those were explicitly removed from the list. It is also > hard > > to debug as the topics causing the problem are not defined anywhere in > your > > job, configs etc. > > > > Has anyone run into this issue? Should we change this default behaviour > or > > at least have an option to not do this? > > > > Cheers, > > Gyula > > > |
Hi!
I agree that it’s very confusing if you explicitly specify the topics that are to be confusing and what happens is different. I would almost consider this to be a bug , can’t see any reasonable use case just hard to debug problems . Having an option would be a good start but I would rather treat this as a bug. Gyula On Wed, 13 Feb 2019 at 18:27, Feng LI <[hidden email]> wrote: > Hello there, > > I’m just wondering if there are real world use cases for maintaining this > default behavior. It’s a bit counter intuitive and sometimes results in > serious production issues. ( We had a similar issue when changing the topic > name, and resulting reading every message twice - both from the old one and > from the new). > > Cheers, > Feng > Le mer. 13 févr. 2019 à 17:56, Tzu-Li (Gordon) Tai <[hidden email]> a > écrit : > > > Hi, > > > > Partition offsets stored in state will always be respected when the > > consumer is restored from checkpoints / savepoints. > > AFAIK, this seems to have been the behaviour for quite some time now > (since > > FlinkKafkaConsumer08). > > > > I think in the past there were some discussion to at least allow some way > > to ignore restored partition offsets. > > One way to enable this is to filter the restored partition offsets based > on > > the configured list of specified topics / topic regex pattern in the > > current execution. This should work, since this can only be modified when > > restoring from savepoints (i.e. manual restores). > > To avoid breaking the current behaviour, we can maybe add a > > `filterRestoredPartitionOffsetState()` configuration on the consumer, > which > > by default is disabled to match the current behaviour. > > > > What do you think? > > > > Cheers, > > Gordon > > > > On Wed, Feb 13, 2019 at 11:59 PM Gyula Fóra <[hidden email]> > wrote: > > > > > Hi! > > > > > > I have run into a weird issue which I could have sworn that it wouldnt > > > happen :D > > > I feel there was a discussion about this in the past but maybe im > wrong, > > > but I hope someone can point me to a ticket. > > > > > > Lets say you create a kafka consumer that consumes (t1,t2,t3), you > take a > > > savepoint and deploy a new version that only consumes (t1). > > > > > > The restore logic now still starts to consume (t1,t2,t3) which feels > very > > > unintuitive as those were explicitly removed from the list. It is also > > hard > > > to debug as the topics causing the problem are not defined anywhere in > > your > > > job, configs etc. > > > > > > Has anyone run into this issue? Should we change this default behaviour > > or > > > at least have an option to not do this? > > > > > > Cheers, > > > Gyula > > > > > > |
I think these two Jira issues are relevant here:
- https://issues.apache.org/jira/browse/FLINK-10342 <https://issues.apache.org/jira/browse/FLINK-10342> - https://issues.apache.org/jira/browse/FLINK-9303 <https://issues.apache.org/jira/browse/FLINK-9303> The second one only because it’s slightly related. The first one is actually exactly this thread. I was against changing this behaviour in the Jira but I can now see that this is quite likely an issue. Aljoscha > On 13. Feb 2019, at 18:55, Gyula Fóra <[hidden email]> wrote: > > Hi! > > I agree that it’s very confusing if you explicitly specify the topics that > are to be confusing and what happens is different. > > I would almost consider this to be a bug , can’t see any reasonable use > case just hard to debug problems . > > Having an option would be a good start but I would rather treat this as a > bug. > > Gyula > > On Wed, 13 Feb 2019 at 18:27, Feng LI <[hidden email]> wrote: > >> Hello there, >> >> I’m just wondering if there are real world use cases for maintaining this >> default behavior. It’s a bit counter intuitive and sometimes results in >> serious production issues. ( We had a similar issue when changing the topic >> name, and resulting reading every message twice - both from the old one and >> from the new). >> >> Cheers, >> Feng >> Le mer. 13 févr. 2019 à 17:56, Tzu-Li (Gordon) Tai <[hidden email]> a >> écrit : >> >>> Hi, >>> >>> Partition offsets stored in state will always be respected when the >>> consumer is restored from checkpoints / savepoints. >>> AFAIK, this seems to have been the behaviour for quite some time now >> (since >>> FlinkKafkaConsumer08). >>> >>> I think in the past there were some discussion to at least allow some way >>> to ignore restored partition offsets. >>> One way to enable this is to filter the restored partition offsets based >> on >>> the configured list of specified topics / topic regex pattern in the >>> current execution. This should work, since this can only be modified when >>> restoring from savepoints (i.e. manual restores). >>> To avoid breaking the current behaviour, we can maybe add a >>> `filterRestoredPartitionOffsetState()` configuration on the consumer, >> which >>> by default is disabled to match the current behaviour. >>> >>> What do you think? >>> >>> Cheers, >>> Gordon >>> >>> On Wed, Feb 13, 2019 at 11:59 PM Gyula Fóra <[hidden email]> >> wrote: >>> >>>> Hi! >>>> >>>> I have run into a weird issue which I could have sworn that it wouldnt >>>> happen :D >>>> I feel there was a discussion about this in the past but maybe im >> wrong, >>>> but I hope someone can point me to a ticket. >>>> >>>> Lets say you create a kafka consumer that consumes (t1,t2,t3), you >> take a >>>> savepoint and deploy a new version that only consumes (t1). >>>> >>>> The restore logic now still starts to consume (t1,t2,t3) which feels >> very >>>> unintuitive as those were explicitly removed from the list. It is also >>> hard >>>> to debug as the topics causing the problem are not defined anywhere in >>> your >>>> job, configs etc. >>>> >>>> Has anyone run into this issue? Should we change this default behaviour >>> or >>>> at least have an option to not do this? >>>> >>>> Cheers, >>>> Gyula >>>> >>> >> |
Hello Aljoscha,
Thanks for sharing the ticket, I think it makes sense to reopen the ticket. (I can work on the fix for this, should be a small patch, just add a filter when restoring Kafka partitions with those discovered partitions). (btw. Can I have a contributor access for jira, my username is f.li) Cheers, Feng Le jeu. 14 févr. 2019 à 17:07, Aljoscha Krettek <[hidden email]> a écrit : > I think these two Jira issues are relevant here: > - https://issues.apache.org/jira/browse/FLINK-10342 < > https://issues.apache.org/jira/browse/FLINK-10342> > - https://issues.apache.org/jira/browse/FLINK-9303 < > https://issues.apache.org/jira/browse/FLINK-9303> > > The second one only because it’s slightly related. The first one is > actually exactly this thread. > > I was against changing this behaviour in the Jira but I can now see that > this is quite likely an issue. > > Aljoscha > > > On 13. Feb 2019, at 18:55, Gyula Fóra <[hidden email]> wrote: > > > > Hi! > > > > I agree that it’s very confusing if you explicitly specify the topics > that > > are to be confusing and what happens is different. > > > > I would almost consider this to be a bug , can’t see any reasonable use > > case just hard to debug problems . > > > > Having an option would be a good start but I would rather treat this as a > > bug. > > > > Gyula > > > > On Wed, 13 Feb 2019 at 18:27, Feng LI <[hidden email]> wrote: > > > >> Hello there, > >> > >> I’m just wondering if there are real world use cases for maintaining > this > >> default behavior. It’s a bit counter intuitive and sometimes results in > >> serious production issues. ( We had a similar issue when changing the > topic > >> name, and resulting reading every message twice - both from the old one > and > >> from the new). > >> > >> Cheers, > >> Feng > >> Le mer. 13 févr. 2019 à 17:56, Tzu-Li (Gordon) Tai <[hidden email]> > a > >> écrit : > >> > >>> Hi, > >>> > >>> Partition offsets stored in state will always be respected when the > >>> consumer is restored from checkpoints / savepoints. > >>> AFAIK, this seems to have been the behaviour for quite some time now > >> (since > >>> FlinkKafkaConsumer08). > >>> > >>> I think in the past there were some discussion to at least allow some > way > >>> to ignore restored partition offsets. > >>> One way to enable this is to filter the restored partition offsets > based > >> on > >>> the configured list of specified topics / topic regex pattern in the > >>> current execution. This should work, since this can only be modified > when > >>> restoring from savepoints (i.e. manual restores). > >>> To avoid breaking the current behaviour, we can maybe add a > >>> `filterRestoredPartitionOffsetState()` configuration on the consumer, > >> which > >>> by default is disabled to match the current behaviour. > >>> > >>> What do you think? > >>> > >>> Cheers, > >>> Gordon > >>> > >>> On Wed, Feb 13, 2019 at 11:59 PM Gyula Fóra <[hidden email]> > >> wrote: > >>> > >>>> Hi! > >>>> > >>>> I have run into a weird issue which I could have sworn that it wouldnt > >>>> happen :D > >>>> I feel there was a discussion about this in the past but maybe im > >> wrong, > >>>> but I hope someone can point me to a ticket. > >>>> > >>>> Lets say you create a kafka consumer that consumes (t1,t2,t3), you > >> take a > >>>> savepoint and deploy a new version that only consumes (t1). > >>>> > >>>> The restore logic now still starts to consume (t1,t2,t3) which feels > >> very > >>>> unintuitive as those were explicitly removed from the list. It is also > >>> hard > >>>> to debug as the topics causing the problem are not defined anywhere in > >>> your > >>>> job, configs etc. > >>>> > >>>> Has anyone run into this issue? Should we change this default > behaviour > >>> or > >>>> at least have an option to not do this? > >>>> > >>>> Cheers, > >>>> Gyula > >>>> > >>> > >> > > |
Hi Feng,
Thanks for working on a fix for this. I gave you contributor permission on JIRA. Before you jump right onto the code: do we have an agreement already on whether we change the default behaviour directly, or add a configuration option (e.g. add a `filterRestoredPartitionOffsetState()` method on the consumer) to enable the filtering? I'm still slightly in favor of keeping the default behaviour for the current Kafka connector, and only change that default for the upcoming rework of the connector. Cheers, Gordon On Fri, Feb 15, 2019 at 10:13 PM Feng LI <[hidden email]> wrote: > Hello Aljoscha, > > Thanks for sharing the ticket, I think it makes sense to reopen the ticket. > (I can work on the fix for this, should be a small patch, just add a filter > when restoring Kafka partitions with those discovered partitions). > > (btw. Can I have a contributor access for jira, my username is f.li) > > Cheers, > Feng > > Le jeu. 14 févr. 2019 à 17:07, Aljoscha Krettek <[hidden email]> a > écrit : > > > I think these two Jira issues are relevant here: > > - https://issues.apache.org/jira/browse/FLINK-10342 < > > https://issues.apache.org/jira/browse/FLINK-10342> > > - https://issues.apache.org/jira/browse/FLINK-9303 < > > https://issues.apache.org/jira/browse/FLINK-9303> > > > > The second one only because it’s slightly related. The first one is > > actually exactly this thread. > > > > I was against changing this behaviour in the Jira but I can now see that > > this is quite likely an issue. > > > > Aljoscha > > > > > On 13. Feb 2019, at 18:55, Gyula Fóra <[hidden email]> wrote: > > > > > > Hi! > > > > > > I agree that it’s very confusing if you explicitly specify the topics > > that > > > are to be confusing and what happens is different. > > > > > > I would almost consider this to be a bug , can’t see any reasonable use > > > case just hard to debug problems . > > > > > > Having an option would be a good start but I would rather treat this > as a > > > bug. > > > > > > Gyula > > > > > > On Wed, 13 Feb 2019 at 18:27, Feng LI <[hidden email]> wrote: > > > > > >> Hello there, > > >> > > >> I’m just wondering if there are real world use cases for maintaining > > this > > >> default behavior. It’s a bit counter intuitive and sometimes results > in > > >> serious production issues. ( We had a similar issue when changing the > > topic > > >> name, and resulting reading every message twice - both from the old > one > > and > > >> from the new). > > >> > > >> Cheers, > > >> Feng > > >> Le mer. 13 févr. 2019 à 17:56, Tzu-Li (Gordon) Tai < > [hidden email]> > > a > > >> écrit : > > >> > > >>> Hi, > > >>> > > >>> Partition offsets stored in state will always be respected when the > > >>> consumer is restored from checkpoints / savepoints. > > >>> AFAIK, this seems to have been the behaviour for quite some time now > > >> (since > > >>> FlinkKafkaConsumer08). > > >>> > > >>> I think in the past there were some discussion to at least allow some > > way > > >>> to ignore restored partition offsets. > > >>> One way to enable this is to filter the restored partition offsets > > based > > >> on > > >>> the configured list of specified topics / topic regex pattern in the > > >>> current execution. This should work, since this can only be modified > > when > > >>> restoring from savepoints (i.e. manual restores). > > >>> To avoid breaking the current behaviour, we can maybe add a > > >>> `filterRestoredPartitionOffsetState()` configuration on the consumer, > > >> which > > >>> by default is disabled to match the current behaviour. > > >>> > > >>> What do you think? > > >>> > > >>> Cheers, > > >>> Gordon > > >>> > > >>> On Wed, Feb 13, 2019 at 11:59 PM Gyula Fóra <[hidden email]> > > >> wrote: > > >>> > > >>>> Hi! > > >>>> > > >>>> I have run into a weird issue which I could have sworn that it > wouldnt > > >>>> happen :D > > >>>> I feel there was a discussion about this in the past but maybe im > > >> wrong, > > >>>> but I hope someone can point me to a ticket. > > >>>> > > >>>> Lets say you create a kafka consumer that consumes (t1,t2,t3), you > > >> take a > > >>>> savepoint and deploy a new version that only consumes (t1). > > >>>> > > >>>> The restore logic now still starts to consume (t1,t2,t3) which feels > > >> very > > >>>> unintuitive as those were explicitly removed from the list. It is > also > > >>> hard > > >>>> to debug as the topics causing the problem are not defined anywhere > in > > >>> your > > >>>> job, configs etc. > > >>>> > > >>>> Has anyone run into this issue? Should we change this default > > behaviour > > >>> or > > >>>> at least have an option to not do this? > > >>>> > > >>>> Cheers, > > >>>> Gyula > > >>>> > > >>> > > >> > > > > > |
Gordon,
Do you have an example where the current default behaviour makes sense and it doesnt cause unexpected problems? Or an example where someone might reasonably expect the current behaviour instead of the newly suggested one. If we have such cases I would agree lets keep it as is. If we cant come up with anything reasonable I vote for changing the default. Gyula On Fri, Feb 15, 2019 at 3:31 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote: > Hi Feng, > > Thanks for working on a fix for this. > I gave you contributor permission on JIRA. > > Before you jump right onto the code: > do we have an agreement already on whether we change the default behaviour > directly, > or add a configuration option (e.g. add a > `filterRestoredPartitionOffsetState()` method on the consumer) to enable > the filtering? > > I'm still slightly in favor of keeping the default behaviour for the > current Kafka connector, > and only change that default for the upcoming rework of the connector. > > Cheers, > Gordon > > On Fri, Feb 15, 2019 at 10:13 PM Feng LI <[hidden email]> wrote: > > > Hello Aljoscha, > > > > Thanks for sharing the ticket, I think it makes sense to reopen the > ticket. > > (I can work on the fix for this, should be a small patch, just add a > filter > > when restoring Kafka partitions with those discovered partitions). > > > > (btw. Can I have a contributor access for jira, my username is f.li) > > > > Cheers, > > Feng > > > > Le jeu. 14 févr. 2019 à 17:07, Aljoscha Krettek <[hidden email]> a > > écrit : > > > > > I think these two Jira issues are relevant here: > > > - https://issues.apache.org/jira/browse/FLINK-10342 < > > > https://issues.apache.org/jira/browse/FLINK-10342> > > > - https://issues.apache.org/jira/browse/FLINK-9303 < > > > https://issues.apache.org/jira/browse/FLINK-9303> > > > > > > The second one only because it’s slightly related. The first one is > > > actually exactly this thread. > > > > > > I was against changing this behaviour in the Jira but I can now see > that > > > this is quite likely an issue. > > > > > > Aljoscha > > > > > > > On 13. Feb 2019, at 18:55, Gyula Fóra <[hidden email]> wrote: > > > > > > > > Hi! > > > > > > > > I agree that it’s very confusing if you explicitly specify the topics > > > that > > > > are to be confusing and what happens is different. > > > > > > > > I would almost consider this to be a bug , can’t see any reasonable > use > > > > case just hard to debug problems . > > > > > > > > Having an option would be a good start but I would rather treat this > > as a > > > > bug. > > > > > > > > Gyula > > > > > > > > On Wed, 13 Feb 2019 at 18:27, Feng LI <[hidden email]> wrote: > > > > > > > >> Hello there, > > > >> > > > >> I’m just wondering if there are real world use cases for maintaining > > > this > > > >> default behavior. It’s a bit counter intuitive and sometimes results > > in > > > >> serious production issues. ( We had a similar issue when changing > the > > > topic > > > >> name, and resulting reading every message twice - both from the old > > one > > > and > > > >> from the new). > > > >> > > > >> Cheers, > > > >> Feng > > > >> Le mer. 13 févr. 2019 à 17:56, Tzu-Li (Gordon) Tai < > > [hidden email]> > > > a > > > >> écrit : > > > >> > > > >>> Hi, > > > >>> > > > >>> Partition offsets stored in state will always be respected when the > > > >>> consumer is restored from checkpoints / savepoints. > > > >>> AFAIK, this seems to have been the behaviour for quite some time > now > > > >> (since > > > >>> FlinkKafkaConsumer08). > > > >>> > > > >>> I think in the past there were some discussion to at least allow > some > > > way > > > >>> to ignore restored partition offsets. > > > >>> One way to enable this is to filter the restored partition offsets > > > based > > > >> on > > > >>> the configured list of specified topics / topic regex pattern in > the > > > >>> current execution. This should work, since this can only be > modified > > > when > > > >>> restoring from savepoints (i.e. manual restores). > > > >>> To avoid breaking the current behaviour, we can maybe add a > > > >>> `filterRestoredPartitionOffsetState()` configuration on the > consumer, > > > >> which > > > >>> by default is disabled to match the current behaviour. > > > >>> > > > >>> What do you think? > > > >>> > > > >>> Cheers, > > > >>> Gordon > > > >>> > > > >>> On Wed, Feb 13, 2019 at 11:59 PM Gyula Fóra <[hidden email]> > > > >> wrote: > > > >>> > > > >>>> Hi! > > > >>>> > > > >>>> I have run into a weird issue which I could have sworn that it > > wouldnt > > > >>>> happen :D > > > >>>> I feel there was a discussion about this in the past but maybe im > > > >> wrong, > > > >>>> but I hope someone can point me to a ticket. > > > >>>> > > > >>>> Lets say you create a kafka consumer that consumes (t1,t2,t3), you > > > >> take a > > > >>>> savepoint and deploy a new version that only consumes (t1). > > > >>>> > > > >>>> The restore logic now still starts to consume (t1,t2,t3) which > feels > > > >> very > > > >>>> unintuitive as those were explicitly removed from the list. It is > > also > > > >>> hard > > > >>>> to debug as the topics causing the problem are not defined > anywhere > > in > > > >>> your > > > >>>> job, configs etc. > > > >>>> > > > >>>> Has anyone run into this issue? Should we change this default > > > behaviour > > > >>> or > > > >>>> at least have an option to not do this? > > > >>>> > > > >>>> Cheers, > > > >>>> Gyula > > > >>>> > > > >>> > > > >> > > > > > > > > > |
Hello Gordon,
Thanks for adding the contributor permission. :) Agree with Gyula, I would vote +1 for changing this behavior unless we have use cases for maintaining it. I consider it more like a bug other than expected behavior. We had one counter example in production when migrating from one topic to another, restoring back and start consuming both topics, results in serious production issue (read the same message twice). Cheers, Feng Le ven. 15 févr. 2019 à 15:44, Gyula Fóra <[hidden email]> a écrit : > Gordon, > Do you have an example where the current default behaviour makes sense and > it doesnt cause unexpected problems? > Or an example where someone might reasonably expect the current behaviour > instead of the newly suggested one. > > If we have such cases I would agree lets keep it as is. If we cant come up > with anything reasonable I vote for changing the default. > > > Gyula > > On Fri, Feb 15, 2019 at 3:31 PM Tzu-Li (Gordon) Tai <[hidden email]> > wrote: > > > Hi Feng, > > > > Thanks for working on a fix for this. > > I gave you contributor permission on JIRA. > > > > Before you jump right onto the code: > > do we have an agreement already on whether we change the default > behaviour > > directly, > > or add a configuration option (e.g. add a > > `filterRestoredPartitionOffsetState()` method on the consumer) to enable > > the filtering? > > > > I'm still slightly in favor of keeping the default behaviour for the > > current Kafka connector, > > and only change that default for the upcoming rework of the connector. > > > > Cheers, > > Gordon > > > > On Fri, Feb 15, 2019 at 10:13 PM Feng LI <[hidden email]> wrote: > > > > > Hello Aljoscha, > > > > > > Thanks for sharing the ticket, I think it makes sense to reopen the > > ticket. > > > (I can work on the fix for this, should be a small patch, just add a > > filter > > > when restoring Kafka partitions with those discovered partitions). > > > > > > (btw. Can I have a contributor access for jira, my username is f.li) > > > > > > Cheers, > > > Feng > > > > > > Le jeu. 14 févr. 2019 à 17:07, Aljoscha Krettek <[hidden email]> > a > > > écrit : > > > > > > > I think these two Jira issues are relevant here: > > > > - https://issues.apache.org/jira/browse/FLINK-10342 < > > > > https://issues.apache.org/jira/browse/FLINK-10342> > > > > - https://issues.apache.org/jira/browse/FLINK-9303 < > > > > https://issues.apache.org/jira/browse/FLINK-9303> > > > > > > > > The second one only because it’s slightly related. The first one is > > > > actually exactly this thread. > > > > > > > > I was against changing this behaviour in the Jira but I can now see > > that > > > > this is quite likely an issue. > > > > > > > > Aljoscha > > > > > > > > > On 13. Feb 2019, at 18:55, Gyula Fóra <[hidden email]> > wrote: > > > > > > > > > > Hi! > > > > > > > > > > I agree that it’s very confusing if you explicitly specify the > topics > > > > that > > > > > are to be confusing and what happens is different. > > > > > > > > > > I would almost consider this to be a bug , can’t see any reasonable > > use > > > > > case just hard to debug problems . > > > > > > > > > > Having an option would be a good start but I would rather treat > this > > > as a > > > > > bug. > > > > > > > > > > Gyula > > > > > > > > > > On Wed, 13 Feb 2019 at 18:27, Feng LI <[hidden email]> > wrote: > > > > > > > > > >> Hello there, > > > > >> > > > > >> I’m just wondering if there are real world use cases for > maintaining > > > > this > > > > >> default behavior. It’s a bit counter intuitive and sometimes > results > > > in > > > > >> serious production issues. ( We had a similar issue when changing > > the > > > > topic > > > > >> name, and resulting reading every message twice - both from the > old > > > one > > > > and > > > > >> from the new). > > > > >> > > > > >> Cheers, > > > > >> Feng > > > > >> Le mer. 13 févr. 2019 à 17:56, Tzu-Li (Gordon) Tai < > > > [hidden email]> > > > > a > > > > >> écrit : > > > > >> > > > > >>> Hi, > > > > >>> > > > > >>> Partition offsets stored in state will always be respected when > the > > > > >>> consumer is restored from checkpoints / savepoints. > > > > >>> AFAIK, this seems to have been the behaviour for quite some time > > now > > > > >> (since > > > > >>> FlinkKafkaConsumer08). > > > > >>> > > > > >>> I think in the past there were some discussion to at least allow > > some > > > > way > > > > >>> to ignore restored partition offsets. > > > > >>> One way to enable this is to filter the restored partition > offsets > > > > based > > > > >> on > > > > >>> the configured list of specified topics / topic regex pattern in > > the > > > > >>> current execution. This should work, since this can only be > > modified > > > > when > > > > >>> restoring from savepoints (i.e. manual restores). > > > > >>> To avoid breaking the current behaviour, we can maybe add a > > > > >>> `filterRestoredPartitionOffsetState()` configuration on the > > consumer, > > > > >> which > > > > >>> by default is disabled to match the current behaviour. > > > > >>> > > > > >>> What do you think? > > > > >>> > > > > >>> Cheers, > > > > >>> Gordon > > > > >>> > > > > >>> On Wed, Feb 13, 2019 at 11:59 PM Gyula Fóra < > [hidden email]> > > > > >> wrote: > > > > >>> > > > > >>>> Hi! > > > > >>>> > > > > >>>> I have run into a weird issue which I could have sworn that it > > > wouldnt > > > > >>>> happen :D > > > > >>>> I feel there was a discussion about this in the past but maybe > im > > > > >> wrong, > > > > >>>> but I hope someone can point me to a ticket. > > > > >>>> > > > > >>>> Lets say you create a kafka consumer that consumes (t1,t2,t3), > you > > > > >> take a > > > > >>>> savepoint and deploy a new version that only consumes (t1). > > > > >>>> > > > > >>>> The restore logic now still starts to consume (t1,t2,t3) which > > feels > > > > >> very > > > > >>>> unintuitive as those were explicitly removed from the list. It > is > > > also > > > > >>> hard > > > > >>>> to debug as the topics causing the problem are not defined > > anywhere > > > in > > > > >>> your > > > > >>>> job, configs etc. > > > > >>>> > > > > >>>> Has anyone run into this issue? Should we change this default > > > > behaviour > > > > >>> or > > > > >>>> at least have an option to not do this? > > > > >>>> > > > > >>>> Cheers, > > > > >>>> Gyula > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > > > > > |
Hi,
I indeed don't have a specific use case in mind that justifies the current behaviour. My only concern is that since this was the default behaviour for a quite a while already, I can't be 100% that there are some user who actually expects the current behaviour. That may be dangerous since we would be essentially dropping state. But I agree that this isn't a strong argument for not considering it a bug. Therefore, I agree with the following: - Change the default behaviour to filter restored partition offset states with the current configured KafkaTopicsDescriptor. The topics descriptor should only ever change when the job is manually restored. - Add a `disableFilteringRestoredPartitionOffsets()` (name TBD) to at least provide a fallback for users who were somehow expecting the legacy behaviour. I don't think adding this configuration would add too much complication to the implementation, so it would be worth to still add that as a safeguard, just in case. What do you think? Cheers, Gordon On Fri, Feb 15, 2019, 11:06 PM Feng LI <[hidden email]> wrote: > Hello Gordon, > > Thanks for adding the contributor permission. :) > > Agree with Gyula, I would vote +1 for changing this behavior unless we have > use cases for maintaining it. I consider it more like a bug other than > expected behavior. > > We had one counter example in production when migrating from one topic to > another, restoring back and start consuming both topics, results in serious > production issue (read the same message twice). > > Cheers, > Feng > Le ven. 15 févr. 2019 à 15:44, Gyula Fóra <[hidden email]> a écrit : > > > Gordon, > > Do you have an example where the current default behaviour makes sense > and > > it doesnt cause unexpected problems? > > Or an example where someone might reasonably expect the current behaviour > > instead of the newly suggested one. > > > > If we have such cases I would agree lets keep it as is. If we cant come > up > > with anything reasonable I vote for changing the default. > > > > > > Gyula > > > > On Fri, Feb 15, 2019 at 3:31 PM Tzu-Li (Gordon) Tai <[hidden email] > > > > wrote: > > > > > Hi Feng, > > > > > > Thanks for working on a fix for this. > > > I gave you contributor permission on JIRA. > > > > > > Before you jump right onto the code: > > > do we have an agreement already on whether we change the default > > behaviour > > > directly, > > > or add a configuration option (e.g. add a > > > `filterRestoredPartitionOffsetState()` method on the consumer) to > enable > > > the filtering? > > > > > > I'm still slightly in favor of keeping the default behaviour for the > > > current Kafka connector, > > > and only change that default for the upcoming rework of the connector. > > > > > > Cheers, > > > Gordon > > > > > > On Fri, Feb 15, 2019 at 10:13 PM Feng LI <[hidden email]> > wrote: > > > > > > > Hello Aljoscha, > > > > > > > > Thanks for sharing the ticket, I think it makes sense to reopen the > > > ticket. > > > > (I can work on the fix for this, should be a small patch, just add a > > > filter > > > > when restoring Kafka partitions with those discovered partitions). > > > > > > > > (btw. Can I have a contributor access for jira, my username is f.li) > > > > > > > > Cheers, > > > > Feng > > > > > > > > Le jeu. 14 févr. 2019 à 17:07, Aljoscha Krettek <[hidden email] > > > > a > > > > écrit : > > > > > > > > > I think these two Jira issues are relevant here: > > > > > - https://issues.apache.org/jira/browse/FLINK-10342 < > > > > > https://issues.apache.org/jira/browse/FLINK-10342> > > > > > - https://issues.apache.org/jira/browse/FLINK-9303 < > > > > > https://issues.apache.org/jira/browse/FLINK-9303> > > > > > > > > > > The second one only because it’s slightly related. The first one is > > > > > actually exactly this thread. > > > > > > > > > > I was against changing this behaviour in the Jira but I can now see > > > that > > > > > this is quite likely an issue. > > > > > > > > > > Aljoscha > > > > > > > > > > > On 13. Feb 2019, at 18:55, Gyula Fóra <[hidden email]> > > wrote: > > > > > > > > > > > > Hi! > > > > > > > > > > > > I agree that it’s very confusing if you explicitly specify the > > topics > > > > > that > > > > > > are to be confusing and what happens is different. > > > > > > > > > > > > I would almost consider this to be a bug , can’t see any > reasonable > > > use > > > > > > case just hard to debug problems . > > > > > > > > > > > > Having an option would be a good start but I would rather treat > > this > > > > as a > > > > > > bug. > > > > > > > > > > > > Gyula > > > > > > > > > > > > On Wed, 13 Feb 2019 at 18:27, Feng LI <[hidden email]> > > wrote: > > > > > > > > > > > >> Hello there, > > > > > >> > > > > > >> I’m just wondering if there are real world use cases for > > maintaining > > > > > this > > > > > >> default behavior. It’s a bit counter intuitive and sometimes > > results > > > > in > > > > > >> serious production issues. ( We had a similar issue when > changing > > > the > > > > > topic > > > > > >> name, and resulting reading every message twice - both from the > > old > > > > one > > > > > and > > > > > >> from the new). > > > > > >> > > > > > >> Cheers, > > > > > >> Feng > > > > > >> Le mer. 13 févr. 2019 à 17:56, Tzu-Li (Gordon) Tai < > > > > [hidden email]> > > > > > a > > > > > >> écrit : > > > > > >> > > > > > >>> Hi, > > > > > >>> > > > > > >>> Partition offsets stored in state will always be respected when > > the > > > > > >>> consumer is restored from checkpoints / savepoints. > > > > > >>> AFAIK, this seems to have been the behaviour for quite some > time > > > now > > > > > >> (since > > > > > >>> FlinkKafkaConsumer08). > > > > > >>> > > > > > >>> I think in the past there were some discussion to at least > allow > > > some > > > > > way > > > > > >>> to ignore restored partition offsets. > > > > > >>> One way to enable this is to filter the restored partition > > offsets > > > > > based > > > > > >> on > > > > > >>> the configured list of specified topics / topic regex pattern > in > > > the > > > > > >>> current execution. This should work, since this can only be > > > modified > > > > > when > > > > > >>> restoring from savepoints (i.e. manual restores). > > > > > >>> To avoid breaking the current behaviour, we can maybe add a > > > > > >>> `filterRestoredPartitionOffsetState()` configuration on the > > > consumer, > > > > > >> which > > > > > >>> by default is disabled to match the current behaviour. > > > > > >>> > > > > > >>> What do you think? > > > > > >>> > > > > > >>> Cheers, > > > > > >>> Gordon > > > > > >>> > > > > > >>> On Wed, Feb 13, 2019 at 11:59 PM Gyula Fóra < > > [hidden email]> > > > > > >> wrote: > > > > > >>> > > > > > >>>> Hi! > > > > > >>>> > > > > > >>>> I have run into a weird issue which I could have sworn that it > > > > wouldnt > > > > > >>>> happen :D > > > > > >>>> I feel there was a discussion about this in the past but maybe > > im > > > > > >> wrong, > > > > > >>>> but I hope someone can point me to a ticket. > > > > > >>>> > > > > > >>>> Lets say you create a kafka consumer that consumes (t1,t2,t3), > > you > > > > > >> take a > > > > > >>>> savepoint and deploy a new version that only consumes (t1). > > > > > >>>> > > > > > >>>> The restore logic now still starts to consume (t1,t2,t3) which > > > feels > > > > > >> very > > > > > >>>> unintuitive as those were explicitly removed from the list. It > > is > > > > also > > > > > >>> hard > > > > > >>>> to debug as the topics causing the problem are not defined > > > anywhere > > > > in > > > > > >>> your > > > > > >>>> job, configs etc. > > > > > >>>> > > > > > >>>> Has anyone run into this issue? Should we change this default > > > > > behaviour > > > > > >>> or > > > > > >>>> at least have an option to not do this? > > > > > >>>> > > > > > >>>> Cheers, > > > > > >>>> Gyula > > > > > >>>> > > > > > >>> > > > > > >> > > > > > > > > > > > > > > > > > > > > |
Hello Gordon,
Thank sounds good to me. I prepared a patch for that. Will add you in the loop. Cheers, Feng Le sam. 16 févr. 2019 à 05:30, Tzu-Li (Gordon) Tai <[hidden email]> a écrit : > Hi, > > I indeed don't have a specific use case in mind that justifies the current > behaviour. > My only concern is that since this was the default behaviour for a quite a > while already, I can't be 100% that there are some user who actually > expects the current behaviour. That may be dangerous since we would be > essentially dropping state. > > But I agree that this isn't a strong argument for not considering it a bug. > Therefore, I agree with the following: > > - Change the default behaviour to filter restored partition offset states > with the current configured KafkaTopicsDescriptor. The topics descriptor > should only ever change when the job is manually restored. > - Add a `disableFilteringRestoredPartitionOffsets()` (name TBD) to at least > provide a fallback for users who were somehow expecting the legacy > behaviour. I don't think adding this configuration would add too much > complication to the implementation, so it would be worth to still add that > as a safeguard, just in case. > > What do you think? > > Cheers, > Gordon > > On Fri, Feb 15, 2019, 11:06 PM Feng LI <[hidden email]> wrote: > > > Hello Gordon, > > > > Thanks for adding the contributor permission. :) > > > > Agree with Gyula, I would vote +1 for changing this behavior unless we > have > > use cases for maintaining it. I consider it more like a bug other than > > expected behavior. > > > > We had one counter example in production when migrating from one topic to > > another, restoring back and start consuming both topics, results in > serious > > production issue (read the same message twice). > > > > Cheers, > > Feng > > Le ven. 15 févr. 2019 à 15:44, Gyula Fóra <[hidden email]> a > écrit : > > > > > Gordon, > > > Do you have an example where the current default behaviour makes sense > > and > > > it doesnt cause unexpected problems? > > > Or an example where someone might reasonably expect the current > behaviour > > > instead of the newly suggested one. > > > > > > If we have such cases I would agree lets keep it as is. If we cant come > > up > > > with anything reasonable I vote for changing the default. > > > > > > > > > Gyula > > > > > > On Fri, Feb 15, 2019 at 3:31 PM Tzu-Li (Gordon) Tai < > [hidden email] > > > > > > wrote: > > > > > > > Hi Feng, > > > > > > > > Thanks for working on a fix for this. > > > > I gave you contributor permission on JIRA. > > > > > > > > Before you jump right onto the code: > > > > do we have an agreement already on whether we change the default > > > behaviour > > > > directly, > > > > or add a configuration option (e.g. add a > > > > `filterRestoredPartitionOffsetState()` method on the consumer) to > > enable > > > > the filtering? > > > > > > > > I'm still slightly in favor of keeping the default behaviour for the > > > > current Kafka connector, > > > > and only change that default for the upcoming rework of the > connector. > > > > > > > > Cheers, > > > > Gordon > > > > > > > > On Fri, Feb 15, 2019 at 10:13 PM Feng LI <[hidden email]> > > wrote: > > > > > > > > > Hello Aljoscha, > > > > > > > > > > Thanks for sharing the ticket, I think it makes sense to reopen the > > > > ticket. > > > > > (I can work on the fix for this, should be a small patch, just add > a > > > > filter > > > > > when restoring Kafka partitions with those discovered partitions). > > > > > > > > > > (btw. Can I have a contributor access for jira, my username is > f.li) > > > > > > > > > > Cheers, > > > > > Feng > > > > > > > > > > Le jeu. 14 févr. 2019 à 17:07, Aljoscha Krettek < > [hidden email] > > > > > > a > > > > > écrit : > > > > > > > > > > > I think these two Jira issues are relevant here: > > > > > > - https://issues.apache.org/jira/browse/FLINK-10342 < > > > > > > https://issues.apache.org/jira/browse/FLINK-10342> > > > > > > - https://issues.apache.org/jira/browse/FLINK-9303 < > > > > > > https://issues.apache.org/jira/browse/FLINK-9303> > > > > > > > > > > > > The second one only because it’s slightly related. The first one > is > > > > > > actually exactly this thread. > > > > > > > > > > > > I was against changing this behaviour in the Jira but I can now > see > > > > that > > > > > > this is quite likely an issue. > > > > > > > > > > > > Aljoscha > > > > > > > > > > > > > On 13. Feb 2019, at 18:55, Gyula Fóra <[hidden email]> > > > wrote: > > > > > > > > > > > > > > Hi! > > > > > > > > > > > > > > I agree that it’s very confusing if you explicitly specify the > > > topics > > > > > > that > > > > > > > are to be confusing and what happens is different. > > > > > > > > > > > > > > I would almost consider this to be a bug , can’t see any > > reasonable > > > > use > > > > > > > case just hard to debug problems . > > > > > > > > > > > > > > Having an option would be a good start but I would rather treat > > > this > > > > > as a > > > > > > > bug. > > > > > > > > > > > > > > Gyula > > > > > > > > > > > > > > On Wed, 13 Feb 2019 at 18:27, Feng LI <[hidden email]> > > > wrote: > > > > > > > > > > > > > >> Hello there, > > > > > > >> > > > > > > >> I’m just wondering if there are real world use cases for > > > maintaining > > > > > > this > > > > > > >> default behavior. It’s a bit counter intuitive and sometimes > > > results > > > > > in > > > > > > >> serious production issues. ( We had a similar issue when > > changing > > > > the > > > > > > topic > > > > > > >> name, and resulting reading every message twice - both from > the > > > old > > > > > one > > > > > > and > > > > > > >> from the new). > > > > > > >> > > > > > > >> Cheers, > > > > > > >> Feng > > > > > > >> Le mer. 13 févr. 2019 à 17:56, Tzu-Li (Gordon) Tai < > > > > > [hidden email]> > > > > > > a > > > > > > >> écrit : > > > > > > >> > > > > > > >>> Hi, > > > > > > >>> > > > > > > >>> Partition offsets stored in state will always be respected > when > > > the > > > > > > >>> consumer is restored from checkpoints / savepoints. > > > > > > >>> AFAIK, this seems to have been the behaviour for quite some > > time > > > > now > > > > > > >> (since > > > > > > >>> FlinkKafkaConsumer08). > > > > > > >>> > > > > > > >>> I think in the past there were some discussion to at least > > allow > > > > some > > > > > > way > > > > > > >>> to ignore restored partition offsets. > > > > > > >>> One way to enable this is to filter the restored partition > > > offsets > > > > > > based > > > > > > >> on > > > > > > >>> the configured list of specified topics / topic regex pattern > > in > > > > the > > > > > > >>> current execution. This should work, since this can only be > > > > modified > > > > > > when > > > > > > >>> restoring from savepoints (i.e. manual restores). > > > > > > >>> To avoid breaking the current behaviour, we can maybe add a > > > > > > >>> `filterRestoredPartitionOffsetState()` configuration on the > > > > consumer, > > > > > > >> which > > > > > > >>> by default is disabled to match the current behaviour. > > > > > > >>> > > > > > > >>> What do you think? > > > > > > >>> > > > > > > >>> Cheers, > > > > > > >>> Gordon > > > > > > >>> > > > > > > >>> On Wed, Feb 13, 2019 at 11:59 PM Gyula Fóra < > > > [hidden email]> > > > > > > >> wrote: > > > > > > >>> > > > > > > >>>> Hi! > > > > > > >>>> > > > > > > >>>> I have run into a weird issue which I could have sworn that > it > > > > > wouldnt > > > > > > >>>> happen :D > > > > > > >>>> I feel there was a discussion about this in the past but > maybe > > > im > > > > > > >> wrong, > > > > > > >>>> but I hope someone can point me to a ticket. > > > > > > >>>> > > > > > > >>>> Lets say you create a kafka consumer that consumes > (t1,t2,t3), > > > you > > > > > > >> take a > > > > > > >>>> savepoint and deploy a new version that only consumes (t1). > > > > > > >>>> > > > > > > >>>> The restore logic now still starts to consume (t1,t2,t3) > which > > > > feels > > > > > > >> very > > > > > > >>>> unintuitive as those were explicitly removed from the list. > It > > > is > > > > > also > > > > > > >>> hard > > > > > > >>>> to debug as the topics causing the problem are not defined > > > > anywhere > > > > > in > > > > > > >>> your > > > > > > >>>> job, configs etc. > > > > > > >>>> > > > > > > >>>> Has anyone run into this issue? Should we change this > default > > > > > > behaviour > > > > > > >>> or > > > > > > >>>> at least have an option to not do this? > > > > > > >>>> > > > > > > >>>> Cheers, > > > > > > >>>> Gyula > > > > > > >>>> > > > > > > >>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |