Sort streams in windows

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Sort streams in windows

Евгений Юшин
Hi folks

I want to sort stream based on event time field derived from events. To do
this I can use one of the existing windows like TimeWindow to collect
events in a window of a particular size, or SlidingWindow to run sort logic
more often (and sort within slide).
Ideally, I want to sort events as fast as they pass watermark (with
out-of-order ts extractor). None of the current windows allow me to do
this. And I think to implement custom merging window similar to
SlidingWindow. Each element will be assigned to Window(event_ts,
event_ts+1), and then all windows with 'start < watermark' will be merged.
To implement this I need time service available in
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L232
Unfortunately, 'getCurrentProcessingTime'is only there for now.

I can pass function to extract timestamp to my new window extractor, but in
this case logic for calculation min watermark for
parallel/unioned/co-joined streams won't simply work.

@devs would you mind if I extend WindowAssignerContext with
 getCurrentWatermark or the whole time service reference?

Would be really glad to hear ypur concerns.

Regards,
Eugene
Reply | Threaded
Open this post in threaded view
|

Re: Sort streams in windows

Jan Lukavský
Hi Eugene,

I'd say that what you want essentially is not "sort in windows", because
(as you mention), you want to emit elements from windows as soon as
watermark passes some timestamp. Maybe a better approach would be to
implement this using stateful processing, where you keep a buffer of
(unsorted) inputs and setup a timer for minimal time of elements in the
buffer (plus allowed lateness), and the sort elements with timestamp <=
the timer (very ofter single elements). I'm actually working on this for
Apache Beam (design doc [1]), but this is still a work-in-progress.

Another drawback is that something like "sorted map state" will probably
be needed in order to efficiently query the state for minimal timestamp.
A less efficient implementation might work with ListState as well.

Jan

[1]
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing

On 6/14/19 3:58 PM, Евгений Юшин wrote:

> Hi folks
>
> I want to sort stream based on event time field derived from events. To do
> this I can use one of the existing windows like TimeWindow to collect
> events in a window of a particular size, or SlidingWindow to run sort logic
> more often (and sort within slide).
> Ideally, I want to sort events as fast as they pass watermark (with
> out-of-order ts extractor). None of the current windows allow me to do
> this. And I think to implement custom merging window similar to
> SlidingWindow. Each element will be assigned to Window(event_ts,
> event_ts+1), and then all windows with 'start < watermark' will be merged.
> To implement this I need time service available in
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L232
> Unfortunately, 'getCurrentProcessingTime'is only there for now.
>
> I can pass function to extract timestamp to my new window extractor, but in
> this case logic for calculation min watermark for
> parallel/unioned/co-joined streams won't simply work.
>
> @devs would you mind if I extend WindowAssignerContext with
>   getCurrentWatermark or the whole time service reference?
>
> Would be really glad to hear ypur concerns.
>
> Regards,
> Eugene
>
Reply | Threaded
Open this post in threaded view
|

Re: Sort streams in windows

Евгений Юшин
Hi Jan

Thanks for a quick reply.

Doing stateful transformation requires re-writing the same logic which is
already defined in Flink by itself. Let's consider example from my original
message:
There can be out-of-order data -> data should be propagated to next
operator only when watermark crosses out-of-order boundaries -> all records
with 'ts < watermark' should be pre-processed (e.g. sorted)

Stateful function: all records should be stored in state, for each new
record the whole state should be traversed to understand if out-of-order
events can be propagated further. For unioned streams there should be logic
to take min ts for each stream to compare, but info about which records
goes to which stream is already lost. State should be persisted, and this
adds some footprint during checkpoints.
Flink windows handle all these duties under the hood.

So I think Flink Windows (merging one for this particular case) interface
is a perfect fit for such kind of activities when pre-processing should be
done at first place.



пн, 17 июн. 2019 г. в 11:35, Jan Lukavský <[hidden email]>:

