Watermark generation in Temporal Table Join

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Watermark generation in Temporal Table Join

Dominik Wosiński
Hey Guys,
I have observed a weird behavior on using the Temporal Table Join and the
way it pushes the Watermark forward. Generally, I think the question is *When
is the Watermark pushed forward by the Temporal Table Join?*

The issue I have noticed is that Watermark seems to be pushed forward even
if elements are not generated, is that the expected behavior?

I have created a simple test that takes two streams rates & ccyCodes. Now,
I do temporal table Join of rates & ccyCodes in two ways :
1) SELECT ccyIsoCode, ccyIsoName, rate, rates_ts as ratesLong

|  FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime))
|  WHERE ccyIsoCode = ratesCcyIsoCode



2) SELECT ccyIsoCode, ccyIsoName, rate, rates_rowtime as ratesTs

|  FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime))
|  WHERE ccyIsoCode = ratesCcyIsoCode

So, as You can see the only difference is the fact the in the first one I
am selecting the field that is the timestamp but wasn't marked with
*rowtime* in the second one I am selecting the actual *rowtime.*

Now, I for the first method I need to reassign timestamps and watermarks,
which I do.  Finally, I create time windows of size let's say 7000
miliseconds and print the results. Now the unexpected behaviour I am facing
is the fact that, say I create four artificial records for rates that are
joined correctly with ccy with timestamps (1000, 5000, 8000, 20000)

   - For the first method there is only one window generated with elements
   [1000, 5000]
   - But for the second method with *rowtime* there are two different
   windows [1000, 5000] and [8000], this basically means that the watermark
   for 20000 was generated.

Is that the expected behavior ?? I was quite surprised that selecting
different field can actually yied different results in terms of
Watermarking.

Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Watermark generation in Temporal Table Join

Kurt Young
Hi,

AFAIK there is no special watermark generation logic for temporal table
join operator. Could you share your example's codes then I can help to
analyze and debug?

Best,
Kurt


On Tue, Mar 17, 2020 at 9:53 PM Dominik Wosiński <[hidden email]> wrote:

> Hey Guys,
> I have observed a weird behavior on using the Temporal Table Join and the
> way it pushes the Watermark forward. Generally, I think the question is
> *When
> is the Watermark pushed forward by the Temporal Table Join?*
>
> The issue I have noticed is that Watermark seems to be pushed forward even
> if elements are not generated, is that the expected behavior?
>
> I have created a simple test that takes two streams rates & ccyCodes. Now,
> I do temporal table Join of rates & ccyCodes in two ways :
> 1) SELECT ccyIsoCode, ccyIsoName, rate, rates_ts as ratesLong
>
> |  FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime))
> |  WHERE ccyIsoCode = ratesCcyIsoCode
>
>
>
> 2) SELECT ccyIsoCode, ccyIsoName, rate, rates_rowtime as ratesTs
>
> |  FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime))
> |  WHERE ccyIsoCode = ratesCcyIsoCode
>
> So, as You can see the only difference is the fact the in the first one I
> am selecting the field that is the timestamp but wasn't marked with
> *rowtime* in the second one I am selecting the actual *rowtime.*
>
> Now, I for the first method I need to reassign timestamps and watermarks,
> which I do.  Finally, I create time windows of size let's say 7000
> miliseconds and print the results. Now the unexpected behaviour I am facing
> is the fact that, say I create four artificial records for rates that are
> joined correctly with ccy with timestamps (1000, 5000, 8000, 20000)
>
>    - For the first method there is only one window generated with elements
>    [1000, 5000]
>    - But for the second method with *rowtime* there are two different
>    windows [1000, 5000] and [8000], this basically means that the watermark
>    for 20000 was generated.
>
> Is that the expected behavior ?? I was quite surprised that selecting
> different field can actually yied different results in terms of
> Watermarking.
>
> Best Regards,
> Dom.
>
Reply | Threaded
Open this post in threaded view
|

Re: Watermark generation in Temporal Table Join

