Watermark alignment during unit tests

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Watermark alignment during unit tests

Евгений Юшин
Hi devs

During the work on https://issues.apache.org/jira/browse/FLINK-10050 I've
found unstable behaviour of unit tests for unioned streams (which are used
in CoGroupedStream/JoinedStream under the hood).
Let's assume we have late elements in one of the stream. The thing is we
have no guarantees which source will be read first, and in which order
watermark alignment will be applied. So, the following example produce
different results for different invocation:

     val s1 = env.addSource(new SourceFunction[(String, String)] {
        override def run(ctx: SourceFunction.SourceContext[(String,
String)]): Unit = {
          ctx.collectWithTimestamp(("a", "a1"), 1)
          //wmAllignmentLock.synchronized {
              //wmAllignmentLock.wait()
          //}
          ctx.emitWatermark(new Watermark(4))
          ctx.collectWithTimestamp(("a", "a2"), 2)
        }

        override def cancel(): Unit = {}
      })

      val s2 = env.addSource(new SourceFunction[(String, String)] {
        override def run(ctx: SourceFunction.SourceContext[(String,
String)]): Unit = {
          ctx.collectWithTimestamp(("a", "b1"), 1)
          ctx.emitWatermark(new Watermark(4))
          //wmAllignmentLock.synchronized {
              //wmAllignmentLock.notifyAll()
          //}
        }

        override def cancel(): Unit = {}
      })

      val joined = s1.join(s2).where(_._1).equalTo(_._1)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(3)))
        .apply((x, y) => s"$x:$y")
For some invocations (when Flink decide to process 2nd source before
1st), ("a", "a2") is considered to be late and dropped; and vice
versa.Here is the rate for 1000 invocations:
Run JOIN periodic
iteration [50] contains late total = 22, this iter = 22
iteration [100] contains late total = 51, this iter = 29
iteration [150] contains late total = 78, this iter = 27
iteration [200] contains late total = 101, this iter = 23
iteration [250] contains late total = 124, this iter = 23
iteration [300] contains late total = 155, this iter = 31
iteration [350] contains late total = 184, this iter = 29
iteration [400] contains late total = 210, this iter = 26
iteration [450] contains late total = 233, this iter = 23
iteration [500] contains late total = 256, this iter = 23
iteration [550] contains late total = 274, this iter = 18
iteration [600] contains late total = 303, this iter = 29
iteration [650] contains late total = 338, this iter = 35
iteration [700] contains late total = 367, this iter = 29
iteration [750] contains late total = 393, this iter = 26
iteration [800] contains late total = 415, this iter = 22
iteration [850] contains late total = 439, this iter = 24
iteration [900] contains late total = 459, this iter = 20
iteration [950] contains late total = 484, this iter = 25
iteration [1000] contains late total = 502, this iter = 18
contains late = 502


It doesn't matter Periodic or Punctuated watermark assigner is used.
As well as syncronization mechanism (commented in the code snippet
above) doesn't help to align records in particular order.

While this behaviour is totally fine for Production case, I just
wonder how to write stable unit test scenario to cover late elements
processing.
I didn't find any suitable test harness from utils.

Any feedback is appreciated!

Regards,
Eugen
Reply | Threaded
Open this post in threaded view
|

Re: Watermark alignment during unit tests

Kostas Kloudas
Hi Eugen,

It is true that for ITcases this can be difficult and this should be improved in Flink’s testing infrastructure,
but for this specific PR, what you need to check is if the allowedLateness parameter is propagated correctly
throughout the translation process. The window operator with allowed lateness (which is applied next)
is covered by other tests.

In this case I would recommend to do the Joining of the stream “manually”, i.e.:

input1.coGroup(input2)
      .where(keySelector1)
      .equalTo(keySelector2)
      .window(windowAssigner)

and then from the resulting WithWindow, just try to get the allowed lateness and verify that this is the
value that you provided.

This will cover the propagation and make sure that nobody breaks it in the future.

Cheers,
Kostas


> On Sep 18, 2018, at 11:40 AM, Евгений Юшин <[hidden email]> wrote:
>
> Hi devs
>
> During the work on https://issues.apache.org/jira/browse/FLINK-10050 I've
> found unstable behaviour of unit tests for unioned streams (which are used
> in CoGroupedStream/JoinedStream under the hood).
> Let's assume we have late elements in one of the stream. The thing is we
> have no guarantees which source will be read first, and in which order
> watermark alignment will be applied. So, the following example produce
> different results for different invocation:
>
>     val s1 = env.addSource(new SourceFunction[(String, String)] {
>        override def run(ctx: SourceFunction.SourceContext[(String,
> String)]): Unit = {
>          ctx.collectWithTimestamp(("a", "a1"), 1)
>          //wmAllignmentLock.synchronized {
>              //wmAllignmentLock.wait()
>          //}
>          ctx.emitWatermark(new Watermark(4))
>          ctx.collectWithTimestamp(("a", "a2"), 2)
>        }
>
>        override def cancel(): Unit = {}
>      })
>
>      val s2 = env.addSource(new SourceFunction[(String, String)] {
>        override def run(ctx: SourceFunction.SourceContext[(String,
> String)]): Unit = {
>          ctx.collectWithTimestamp(("a", "b1"), 1)
>          ctx.emitWatermark(new Watermark(4))
>          //wmAllignmentLock.synchronized {
>              //wmAllignmentLock.notifyAll()
>          //}
>        }
>
>        override def cancel(): Unit = {}
>      })
>
>      val joined = s1.join(s2).where(_._1).equalTo(_._1)
>        .window(TumblingEventTimeWindows.of(Time.milliseconds(3)))
>        .apply((x, y) => s"$x:$y")
> For some invocations (when Flink decide to process 2nd source before
> 1st), ("a", "a2") is considered to be late and dropped; and vice
> versa.Here is the rate for 1000 invocations:
> Run JOIN periodic
> iteration [50] contains late total = 22, this iter = 22
> iteration [100] contains late total = 51, this iter = 29
> iteration [150] contains late total = 78, this iter = 27
> iteration [200] contains late total = 101, this iter = 23
> iteration [250] contains late total = 124, this iter = 23
> iteration [300] contains late total = 155, this iter = 31
> iteration [350] contains late total = 184, this iter = 29
> iteration [400] contains late total = 210, this iter = 26
> iteration [450] contains late total = 233, this iter = 23
> iteration [500] contains late total = 256, this iter = 23
> iteration [550] contains late total = 274, this iter = 18
> iteration [600] contains late total = 303, this iter = 29
> iteration [650] contains late total = 338, this iter = 35
> iteration [700] contains late total = 367, this iter = 29
> iteration [750] contains late total = 393, this iter = 26
> iteration [800] contains late total = 415, this iter = 22
> iteration [850] contains late total = 439, this iter = 24
> iteration [900] contains late total = 459, this iter = 20
> iteration [950] contains late total = 484, this iter = 25
> iteration [1000] contains late total = 502, this iter = 18
> contains late = 502
>
>
> It doesn't matter Periodic or Punctuated watermark assigner is used.
> As well as syncronization mechanism (commented in the code snippet
> above) doesn't help to align records in particular order.
>
> While this behaviour is totally fine for Production case, I just
> wonder how to write stable unit test scenario to cover late elements
> processing.
> I didn't find any suitable test harness from utils.
>
> Any feedback is appreciated!
>
> Regards,
> Eugen