[DISCUSS] Improvement to Flink Window Operator with Slicing

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Improvement to Flink Window Operator with Slicing

Rong Rong
Hi all,

Various discussion in the mailing list & JIRA tickets [2] had been brought
up in the past regarding the windowing operation performance. As we
experiment internally with some of our extreme use cases, we found out that
using a slice-based implementation can optimize Flink's windowing mechanism
and provide a better performance in most cases.

We've put together a preliminary enhancement and performance optimization
plan [1] for the current windowing operation in Flink. This is largely
inspired by stream slicing research shared in recent Flink Forward
conference [3] by Philip and Jonas, and the discussion in the main JIRA
ticket [2]. The initial design and POC implementations consider optimizing
the performance for the category of overlapping windows as well as allowing
chaining of cascade window operators.

It will be great to hear the feedbacks and suggestions from the community.
Please kindly share your comments and suggestions.

Thanks,
Rong

[1]
https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing
[2] https://issues.apache.org/jira/browse/FLINK-7001
[3]
https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improvement to Flink Window Operator with Slicing

Rong Rong
Hi all,

Thanks for the feedbacks and suggestions to the design doc. I have created
a parent JIRA [1] to track the related tasks and started the implementation
process
Any further feedbacks or suggestions are highly appreciated.

Best,
Rong

[1] https://issues.apache.org/jira/browse/FLINK-11276

On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <[hidden email]> wrote:

> Hi all,
>
> Various discussion in the mailing list & JIRA tickets [2] had been brought
> up in the past regarding the windowing operation performance. As we
> experiment internally with some of our extreme use cases, we found out that
> using a slice-based implementation can optimize Flink's windowing mechanism
> and provide a better performance in most cases.
>
> We've put together a preliminary enhancement and performance optimization
> plan [1] for the current windowing operation in Flink. This is largely
> inspired by stream slicing research shared in recent Flink Forward
> conference [3] by Philip and Jonas, and the discussion in the main JIRA
> ticket [2]. The initial design and POC implementations consider optimizing
> the performance for the category of overlapping windows as well as allowing
> chaining of cascade window operators.
>
> It will be great to hear the feedbacks and suggestions from the community.
> Please kindly share your comments and suggestions.
>
> Thanks,
> Rong
>
> [1]
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing
> [2] https://issues.apache.org/jira/browse/FLINK-7001
> [3]
> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improvement to Flink Window Operator with Slicing

Fabian Hueske-2
Thank you Rong!
The performance of sliding windows is an issue for many users.
Adding support for a more efficient window is a great effort.

Thank you,
Fabian

Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong <[hidden email]>:

> Hi all,
>
> Thanks for the feedbacks and suggestions to the design doc. I have created
> a parent JIRA [1] to track the related tasks and started the implementation
> process
> Any further feedbacks or suggestions are highly appreciated.
>
> Best,
> Rong
>
> [1] https://issues.apache.org/jira/browse/FLINK-11276
>
> On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <[hidden email]> wrote:
>
> > Hi all,
> >
> > Various discussion in the mailing list & JIRA tickets [2] had been
> brought
> > up in the past regarding the windowing operation performance. As we
> > experiment internally with some of our extreme use cases, we found out
> that
> > using a slice-based implementation can optimize Flink's windowing
> mechanism
> > and provide a better performance in most cases.
> >
> > We've put together a preliminary enhancement and performance optimization
> > plan [1] for the current windowing operation in Flink. This is largely
> > inspired by stream slicing research shared in recent Flink Forward
> > conference [3] by Philip and Jonas, and the discussion in the main JIRA
> > ticket [2]. The initial design and POC implementations consider
> optimizing
> > the performance for the category of overlapping windows as well as
> allowing
> > chaining of cascade window operators.
> >
> > It will be great to hear the feedbacks and suggestions from the
> community.
> > Please kindly share your comments and suggestions.
> >
> > Thanks,
> > Rong
> >
> > [1]
> >
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing
> > [2] https://issues.apache.org/jira/browse/FLINK-7001
> > [3]
> >
> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
> >
> >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improvement to Flink Window Operator with Slicing

Kurt Young
Hi Rong,

Thanks for the improvement proposal, this topic aroused my interest since
we did some similar improvements in Blink.
After going through your design doc, i would like share some thoughts:

1. It looks to me the proposed SliceManager and MergeManager is quite
generic and can be used in various situations like unaligned windows.
However
this will introduce some new API and some new stream operators, and thus
make is a big effort to do. I suppose the main reason to introduce these
concepts is
we want to support some complex scenarios like unaligned windows, multi
timeout session windows and so on. But if let's go back to the original
motivation of this
improvement, it will be "Highly overlapping window operation" and "Window
functions with efficient methods calculating and merging partial results",
just as you mentioned
in the section "Target Usage Pattern". I'm curious if it's necessary to
introduce all these new APIs and operators to meet these requirements. If i
understand you correctly,
we may even can achieve the goal by using two successive windows, first
with tumbling window, and then sliding window. And from the experiences we
had in Blink, by adding
some enhancements to original window operator may also be sufficient. Does
it make sense to you if we can first try to improve the target scenario
without adding new APIs and
 operators? It will definitely make it more easy to review and merged.

2. From what we observed after Blink did similar improvements, slice or
pane based improvement is not a silver bullet for all kinds of situations.
In most cases, we only
observed small performance gain like 20% or 30%. The main reason is, if we
want get big gain from this improvement, normally we require the window
function have efficient
calculating and merging methods, like SUM, COUNT. If this assumption
stands, the basic approach of window operator will also be performant. It
stored compact intermedia result for
each window per key, and it's also very efficient to get result based on
this. The only downside is we need to do the calculation in multiply
windows per key, but based on the
assumption, the calculation is efficient and may not be a blocker. Based on
this, i think we must be more careful about the changes, especially API
related.

3. The last thing in my mind is whether we can share some implementations
with blink's window operator. We introduced a new window operator in Blink
SQL, which i think needs to be
discussed when we try to adopt blink improvements. Without strong reasons,
i think it's better to share some implementations with DataStream's window
operator. Maybe we don't need
 to share the whole operator, but at least some underlying concepts and
data structures. It would also be great if you can think about it and see
if it's a feasible way.

Last small question: how do you handle sliding windows whose window size
can't be divided by step, such as 10 seconds window and slide with 3
seconds?

Best,
Kurt


On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <[hidden email]> wrote:

> Thank you Rong!
> The performance of sliding windows is an issue for many users.
> Adding support for a more efficient window is a great effort.
>
> Thank you,
> Fabian
>
> Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong <[hidden email]
> >:
>
> > Hi all,
> >
> > Thanks for the feedbacks and suggestions to the design doc. I have
> created
> > a parent JIRA [1] to track the related tasks and started the
> implementation
> > process
> > Any further feedbacks or suggestions are highly appreciated.
> >
> > Best,
> > Rong
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-11276
> >
> > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <[hidden email]> wrote:
> >
> > > Hi all,
> > >
> > > Various discussion in the mailing list & JIRA tickets [2] had been
> > brought
> > > up in the past regarding the windowing operation performance. As we
> > > experiment internally with some of our extreme use cases, we found out
> > that
> > > using a slice-based implementation can optimize Flink's windowing
> > mechanism
> > > and provide a better performance in most cases.
> > >
> > > We've put together a preliminary enhancement and performance
> optimization
> > > plan [1] for the current windowing operation in Flink. This is largely
> > > inspired by stream slicing research shared in recent Flink Forward
> > > conference [3] by Philip and Jonas, and the discussion in the main JIRA
> > > ticket [2]. The initial design and POC implementations consider
> > optimizing
> > > the performance for the category of overlapping windows as well as
> > allowing
> > > chaining of cascade window operators.
> > >
> > > It will be great to hear the feedbacks and suggestions from the
> > community.
> > > Please kindly share your comments and suggestions.
> > >
> > > Thanks,
> > > Rong
> > >
> > > [1]
> > >
> >
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing
> > > [2] https://issues.apache.org/jira/browse/FLINK-7001
> > > [3]
> > >
> >
> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
> > >
> > >
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improvement to Flink Window Operator with Slicing

