Ordering in ProcessFunction after emitting multiple events from WindowFunction

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Ordering in ProcessFunction after emitting multiple events from WindowFunction

Michał Stępień
Hello,

I have dev related question.

How will Flink behave when processing events in WindowFunciton (version 1.3
or 1.4 including ProcessWindowFuncton) produces multiple events? Does Flink
guarantee that ProcessFunction will process them in order after window
outputs them in order into Collector for given key? Moreover, I understand
that Flink won't process next event in ProcessFunction as long as it won't
finish previous one for given key.

Code:

env.addSource(eventSource)
    .assignTimestampsAndWatermarks(timestampsAndWatermarksGenerator)
    .keyBy(FLINK_GROUPING_FIELD_NAME)

.window(TumblingEventTimeWindows.of(Time.milliseconds((WINDOW_DURATION_MILLIS))))
    .apply(windowFunction)
    .keyBy(FLINK_GROUPING_FIELD_NAME)
    .process(processFunction)
    .addSink(sinkFunction);

--
Pozdrawiam/Regards,
Michał

*Michał Stępień*
Developer
*freeportmetrics.com <http://freeportmetrics.com/>*  |  +48 22 253 25 13
<%2B48%20660%20507%20111>
Reply | Threaded
Open this post in threaded view
|

Re: Ordering in ProcessFunction after emitting multiple events from WindowFunction

Aljoscha Krettek-2
Hi,

I think if you're keying by the same key the order should be preserved. If not, then events from different upstream operations can arrive in arbitrary order.

Best,
Aljoscha

> On 12. Jan 2018, at 12:57, Michał Stępień <[hidden email]> wrote:
>
> Hello,
>
> I have dev related question.
>
> How will Flink behave when processing events in WindowFunciton (version 1.3
> or 1.4 including ProcessWindowFuncton) produces multiple events? Does Flink
> guarantee that ProcessFunction will process them in order after window
> outputs them in order into Collector for given key? Moreover, I understand
> that Flink won't process next event in ProcessFunction as long as it won't
> finish previous one for given key.
>
> Code:
>
> env.addSource(eventSource)
>    .assignTimestampsAndWatermarks(timestampsAndWatermarksGenerator)
>    .keyBy(FLINK_GROUPING_FIELD_NAME)
>
> .window(TumblingEventTimeWindows.of(Time.milliseconds((WINDOW_DURATION_MILLIS))))
>    .apply(windowFunction)
>    .keyBy(FLINK_GROUPING_FIELD_NAME)
>    .process(processFunction)
>    .addSink(sinkFunction);
>
> --
> Pozdrawiam/Regards,
> Michał
>
> *Michał Stępień*
> Developer
> *freeportmetrics.com <http://freeportmetrics.com/>*  |  +48 22 253 25 13
> <%2B48%20660%20507%20111>