Deterministic processing with out-of-order streams

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Deterministic processing with out-of-order streams

Jaromir Vanek
This post was updated on .
Is Flink processing really repeatedly deterministic when incoming stream of elements is out-of-order? How is it ensured?

I am aware of all the principles like event time and watermarking. But I can't understand how it works in case there are late elements in stream - that means there are elements violating the watermark condition - having lower timestamp than previously emitted watermark. From my point of view these elements will flow through the system without any mechanism that would discard them. Late elements then may, or may not, fall into existing windows.

Let's draw simple example reading from Kafka source with two partitions. Numbers are representing event time. Data from Kafka are shuffled to the one WindowOperator calculating sum (all elements have the same key).

                                                 ---------------------------------------|
part. 1   | ..., 15, 12, 9  |  ------->|                                               |
                                                |          WindowOperator              |
part. 2   | ..., 18, 6, 11  |  ------->|    (window maxTimestamp 10)    |
                                                -----------------------------------------

Elements can arrive to WindowOperator in arbitrary order

example1 (E denotes element, W denotes watermark)

E 9
W 9
E 11
W 11 (current watermark: min(9, 11) = 9)
E 6  
E 12
W 12 (current watermark: min(12, 11) = 11)
---------------
window(0, 10) fires with sum 15

example2:

E 9
W 9
E 11
W 11 (current watermark: min(9, 11) = 9)
E 12
W 12 (current watermark: min(12, 11) = 11)
---------------
window fires with sum 9


In my example result is not deterministic, it's more less random. Is there anything I am missing?

Thank you very much for explanation.
Reply | Threaded
Open this post in threaded view
|

Re: Deterministic processing with out-of-order streams

Till Rohrmann
Hi Jaromir,

deterministic processing with late elements is indeed more difficult than
without them. What you have to do is to send updates to your downstream
operators in case that you see late elements. This can either be an
incremental update or a retraction with the corrected value. It basically
depends on the operation you're performing. So in your example, you could
define an allowed lateness for which you keep the window contents around.
If the element with value 6 arrives within the allowed lateness (and after
the watermark) emit the corrected sum for the window. Your downstream
operators have to be able to handle updated windows, though. At the moment
this is something the user has to take care of and there is only little
support from Flink's side. But we're working on improving the late element
handling with [1].

[1] https://github.com/apache/flink/pull/2415

Cheers,
Till

On Fri, Nov 4, 2016 at 11:38 PM, Jaromir Vanek <[hidden email]>
wrote:

> Is Flink processing really repeatedly deterministic when incoming stream of
> elements is out-of-order? How is it ensured?
>
> I am aware of all the principles like event time and watermarking. But I
> can't understand how it works in case there are late elements in stream -
> that means there are elements violating the watermark condition - having
> lower timestamp than previously emitted watermark. From my point of view
> these elements will flow through the system without any mechanism that
> would
> discard them. Late elements then may, or may not, fall into existing
> windows.
>
> Let's draw simple example reading from Kafka source with two partitions.
> Numbers are representing event time. Data from Kafka are shuffled to the
> one
> WindowOperator calculating sum (all elements have the same key).
>
>
> ---------------------------------------|
> part. 1   | ..., 15, 12, 9  |  ------->|
> |
>                                                 |          WindowOperator
> |
> part. 2   | ..., 18, 6, 11  |  ------->|    (window maxTimestamp 10)    |
>
> -----------------------------------------
>
> Elements can arrive to WindowOperator in arbitrary order
>
> example1 (E denotes element, W denotes watermark)
>
> E 9
> W 9
> E 11
> W 11 (current watermark: min(9, 11) = 9)
> E 6
> E 12
> W 12 (current watermark: min(9, 12) = 12)
> ---------------
> window(0, 10) fires with sum 15
>
> example2:
>
> E 9
> W 9
> E 11
> W 11 (current watermark: min(9, 11) = 9)
> E 12
> W 12 (current watermark: min(9, 12) = 12)
> ---------------
> window fires with sum 9
>
>
> In my example result is not deterministic, it's more less random. Is there
> anything I am missing?
>
> Thank you very much for explanation.
>
>
>
> --
> View this message in context: http://apache-flink-mailing-
> list-archive.1008284.n3.nabble.com/Deterministic-
> processing-with-out-of-order-streams-tp14409.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: Deterministic processing with out-of-order streams