Rong Rong
Hi All,

Thanks for sharing feedbacks for the window optimization design doc and on
the discussion JIRAs @Jincheng, @Kurt, @Jark and @Fabian. These are very
valuable feedbacks and we will try to incorporate them in the next step.

There were several revision done for the current design doc, and several
POCs being developed since we initially shared the document. Thus, some of
the following might’ve already been addressed in other places. However, I
would still like to summarize them since these points were raised up in
various scenarios.

1) Scope of the window optimization using slicing.

The original scope of the doc was to address the problem of
element-to-window duplication when sliding window goes across wide range
with narrow slides (with or w/o efficient partial aggregations). However,
as we develop the 2 POCs [1,2] we found out there’s really no
one-solution-fits-all approach to how this can be optimized (same
observation as Kurt and Jark mentioned). Thus some further expansion of the
scope was done:

1a). Directly improving WindowOperator([1, 3])

In the design doc, the PartialWindowFunction was designed as a cue for
WindowOperator to choose how to optimize the sliding window. This was not
working well because: how efficient window operator process messages
depends on: 1. the pattern of the window; 2. the pattern of the incoming
traffic; and 3. the complexity of the window process function.

The idea of having users to specify the complexity of each API (e.g.
accumulate, merge, retract) can help the system choose the path of
optimization, however as this stage the it is hard to utilize this
information.

1b). Explicitly allow slicing and merging steps([2]), or more generic
window chaining:

With the second POC, it is users' responsibility to create an efficient
window operator(s), since now user can control the state and the process
individually. This opens up more possibilities: e.g. unaligned /
multi-timeout session window, cascade windows.

However, with this API change, users are given much more flexibilities.

In conclusion to (1), I think one of the clear feedbacks is that this
effort goes beyond just improving the sliding window performance. If we
think it is the right way to expand the window API, there should be a
proper API change discussion: As Kurt mentioned, if any of these cases can
be covered with existing API, we should always exhaust that first. (Further
discussion in (2))


2). The motivation of changing the public DataStream APIs related to
window-based processing.

As Jincheng and Kurt mentioned, utilizing the current windowed stream API
can achieve some of the results. For example, using a tumble follow by a
sliding window can reduce the amount of duplication since the tumbling
window results will be passed to the sliding window as one element.
However, when we experiment, the are several problems I observed:

2a). rehashing with extra keyBy

The limitation requires a separate set of keyBy following the tumble
result, which creates unnecessary complexity. One work around was to use
"reinterpretAsKeyedStream" as Fabian mentioned, however there are many
investigation to be done to ensure it works well (e.g. how does the keyed
state store work since the key was not sets as a part of the namespace
within WindowOperator)

2b). Handling iterable elements

Another problem is if there’s no efficient partial results produce by the
tumbling window, the output element (as an iterable) will still be
duplicated into each of the downstream sliding window states.

However, this is actually a tricky problem by itself, further discussion in
(2d) for this piece.

2c). Additional support for some of the more complex use cases

As described in (1b), with the explicit control of the window state and the
window function, users can control how the optimization of the windowing
can be done. One will argue that these type of optimization can also be
done using ProcessFunction or other rich functions.

However, if we take a look at how the WindowOperator is currently
structured, it also has explicit aggregation + windowing APIs [4] presented
for the WindowedStream.

Another point is opening up: multi-slide windowing and multi-timeout window
described in the presentation [5] in FF.

I would also like to raise a point that the way how the current POC [2]
changes the public API is just a proof of concept that the flexibilities
opens more complex use cases. It is by no means the final design and I
agree that there could be more-aggressive-than-necessary changes since it’s
meant for a POC.

2d). The support for same-window operator (similar to Table/SQL
OVER-aggregate)

As discussed with Jincheng in a separate thread. Supporting the same window
operator similar to how OVER-aggregate was achieved in Table/SQL API. It is
definitely useful in some of the window use cases (2b). For example, A
process function without efficient partial aggregation results on sliding
window can be very efficiently evaluated since both add and retract
operation on an iterable list is cheap.

There are some fundamental difference between the current window operator
vs the same-window operator concept, for example the trigger mechanism is
entirely different; and it will be a challenge to incorporate
OVER-aggregate of TableAPI since there are many table specific components
in the bounded/unbounded over aggregates. However, I am seeing this as an
implementation details (further discussion in (3a)).

In conclusion to (2), I think the discussions remains whether to expand the
API if we introduce sliding: if no, whether we can exhaust all the use
cases with current API; if yes, how that extra API will look like.
Especially the same-window operation in the DataStream API. (Utilizing
whatever we already have in Table/SQL is one solution).


3). Incorporating Existing POCs and Implementations (e.g. Blink’s
WindowOperator & SQL/Table API)

There are already some implementations exist resembles the idea of slicing.
For example Blink’s WindowOperator [3]. My thinking is once we agree on the
API and the public facing changes, we should reuse as much of the current
implementations as possible, since Blink has been running in production for
years (please correct me if I am wrong @Jark/Jincheng/Kurt :-D ).


Regarding the implementation details and some existing approaches.

3a). Over-aggregate vs. window-operator.

Over aggregate has already been implemented in Table/SQL API. Unlike window
operators, over aggregate utilizes the process function and a general
process operator approach.

Further looking into how the current window operator works comparing with
the over aggregate, there are many *common implementations*. For example,
the window state (especially the list-based approach); the triggering
mechanism / timer service (especially on rowtime); and the “evicting
policies”.

I was wondering if it is possible to consider, on an operator level,
support: 1. The same-window operator concept as a generic case of the
over-aggregate, and 2. Create a more comment window operator base that
generalize the state, trigger and eviction context.

3b). The support for partial process function: retracting vs. merge.

Supporting retracting operations is definitely a plus in some use cases,
for example like Fabian suggested in the JIRA discussion [6]. Currently
this already exist in the Table/SQL API aggregation function.

3c). States and Chaining Strategy

With the chaining strategy [7] in the operators, I was wondering whether
there’s performance consideration when implementing the sliding window as
one operator, or multiple operator chaining together. One question I think
worth investigating is what is the most efficient way of storing the
slicing information like Jark suggested in the JIRA discussion [8].


As I tried to capture as many discussions here as possible, I can hardly
imagine that we’ve address all of these questions. So please share your
comment and feedbacks as they are highly appreciated! I can definitely
volunteer will start a design/discussion doc for this effort if everyone
thinks it is suitable to move forward.


Thanks,

Rong



[1]
https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.yevpiesfln1m

[2]
https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.j0v7ubhwfl0y

[3]
https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java