Dominik Wosiński
Hey,
Sure, I have create something that can be called a minimal reproducible
example. It's not the prettiest since it uses a lot of *Thread.sleep* but
it allows to be sure that the input is exactly what you want.
https://github.com/DomWos/FlinkTTF/tree/long-vs-timestamp

In the long-vs-timestamp branch, there is a bunch of classes including two
actual tests *TimestampPassingTTFLongSelect* &
*TimestampPassingTTFTimestampSelect
*both presents the same select and all operations are generally the same
apart from the selected field.
The output should be printed at the end, I didn't want to play much with
Sinks since I think printing shows the issue better.


Best Regards,
Dom.


śr., 18 mar 2020 o 08:13 Kurt Young <[hidden email]> napisał(a):

> Hi,
>
> AFAIK there is no special watermark generation logic for temporal table
> join operator. Could you share your example's codes then I can help to
> analyze and debug?
>
> Best,
> Kurt
>
>
> On Tue, Mar 17, 2020 at 9:53 PM Dominik Wosiński <[hidden email]> wrote:
>
> > Hey Guys,
> > I have observed a weird behavior on using the Temporal Table Join and the
> > way it pushes the Watermark forward. Generally, I think the question is
> > *When
> > is the Watermark pushed forward by the Temporal Table Join?*
> >
> > The issue I have noticed is that Watermark seems to be pushed forward
> even
> > if elements are not generated, is that the expected behavior?
> >
> > I have created a simple test that takes two streams rates & ccyCodes.
> Now,
> > I do temporal table Join of rates & ccyCodes in two ways :
> > 1) SELECT ccyIsoCode, ccyIsoName, rate, rates_ts as ratesLong
> >
> > |  FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime))
> > |  WHERE ccyIsoCode = ratesCcyIsoCode
> >
> >
> >
> > 2) SELECT ccyIsoCode, ccyIsoName, rate, rates_rowtime as ratesTs
> >
> > |  FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime))
> > |  WHERE ccyIsoCode = ratesCcyIsoCode
> >
> > So, as You can see the only difference is the fact the in the first one I
> > am selecting the field that is the timestamp but wasn't marked with
> > *rowtime* in the second one I am selecting the actual *rowtime.*
> >
> > Now, I for the first method I need to reassign timestamps and watermarks,
> > which I do.  Finally, I create time windows of size let's say 7000
> > miliseconds and print the results. Now the unexpected behaviour I am
> facing
> > is the fact that, say I create four artificial records for rates that are
> > joined correctly with ccy with timestamps (1000, 5000, 8000, 20000)
> >
> >    - For the first method there is only one window generated with
> elements
> >    [1000, 5000]
> >    - But for the second method with *rowtime* there are two different
> >    windows [1000, 5000] and [8000], this basically means that the
> watermark
> >    for 20000 was generated.
> >
> > Is that the expected behavior ?? I was quite surprised that selecting
> > different field can actually yied different results in terms of
> > Watermarking.
> >
> > Best Regards,
> > Dom.
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Watermark generation in Temporal Table Join

Kurt Young
Hi, after looking at your code, i think i might find the root cause.

The reason is the additional `AssignerWithPeriodicWatermarks` [1] you added
in the long version.
Since the temporal table join could only get joined result of [3000, 6500,
8500], so the watermark this operator would generate will be 8499 [2],
which won't be able to trigger the second window you wanted.

As a comparison, in timestamp version, you didn't have this
additional watermark generator. Although you can also only get same joined
result [3000, 65000, 8500], but the watermark will be inherited from
source, which will eventually be 19999 [3]. With such watermark, you will
be able to tigger the second window you has.

[1]
https://github.com/DomWos/FlinkTTF/blob/long-vs-timestamp/src/test/java/TimestampPassingTTFLongSelect.scala#L71
[2]
https://github.com/DomWos/FlinkTTF/blob/long-vs-timestamp/src/test/java/TimestampPassingTTFLongSelect.scala#L73
[3]
https://github.com/DomWos/FlinkTTF/blob/long-vs-timestamp/src/test/java/TestUtils.scala#L119

