Hi all,
I have just come across a weird state of operators after restore from checkpoint. After the restore, two operators that are connected (i.e. operator A is input of operator B) ended up with watermark of operator A being less than watermark of operator B. I don't know how to explain this. Can it be normal or does it signal a bug somewhere? If I understand Flink's checkpointing correctly, the checkpoint barrier flows from one operator to another, so the watermark should be aligned. I'm running a Beam pipeline on Flink 1.8.1. Am I missing something? Many thanks for comments, Jan |
Hi Jan,
Two pointers that may help you explain the behavior are the following: 1) If you have a custom watermark generator, I do not think that Flink checks if it emits only monotonically increasing watermarks. This is the responsibility of the generator itself. This means that although you operator A is topologically before operator B, operator A may have a smaller watermark if your watermark generator allows so. 2) Flink currently does not checkpoint the last seen watermark ( https://issues.apache.org/jira/browse/FLINK-5601). This means that after restoring, your (event) time is assumed to be Long.Min until the first new watermark comes. So if you observed late data not being late anymore or sth similar, then it may not be that the two operators have different watermarks but that after restoring event time rolls back to the "beginning of time". I hope this helps, Kostas On Wed, Aug 7, 2019 at 12:11 PM Jan Lukavský <[hidden email]> wrote: > Hi all, > > I have just come across a weird state of operators after restore from > checkpoint. After the restore, two operators that are connected (i.e. > operator A is input of operator B) ended up with watermark of operator A > being less than watermark of operator B. I don't know how to explain > this. Can it be normal or does it signal a bug somewhere? If I > understand Flink's checkpointing correctly, the checkpoint barrier flows > from one operator to another, so the watermark should be aligned. > > I'm running a Beam pipeline on Flink 1.8.1. > > Am I missing something? > > Many thanks for comments, > > Jan > > |
Hi Kostas,
thanks for reaction, comments inline. On 8/7/19 1:59 PM, Kostas Kloudas wrote: > Hi Jan, > > Two pointers that may help you explain the behavior are the following: > > 1) If you have a custom watermark generator, I do not think that Flink > checks if it emits only monotonically > increasing watermarks. This is the responsibility of the generator itself. > This means that although you operator A > is topologically before operator B, operator A may have a smaller watermark > if your watermark generator allows so. generated sequence is monotonic. But still, I'm not sure, that even if it was the case, that the generated watermark actually decreases, would that mean, that downstream operator after source (operator A) would actually "go back in time"? > > 2) Flink currently does not checkpoint the last seen watermark ( > https://issues.apache.org/jira/browse/FLINK-5601). > This means that after restoring, your (event) time is assumed to be > Long.Min until the first new watermark comes. > So if you observed late data not being late anymore or sth similar, then it > may not be that the two operators have > different watermarks but that after restoring event time rolls back to the > "beginning of time". I actually didn't observe any wrong or unexpected behavior, exceptions or wrong outputs. I just noticed this on Flink's WebUI and it looked strange to me. Could it be just that the WebUI showed older watermark for operator A? Strange was, that the watermarks were my screen long enough to take a screenshot (so at least say 10 seconds displaying watermark of operator A less than the one of operator B). Even if watermarks are not checkpointed, would it still be possible for watermark of operator B to be actually greater? I'm still confused of how this could happen, because (in my understanding) output watermark of operator A should be greater or equal to input watermark of B (because it takes minimum of inputs). Sorry if I'm too digging into this, but I don't like things I cannot explain, as they might point out to some bugs somewhere. :-) Or that my mental model it not aligned with reality. Jan > > I hope this helps, > Kostas > > On Wed, Aug 7, 2019 at 12:11 PM Jan Lukavský <[hidden email]> wrote: > >> Hi all, >> >> I have just come across a weird state of operators after restore from >> checkpoint. After the restore, two operators that are connected (i.e. >> operator A is input of operator B) ended up with watermark of operator A >> being less than watermark of operator B. I don't know how to explain >> this. Can it be normal or does it signal a bug somewhere? If I >> understand Flink's checkpointing correctly, the checkpoint barrier flows >> from one operator to another, so the watermark should be aligned. >> >> I'm running a Beam pipeline on Flink 1.8.1. >> >> Am I missing something? >> >> Many thanks for comments, >> >> Jan >> >> |
Hi Jan,
After looking at the code, my point 1) is false for *intermediate* tasks and if you are using a watermark assigner. This means that in these cases, Flink checks that the "next" watermark is greater than the "previous" one. But if your operator A is a source and you emit watermarks from the source, then it can happen that your watermark appears to go backwards on operator A, but operator B does the "correction" by discarding smaller watermarks. That can explain your observation. Cheers, Kostas On Wed, Aug 7, 2019 at 2:30 PM Jan Lukavský <[hidden email]> wrote: > Hi Kostas, > > thanks for reaction, comments inline. > > On 8/7/19 1:59 PM, Kostas Kloudas wrote: > > Hi Jan, > > > > Two pointers that may help you explain the behavior are the following: > > > > 1) If you have a custom watermark generator, I do not think that Flink > > checks if it emits only monotonically > > increasing watermarks. This is the responsibility of the generator > itself. > > This means that although you operator A > > is topologically before operator B, operator A may have a smaller > watermark > > if your watermark generator allows so. > I do generate watermarks by custom source, but I believe that the > generated sequence is monotonic. But still, I'm not sure, that even if > it was the case, that the generated watermark actually decreases, would > that mean, that downstream operator after source (operator A) would > actually "go back in time"? > > > > 2) Flink currently does not checkpoint the last seen watermark ( > > https://issues.apache.org/jira/browse/FLINK-5601). > > This means that after restoring, your (event) time is assumed to be > > Long.Min until the first new watermark comes. > > So if you observed late data not being late anymore or sth similar, then > it > > may not be that the two operators have > > different watermarks but that after restoring event time rolls back to > the > > "beginning of time". > > I actually didn't observe any wrong or unexpected behavior, exceptions > or wrong outputs. I just noticed this on Flink's WebUI and it looked > strange to me. Could it be just that the WebUI showed older watermark > for operator A? Strange was, that the watermarks were my screen long > enough to take a screenshot (so at least say 10 seconds displaying > watermark of operator A less than the one of operator B). Even if > watermarks are not checkpointed, would it still be possible for > watermark of operator B to be actually greater? I'm still confused of > how this could happen, because (in my understanding) output watermark of > operator A should be greater or equal to input watermark of B (because > it takes minimum of inputs). > > Sorry if I'm too digging into this, but I don't like things I cannot > explain, as they might point out to some bugs somewhere. :-) Or that my > mental model it not aligned with reality. > > Jan > > > > > I hope this helps, > > Kostas > > > > On Wed, Aug 7, 2019 at 12:11 PM Jan Lukavský <[hidden email]> wrote: > > > >> Hi all, > >> > >> I have just come across a weird state of operators after restore from > >> checkpoint. After the restore, two operators that are connected (i.e. > >> operator A is input of operator B) ended up with watermark of operator A > >> being less than watermark of operator B. I don't know how to explain > >> this. Can it be normal or does it signal a bug somewhere? If I > >> understand Flink's checkpointing correctly, the checkpoint barrier flows > >> from one operator to another, so the watermark should be aligned. > >> > >> I'm running a Beam pipeline on Flink 1.8.1. > >> > >> Am I missing something? > >> > >> Many thanks for comments, > >> > >> Jan > >> > >> > |
Actually, operator A is intermediate, source is preceding it.
On 8/7/19 2:44 PM, Kostas Kloudas wrote: > Hi Jan, > > After looking at the code, my point 1) is false for *intermediate* tasks > and if you are > using a watermark assigner. This means that in these cases, Flink checks > that the > "next" watermark is greater than the "previous" one. > > But if your operator A is a source and you emit watermarks from the source, > then > it can happen that your watermark appears to go backwards on operator A, > but > operator B does the "correction" by discarding smaller watermarks. That can > explain > your observation. > > Cheers, > Kostas > > On Wed, Aug 7, 2019 at 2:30 PM Jan Lukavský <[hidden email]> wrote: > >> Hi Kostas, >> >> thanks for reaction, comments inline. >> >> On 8/7/19 1:59 PM, Kostas Kloudas wrote: >>> Hi Jan, >>> >>> Two pointers that may help you explain the behavior are the following: >>> >>> 1) If you have a custom watermark generator, I do not think that Flink >>> checks if it emits only monotonically >>> increasing watermarks. This is the responsibility of the generator >> itself. >>> This means that although you operator A >>> is topologically before operator B, operator A may have a smaller >> watermark >>> if your watermark generator allows so. >> I do generate watermarks by custom source, but I believe that the >> generated sequence is monotonic. But still, I'm not sure, that even if >> it was the case, that the generated watermark actually decreases, would >> that mean, that downstream operator after source (operator A) would >> actually "go back in time"? >>> 2) Flink currently does not checkpoint the last seen watermark ( >>> https://issues.apache.org/jira/browse/FLINK-5601). >>> This means that after restoring, your (event) time is assumed to be >>> Long.Min until the first new watermark comes. >>> So if you observed late data not being late anymore or sth similar, then >> it >>> may not be that the two operators have >>> different watermarks but that after restoring event time rolls back to >> the >>> "beginning of time". >> I actually didn't observe any wrong or unexpected behavior, exceptions >> or wrong outputs. I just noticed this on Flink's WebUI and it looked >> strange to me. Could it be just that the WebUI showed older watermark >> for operator A? Strange was, that the watermarks were my screen long >> enough to take a screenshot (so at least say 10 seconds displaying >> watermark of operator A less than the one of operator B). Even if >> watermarks are not checkpointed, would it still be possible for >> watermark of operator B to be actually greater? I'm still confused of >> how this could happen, because (in my understanding) output watermark of >> operator A should be greater or equal to input watermark of B (because >> it takes minimum of inputs). >> >> Sorry if I'm too digging into this, but I don't like things I cannot >> explain, as they might point out to some bugs somewhere. :-) Or that my >> mental model it not aligned with reality. >> >> Jan >> >>> I hope this helps, >>> Kostas >>> >>> On Wed, Aug 7, 2019 at 12:11 PM Jan Lukavský <[hidden email]> wrote: >>> >>>> Hi all, >>>> >>>> I have just come across a weird state of operators after restore from >>>> checkpoint. After the restore, two operators that are connected (i.e. >>>> operator A is input of operator B) ended up with watermark of operator A >>>> being less than watermark of operator B. I don't know how to explain >>>> this. Can it be normal or does it signal a bug somewhere? If I >>>> understand Flink's checkpointing correctly, the checkpoint barrier flows >>>> from one operator to another, so the watermark should be aligned. >>>> >>>> I'm running a Beam pipeline on Flink 1.8.1. >>>> >>>> Am I missing something? >>>> >>>> Many thanks for comments, >>>> >>>> Jan >>>> >>>> |
But are they chained together? Could you provide the code from your job, at
least until operator A? On Wed, Aug 7, 2019 at 3:03 PM Jan Lukavský <[hidden email]> wrote: > Actually, operator A is intermediate, source is preceding it. > > On 8/7/19 2:44 PM, Kostas Kloudas wrote: > > Hi Jan, > > > > After looking at the code, my point 1) is false for *intermediate* tasks > > and if you are > > using a watermark assigner. This means that in these cases, Flink checks > > that the > > "next" watermark is greater than the "previous" one. > > > > But if your operator A is a source and you emit watermarks from the > source, > > then > > it can happen that your watermark appears to go backwards on operator A, > > but > > operator B does the "correction" by discarding smaller watermarks. That > can > > explain > > your observation. > > > > Cheers, > > Kostas > > > > On Wed, Aug 7, 2019 at 2:30 PM Jan Lukavský <[hidden email]> wrote: > > > >> Hi Kostas, > >> > >> thanks for reaction, comments inline. > >> > >> On 8/7/19 1:59 PM, Kostas Kloudas wrote: > >>> Hi Jan, > >>> > >>> Two pointers that may help you explain the behavior are the following: > >>> > >>> 1) If you have a custom watermark generator, I do not think that Flink > >>> checks if it emits only monotonically > >>> increasing watermarks. This is the responsibility of the generator > >> itself. > >>> This means that although you operator A > >>> is topologically before operator B, operator A may have a smaller > >> watermark > >>> if your watermark generator allows so. > >> I do generate watermarks by custom source, but I believe that the > >> generated sequence is monotonic. But still, I'm not sure, that even if > >> it was the case, that the generated watermark actually decreases, would > >> that mean, that downstream operator after source (operator A) would > >> actually "go back in time"? > >>> 2) Flink currently does not checkpoint the last seen watermark ( > >>> https://issues.apache.org/jira/browse/FLINK-5601). > >>> This means that after restoring, your (event) time is assumed to be > >>> Long.Min until the first new watermark comes. > >>> So if you observed late data not being late anymore or sth similar, > then > >> it > >>> may not be that the two operators have > >>> different watermarks but that after restoring event time rolls back to > >> the > >>> "beginning of time". > >> I actually didn't observe any wrong or unexpected behavior, exceptions > >> or wrong outputs. I just noticed this on Flink's WebUI and it looked > >> strange to me. Could it be just that the WebUI showed older watermark > >> for operator A? Strange was, that the watermarks were my screen long > >> enough to take a screenshot (so at least say 10 seconds displaying > >> watermark of operator A less than the one of operator B). Even if > >> watermarks are not checkpointed, would it still be possible for > >> watermark of operator B to be actually greater? I'm still confused of > >> how this could happen, because (in my understanding) output watermark of > >> operator A should be greater or equal to input watermark of B (because > >> it takes minimum of inputs). > >> > >> Sorry if I'm too digging into this, but I don't like things I cannot > >> explain, as they might point out to some bugs somewhere. :-) Or that my > >> mental model it not aligned with reality. > >> > >> Jan > >> > >>> I hope this helps, > >>> Kostas > >>> > >>> On Wed, Aug 7, 2019 at 12:11 PM Jan Lukavský <[hidden email]> wrote: > >>> > >>>> Hi all, > >>>> > >>>> I have just come across a weird state of operators after restore from > >>>> checkpoint. After the restore, two operators that are connected (i.e. > >>>> operator A is input of operator B) ended up with watermark of > operator A > >>>> being less than watermark of operator B. I don't know how to explain > >>>> this. Can it be normal or does it signal a bug somewhere? If I > >>>> understand Flink's checkpointing correctly, the checkpoint barrier > flows > >>>> from one operator to another, so the watermark should be aligned. > >>>> > >>>> I'm running a Beam pipeline on Flink 1.8.1. > >>>> > >>>> Am I missing something? > >>>> > >>>> Many thanks for comments, > >>>> > >>>> Jan > >>>> > >>>> > |
Code would be a little complicated, because it is wrapped with several
layers of other APIs (Beam being one of them, but there is also other layer). I can provide the job graph [1] a screenshot of the two watermarks [2]. The watermarks are taken from the two operators on bottom left. Essentially, the job reads from Google cloud storage and simultaneously from Kafka. On cloud storage are stored blob files containing historical events and these blobs are marked with event time range (e.g. file is named BLOB_EVENTS_TIMESTAMP1_TIMESTAMP2), and those timestamps are used to generate watermarks from the batch storage (files are read in sorted order). Does that help, or would you like more details? Jan [1] https://transfer.sh/v473f/jobgraph.png [2] https://transfer.sh/iDg1A/watermarks.png On 8/7/19 3:04 PM, Kostas Kloudas wrote: > But are they chained together? Could you provide the code from your job, at > least until operator A? > > On Wed, Aug 7, 2019 at 3:03 PM Jan Lukavský <[hidden email]> wrote: > >> Actually, operator A is intermediate, source is preceding it. >> >> On 8/7/19 2:44 PM, Kostas Kloudas wrote: >>> Hi Jan, >>> >>> After looking at the code, my point 1) is false for *intermediate* tasks >>> and if you are >>> using a watermark assigner. This means that in these cases, Flink checks >>> that the >>> "next" watermark is greater than the "previous" one. >>> >>> But if your operator A is a source and you emit watermarks from the >> source, >>> then >>> it can happen that your watermark appears to go backwards on operator A, >>> but >>> operator B does the "correction" by discarding smaller watermarks. That >> can >>> explain >>> your observation. >>> >>> Cheers, >>> Kostas >>> >>> On Wed, Aug 7, 2019 at 2:30 PM Jan Lukavský <[hidden email]> wrote: >>> >>>> Hi Kostas, >>>> >>>> thanks for reaction, comments inline. >>>> >>>> On 8/7/19 1:59 PM, Kostas Kloudas wrote: >>>>> Hi Jan, >>>>> >>>>> Two pointers that may help you explain the behavior are the following: >>>>> >>>>> 1) If you have a custom watermark generator, I do not think that Flink >>>>> checks if it emits only monotonically >>>>> increasing watermarks. This is the responsibility of the generator >>>> itself. >>>>> This means that although you operator A >>>>> is topologically before operator B, operator A may have a smaller >>>> watermark >>>>> if your watermark generator allows so. >>>> I do generate watermarks by custom source, but I believe that the >>>> generated sequence is monotonic. But still, I'm not sure, that even if >>>> it was the case, that the generated watermark actually decreases, would >>>> that mean, that downstream operator after source (operator A) would >>>> actually "go back in time"? >>>>> 2) Flink currently does not checkpoint the last seen watermark ( >>>>> https://issues.apache.org/jira/browse/FLINK-5601). >>>>> This means that after restoring, your (event) time is assumed to be >>>>> Long.Min until the first new watermark comes. >>>>> So if you observed late data not being late anymore or sth similar, >> then >>>> it >>>>> may not be that the two operators have >>>>> different watermarks but that after restoring event time rolls back to >>>> the >>>>> "beginning of time". >>>> I actually didn't observe any wrong or unexpected behavior, exceptions >>>> or wrong outputs. I just noticed this on Flink's WebUI and it looked >>>> strange to me. Could it be just that the WebUI showed older watermark >>>> for operator A? Strange was, that the watermarks were my screen long >>>> enough to take a screenshot (so at least say 10 seconds displaying >>>> watermark of operator A less than the one of operator B). Even if >>>> watermarks are not checkpointed, would it still be possible for >>>> watermark of operator B to be actually greater? I'm still confused of >>>> how this could happen, because (in my understanding) output watermark of >>>> operator A should be greater or equal to input watermark of B (because >>>> it takes minimum of inputs). >>>> >>>> Sorry if I'm too digging into this, but I don't like things I cannot >>>> explain, as they might point out to some bugs somewhere. :-) Or that my >>>> mental model it not aligned with reality. >>>> >>>> Jan >>>> >>>>> I hope this helps, >>>>> Kostas >>>>> >>>>> On Wed, Aug 7, 2019 at 12:11 PM Jan Lukavský <[hidden email]> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I have just come across a weird state of operators after restore from >>>>>> checkpoint. After the restore, two operators that are connected (i.e. >>>>>> operator A is input of operator B) ended up with watermark of >> operator A >>>>>> being less than watermark of operator B. I don't know how to explain >>>>>> this. Can it be normal or does it signal a bug somewhere? If I >>>>>> understand Flink's checkpointing correctly, the checkpoint barrier >> flows >>>>>> from one operator to another, so the watermark should be aligned. >>>>>> >>>>>> I'm running a Beam pipeline on Flink 1.8.1. >>>>>> >>>>>> Am I missing something? >>>>>> >>>>>> Many thanks for comments, >>>>>> >>>>>> Jan >>>>>> >>>>>> |
Hi Jan,
I am not sure what is happening. Operator A does not seem to be chained to the source (which produces the watermarks) so the check about increasing watermarks should be also applied there. BTW, I assume that bottom left you mean the one that starts with "activeDevices:takePresent..." (Op. A) and "activeDevices:stepLength..." (Op. B). I am wondering if it can be that the WebUi is not consistent across different operators. For example, the watermark of Op B was simply not updated in the WebUI. I also cc Chesnay who may have a better insight about the WebUi. Cheers, Kostas On Wed, Aug 7, 2019 at 3:25 PM Jan Lukavský <[hidden email]> wrote: > Code would be a little complicated, because it is wrapped with several > layers of other APIs (Beam being one of them, but there is also other > layer). > > I can provide the job graph [1] a screenshot of the two watermarks [2]. > The watermarks are taken from the two operators on bottom left. > > Essentially, the job reads from Google cloud storage and simultaneously > from Kafka. On cloud storage are stored blob files containing historical > events and these blobs are marked with event time range (e.g. file is > named BLOB_EVENTS_TIMESTAMP1_TIMESTAMP2), and those timestamps are used > to generate watermarks from the batch storage (files are read in sorted > order). > > Does that help, or would you like more details? > > Jan > > [1] https://transfer.sh/v473f/jobgraph.png > [2] https://transfer.sh/iDg1A/watermarks.png > > On 8/7/19 3:04 PM, Kostas Kloudas wrote: > > But are they chained together? Could you provide the code from your job, > at > > least until operator A? > > > > On Wed, Aug 7, 2019 at 3:03 PM Jan Lukavský <[hidden email]> wrote: > > > >> Actually, operator A is intermediate, source is preceding it. > >> > >> On 8/7/19 2:44 PM, Kostas Kloudas wrote: > >>> Hi Jan, > >>> > >>> After looking at the code, my point 1) is false for *intermediate* > tasks > >>> and if you are > >>> using a watermark assigner. This means that in these cases, Flink > checks > >>> that the > >>> "next" watermark is greater than the "previous" one. > >>> > >>> But if your operator A is a source and you emit watermarks from the > >> source, > >>> then > >>> it can happen that your watermark appears to go backwards on operator > A, > >>> but > >>> operator B does the "correction" by discarding smaller watermarks. That > >> can > >>> explain > >>> your observation. > >>> > >>> Cheers, > >>> Kostas > >>> > >>> On Wed, Aug 7, 2019 at 2:30 PM Jan Lukavský <[hidden email]> wrote: > >>> > >>>> Hi Kostas, > >>>> > >>>> thanks for reaction, comments inline. > >>>> > >>>> On 8/7/19 1:59 PM, Kostas Kloudas wrote: > >>>>> Hi Jan, > >>>>> > >>>>> Two pointers that may help you explain the behavior are the > following: > >>>>> > >>>>> 1) If you have a custom watermark generator, I do not think that > Flink > >>>>> checks if it emits only monotonically > >>>>> increasing watermarks. This is the responsibility of the generator > >>>> itself. > >>>>> This means that although you operator A > >>>>> is topologically before operator B, operator A may have a smaller > >>>> watermark > >>>>> if your watermark generator allows so. > >>>> I do generate watermarks by custom source, but I believe that the > >>>> generated sequence is monotonic. But still, I'm not sure, that even if > >>>> it was the case, that the generated watermark actually decreases, > would > >>>> that mean, that downstream operator after source (operator A) would > >>>> actually "go back in time"? > >>>>> 2) Flink currently does not checkpoint the last seen watermark ( > >>>>> https://issues.apache.org/jira/browse/FLINK-5601). > >>>>> This means that after restoring, your (event) time is assumed to be > >>>>> Long.Min until the first new watermark comes. > >>>>> So if you observed late data not being late anymore or sth similar, > >> then > >>>> it > >>>>> may not be that the two operators have > >>>>> different watermarks but that after restoring event time rolls back > to > >>>> the > >>>>> "beginning of time". > >>>> I actually didn't observe any wrong or unexpected behavior, exceptions > >>>> or wrong outputs. I just noticed this on Flink's WebUI and it looked > >>>> strange to me. Could it be just that the WebUI showed older watermark > >>>> for operator A? Strange was, that the watermarks were my screen long > >>>> enough to take a screenshot (so at least say 10 seconds displaying > >>>> watermark of operator A less than the one of operator B). Even if > >>>> watermarks are not checkpointed, would it still be possible for > >>>> watermark of operator B to be actually greater? I'm still confused of > >>>> how this could happen, because (in my understanding) output watermark > of > >>>> operator A should be greater or equal to input watermark of B (because > >>>> it takes minimum of inputs). > >>>> > >>>> Sorry if I'm too digging into this, but I don't like things I cannot > >>>> explain, as they might point out to some bugs somewhere. :-) Or that > my > >>>> mental model it not aligned with reality. > >>>> > >>>> Jan > >>>> > >>>>> I hope this helps, > >>>>> Kostas > >>>>> > >>>>> On Wed, Aug 7, 2019 at 12:11 PM Jan Lukavský <[hidden email]> > wrote: > >>>>> > >>>>>> Hi all, > >>>>>> > >>>>>> I have just come across a weird state of operators after restore > from > >>>>>> checkpoint. After the restore, two operators that are connected > (i.e. > >>>>>> operator A is input of operator B) ended up with watermark of > >> operator A > >>>>>> being less than watermark of operator B. I don't know how to explain > >>>>>> this. Can it be normal or does it signal a bug somewhere? If I > >>>>>> understand Flink's checkpointing correctly, the checkpoint barrier > >> flows > >>>>>> from one operator to another, so the watermark should be aligned. > >>>>>> > >>>>>> I'm running a Beam pipeline on Flink 1.8.1. > >>>>>> > >>>>>> Am I missing something? > >>>>>> > >>>>>> Many thanks for comments, > >>>>>> > >>>>>> Jan > >>>>>> > >>>>>> > |
Hi Kostas,
yes - operators A and B are as you said. The WebUI is what would explain in best, if it is anyhow possible, that it would display sort of stale information at one operator and updated on another. Jan On 8/7/19 3:59 PM, Kostas Kloudas wrote: > Hi Jan, > > I am not sure what is happening. Operator A does not seem to be chained to > the source (which produces the watermarks) so > the check about increasing watermarks should be also applied there. BTW, I > assume that bottom left you mean the one that > starts with "activeDevices:takePresent..." (Op. A) and > "activeDevices:stepLength..." (Op. B). > > I am wondering if it can be that the WebUi is not consistent across > different operators. > For example, the watermark of Op B was simply not updated in the WebUI. > > I also cc Chesnay who may have a better insight about the WebUi. > > Cheers, > Kostas > > > On Wed, Aug 7, 2019 at 3:25 PM Jan Lukavský <[hidden email]> wrote: > >> Code would be a little complicated, because it is wrapped with several >> layers of other APIs (Beam being one of them, but there is also other >> layer). >> >> I can provide the job graph [1] a screenshot of the two watermarks [2]. >> The watermarks are taken from the two operators on bottom left. >> >> Essentially, the job reads from Google cloud storage and simultaneously >> from Kafka. On cloud storage are stored blob files containing historical >> events and these blobs are marked with event time range (e.g. file is >> named BLOB_EVENTS_TIMESTAMP1_TIMESTAMP2), and those timestamps are used >> to generate watermarks from the batch storage (files are read in sorted >> order). >> >> Does that help, or would you like more details? >> >> Jan >> >> [1] https://transfer.sh/v473f/jobgraph.png >> [2] https://transfer.sh/iDg1A/watermarks.png >> >> On 8/7/19 3:04 PM, Kostas Kloudas wrote: >>> But are they chained together? Could you provide the code from your job, >> at >>> least until operator A? >>> >>> On Wed, Aug 7, 2019 at 3:03 PM Jan Lukavský <[hidden email]> wrote: >>> >>>> Actually, operator A is intermediate, source is preceding it. >>>> >>>> On 8/7/19 2:44 PM, Kostas Kloudas wrote: >>>>> Hi Jan, >>>>> >>>>> After looking at the code, my point 1) is false for *intermediate* >> tasks >>>>> and if you are >>>>> using a watermark assigner. This means that in these cases, Flink >> checks >>>>> that the >>>>> "next" watermark is greater than the "previous" one. >>>>> >>>>> But if your operator A is a source and you emit watermarks from the >>>> source, >>>>> then >>>>> it can happen that your watermark appears to go backwards on operator >> A, >>>>> but >>>>> operator B does the "correction" by discarding smaller watermarks. That >>>> can >>>>> explain >>>>> your observation. >>>>> >>>>> Cheers, >>>>> Kostas >>>>> >>>>> On Wed, Aug 7, 2019 at 2:30 PM Jan Lukavský <[hidden email]> wrote: >>>>> >>>>>> Hi Kostas, >>>>>> >>>>>> thanks for reaction, comments inline. >>>>>> >>>>>> On 8/7/19 1:59 PM, Kostas Kloudas wrote: >>>>>>> Hi Jan, >>>>>>> >>>>>>> Two pointers that may help you explain the behavior are the >> following: >>>>>>> 1) If you have a custom watermark generator, I do not think that >> Flink >>>>>>> checks if it emits only monotonically >>>>>>> increasing watermarks. This is the responsibility of the generator >>>>>> itself. >>>>>>> This means that although you operator A >>>>>>> is topologically before operator B, operator A may have a smaller >>>>>> watermark >>>>>>> if your watermark generator allows so. >>>>>> I do generate watermarks by custom source, but I believe that the >>>>>> generated sequence is monotonic. But still, I'm not sure, that even if >>>>>> it was the case, that the generated watermark actually decreases, >> would >>>>>> that mean, that downstream operator after source (operator A) would >>>>>> actually "go back in time"? >>>>>>> 2) Flink currently does not checkpoint the last seen watermark ( >>>>>>> https://issues.apache.org/jira/browse/FLINK-5601). >>>>>>> This means that after restoring, your (event) time is assumed to be >>>>>>> Long.Min until the first new watermark comes. >>>>>>> So if you observed late data not being late anymore or sth similar, >>>> then >>>>>> it >>>>>>> may not be that the two operators have >>>>>>> different watermarks but that after restoring event time rolls back >> to >>>>>> the >>>>>>> "beginning of time". >>>>>> I actually didn't observe any wrong or unexpected behavior, exceptions >>>>>> or wrong outputs. I just noticed this on Flink's WebUI and it looked >>>>>> strange to me. Could it be just that the WebUI showed older watermark >>>>>> for operator A? Strange was, that the watermarks were my screen long >>>>>> enough to take a screenshot (so at least say 10 seconds displaying >>>>>> watermark of operator A less than the one of operator B). Even if >>>>>> watermarks are not checkpointed, would it still be possible for >>>>>> watermark of operator B to be actually greater? I'm still confused of >>>>>> how this could happen, because (in my understanding) output watermark >> of >>>>>> operator A should be greater or equal to input watermark of B (because >>>>>> it takes minimum of inputs). >>>>>> >>>>>> Sorry if I'm too digging into this, but I don't like things I cannot >>>>>> explain, as they might point out to some bugs somewhere. :-) Or that >> my >>>>>> mental model it not aligned with reality. >>>>>> >>>>>> Jan >>>>>> >>>>>>> I hope this helps, >>>>>>> Kostas >>>>>>> >>>>>>> On Wed, Aug 7, 2019 at 12:11 PM Jan Lukavský <[hidden email]> >> wrote: >>>>>>>> Hi all, >>>>>>>> >>>>>>>> I have just come across a weird state of operators after restore >> from >>>>>>>> checkpoint. After the restore, two operators that are connected >> (i.e. >>>>>>>> operator A is input of operator B) ended up with watermark of >>>> operator A >>>>>>>> being less than watermark of operator B. I don't know how to explain >>>>>>>> this. Can it be normal or does it signal a bug somewhere? If I >>>>>>>> understand Flink's checkpointing correctly, the checkpoint barrier >>>> flows >>>>>>>> from one operator to another, so the watermark should be aligned. >>>>>>>> >>>>>>>> I'm running a Beam pipeline on Flink 1.8.1. >>>>>>>> >>>>>>>> Am I missing something? >>>>>>>> >>>>>>>> Many thanks for comments, >>>>>>>> >>>>>>>> Jan >>>>>>>> >>>>>>>> |
Free forum by Nabble | Edit this page |