Hey Guys,
I have observed a weird behavior on using the Temporal Table Join and the way it pushes the Watermark forward. Generally, I think the question is *When is the Watermark pushed forward by the Temporal Table Join?* The issue I have noticed is that Watermark seems to be pushed forward even if elements are not generated, is that the expected behavior? I have created a simple test that takes two streams rates & ccyCodes. Now, I do temporal table Join of rates & ccyCodes in two ways : 1) SELECT ccyIsoCode, ccyIsoName, rate, rates_ts as ratesLong | FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime)) | WHERE ccyIsoCode = ratesCcyIsoCode 2) SELECT ccyIsoCode, ccyIsoName, rate, rates_rowtime as ratesTs | FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime)) | WHERE ccyIsoCode = ratesCcyIsoCode So, as You can see the only difference is the fact the in the first one I am selecting the field that is the timestamp but wasn't marked with *rowtime* in the second one I am selecting the actual *rowtime.* Now, I for the first method I need to reassign timestamps and watermarks, which I do. Finally, I create time windows of size let's say 7000 miliseconds and print the results. Now the unexpected behaviour I am facing is the fact that, say I create four artificial records for rates that are joined correctly with ccy with timestamps (1000, 5000, 8000, 20000) - For the first method there is only one window generated with elements [1000, 5000] - But for the second method with *rowtime* there are two different windows [1000, 5000] and [8000], this basically means that the watermark for 20000 was generated. Is that the expected behavior ?? I was quite surprised that selecting different field can actually yied different results in terms of Watermarking. Best Regards, Dom. |
Hi,
AFAIK there is no special watermark generation logic for temporal table join operator. Could you share your example's codes then I can help to analyze and debug? Best, Kurt On Tue, Mar 17, 2020 at 9:53 PM Dominik Wosiński <[hidden email]> wrote: > Hey Guys, > I have observed a weird behavior on using the Temporal Table Join and the > way it pushes the Watermark forward. Generally, I think the question is > *When > is the Watermark pushed forward by the Temporal Table Join?* > > The issue I have noticed is that Watermark seems to be pushed forward even > if elements are not generated, is that the expected behavior? > > I have created a simple test that takes two streams rates & ccyCodes. Now, > I do temporal table Join of rates & ccyCodes in two ways : > 1) SELECT ccyIsoCode, ccyIsoName, rate, rates_ts as ratesLong > > | FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime)) > | WHERE ccyIsoCode = ratesCcyIsoCode > > > > 2) SELECT ccyIsoCode, ccyIsoName, rate, rates_rowtime as ratesTs > > | FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime)) > | WHERE ccyIsoCode = ratesCcyIsoCode > > So, as You can see the only difference is the fact the in the first one I > am selecting the field that is the timestamp but wasn't marked with > *rowtime* in the second one I am selecting the actual *rowtime.* > > Now, I for the first method I need to reassign timestamps and watermarks, > which I do. Finally, I create time windows of size let's say 7000 > miliseconds and print the results. Now the unexpected behaviour I am facing > is the fact that, say I create four artificial records for rates that are > joined correctly with ccy with timestamps (1000, 5000, 8000, 20000) > > - For the first method there is only one window generated with elements > [1000, 5000] > - But for the second method with *rowtime* there are two different > windows [1000, 5000] and [8000], this basically means that the watermark > for 20000 was generated. > > Is that the expected behavior ?? I was quite surprised that selecting > different field can actually yied different results in terms of > Watermarking. > > Best Regards, > Dom. > |
Hey,
Sure, I have create something that can be called a minimal reproducible example. It's not the prettiest since it uses a lot of *Thread.sleep* but it allows to be sure that the input is exactly what you want. https://github.com/DomWos/FlinkTTF/tree/long-vs-timestamp In the long-vs-timestamp branch, there is a bunch of classes including two actual tests *TimestampPassingTTFLongSelect* & *TimestampPassingTTFTimestampSelect *both presents the same select and all operations are generally the same apart from the selected field. The output should be printed at the end, I didn't want to play much with Sinks since I think printing shows the issue better. Best Regards, Dom. śr., 18 mar 2020 o 08:13 Kurt Young <[hidden email]> napisał(a): > Hi, > > AFAIK there is no special watermark generation logic for temporal table > join operator. Could you share your example's codes then I can help to > analyze and debug? > > Best, > Kurt > > > On Tue, Mar 17, 2020 at 9:53 PM Dominik Wosiński <[hidden email]> wrote: > > > Hey Guys, > > I have observed a weird behavior on using the Temporal Table Join and the > > way it pushes the Watermark forward. Generally, I think the question is > > *When > > is the Watermark pushed forward by the Temporal Table Join?* > > > > The issue I have noticed is that Watermark seems to be pushed forward > even > > if elements are not generated, is that the expected behavior? > > > > I have created a simple test that takes two streams rates & ccyCodes. > Now, > > I do temporal table Join of rates & ccyCodes in two ways : > > 1) SELECT ccyIsoCode, ccyIsoName, rate, rates_ts as ratesLong > > > > | FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime)) > > | WHERE ccyIsoCode = ratesCcyIsoCode > > > > > > > > 2) SELECT ccyIsoCode, ccyIsoName, rate, rates_rowtime as ratesTs > > > > | FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime)) > > | WHERE ccyIsoCode = ratesCcyIsoCode > > > > So, as You can see the only difference is the fact the in the first one I > > am selecting the field that is the timestamp but wasn't marked with > > *rowtime* in the second one I am selecting the actual *rowtime.* > > > > Now, I for the first method I need to reassign timestamps and watermarks, > > which I do. Finally, I create time windows of size let's say 7000 > > miliseconds and print the results. Now the unexpected behaviour I am > facing > > is the fact that, say I create four artificial records for rates that are > > joined correctly with ccy with timestamps (1000, 5000, 8000, 20000) > > > > - For the first method there is only one window generated with > elements > > [1000, 5000] > > - But for the second method with *rowtime* there are two different > > windows [1000, 5000] and [8000], this basically means that the > watermark > > for 20000 was generated. > > > > Is that the expected behavior ?? I was quite surprised that selecting > > different field can actually yied different results in terms of > > Watermarking. > > > > Best Regards, > > Dom. > > > |
Hi, after looking at your code, i think i might find the root cause.
The reason is the additional `AssignerWithPeriodicWatermarks` [1] you added in the long version. Since the temporal table join could only get joined result of [3000, 6500, 8500], so the watermark this operator would generate will be 8499 [2], which won't be able to trigger the second window you wanted. As a comparison, in timestamp version, you didn't have this additional watermark generator. Although you can also only get same joined result [3000, 65000, 8500], but the watermark will be inherited from source, which will eventually be 19999 [3]. With such watermark, you will be able to tigger the second window you has. [1] https://github.com/DomWos/FlinkTTF/blob/long-vs-timestamp/src/test/java/TimestampPassingTTFLongSelect.scala#L71 [2] https://github.com/DomWos/FlinkTTF/blob/long-vs-timestamp/src/test/java/TimestampPassingTTFLongSelect.scala#L73 [3] https://github.com/DomWos/FlinkTTF/blob/long-vs-timestamp/src/test/java/TestUtils.scala#L119 Best, Kurt On Thu, Mar 19, 2020 at 11:20 PM Dominik Wosiński <[hidden email]> wrote: > Hey, > Sure, I have create something that can be called a minimal reproducible > example. It's not the prettiest since it uses a lot of *Thread.sleep* but > it allows to be sure that the input is exactly what you want. > https://github.com/DomWos/FlinkTTF/tree/long-vs-timestamp > > In the long-vs-timestamp branch, there is a bunch of classes including two > actual tests *TimestampPassingTTFLongSelect* & *TimestampPassingTTFTimestampSelect > *both presents the same select and all operations are generally the same > apart from the selected field. > The output should be printed at the end, I didn't want to play much with > Sinks since I think printing shows the issue better. > > > Best Regards, > Dom. > > > śr., 18 mar 2020 o 08:13 Kurt Young <[hidden email]> napisał(a): > >> Hi, >> >> AFAIK there is no special watermark generation logic for temporal table >> join operator. Could you share your example's codes then I can help to >> analyze and debug? >> >> Best, >> Kurt >> >> >> On Tue, Mar 17, 2020 at 9:53 PM Dominik Wosiński <[hidden email]> >> wrote: >> >> > Hey Guys, >> > I have observed a weird behavior on using the Temporal Table Join and >> the >> > way it pushes the Watermark forward. Generally, I think the question is >> > *When >> > is the Watermark pushed forward by the Temporal Table Join?* >> > >> > The issue I have noticed is that Watermark seems to be pushed forward >> even >> > if elements are not generated, is that the expected behavior? >> > >> > I have created a simple test that takes two streams rates & ccyCodes. >> Now, >> > I do temporal table Join of rates & ccyCodes in two ways : >> > 1) SELECT ccyIsoCode, ccyIsoName, rate, rates_ts as ratesLong >> > >> > | FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime)) >> > | WHERE ccyIsoCode = ratesCcyIsoCode >> > >> > >> > >> > 2) SELECT ccyIsoCode, ccyIsoName, rate, rates_rowtime as ratesTs >> > >> > | FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime)) >> > | WHERE ccyIsoCode = ratesCcyIsoCode >> > >> > So, as You can see the only difference is the fact the in the first one >> I >> > am selecting the field that is the timestamp but wasn't marked with >> > *rowtime* in the second one I am selecting the actual *rowtime.* >> > >> > Now, I for the first method I need to reassign timestamps and >> watermarks, >> > which I do. Finally, I create time windows of size let's say 7000 >> > miliseconds and print the results. Now the unexpected behaviour I am >> facing >> > is the fact that, say I create four artificial records for rates that >> are >> > joined correctly with ccy with timestamps (1000, 5000, 8000, 20000) >> > >> > - For the first method there is only one window generated with >> elements >> > [1000, 5000] >> > - But for the second method with *rowtime* there are two different >> > windows [1000, 5000] and [8000], this basically means that the >> watermark >> > for 20000 was generated. >> > >> > Is that the expected behavior ?? I was quite surprised that selecting >> > different field can actually yied different results in terms of >> > Watermarking. >> > >> > Best Regards, >> > Dom. >> > >> > |
Hey,
generally, that's what I thought more or less. I think I understand the behavior itself, thanks for explaining it to me. But what actually concerns me is the fact that this *assignTimestampsAndWatermarks* is required if You will select this Long field, which basically means that the type of selected field will influence internals and finally cause Flink to yield a little different results, which is not something that is expected IMHO. Anyway, thanks a lot for taking a look at the code! Best Regrads, Dom. |
Free forum by Nabble | Edit this page |