Best,
Kurt


On Thu, Mar 19, 2020 at 11:20 PM Dominik Wosiński <[hidden email]> wrote:

> Hey,
> Sure, I have create something that can be called a minimal reproducible
> example. It's not the prettiest since it uses a lot of *Thread.sleep* but
> it allows to be sure that the input is exactly what you want.
> https://github.com/DomWos/FlinkTTF/tree/long-vs-timestamp
>
> In the long-vs-timestamp branch, there is a bunch of classes including two
> actual tests *TimestampPassingTTFLongSelect* & *TimestampPassingTTFTimestampSelect
> *both presents the same select and all operations are generally the same
> apart from the selected field.
> The output should be printed at the end, I didn't want to play much with
> Sinks since I think printing shows the issue better.
>
>
> Best Regards,
> Dom.
>
>
> śr., 18 mar 2020 o 08:13 Kurt Young <[hidden email]> napisał(a):
>
>> Hi,
>>
>> AFAIK there is no special watermark generation logic for temporal table
>> join operator. Could you share your example's codes then I can help to
>> analyze and debug?
>>
>> Best,
>> Kurt
>>
>>
>> On Tue, Mar 17, 2020 at 9:53 PM Dominik Wosiński <[hidden email]>
>> wrote:
>>
>> > Hey Guys,
>> > I have observed a weird behavior on using the Temporal Table Join and
>> the
>> > way it pushes the Watermark forward. Generally, I think the question is
>> > *When
>> > is the Watermark pushed forward by the Temporal Table Join?*
>> >
>> > The issue I have noticed is that Watermark seems to be pushed forward
>> even
>> > if elements are not generated, is that the expected behavior?
>> >
>> > I have created a simple test that takes two streams rates & ccyCodes.
>> Now,
>> > I do temporal table Join of rates & ccyCodes in two ways :
>> > 1) SELECT ccyIsoCode, ccyIsoName, rate, rates_ts as ratesLong
>> >
>> > |  FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime))
>> > |  WHERE ccyIsoCode = ratesCcyIsoCode
>> >
>> >
>> >
>> > 2) SELECT ccyIsoCode, ccyIsoName, rate, rates_rowtime as ratesTs
>> >
>> > |  FROM RatesTable, LATERAL TABLE(ccyTable(rates_rowtime))
>> > |  WHERE ccyIsoCode = ratesCcyIsoCode
>> >
>> > So, as You can see the only difference is the fact the in the first one
>> I
>> > am selecting the field that is the timestamp but wasn't marked with
>> > *rowtime* in the second one I am selecting the actual *rowtime.*
>> >
>> > Now, I for the first method I need to reassign timestamps and
>> watermarks,
>> > which I do.  Finally, I create time windows of size let's say 7000
>> > miliseconds and print the results. Now the unexpected behaviour I am
>> facing
>> > is the fact that, say I create four artificial records for rates that
>> are
>> > joined correctly with ccy with timestamps (1000, 5000, 8000, 20000)
>> >
>> >    - For the first method there is only one window generated with
>> elements
>> >    [1000, 5000]
>> >    - But for the second method with *rowtime* there are two different
>> >    windows [1000, 5000] and [8000], this basically means that the
>> watermark
>> >    for 20000 was generated.
>> >
>> > Is that the expected behavior ?? I was quite surprised that selecting
>> > different field can actually yied different results in terms of
>> > Watermarking.
>> >
>> > Best Regards,
>> > Dom.
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Watermark generation in Temporal Table Join

Dominik Wosiński
Hey,
generally, that's what I thought more or less. I think I understand the
behavior itself, thanks for explaining it to me.

But what actually concerns me is the fact that this
*assignTimestampsAndWatermarks* is required if You will select this Long
field, which basically means that the type of selected field will influence
internals and finally cause Flink to yield a little different results,
which is not something that is expected IMHO.

Anyway, thanks a lot for taking a look at the code!

Best Regrads,
Dom.