[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction

[5]
https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing

[6] https://issues.apache.org/jira/browse/FLINK-11454

[7]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
[8] https://issues.apache.org/jira/browse/FLINK-7001

On Tue, Feb 19, 2019 at 1:57 AM Kurt Young <[hidden email]> wrote:

> Hi Rong,
>
> Thanks for the improvement proposal, this topic aroused my interest since
> we did some similar improvements in Blink.
> After going through your design doc, i would like share some thoughts:
>
> 1. It looks to me the proposed SliceManager and MergeManager is quite
> generic and can be used in various situations like unaligned windows.
> However
> this will introduce some new API and some new stream operators, and thus
> make is a big effort to do. I suppose the main reason to introduce these
> concepts is
> we want to support some complex scenarios like unaligned windows, multi
> timeout session windows and so on. But if let's go back to the original
> motivation of this
> improvement, it will be "Highly overlapping window operation" and "Window
> functions with efficient methods calculating and merging partial results",
> just as you mentioned
> in the section "Target Usage Pattern". I'm curious if it's necessary to
> introduce all these new APIs and operators to meet these requirements. If i
> understand you correctly,
> we may even can achieve the goal by using two successive windows, first
> with tumbling window, and then sliding window. And from the experiences we
> had in Blink, by adding
> some enhancements to original window operator may also be sufficient. Does
> it make sense to you if we can first try to improve the target scenario
> without adding new APIs and
>  operators? It will definitely make it more easy to review and merged.
>
> 2. From what we observed after Blink did similar improvements, slice or
> pane based improvement is not a silver bullet for all kinds of situations.
> In most cases, we only
> observed small performance gain like 20% or 30%. The main reason is, if we
> want get big gain from this improvement, normally we require the window
> function have efficient
> calculating and merging methods, like SUM, COUNT. If this assumption
> stands, the basic approach of window operator will also be performant. It
> stored compact intermedia result for
> each window per key, and it's also very efficient to get result based on
> this. The only downside is we need to do the calculation in multiply
> windows per key, but based on the
> assumption, the calculation is efficient and may not be a blocker. Based on
> this, i think we must be more careful about the changes, especially API
> related.
>
> 3. The last thing in my mind is whether we can share some implementations
> with blink's window operator. We introduced a new window operator in Blink
> SQL, which i think needs to be
> discussed when we try to adopt blink improvements. Without strong reasons,
> i think it's better to share some implementations with DataStream's window
> operator. Maybe we don't need
>  to share the whole operator, but at least some underlying concepts and
> data structures. It would also be great if you can think about it and see
> if it's a feasible way.
>
> Last small question: how do you handle sliding windows whose window size
> can't be divided by step, such as 10 seconds window and slide with 3
> seconds?
>
> Best,
> Kurt
>
>
> On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <[hidden email]> wrote:
>
> > Thank you Rong!
> > The performance of sliding windows is an issue for many users.
> > Adding support for a more efficient window is a great effort.
> >
> > Thank you,
> > Fabian
> >
> > Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong <
> [hidden email]
> > >:
> >
> > > Hi all,
> > >
> > > Thanks for the feedbacks and suggestions to the design doc. I have
> > created
> > > a parent JIRA [1] to track the related tasks and started the
> > implementation
> > > process
> > > Any further feedbacks or suggestions are highly appreciated.
> > >
> > > Best,
> > > Rong
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-11276
> > >
> > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <[hidden email]> wrote:
> > >
> > > > Hi all,
> > > >
> > > > Various discussion in the mailing list & JIRA tickets [2] had been
> > > brought
> > > > up in the past regarding the windowing operation performance. As we
> > > > experiment internally with some of our extreme use cases, we found
> out
> > > that
> > > > using a slice-based implementation can optimize Flink's windowing
> > > mechanism
> > > > and provide a better performance in most cases.
> > > >
> > > > We've put together a preliminary enhancement and performance
> > optimization
> > > > plan [1] for the current windowing operation in Flink. This is
> largely
> > > > inspired by stream slicing research shared in recent Flink Forward
> > > > conference [3] by Philip and Jonas, and the discussion in the main
> JIRA
> > > > ticket [2]. The initial design and POC implementations consider
> > > optimizing
> > > > the performance for the category of overlapping windows as well as
> > > allowing
> > > > chaining of cascade window operators.
> > > >
> > > > It will be great to hear the feedbacks and suggestions from the
> > > community.
> > > > Please kindly share your comments and suggestions.
> > > >
> > > > Thanks,
> > > > Rong
> > > >
> > > > [1]
> > > >
> > >
> >
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing
> > > > [2] https://issues.apache.org/jira/browse/FLINK-7001
> > > > [3]
> > > >
> > >
> >
> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
> > > >
> > > >
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improvement to Flink Window Operator with Slicing

Kurt Young
Hi Rong,

Thanks for the detailed summarization! It indeed involves lots of problems
and unanswered questions which is i think not practical to
solve in one shot. From my point of view, the performance issue with the
sliding window is the root one and maybe most possible
which user will run into. Thus i think the first question should be
answered is:
"What kind of improvements we can do with the sliding window scenario?"

First, we should try to figure out how many improvements can be done
without changing or adding new API. This can reuse the POC you did
and the work blink had been done, we can share some ideas and maybe some
implementation details. And it already involves lots of efforts
even if we only do this one thing. It may introduce some refactory to
current window operator, and we should keep it compatible with old version.

After this, we can release it in next version and gather some users
feedbacks. We can further answer the question: "Does the improvements cover
most use cases? Are there any critical ones which is impossible to do with
current window operator?". At that time, we can open the discussions to
introduce some new API to meet the requirements.

It will introduce more work than improve window operator internally when we
decide to add new APIs, which you have covered a lot in your proposal.
Actually, the approaches you proposed looks good to me, take it step by
step is a more practical way.

Best,
Kurt


On Fri, Feb 22, 2019 at 2:58 AM Rong Rong <[hidden email]> wrote:

> Hi All,
>
> Thanks for sharing feedbacks for the window optimization design doc and on
> the discussion JIRAs @Jincheng, @Kurt, @Jark and @Fabian. These are very
> valuable feedbacks and we will try to incorporate them in the next step.
>
> There were several revision done for the current design doc, and several
> POCs being developed since we initially shared the document. Thus, some of
> the following might’ve already been addressed in other places. However, I
> would still like to summarize them since these points were raised up in
> various scenarios.
>
> 1) Scope of the window optimization using slicing.
>
> The original scope of the doc was to address the problem of
> element-to-window duplication when sliding window goes across wide range
> with narrow slides (with or w/o efficient partial aggregations). However,
> as we develop the 2 POCs [1,2] we found out there’s really no
> one-solution-fits-all approach to how this can be optimized (same
> observation as Kurt and Jark mentioned). Thus some further expansion of the
> scope was done:
>
> 1a). Directly improving WindowOperator([1, 3])
>
> In the design doc, the PartialWindowFunction was designed as a cue for
> WindowOperator to choose how to optimize the sliding window. This was not
> working well because: how efficient window operator process messages
> depends on: 1. the pattern of the window; 2. the pattern of the incoming
> traffic; and 3. the complexity of the window process function.
>
> The idea of having users to specify the complexity of each API (e.g.
> accumulate, merge, retract) can help the system choose the path of
> optimization, however as this stage the it is hard to utilize this
> information.
>
> 1b). Explicitly allow slicing and merging steps([2]), or more generic
> window chaining:
>
> With the second POC, it is users' responsibility to create an efficient
> window operator(s), since now user can control the state and the process
> individually. This opens up more possibilities: e.g. unaligned /
> multi-timeout session window, cascade windows.
>
> However, with this API change, users are given much more flexibilities.
>
> In conclusion to (1), I think one of the clear feedbacks is that this
> effort goes beyond just improving the sliding window performance. If we
> think it is the right way to expand the window API, there should be a
> proper API change discussion: As Kurt mentioned, if any of these cases can
> be covered with existing API, we should always exhaust that first. (Further
> discussion in (2))
>
>
> 2). The motivation of changing the public DataStream APIs related to
> window-based processing.
>
> As Jincheng and Kurt mentioned, utilizing the current windowed stream API
> can achieve some of the results. For example, using a tumble follow by a
> sliding window can reduce the amount of duplication since the tumbling
> window results will be passed to the sliding window as one element.
> However, when we experiment, the are several problems I observed:
>
> 2a). rehashing with extra keyBy
>
> The limitation requires a separate set of keyBy following the tumble
> result, which creates unnecessary complexity. One work around was to use
> "reinterpretAsKeyedStream" as Fabian mentioned, however there are many
> investigation to be done to ensure it works well (e.g. how does the keyed
> state store work since the key was not sets as a part of the namespace
> within WindowOperator)
>
> 2b). Handling iterable elements
>
> Another problem is if there’s no efficient partial results produce by the
> tumbling window, the output element (as an iterable) will still be
> duplicated into each of the downstream sliding window states.
>
> However, this is actually a tricky problem by itself, further discussion in
> (2d) for this piece.
>
> 2c). Additional support for some of the more complex use cases
>
> As described in (1b), with the explicit control of the window state and the
> window function, users can control how the optimization of the windowing
> can be done. One will argue that these type of optimization can also be
> done using ProcessFunction or other rich functions.
>
> However, if we take a look at how the WindowOperator is currently
> structured, it also has explicit aggregation + windowing APIs [4] presented
> for the WindowedStream.
>
> Another point is opening up: multi-slide windowing and multi-timeout window
> described in the presentation [5] in FF.
>
> I would also like to raise a point that the way how the current POC [2]
> changes the public API is just a proof of concept that the flexibilities
> opens more complex use cases. It is by no means the final design and I
> agree that there could be more-aggressive-than-necessary changes since it’s
> meant for a POC.
>
> 2d). The support for same-window operator (similar to Table/SQL
> OVER-aggregate)
>
> As discussed with Jincheng in a separate thread. Supporting the same window
> operator similar to how OVER-aggregate was achieved in Table/SQL API. It is
> definitely useful in some of the window use cases (2b). For example, A
> process function without efficient partial aggregation results on sliding
> window can be very efficiently evaluated since both add and retract
> operation on an iterable list is cheap.
>
> There are some fundamental difference between the current window operator
> vs the same-window operator concept, for example the trigger mechanism is
> entirely different; and it will be a challenge to incorporate
> OVER-aggregate of TableAPI since there are many table specific components
> in the bounded/unbounded over aggregates. However, I am seeing this as an
> implementation details (further discussion in (3a)).
>
> In conclusion to (2), I think the discussions remains whether to expand the
> API if we introduce sliding: if no, whether we can exhaust all the use
> cases with current API; if yes, how that extra API will look like.
> Especially the same-window operation in the DataStream API. (Utilizing
> whatever we already have in Table/SQL is one solution).
>
>
> 3). Incorporating Existing POCs and Implementations (e.g. Blink’s
> WindowOperator & SQL/Table API)
>
> There are already some implementations exist resembles the idea of slicing.
> For example Blink’s WindowOperator [3]. My thinking is once we agree on the
> API and the public facing changes, we should reuse as much of the current
> implementations as possible, since Blink has been running in production for
> years (please correct me if I am wrong @Jark/Jincheng/Kurt :-D ).
>
>
> Regarding the implementation details and some existing approaches.
>
> 3a). Over-aggregate vs. window-operator.
>
> Over aggregate has already been implemented in Table/SQL API. Unlike window
> operators, over aggregate utilizes the process function and a general
> process operator approach.
>
> Further looking into how the current window operator works comparing with
> the over aggregate, there are many *common implementations*. For example,
> the window state (especially the list-based approach); the triggering
> mechanism / timer service (especially on rowtime); and the “evicting
> policies”.
>
> I was wondering if it is possible to consider, on an operator level,
> support: 1. The same-window operator concept as a generic case of the
> over-aggregate, and 2. Create a more comment window operator base that
> generalize the state, trigger and eviction context.
>
> 3b). The support for partial process function: retracting vs. merge.
>
> Supporting retracting operations is definitely a plus in some use cases,
> for example like Fabian suggested in the JIRA discussion [6]. Currently
> this already exist in the Table/SQL API aggregation function.
>
> 3c). States and Chaining Strategy
>
> With the chaining strategy [7] in the operators, I was wondering whether
> there’s performance consideration when implementing the sliding window as
> one operator, or multiple operator chaining together. One question I think
> worth investigating is what is the most efficient way of storing the
> slicing information like Jark suggested in the JIRA discussion [8].
>
>
> As I tried to capture as many discussions here as possible, I can hardly
> imagine that we’ve address all of these questions. So please share your
> comment and feedbacks as they are highly appreciated! I can definitely
> volunteer will start a design/discussion doc for this effort if everyone
> thinks it is suitable to move forward.
>
>
> Thanks,
>
> Rong
>
>
>
> [1]
>
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.yevpiesfln1m
>
> [2]
>
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.j0v7ubhwfl0y
>
> [3]
>
> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java
>
> [4]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction
>
> [5]
>
> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
>
> [6] https://issues.apache.org/jira/browse/FLINK-11454
>
> [7]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
> [8] https://issues.apache.org/jira/browse/FLINK-7001
>
> On Tue, Feb 19, 2019 at 1:57 AM Kurt Young <[hidden email]> wrote:
>
> > Hi Rong,
> >
> > Thanks for the improvement proposal, this topic aroused my interest since
> > we did some similar improvements in Blink.
> > After going through your design doc, i would like share some thoughts:
> >
> > 1. It looks to me the proposed SliceManager and MergeManager is quite
> > generic and can be used in various situations like unaligned windows.
> > However
> > this will introduce some new API and some new stream operators, and thus
> > make is a big effort to do. I suppose the main reason to introduce these
> > concepts is
> > we want to support some complex scenarios like unaligned windows, multi
> > timeout session windows and so on. But if let's go back to the original
> > motivation of this
> > improvement, it will be "Highly overlapping window operation" and "Window
> > functions with efficient methods calculating and merging partial
> results",
> > just as you mentioned
> > in the section "Target Usage Pattern". I'm curious if it's necessary to
> > introduce all these new APIs and operators to meet these requirements.
> If i
> > understand you correctly,
> > we may even can achieve the goal by using two successive windows, first
> > with tumbling window, and then sliding window. And from the experiences
> we
> > had in Blink, by adding
> > some enhancements to original window operator may also be sufficient.
> Does
> > it make sense to you if we can first try to improve the target scenario
> > without adding new APIs and
> >  operators? It will definitely make it more easy to review and merged.
> >
> > 2. From what we observed after Blink did similar improvements, slice or
> > pane based improvement is not a silver bullet for all kinds of
> situations.
> > In most cases, we only
> > observed small performance gain like 20% or 30%. The main reason is, if
> we
> > want get big gain from this improvement, normally we require the window
> > function have efficient
> > calculating and merging methods, like SUM, COUNT. If this assumption
> > stands, the basic approach of window operator will also be performant. It
> > stored compact intermedia result for
> > each window per key, and it's also very efficient to get result based on
> > this. The only downside is we need to do the calculation in multiply
> > windows per key, but based on the
> > assumption, the calculation is efficient and may not be a blocker. Based
> on
> > this, i think we must be more careful about the changes, especially API
> > related.
> >
> > 3. The last thing in my mind is whether we can share some implementations
> > with blink's window operator. We introduced a new window operator in
> Blink
> > SQL, which i think needs to be
> > discussed when we try to adopt blink improvements. Without strong
> reasons,
> > i think it's better to share some implementations with DataStream's
> window
> > operator. Maybe we don't need
> >  to share the whole operator, but at least some underlying concepts and
> > data structures. It would also be great if you can think about it and see
> > if it's a feasible way.
> >
> > Last small question: how do you handle sliding windows whose window size
> > can't be divided by step, such as 10 seconds window and slide with 3
> > seconds?
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <[hidden email]> wrote:
> >
> > > Thank you Rong!
> > > The performance of sliding windows is an issue for many users.
> > > Adding support for a more efficient window is a great effort.
> > >
> > > Thank you,
> > > Fabian
> > >
> > > Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong <
> > [hidden email]
> > > >:
> > >
> > > > Hi all,
> > > >
> > > > Thanks for the feedbacks and suggestions to the design doc. I have
> > > created
> > > > a parent JIRA [1] to track the related tasks and started the
> > > implementation
> > > > process
> > > > Any further feedbacks or suggestions are highly appreciated.
> > > >
> > > > Best,
> > > > Rong
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-11276
> > > >
> > > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <[hidden email]>
> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > Various discussion in the mailing list & JIRA tickets [2] had been
> > > > brought
> > > > > up in the past regarding the windowing operation performance. As we
> > > > > experiment internally with some of our extreme use cases, we found
> > out
> > > > that
> > > > > using a slice-based implementation can optimize Flink's windowing
> > > > mechanism
> > > > > and provide a better performance in most cases.
> > > > >
> > > > > We've put together a preliminary enhancement and performance
> > > optimization
> > > > > plan [1] for the current windowing operation in Flink. This is
> > largely
> > > > > inspired by stream slicing research shared in recent Flink Forward
> > > > > conference [3] by Philip and Jonas, and the discussion in the main
> > JIRA
> > > > > ticket [2]. The initial design and POC implementations consider
> > > > optimizing
> > > > > the performance for the category of overlapping windows as well as
> > > > allowing
> > > > > chaining of cascade window operators.
> > > > >
> > > > > It will be great to hear the feedbacks and suggestions from the
> > > > community.
> > > > > Please kindly share your comments and suggestions.
> > > > >
> > > > > Thanks,
> > > > > Rong
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing
> > > > > [2] https://issues.apache.org/jira/browse/FLINK-7001
> > > > > [3]
> > > > >
> > > >
> > >
> >
> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improvement to Flink Window Operator with Slicing