Jaromir Vanek
Hi Till, thank you for your answer.

I am afraid defining an allowed lateness won't help. It will just change the problem by constant time. If we agree an element can come in arbitrary time after watermark (depends on the network latency), it may be assigned to the window or may be not if comes before/after allowed lateness period expires. Then element may be counted in or discarded.

Still seems the results are not deterministic. In other words if I run the job reading from Kafka multiple times I may get different result depending on external conditions like network and cluster stability.

Please correct me if i'm wrong.

J.V.


Reply | Threaded
Open this post in threaded view
|

Re: Deterministic processing with out-of-order streams

Till Rohrmann
You're right if you want to guarantee a deterministic computation for an
arbitrary allowed lateness. In the general case, you would never be able to
calculate the final result of a window in a finite time, because there
might always be another element which arrives later. However, for most
practical use cases you can define an upper bound for the allowed lateness
which you can use to calculate your final result. If not, then you will
simply run out of storage capacity at some point of time, because you have
to keep some state around for this late element (in the general case).

Cheers,
Till

On Mon, Nov 7, 2016 at 5:55 PM, Jaromir Vanek <[hidden email]>
wrote:

> Hi Till, thank you for your answer.
>
> I am afraid defining an allowed lateness won't help. It will just change
> the
> problem by constant time. If we agree an element can come in arbitrary time
> after watermark (depends on the network latency), it may be assigned to the
> window or may be not if comes before/after allowed lateness period expires.
> Then element may be counted in or discarded.
>
> Still seems the results are not deterministic. In other words if I run the
> job reading from Kafka multiple times I may get different result depending
> on external conditions like network and cluster stability.
>
> Please correct me if i'm wrong.
>
> J.V.
>
>
>
>
>
>
> --
> View this message in context: http://apache-flink-mailing-
> list-archive.1008284.n3.nabble.com/Deterministic-
> processing-with-out-of-order-streams-tp14409p14422.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: Deterministic processing with out-of-order streams

Vinay Patil
Hi Jaromir,

You can make use of Custom Trigger and set the allowed lateness to max value.

I have kept the custom trigger code (EventTimeTrigger) same as Flink 1.0.3, doing this the late elements will not be discarded and they will be assigned single windows , now you can decide what you want to do with the late elements in the window evaluation function

This is what I have done since I cannot afford to loose any financial data.


Regards,
Vinay Patil

On Mon, Nov 7, 2016 at 10:58 PM, Till Rohrmann [via Apache Flink Mailing List archive.] <[hidden email]> wrote:
You're right if you want to guarantee a deterministic computation for an
arbitrary allowed lateness. In the general case, you would never be able to
calculate the final result of a window in a finite time, because there
might always be another element which arrives later. However, for most
practical use cases you can define an upper bound for the allowed lateness
which you can use to calculate your final result. If not, then you will
simply run out of storage capacity at some point of time, because you have
to keep some state around for this late element (in the general case).

Cheers,
Till

On Mon, Nov 7, 2016 at 5:55 PM, Jaromir Vanek <[hidden email]>
wrote:

> Hi Till, thank you for your answer.
>
> I am afraid defining an allowed lateness won't help. It will just change
> the
> problem by constant time. If we agree an element can come in arbitrary time
> after watermark (depends on the network latency), it may be assigned to the
> window or may be not if comes before/after allowed lateness period expires.
> Then element may be counted in or discarded.
>
> Still seems the results are not deterministic. In other words if I run the
> job reading from Kafka multiple times I may get different result depending
> on external conditions like network and cluster stability.
>
> Please correct me if i'm wrong.
>
> J.V.
>
>
>
>
>
>
> --
> View this message in context: http://apache-flink-mailing-
> list-archive.1008284.n3.nabble.com/Deterministic-
> processing-with-out-of-order-streams-tp14409p14422.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>



To start a new topic under Apache Flink Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink Mailing List archive., click here.
NAML