> Hi Eugene,
>
> I'd say that what you want essentially is not "sort in windows", because
> (as you mention), you want to emit elements from windows as soon as
> watermark passes some timestamp. Maybe a better approach would be to
> implement this using stateful processing, where you keep a buffer of
> (unsorted) inputs and setup a timer for minimal time of elements in the
> buffer (plus allowed lateness), and the sort elements with timestamp <=
> the timer (very ofter single elements). I'm actually working on this for
> Apache Beam (design doc [1]), but this is still a work-in-progress.
>
> Another drawback is that something like "sorted map state" will probably
> be needed in order to efficiently query the state for minimal timestamp.
> A less efficient implementation might work with ListState as well.
>
> Jan
>
> [1]
>
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing
>
> On 6/14/19 3:58 PM, Евгений Юшин wrote:
> > Hi folks
> >
> > I want to sort stream based on event time field derived from events. To
> do
> > this I can use one of the existing windows like TimeWindow to collect
> > events in a window of a particular size, or SlidingWindow to run sort
> logic
> > more often (and sort within slide).
> > Ideally, I want to sort events as fast as they pass watermark (with
> > out-of-order ts extractor). None of the current windows allow me to do
> > this. And I think to implement custom merging window similar to
> > SlidingWindow. Each element will be assigned to Window(event_ts,
> > event_ts+1), and then all windows with 'start < watermark' will be
> merged.
> > To implement this I need time service available in
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L232
> > Unfortunately, 'getCurrentProcessingTime'is only there for now.
> >
> > I can pass function to extract timestamp to my new window extractor, but
> in
> > this case logic for calculation min watermark for
> > parallel/unioned/co-joined streams won't simply work.
> >
> > @devs would you mind if I extend WindowAssignerContext with
> >   getCurrentWatermark or the whole time service reference?
> >
> > Would be really glad to hear ypur concerns.
> >
> > Regards,
> > Eugene
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Sort streams in windows

Fabian Hueske-2
Hi Eugene,

I agree with Jan. Using a ProcessFunction is the way to go.

ProcessFunction gives you all the tools you need:
* ListState which is very cheap to append to (and you only need to read the
ListState when you receive a watermark).
* Access to event timestamps, the current watermark and timers.

The ProcessFunction logic is rather easy to implement and should me much
better to reason about than orting and firing logic that is spread across a
window assigner, trigger, and window function.
Moreover, you would not need to extend the WindowAssignerContext.

Best, Fabian


Am Mo., 17. Juni 2019 um 15:40 Uhr schrieb Евгений Юшин <
[hidden email]>:

> Hi Jan
>
> Thanks for a quick reply.
>
> Doing stateful transformation requires re-writing the same logic which is
> already defined in Flink by itself. Let's consider example from my original
> message:
> There can be out-of-order data -> data should be propagated to next
> operator only when watermark crosses out-of-order boundaries -> all records
> with 'ts < watermark' should be pre-processed (e.g. sorted)
>
> Stateful function: all records should be stored in state, for each new
> record the whole state should be traversed to understand if out-of-order
> events can be propagated further. For unioned streams there should be logic
> to take min ts for each stream to compare, but info about which records
> goes to which stream is already lost. State should be persisted, and this
> adds some footprint during checkpoints.
> Flink windows handle all these duties under the hood.
>
> So I think Flink Windows (merging one for this particular case) interface
> is a perfect fit for such kind of activities when pre-processing should be
> done at first place.
>
>
>
> пн, 17 июн. 2019 г. в 11:35, Jan Lukavský <[hidden email]>:
>
> > Hi Eugene,
> >
> > I'd say that what you want essentially is not "sort in windows", because
> > (as you mention), you want to emit elements from windows as soon as
> > watermark passes some timestamp. Maybe a better approach would be to
> > implement this using stateful processing, where you keep a buffer of
> > (unsorted) inputs and setup a timer for minimal time of elements in the
> > buffer (plus allowed lateness), and the sort elements with timestamp <=
> > the timer (very ofter single elements). I'm actually working on this for
> > Apache Beam (design doc [1]), but this is still a work-in-progress.
> >
> > Another drawback is that something like "sorted map state" will probably
> > be needed in order to efficiently query the state for minimal timestamp.
> > A less efficient implementation might work with ListState as well.
> >
> > Jan
> >
> > [1]
> >
> >
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing
> >
> > On 6/14/19 3:58 PM, Евгений Юшин wrote:
> > > Hi folks
> > >
> > > I want to sort stream based on event time field derived from events. To
> > do
> > > this I can use one of the existing windows like TimeWindow to collect
> > > events in a window of a particular size, or SlidingWindow to run sort
> > logic
> > > more often (and sort within slide).
> > > Ideally, I want to sort events as fast as they pass watermark (with
> > > out-of-order ts extractor). None of the current windows allow me to do
> > > this. And I think to implement custom merging window similar to
> > > SlidingWindow. Each element will be assigned to Window(event_ts,
> > > event_ts+1), and then all windows with 'start < watermark' will be
> > merged.
> > > To implement this I need time service available in
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L232
> > > Unfortunately, 'getCurrentProcessingTime'is only there for now.
> > >
> > > I can pass function to extract timestamp to my new window extractor,
> but
> > in
> > > this case logic for calculation min watermark for
> > > parallel/unioned/co-joined streams won't simply work.
> > >
> > > @devs would you mind if I extend WindowAssignerContext with
> > >   getCurrentWatermark or the whole time service reference?
> > >
> > > Would be really glad to hear ypur concerns.
> > >
> > > Regards,
> > > Eugene
> > >
> >
>