Rong Rong
Hi Kurt,

Thanks for the valuable feedback. I think the suggestions you and Jincheng
provided are definitely the best execution plan

- Starting with sliding window optimization by exhausting the current
public API, there are some components we can leverage or directly reuse
from Blink's window operator [1] implementation.
- Backward compatibility is definitely an issue as any state changes will
probably result in a non-trivial state upgrade. I can definitely follow up
with you on this.

At the same time I think it is also a good idea to summarize all the use
cases that has been discussed so far. This can be very valuable as a
reference: To answer the questions "Does the improvements cover
most use cases?" and when not covered, "whether introduce some new API can
meet the requirements?".

I will try to convert the current parent JIRA [2] into one that does not
include public API alternation. As you mentioned this will be much more
practical way in terms of execution.

Many thanks for the suggestions and guidance!

--
Rong

[1]
https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java
[2] https://issues.apache.org/jira/browse/FLINK-11454

On Mon, Feb 25, 2019 at 3:44 AM Kurt Young <[hidden email]> wrote:

> Hi Rong,
>
> Thanks for the detailed summarization! It indeed involves lots of problems
> and unanswered questions which is i think not practical to
> solve in one shot. From my point of view, the performance issue with the
> sliding window is the root one and maybe most possible
> which user will run into. Thus i think the first question should be
> answered is:
> "What kind of improvements we can do with the sliding window scenario?"
>
> First, we should try to figure out how many improvements can be done
> without changing or adding new API. This can reuse the POC you did
> and the work blink had been done, we can share some ideas and maybe some
> implementation details. And it already involves lots of efforts
> even if we only do this one thing. It may introduce some refactory to
> current window operator, and we should keep it compatible with old version.
>
> After this, we can release it in next version and gather some users
> feedbacks. We can further answer the question: "Does the improvements cover
> most use cases? Are there any critical ones which is impossible to do with
> current window operator?". At that time, we can open the discussions to
> introduce some new API to meet the requirements.
>
> It will introduce more work than improve window operator internally when we
> decide to add new APIs, which you have covered a lot in your proposal.
> Actually, the approaches you proposed looks good to me, take it step by
> step is a more practical way.
>
> Best,
> Kurt
>
>
> On Fri, Feb 22, 2019 at 2:58 AM Rong Rong <[hidden email]> wrote:
>
> > Hi All,
> >
> > Thanks for sharing feedbacks for the window optimization design doc and
> on
> > the discussion JIRAs @Jincheng, @Kurt, @Jark and @Fabian. These are very
> > valuable feedbacks and we will try to incorporate them in the next step.
> >
> > There were several revision done for the current design doc, and several
> > POCs being developed since we initially shared the document. Thus, some
> of
> > the following might’ve already been addressed in other places. However, I
> > would still like to summarize them since these points were raised up in
> > various scenarios.
> >
> > 1) Scope of the window optimization using slicing.
> >
> > The original scope of the doc was to address the problem of
> > element-to-window duplication when sliding window goes across wide range
> > with narrow slides (with or w/o efficient partial aggregations). However,
> > as we develop the 2 POCs [1,2] we found out there’s really no
> > one-solution-fits-all approach to how this can be optimized (same
> > observation as Kurt and Jark mentioned). Thus some further expansion of
> the
> > scope was done:
> >
> > 1a). Directly improving WindowOperator([1, 3])
> >
> > In the design doc, the PartialWindowFunction was designed as a cue for
> > WindowOperator to choose how to optimize the sliding window. This was not
> > working well because: how efficient window operator process messages
> > depends on: 1. the pattern of the window; 2. the pattern of the incoming
> > traffic; and 3. the complexity of the window process function.
> >
> > The idea of having users to specify the complexity of each API (e.g.
> > accumulate, merge, retract) can help the system choose the path of
> > optimization, however as this stage the it is hard to utilize this
> > information.
> >
> > 1b). Explicitly allow slicing and merging steps([2]), or more generic
> > window chaining:
> >
> > With the second POC, it is users' responsibility to create an efficient
> > window operator(s), since now user can control the state and the process
> > individually. This opens up more possibilities: e.g. unaligned /
> > multi-timeout session window, cascade windows.
> >
> > However, with this API change, users are given much more flexibilities.
> >
> > In conclusion to (1), I think one of the clear feedbacks is that this
> > effort goes beyond just improving the sliding window performance. If we
> > think it is the right way to expand the window API, there should be a
> > proper API change discussion: As Kurt mentioned, if any of these cases
> can
> > be covered with existing API, we should always exhaust that first.
> (Further
> > discussion in (2))
> >
> >
> > 2). The motivation of changing the public DataStream APIs related to
> > window-based processing.
> >
> > As Jincheng and Kurt mentioned, utilizing the current windowed stream API
> > can achieve some of the results. For example, using a tumble follow by a
> > sliding window can reduce the amount of duplication since the tumbling
> > window results will be passed to the sliding window as one element.
> > However, when we experiment, the are several problems I observed:
> >
> > 2a). rehashing with extra keyBy
> >
> > The limitation requires a separate set of keyBy following the tumble
> > result, which creates unnecessary complexity. One work around was to use
> > "reinterpretAsKeyedStream" as Fabian mentioned, however there are many
> > investigation to be done to ensure it works well (e.g. how does the keyed
> > state store work since the key was not sets as a part of the namespace
> > within WindowOperator)
> >
> > 2b). Handling iterable elements
> >
> > Another problem is if there’s no efficient partial results produce by the
> > tumbling window, the output element (as an iterable) will still be
> > duplicated into each of the downstream sliding window states.
> >
> > However, this is actually a tricky problem by itself, further discussion
> in
> > (2d) for this piece.
> >
> > 2c). Additional support for some of the more complex use cases
> >
> > As described in (1b), with the explicit control of the window state and
> the
> > window function, users can control how the optimization of the windowing
> > can be done. One will argue that these type of optimization can also be
> > done using ProcessFunction or other rich functions.
> >
> > However, if we take a look at how the WindowOperator is currently
> > structured, it also has explicit aggregation + windowing APIs [4]
> presented
> > for the WindowedStream.
> >
> > Another point is opening up: multi-slide windowing and multi-timeout
> window
> > described in the presentation [5] in FF.
> >
> > I would also like to raise a point that the way how the current POC [2]
> > changes the public API is just a proof of concept that the flexibilities
> > opens more complex use cases. It is by no means the final design and I
> > agree that there could be more-aggressive-than-necessary changes since
> it’s
> > meant for a POC.
> >
> > 2d). The support for same-window operator (similar to Table/SQL
> > OVER-aggregate)
> >
> > As discussed with Jincheng in a separate thread. Supporting the same
> window
> > operator similar to how OVER-aggregate was achieved in Table/SQL API. It
> is
> > definitely useful in some of the window use cases (2b). For example, A
> > process function without efficient partial aggregation results on sliding
> > window can be very efficiently evaluated since both add and retract
> > operation on an iterable list is cheap.
> >
> > There are some fundamental difference between the current window operator
> > vs the same-window operator concept, for example the trigger mechanism is
> > entirely different; and it will be a challenge to incorporate
> > OVER-aggregate of TableAPI since there are many table specific components
> > in the bounded/unbounded over aggregates. However, I am seeing this as an
> > implementation details (further discussion in (3a)).
> >
> > In conclusion to (2), I think the discussions remains whether to expand
> the
> > API if we introduce sliding: if no, whether we can exhaust all the use
> > cases with current API; if yes, how that extra API will look like.
> > Especially the same-window operation in the DataStream API. (Utilizing
> > whatever we already have in Table/SQL is one solution).
> >
> >
> > 3). Incorporating Existing POCs and Implementations (e.g. Blink’s
> > WindowOperator & SQL/Table API)
> >
> > There are already some implementations exist resembles the idea of
> slicing.
> > For example Blink’s WindowOperator [3]. My thinking is once we agree on
> the
> > API and the public facing changes, we should reuse as much of the current
> > implementations as possible, since Blink has been running in production
> for
> > years (please correct me if I am wrong @Jark/Jincheng/Kurt :-D ).
> >
> >
> > Regarding the implementation details and some existing approaches.
> >
> > 3a). Over-aggregate vs. window-operator.
> >
> > Over aggregate has already been implemented in Table/SQL API. Unlike
> window
> > operators, over aggregate utilizes the process function and a general
> > process operator approach.
> >
> > Further looking into how the current window operator works comparing with
> > the over aggregate, there are many *common implementations*. For example,
> > the window state (especially the list-based approach); the triggering
> > mechanism / timer service (especially on rowtime); and the “evicting
> > policies”.
> >
> > I was wondering if it is possible to consider, on an operator level,
> > support: 1. The same-window operator concept as a generic case of the
> > over-aggregate, and 2. Create a more comment window operator base that
> > generalize the state, trigger and eviction context.
> >
> > 3b). The support for partial process function: retracting vs. merge.
> >
> > Supporting retracting operations is definitely a plus in some use cases,
> > for example like Fabian suggested in the JIRA discussion [6]. Currently
> > this already exist in the Table/SQL API aggregation function.
> >
> > 3c). States and Chaining Strategy
> >
> > With the chaining strategy [7] in the operators, I was wondering whether
> > there’s performance consideration when implementing the sliding window as
> > one operator, or multiple operator chaining together. One question I
> think
> > worth investigating is what is the most efficient way of storing the
> > slicing information like Jark suggested in the JIRA discussion [8].
> >
> >
> > As I tried to capture as many discussions here as possible, I can hardly
> > imagine that we’ve address all of these questions. So please share your
> > comment and feedbacks as they are highly appreciated! I can definitely
> > volunteer will start a design/discussion doc for this effort if everyone
> > thinks it is suitable to move forward.
> >
> >
> > Thanks,
> >
> > Rong
> >
> >
> >
> > [1]
> >
> >
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.yevpiesfln1m
> >
> > [2]
> >
> >
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.j0v7ubhwfl0y
> >
> > [3]
> >
> >
> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java
> >
> > [4]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction
> >
> > [5]
> >
> >
> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
> >
> > [6] https://issues.apache.org/jira/browse/FLINK-11454
> >
> > [7]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
> > [8] https://issues.apache.org/jira/browse/FLINK-7001
> >
> > On Tue, Feb 19, 2019 at 1:57 AM Kurt Young <[hidden email]> wrote:
> >
> > > Hi Rong,
> > >
> > > Thanks for the improvement proposal, this topic aroused my interest
> since
> > > we did some similar improvements in Blink.
> > > After going through your design doc, i would like share some thoughts:
> > >
> > > 1. It looks to me the proposed SliceManager and MergeManager is quite
> > > generic and can be used in various situations like unaligned windows.
> > > However
> > > this will introduce some new API and some new stream operators, and
> thus
> > > make is a big effort to do. I suppose the main reason to introduce
> these
> > > concepts is
> > > we want to support some complex scenarios like unaligned windows, multi
> > > timeout session windows and so on. But if let's go back to the original
> > > motivation of this
> > > improvement, it will be "Highly overlapping window operation" and
> "Window
> > > functions with efficient methods calculating and merging partial
> > results",
> > > just as you mentioned
> > > in the section "Target Usage Pattern". I'm curious if it's necessary to
> > > introduce all these new APIs and operators to meet these requirements.
> > If i
> > > understand you correctly,
> > > we may even can achieve the goal by using two successive windows, first
> > > with tumbling window, and then sliding window. And from the experiences
> > we
> > > had in Blink, by adding
> > > some enhancements to original window operator may also be sufficient.
> > Does
> > > it make sense to you if we can first try to improve the target scenario
> > > without adding new APIs and
> > >  operators? It will definitely make it more easy to review and merged.
> > >
> > > 2. From what we observed after Blink did similar improvements, slice or
> > > pane based improvement is not a silver bullet for all kinds of
> > situations.
> > > In most cases, we only
> > > observed small performance gain like 20% or 30%. The main reason is, if
> > we
> > > want get big gain from this improvement, normally we require the window
> > > function have efficient
> > > calculating and merging methods, like SUM, COUNT. If this assumption
> > > stands, the basic approach of window operator will also be performant.
> It
> > > stored compact intermedia result for
> > > each window per key, and it's also very efficient to get result based
> on
> > > this. The only downside is we need to do the calculation in multiply
> > > windows per key, but based on the
> > > assumption, the calculation is efficient and may not be a blocker.
> Based
> > on
> > > this, i think we must be more careful about the changes, especially API
> > > related.
> > >
> > > 3. The last thing in my mind is whether we can share some
> implementations
> > > with blink's window operator. We introduced a new window operator in
> > Blink
> > > SQL, which i think needs to be
> > > discussed when we try to adopt blink improvements. Without strong
> > reasons,
> > > i think it's better to share some implementations with DataStream's
> > window
> > > operator. Maybe we don't need
> > >  to share the whole operator, but at least some underlying concepts and
> > > data structures. It would also be great if you can think about it and
> see
> > > if it's a feasible way.
> > >
> > > Last small question: how do you handle sliding windows whose window
> size
> > > can't be divided by step, such as 10 seconds window and slide with 3
> > > seconds?
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <[hidden email]>
> wrote:
> > >
> > > > Thank you Rong!
> > > > The performance of sliding windows is an issue for many users.
> > > > Adding support for a more efficient window is a great effort.
> > > >
> > > > Thank you,
> > > > Fabian
> > > >
> > > > Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong <
> > > [hidden email]
> > > > >:
> > > >
> > > > > Hi all,
> > > > >
> > > > > Thanks for the feedbacks and suggestions to the design doc. I have
> > > > created
> > > > > a parent JIRA [1] to track the related tasks and started the
> > > > implementation
> > > > > process
> > > > > Any further feedbacks or suggestions are highly appreciated.
> > > > >
> > > > > Best,
> > > > > Rong
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-11276
> > > > >
> > > > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <[hidden email]>
> > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Various discussion in the mailing list & JIRA tickets [2] had
> been
> > > > > brought
> > > > > > up in the past regarding the windowing operation performance. As
> we
> > > > > > experiment internally with some of our extreme use cases, we
> found
> > > out
> > > > > that
> > > > > > using a slice-based implementation can optimize Flink's windowing
> > > > > mechanism
> > > > > > and provide a better performance in most cases.
> > > > > >
> > > > > > We've put together a preliminary enhancement and performance
> > > > optimization
> > > > > > plan [1] for the current windowing operation in Flink. This is
> > > largely
> > > > > > inspired by stream slicing research shared in recent Flink
> Forward
> > > > > > conference [3] by Philip and Jonas, and the discussion in the
> main
> > > JIRA
> > > > > > ticket [2]. The initial design and POC implementations consider
> > > > > optimizing
> > > > > > the performance for the category of overlapping windows as well
> as
> > > > > allowing
> > > > > > chaining of cascade window operators.
> > > > > >
> > > > > > It will be great to hear the feedbacks and suggestions from the
> > > > > community.
> > > > > > Please kindly share your comments and suggestions.
> > > > > >
> > > > > > Thanks,
> > > > > > Rong
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing
> > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7001
> > > > > > [3]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improvement to Flink Window Operator with Slicing

