This post was updated on .
Is Flink processing really repeatedly deterministic when incoming stream of elements is out-of-order? How is it ensured?
I am aware of all the principles like event time and watermarking. But I can't understand how it works in case there are late elements in stream - that means there are elements violating the watermark condition - having lower timestamp than previously emitted watermark. From my point of view these elements will flow through the system without any mechanism that would discard them. Late elements then may, or may not, fall into existing windows. Let's draw simple example reading from Kafka source with two partitions. Numbers are representing event time. Data from Kafka are shuffled to the one WindowOperator calculating sum (all elements have the same key). ---------------------------------------| part. 1 | ..., 15, 12, 9 | ------->| | | WindowOperator | part. 2 | ..., 18, 6, 11 | ------->| (window maxTimestamp 10) | ----------------------------------------- Elements can arrive to WindowOperator in arbitrary order example1 (E denotes element, W denotes watermark) E 9 W 9 E 11 W 11 (current watermark: min(9, 11) = 9) E 6 E 12 W 12 (current watermark: min(12, 11) = 11) --------------- window(0, 10) fires with sum 15 example2: E 9 W 9 E 11 W 11 (current watermark: min(9, 11) = 9) E 12 W 12 (current watermark: min(12, 11) = 11) --------------- window fires with sum 9 In my example result is not deterministic, it's more less random. Is there anything I am missing? Thank you very much for explanation. |
Hi Jaromir,
deterministic processing with late elements is indeed more difficult than without them. What you have to do is to send updates to your downstream operators in case that you see late elements. This can either be an incremental update or a retraction with the corrected value. It basically depends on the operation you're performing. So in your example, you could define an allowed lateness for which you keep the window contents around. If the element with value 6 arrives within the allowed lateness (and after the watermark) emit the corrected sum for the window. Your downstream operators have to be able to handle updated windows, though. At the moment this is something the user has to take care of and there is only little support from Flink's side. But we're working on improving the late element handling with [1]. [1] https://github.com/apache/flink/pull/2415 Cheers, Till On Fri, Nov 4, 2016 at 11:38 PM, Jaromir Vanek <[hidden email]> wrote: > Is Flink processing really repeatedly deterministic when incoming stream of > elements is out-of-order? How is it ensured? > > I am aware of all the principles like event time and watermarking. But I > can't understand how it works in case there are late elements in stream - > that means there are elements violating the watermark condition - having > lower timestamp than previously emitted watermark. From my point of view > these elements will flow through the system without any mechanism that > would > discard them. Late elements then may, or may not, fall into existing > windows. > > Let's draw simple example reading from Kafka source with two partitions. > Numbers are representing event time. Data from Kafka are shuffled to the > one > WindowOperator calculating sum (all elements have the same key). > > > ---------------------------------------| > part. 1 | ..., 15, 12, 9 | ------->| > | > | WindowOperator > | > part. 2 | ..., 18, 6, 11 | ------->| (window maxTimestamp 10) | > > ----------------------------------------- > > Elements can arrive to WindowOperator in arbitrary order > > example1 (E denotes element, W denotes watermark) > > E 9 > W 9 > E 11 > W 11 (current watermark: min(9, 11) = 9) > E 6 > E 12 > W 12 (current watermark: min(9, 12) = 12) > --------------- > window(0, 10) fires with sum 15 > > example2: > > E 9 > W 9 > E 11 > W 11 (current watermark: min(9, 11) = 9) > E 12 > W 12 (current watermark: min(9, 12) = 12) > --------------- > window fires with sum 9 > > > In my example result is not deterministic, it's more less random. Is there > anything I am missing? > > Thank you very much for explanation. > > > > -- > View this message in context: http://apache-flink-mailing- > list-archive.1008284.n3.nabble.com/Deterministic- > processing-with-out-of-order-streams-tp14409.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. > |
Hi Till, thank you for your answer.
I am afraid defining an allowed lateness won't help. It will just change the problem by constant time. If we agree an element can come in arbitrary time after watermark (depends on the network latency), it may be assigned to the window or may be not if comes before/after allowed lateness period expires. Then element may be counted in or discarded. Still seems the results are not deterministic. In other words if I run the job reading from Kafka multiple times I may get different result depending on external conditions like network and cluster stability. Please correct me if i'm wrong. J.V. |
You're right if you want to guarantee a deterministic computation for an
arbitrary allowed lateness. In the general case, you would never be able to calculate the final result of a window in a finite time, because there might always be another element which arrives later. However, for most practical use cases you can define an upper bound for the allowed lateness which you can use to calculate your final result. If not, then you will simply run out of storage capacity at some point of time, because you have to keep some state around for this late element (in the general case). Cheers, Till On Mon, Nov 7, 2016 at 5:55 PM, Jaromir Vanek <[hidden email]> wrote: > Hi Till, thank you for your answer. > > I am afraid defining an allowed lateness won't help. It will just change > the > problem by constant time. If we agree an element can come in arbitrary time > after watermark (depends on the network latency), it may be assigned to the > window or may be not if comes before/after allowed lateness period expires. > Then element may be counted in or discarded. > > Still seems the results are not deterministic. In other words if I run the > job reading from Kafka multiple times I may get different result depending > on external conditions like network and cluster stability. > > Please correct me if i'm wrong. > > J.V. > > > > > > > -- > View this message in context: http://apache-flink-mailing- > list-archive.1008284.n3.nabble.com/Deterministic- > processing-with-out-of-order-streams-tp14409p14422.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. > |
Hi Jaromir, You can make use of Custom Trigger and set the allowed lateness to max value. I have kept the custom trigger code (EventTimeTrigger) same as Flink 1.0.3, doing this the late elements will not be discarded and they will be assigned single windows , now you can decide what you want to do with the late elements in the window evaluation function This is what I have done since I cannot afford to loose any financial data. Regards, Vinay Patil On Mon, Nov 7, 2016 at 10:58 PM, Till Rohrmann [via Apache Flink Mailing List archive.] <[hidden email]> wrote: You're right if you want to guarantee a deterministic computation for an |
Free forum by Nabble | Edit this page |