Rong Rong
Hi Devs,

Thank you all for the valuable feedbacks and comments in the previous
design doc.
We have currently created an initial design/work plan based on @Kurt's
suggestions as "improvements with the sliding window scenario without
changing/adding new public APIs".

Please kindly take a look at the initial design document here [1]. Any
comments or suggestions are highly appreciated!

Thanks,
Rong

--

[1]
https://docs.google.com/document/d/1CvjPJl1Fm1PCpsuuZ4Qc-p_iUzUosBePX_rWNUt8lRw/edit#

On Thu, Feb 28, 2019 at 2:24 PM Rong Rong <[hidden email]> wrote:

> Hi Kurt,
>
> Thanks for the valuable feedback. I think the suggestions you and Jincheng
> provided are definitely the best execution plan
>
> - Starting with sliding window optimization by exhausting the current
> public API, there are some components we can leverage or directly reuse
> from Blink's window operator [1] implementation.
> - Backward compatibility is definitely an issue as any state changes will
> probably result in a non-trivial state upgrade. I can definitely follow up
> with you on this.
>
> At the same time I think it is also a good idea to summarize all the use
> cases that has been discussed so far. This can be very valuable as a
> reference: To answer the questions "Does the improvements cover
> most use cases?" and when not covered, "whether introduce some new API can
> meet the requirements?".
>
> I will try to convert the current parent JIRA [2] into one that does not
> include public API alternation. As you mentioned this will be much more
> practical way in terms of execution.
>
> Many thanks for the suggestions and guidance!
>
> --
> Rong
>
> [1]
> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java
> [2] https://issues.apache.org/jira/browse/FLINK-11454
>
> On Mon, Feb 25, 2019 at 3:44 AM Kurt Young <[hidden email]> wrote:
>
>> Hi Rong,
>>
>> Thanks for the detailed summarization! It indeed involves lots of problems
>> and unanswered questions which is i think not practical to
>> solve in one shot. From my point of view, the performance issue with the
>> sliding window is the root one and maybe most possible
>> which user will run into. Thus i think the first question should be
>> answered is:
>> "What kind of improvements we can do with the sliding window scenario?"
>>
>> First, we should try to figure out how many improvements can be done
>> without changing or adding new API. This can reuse the POC you did
>> and the work blink had been done, we can share some ideas and maybe some
>> implementation details. And it already involves lots of efforts
>> even if we only do this one thing. It may introduce some refactory to
>> current window operator, and we should keep it compatible with old
>> version.
>>
>> After this, we can release it in next version and gather some users
>> feedbacks. We can further answer the question: "Does the improvements
>> cover
>> most use cases? Are there any critical ones which is impossible to do with
>> current window operator?". At that time, we can open the discussions to
>> introduce some new API to meet the requirements.
>>
>> It will introduce more work than improve window operator internally when
>> we
>> decide to add new APIs, which you have covered a lot in your proposal.
>> Actually, the approaches you proposed looks good to me, take it step by
>> step is a more practical way.
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Feb 22, 2019 at 2:58 AM Rong Rong <[hidden email]> wrote:
>>
>> > Hi All,
>> >
>> > Thanks for sharing feedbacks for the window optimization design doc and
>> on
>> > the discussion JIRAs @Jincheng, @Kurt, @Jark and @Fabian. These are very
>> > valuable feedbacks and we will try to incorporate them in the next step.
>> >
>> > There were several revision done for the current design doc, and several
>> > POCs being developed since we initially shared the document. Thus, some
>> of
>> > the following might’ve already been addressed in other places. However,
>> I
>> > would still like to summarize them since these points were raised up in
>> > various scenarios.
>> >
>> > 1) Scope of the window optimization using slicing.
>> >
>> > The original scope of the doc was to address the problem of
>> > element-to-window duplication when sliding window goes across wide range
>> > with narrow slides (with or w/o efficient partial aggregations).
>> However,
>> > as we develop the 2 POCs [1,2] we found out there’s really no
>> > one-solution-fits-all approach to how this can be optimized (same
>> > observation as Kurt and Jark mentioned). Thus some further expansion of
>> the
>> > scope was done:
>> >
>> > 1a). Directly improving WindowOperator([1, 3])
>> >
>> > In the design doc, the PartialWindowFunction was designed as a cue for
>> > WindowOperator to choose how to optimize the sliding window. This was
>> not
>> > working well because: how efficient window operator process messages
>> > depends on: 1. the pattern of the window; 2. the pattern of the incoming
>> > traffic; and 3. the complexity of the window process function.
>> >
>> > The idea of having users to specify the complexity of each API (e.g.
>> > accumulate, merge, retract) can help the system choose the path of
>> > optimization, however as this stage the it is hard to utilize this
>> > information.
>> >
>> > 1b). Explicitly allow slicing and merging steps([2]), or more generic
>> > window chaining:
>> >
>> > With the second POC, it is users' responsibility to create an efficient
>> > window operator(s), since now user can control the state and the process
>> > individually. This opens up more possibilities: e.g. unaligned /
>> > multi-timeout session window, cascade windows.
>> >
>> > However, with this API change, users are given much more flexibilities.
>> >
>> > In conclusion to (1), I think one of the clear feedbacks is that this
>> > effort goes beyond just improving the sliding window performance. If we
>> > think it is the right way to expand the window API, there should be a
>> > proper API change discussion: As Kurt mentioned, if any of these cases
>> can
>> > be covered with existing API, we should always exhaust that first.
>> (Further
>> > discussion in (2))
>> >
>> >
>> > 2). The motivation of changing the public DataStream APIs related to
>> > window-based processing.
>> >
>> > As Jincheng and Kurt mentioned, utilizing the current windowed stream
>> API
>> > can achieve some of the results. For example, using a tumble follow by a
>> > sliding window can reduce the amount of duplication since the tumbling
>> > window results will be passed to the sliding window as one element.
>> > However, when we experiment, the are several problems I observed:
>> >
>> > 2a). rehashing with extra keyBy
>> >
>> > The limitation requires a separate set of keyBy following the tumble
>> > result, which creates unnecessary complexity. One work around was to use
>> > "reinterpretAsKeyedStream" as Fabian mentioned, however there are many
>> > investigation to be done to ensure it works well (e.g. how does the
>> keyed
>> > state store work since the key was not sets as a part of the namespace
>> > within WindowOperator)
>> >
>> > 2b). Handling iterable elements
>> >
>> > Another problem is if there’s no efficient partial results produce by
>> the
>> > tumbling window, the output element (as an iterable) will still be
>> > duplicated into each of the downstream sliding window states.
>> >
>> > However, this is actually a tricky problem by itself, further
>> discussion in
>> > (2d) for this piece.
>> >
>> > 2c). Additional support for some of the more complex use cases
>> >
>> > As described in (1b), with the explicit control of the window state and
>> the
>> > window function, users can control how the optimization of the windowing
>> > can be done. One will argue that these type of optimization can also be
>> > done using ProcessFunction or other rich functions.
>> >
>> > However, if we take a look at how the WindowOperator is currently
>> > structured, it also has explicit aggregation + windowing APIs [4]
>> presented
>> > for the WindowedStream.
>> >
>> > Another point is opening up: multi-slide windowing and multi-timeout
>> window
>> > described in the presentation [5] in FF.
>> >
>> > I would also like to raise a point that the way how the current POC [2]
>> > changes the public API is just a proof of concept that the flexibilities
>> > opens more complex use cases. It is by no means the final design and I
>> > agree that there could be more-aggressive-than-necessary changes since
>> it’s
>> > meant for a POC.
>> >
>> > 2d). The support for same-window operator (similar to Table/SQL
>> > OVER-aggregate)
>> >
>> > As discussed with Jincheng in a separate thread. Supporting the same
>> window
>> > operator similar to how OVER-aggregate was achieved in Table/SQL API.
>> It is
>> > definitely useful in some of the window use cases (2b). For example, A
>> > process function without efficient partial aggregation results on
>> sliding
>> > window can be very efficiently evaluated since both add and retract
>> > operation on an iterable list is cheap.
>> >
>> > There are some fundamental difference between the current window
>> operator
>> > vs the same-window operator concept, for example the trigger mechanism
>> is
>> > entirely different; and it will be a challenge to incorporate
>> > OVER-aggregate of TableAPI since there are many table specific
>> components
>> > in the bounded/unbounded over aggregates. However, I am seeing this as
>> an
>> > implementation details (further discussion in (3a)).
>> >
>> > In conclusion to (2), I think the discussions remains whether to expand
>> the
>> > API if we introduce sliding: if no, whether we can exhaust all the use
>> > cases with current API; if yes, how that extra API will look like.
>> > Especially the same-window operation in the DataStream API. (Utilizing
>> > whatever we already have in Table/SQL is one solution).
>> >
>> >
>> > 3). Incorporating Existing POCs and Implementations (e.g. Blink’s
>> > WindowOperator & SQL/Table API)
>> >
>> > There are already some implementations exist resembles the idea of
>> slicing.
>> > For example Blink’s WindowOperator [3]. My thinking is once we agree on
>> the
>> > API and the public facing changes, we should reuse as much of the
>> current
>> > implementations as possible, since Blink has been running in production
>> for
>> > years (please correct me if I am wrong @Jark/Jincheng/Kurt :-D ).
>> >
>> >
>> > Regarding the implementation details and some existing approaches.
>> >
>> > 3a). Over-aggregate vs. window-operator.
>> >
>> > Over aggregate has already been implemented in Table/SQL API. Unlike
>> window
>> > operators, over aggregate utilizes the process function and a general
>> > process operator approach.
>> >
>> > Further looking into how the current window operator works comparing
>> with
>> > the over aggregate, there are many *common implementations*. For
>> example,
>> > the window state (especially the list-based approach); the triggering
>> > mechanism / timer service (especially on rowtime); and the “evicting
>> > policies”.
>> >
>> > I was wondering if it is possible to consider, on an operator level,
>> > support: 1. The same-window operator concept as a generic case of the
>> > over-aggregate, and 2. Create a more comment window operator base that
>> > generalize the state, trigger and eviction context.
>> >
>> > 3b). The support for partial process function: retracting vs. merge.
>> >
>> > Supporting retracting operations is definitely a plus in some use cases,
>> > for example like Fabian suggested in the JIRA discussion [6]. Currently
>> > this already exist in the Table/SQL API aggregation function.
>> >
>> > 3c). States and Chaining Strategy
>> >
>> > With the chaining strategy [7] in the operators, I was wondering whether
>> > there’s performance consideration when implementing the sliding window
>> as
>> > one operator, or multiple operator chaining together. One question I
>> think
>> > worth investigating is what is the most efficient way of storing the
>> > slicing information like Jark suggested in the JIRA discussion [8].
>> >
>> >
>> > As I tried to capture as many discussions here as possible, I can hardly
>> > imagine that we’ve address all of these questions. So please share your
>> > comment and feedbacks as they are highly appreciated! I can definitely
>> > volunteer will start a design/discussion doc for this effort if everyone
>> > thinks it is suitable to move forward.
>> >
>> >
>> > Thanks,
>> >
>> > Rong
>> >
>> >
>> >
>> > [1]
>> >
>> >
>> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.yevpiesfln1m
>> >
>> > [2]
>> >
>> >
>> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.j0v7ubhwfl0y
>> >
>> > [3]
>> >
>> >
>> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java
>> >
>> > [4]
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction
>> >
>> > [5]
>> >
>> >
>> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
>> >
>> > [6] https://issues.apache.org/jira/browse/FLINK-11454
>> >
>> > [7]
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
>> > [8] https://issues.apache.org/jira/browse/FLINK-7001
>> >
>> > On Tue, Feb 19, 2019 at 1:57 AM Kurt Young <[hidden email]> wrote:
>> >
>> > > Hi Rong,
>> > >
>> > > Thanks for the improvement proposal, this topic aroused my interest
>> since
>> > > we did some similar improvements in Blink.
>> > > After going through your design doc, i would like share some thoughts:
>> > >
>> > > 1. It looks to me the proposed SliceManager and MergeManager is quite
>> > > generic and can be used in various situations like unaligned windows.
>> > > However
>> > > this will introduce some new API and some new stream operators, and
>> thus
>> > > make is a big effort to do. I suppose the main reason to introduce
>> these
>> > > concepts is
>> > > we want to support some complex scenarios like unaligned windows,
>> multi
>> > > timeout session windows and so on. But if let's go back to the
>> original
>> > > motivation of this
>> > > improvement, it will be "Highly overlapping window operation" and
>> "Window
>> > > functions with efficient methods calculating and merging partial
>> > results",
>> > > just as you mentioned
>> > > in the section "Target Usage Pattern". I'm curious if it's necessary
>> to
>> > > introduce all these new APIs and operators to meet these requirements.
>> > If i
>> > > understand you correctly,
>> > > we may even can achieve the goal by using two successive windows,
>> first
>> > > with tumbling window, and then sliding window. And from the
>> experiences
>> > we
>> > > had in Blink, by adding
>> > > some enhancements to original window operator may also be sufficient.
>> > Does
>> > > it make sense to you if we can first try to improve the target
>> scenario
>> > > without adding new APIs and
>> > >  operators? It will definitely make it more easy to review and merged.
>> > >
>> > > 2. From what we observed after Blink did similar improvements, slice
>> or
>> > > pane based improvement is not a silver bullet for all kinds of
>> > situations.
>> > > In most cases, we only
>> > > observed small performance gain like 20% or 30%. The main reason is,
>> if
>> > we
>> > > want get big gain from this improvement, normally we require the
>> window
>> > > function have efficient
>> > > calculating and merging methods, like SUM, COUNT. If this assumption
>> > > stands, the basic approach of window operator will also be
>> performant. It
>> > > stored compact intermedia result for
>> > > each window per key, and it's also very efficient to get result based
>> on
>> > > this. The only downside is we need to do the calculation in multiply
>> > > windows per key, but based on the
>> > > assumption, the calculation is efficient and may not be a blocker.
>> Based
>> > on
>> > > this, i think we must be more careful about the changes, especially
>> API
>> > > related.
>> > >
>> > > 3. The last thing in my mind is whether we can share some
>> implementations
>> > > with blink's window operator. We introduced a new window operator in
>> > Blink
>> > > SQL, which i think needs to be
>> > > discussed when we try to adopt blink improvements. Without strong
>> > reasons,
>> > > i think it's better to share some implementations with DataStream's
>> > window
>> > > operator. Maybe we don't need
>> > >  to share the whole operator, but at least some underlying concepts
>> and
>> > > data structures. It would also be great if you can think about it and
>> see
>> > > if it's a feasible way.
>> > >
>> > > Last small question: how do you handle sliding windows whose window
>> size
>> > > can't be divided by step, such as 10 seconds window and slide with 3
>> > > seconds?
>> > >
>> > > Best,
>> > > Kurt
>> > >
>> > >
>> > > On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <[hidden email]>
>> wrote:
>> > >
>> > > > Thank you Rong!
>> > > > The performance of sliding windows is an issue for many users.
>> > > > Adding support for a more efficient window is a great effort.
>> > > >
>> > > > Thank you,
>> > > > Fabian
>> > > >
>> > > > Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong <
>> > > [hidden email]
>> > > > >:
>> > > >
>> > > > > Hi all,
>> > > > >
>> > > > > Thanks for the feedbacks and suggestions to the design doc. I have
>> > > > created
>> > > > > a parent JIRA [1] to track the related tasks and started the
>> > > > implementation
>> > > > > process
>> > > > > Any further feedbacks or suggestions are highly appreciated.
>> > > > >
>> > > > > Best,
>> > > > > Rong
>> > > > >
>> > > > > [1] https://issues.apache.org/jira/browse/FLINK-11276
>> > > > >
>> > > > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <[hidden email]>
>> > wrote:
>> > > > >
>> > > > > > Hi all,
>> > > > > >
>> > > > > > Various discussion in the mailing list & JIRA tickets [2] had
>> been
>> > > > > brought
>> > > > > > up in the past regarding the windowing operation performance.
>> As we
>> > > > > > experiment internally with some of our extreme use cases, we
>> found
>> > > out
>> > > > > that
>> > > > > > using a slice-based implementation can optimize Flink's
>> windowing
>> > > > > mechanism
>> > > > > > and provide a better performance in most cases.
>> > > > > >
>> > > > > > We've put together a preliminary enhancement and performance
>> > > > optimization
>> > > > > > plan [1] for the current windowing operation in Flink. This is
>> > > largely
>> > > > > > inspired by stream slicing research shared in recent Flink
>> Forward
>> > > > > > conference [3] by Philip and Jonas, and the discussion in the
>> main
>> > > JIRA
>> > > > > > ticket [2]. The initial design and POC implementations consider
>> > > > > optimizing
>> > > > > > the performance for the category of overlapping windows as well
>> as
>> > > > > allowing
>> > > > > > chaining of cascade window operators.
>> > > > > >
>> > > > > > It will be great to hear the feedbacks and suggestions from the
>> > > > > community.
>> > > > > > Please kindly share your comments and suggestions.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Rong
>> > > > > >
>> > > > > > [1]
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing
>> > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7001
>> > > > > > [3]
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>