Hi all,
Various discussion in the mailing list & JIRA tickets [2] had been brought up in the past regarding the windowing operation performance. As we experiment internally with some of our extreme use cases, we found out that using a slice-based implementation can optimize Flink's windowing mechanism and provide a better performance in most cases. We've put together a preliminary enhancement and performance optimization plan [1] for the current windowing operation in Flink. This is largely inspired by stream slicing research shared in recent Flink Forward conference [3] by Philip and Jonas, and the discussion in the main JIRA ticket [2]. The initial design and POC implementations consider optimizing the performance for the category of overlapping windows as well as allowing chaining of cascade window operators. It will be great to hear the feedbacks and suggestions from the community. Please kindly share your comments and suggestions. Thanks, Rong [1] https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing [2] https://issues.apache.org/jira/browse/FLINK-7001 [3] https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing |
Hi all,
Thanks for the feedbacks and suggestions to the design doc. I have created a parent JIRA [1] to track the related tasks and started the implementation process Any further feedbacks or suggestions are highly appreciated. Best, Rong [1] https://issues.apache.org/jira/browse/FLINK-11276 On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <[hidden email]> wrote: > Hi all, > > Various discussion in the mailing list & JIRA tickets [2] had been brought > up in the past regarding the windowing operation performance. As we > experiment internally with some of our extreme use cases, we found out that > using a slice-based implementation can optimize Flink's windowing mechanism > and provide a better performance in most cases. > > We've put together a preliminary enhancement and performance optimization > plan [1] for the current windowing operation in Flink. This is largely > inspired by stream slicing research shared in recent Flink Forward > conference [3] by Philip and Jonas, and the discussion in the main JIRA > ticket [2]. The initial design and POC implementations consider optimizing > the performance for the category of overlapping windows as well as allowing > chaining of cascade window operators. > > It will be great to hear the feedbacks and suggestions from the community. > Please kindly share your comments and suggestions. > > Thanks, > Rong > > [1] > https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing > [2] https://issues.apache.org/jira/browse/FLINK-7001 > [3] > https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing > > > > |
Thank you Rong!
The performance of sliding windows is an issue for many users. Adding support for a more efficient window is a great effort. Thank you, Fabian Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong <[hidden email]>: > Hi all, > > Thanks for the feedbacks and suggestions to the design doc. I have created > a parent JIRA [1] to track the related tasks and started the implementation > process > Any further feedbacks or suggestions are highly appreciated. > > Best, > Rong > > [1] https://issues.apache.org/jira/browse/FLINK-11276 > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <[hidden email]> wrote: > > > Hi all, > > > > Various discussion in the mailing list & JIRA tickets [2] had been > brought > > up in the past regarding the windowing operation performance. As we > > experiment internally with some of our extreme use cases, we found out > that > > using a slice-based implementation can optimize Flink's windowing > mechanism > > and provide a better performance in most cases. > > > > We've put together a preliminary enhancement and performance optimization > > plan [1] for the current windowing operation in Flink. This is largely > > inspired by stream slicing research shared in recent Flink Forward > > conference [3] by Philip and Jonas, and the discussion in the main JIRA > > ticket [2]. The initial design and POC implementations consider > optimizing > > the performance for the category of overlapping windows as well as > allowing > > chaining of cascade window operators. > > > > It will be great to hear the feedbacks and suggestions from the > community. > > Please kindly share your comments and suggestions. > > > > Thanks, > > Rong > > > > [1] > > > https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing > > [2] https://issues.apache.org/jira/browse/FLINK-7001 > > [3] > > > https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing > > > > > > > > > |
Hi Rong,
Thanks for the improvement proposal, this topic aroused my interest since we did some similar improvements in Blink. After going through your design doc, i would like share some thoughts: 1. It looks to me the proposed SliceManager and MergeManager is quite generic and can be used in various situations like unaligned windows. However this will introduce some new API and some new stream operators, and thus make is a big effort to do. I suppose the main reason to introduce these concepts is we want to support some complex scenarios like unaligned windows, multi timeout session windows and so on. But if let's go back to the original motivation of this improvement, it will be "Highly overlapping window operation" and "Window functions with efficient methods calculating and merging partial results", just as you mentioned in the section "Target Usage Pattern". I'm curious if it's necessary to introduce all these new APIs and operators to meet these requirements. If i understand you correctly, we may even can achieve the goal by using two successive windows, first with tumbling window, and then sliding window. And from the experiences we had in Blink, by adding some enhancements to original window operator may also be sufficient. Does it make sense to you if we can first try to improve the target scenario without adding new APIs and operators? It will definitely make it more easy to review and merged. 2. From what we observed after Blink did similar improvements, slice or pane based improvement is not a silver bullet for all kinds of situations. In most cases, we only observed small performance gain like 20% or 30%. The main reason is, if we want get big gain from this improvement, normally we require the window function have efficient calculating and merging methods, like SUM, COUNT. If this assumption stands, the basic approach of window operator will also be performant. It stored compact intermedia result for each window per key, and it's also very efficient to get result based on this. The only downside is we need to do the calculation in multiply windows per key, but based on the assumption, the calculation is efficient and may not be a blocker. Based on this, i think we must be more careful about the changes, especially API related. 3. The last thing in my mind is whether we can share some implementations with blink's window operator. We introduced a new window operator in Blink SQL, which i think needs to be discussed when we try to adopt blink improvements. Without strong reasons, i think it's better to share some implementations with DataStream's window operator. Maybe we don't need to share the whole operator, but at least some underlying concepts and data structures. It would also be great if you can think about it and see if it's a feasible way. Last small question: how do you handle sliding windows whose window size can't be divided by step, such as 10 seconds window and slide with 3 seconds? Best, Kurt On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <[hidden email]> wrote: > Thank you Rong! > The performance of sliding windows is an issue for many users. > Adding support for a more efficient window is a great effort. > > Thank you, > Fabian > > Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong <[hidden email] > >: > > > Hi all, > > > > Thanks for the feedbacks and suggestions to the design doc. I have > created > > a parent JIRA [1] to track the related tasks and started the > implementation > > process > > Any further feedbacks or suggestions are highly appreciated. > > > > Best, > > Rong > > > > [1] https://issues.apache.org/jira/browse/FLINK-11276 > > > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <[hidden email]> wrote: > > > > > Hi all, > > > > > > Various discussion in the mailing list & JIRA tickets [2] had been > > brought > > > up in the past regarding the windowing operation performance. As we > > > experiment internally with some of our extreme use cases, we found out > > that > > > using a slice-based implementation can optimize Flink's windowing > > mechanism > > > and provide a better performance in most cases. > > > > > > We've put together a preliminary enhancement and performance > optimization > > > plan [1] for the current windowing operation in Flink. This is largely > > > inspired by stream slicing research shared in recent Flink Forward > > > conference [3] by Philip and Jonas, and the discussion in the main JIRA > > > ticket [2]. The initial design and POC implementations consider > > optimizing > > > the performance for the category of overlapping windows as well as > > allowing > > > chaining of cascade window operators. > > > > > > It will be great to hear the feedbacks and suggestions from the > > community. > > > Please kindly share your comments and suggestions. > > > > > > Thanks, > > > Rong > > > > > > [1] > > > > > > https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing > > > [2] https://issues.apache.org/jira/browse/FLINK-7001 > > > [3] > > > > > > https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing > > > > > > > > > > > > > > > |
Hi All,
Thanks for sharing feedbacks for the window optimization design doc and on the discussion JIRAs @Jincheng, @Kurt, @Jark and @Fabian. These are very valuable feedbacks and we will try to incorporate them in the next step. There were several revision done for the current design doc, and several POCs being developed since we initially shared the document. Thus, some of the following might’ve already been addressed in other places. However, I would still like to summarize them since these points were raised up in various scenarios. 1) Scope of the window optimization using slicing. The original scope of the doc was to address the problem of element-to-window duplication when sliding window goes across wide range with narrow slides (with or w/o efficient partial aggregations). However, as we develop the 2 POCs [1,2] we found out there’s really no one-solution-fits-all approach to how this can be optimized (same observation as Kurt and Jark mentioned). Thus some further expansion of the scope was done: 1a). Directly improving WindowOperator([1, 3]) In the design doc, the PartialWindowFunction was designed as a cue for WindowOperator to choose how to optimize the sliding window. This was not working well because: how efficient window operator process messages depends on: 1. the pattern of the window; 2. the pattern of the incoming traffic; and 3. the complexity of the window process function. The idea of having users to specify the complexity of each API (e.g. accumulate, merge, retract) can help the system choose the path of optimization, however as this stage the it is hard to utilize this information. 1b). Explicitly allow slicing and merging steps([2]), or more generic window chaining: With the second POC, it is users' responsibility to create an efficient window operator(s), since now user can control the state and the process individually. This opens up more possibilities: e.g. unaligned / multi-timeout session window, cascade windows. However, with this API change, users are given much more flexibilities. In conclusion to (1), I think one of the clear feedbacks is that this effort goes beyond just improving the sliding window performance. If we think it is the right way to expand the window API, there should be a proper API change discussion: As Kurt mentioned, if any of these cases can be covered with existing API, we should always exhaust that first. (Further discussion in (2)) 2). The motivation of changing the public DataStream APIs related to window-based processing. As Jincheng and Kurt mentioned, utilizing the current windowed stream API can achieve some of the results. For example, using a tumble follow by a sliding window can reduce the amount of duplication since the tumbling window results will be passed to the sliding window as one element. However, when we experiment, the are several problems I observed: 2a). rehashing with extra keyBy The limitation requires a separate set of keyBy following the tumble result, which creates unnecessary complexity. One work around was to use "reinterpretAsKeyedStream" as Fabian mentioned, however there are many investigation to be done to ensure it works well (e.g. how does the keyed state store work since the key was not sets as a part of the namespace within WindowOperator) 2b). Handling iterable elements Another problem is if there’s no efficient partial results produce by the tumbling window, the output element (as an iterable) will still be duplicated into each of the downstream sliding window states. However, this is actually a tricky problem by itself, further discussion in (2d) for this piece. 2c). Additional support for some of the more complex use cases As described in (1b), with the explicit control of the window state and the window function, users can control how the optimization of the windowing can be done. One will argue that these type of optimization can also be done using ProcessFunction or other rich functions. However, if we take a look at how the WindowOperator is currently structured, it also has explicit aggregation + windowing APIs [4] presented for the WindowedStream. Another point is opening up: multi-slide windowing and multi-timeout window described in the presentation [5] in FF. I would also like to raise a point that the way how the current POC [2] changes the public API is just a proof of concept that the flexibilities opens more complex use cases. It is by no means the final design and I agree that there could be more-aggressive-than-necessary changes since it’s meant for a POC. 2d). The support for same-window operator (similar to Table/SQL OVER-aggregate) As discussed with Jincheng in a separate thread. Supporting the same window operator similar to how OVER-aggregate was achieved in Table/SQL API. It is definitely useful in some of the window use cases (2b). For example, A process function without efficient partial aggregation results on sliding window can be very efficiently evaluated since both add and retract operation on an iterable list is cheap. There are some fundamental difference between the current window operator vs the same-window operator concept, for example the trigger mechanism is entirely different; and it will be a challenge to incorporate OVER-aggregate of TableAPI since there are many table specific components in the bounded/unbounded over aggregates. However, I am seeing this as an implementation details (further discussion in (3a)). In conclusion to (2), I think the discussions remains whether to expand the API if we introduce sliding: if no, whether we can exhaust all the use cases with current API; if yes, how that extra API will look like. Especially the same-window operation in the DataStream API. (Utilizing whatever we already have in Table/SQL is one solution). 3). Incorporating Existing POCs and Implementations (e.g. Blink’s WindowOperator & SQL/Table API) There are already some implementations exist resembles the idea of slicing. For example Blink’s WindowOperator [3]. My thinking is once we agree on the API and the public facing changes, we should reuse as much of the current implementations as possible, since Blink has been running in production for years (please correct me if I am wrong @Jark/Jincheng/Kurt :-D ). Regarding the implementation details and some existing approaches. 3a). Over-aggregate vs. window-operator. Over aggregate has already been implemented in Table/SQL API. Unlike window operators, over aggregate utilizes the process function and a general process operator approach. Further looking into how the current window operator works comparing with the over aggregate, there are many *common implementations*. For example, the window state (especially the list-based approach); the triggering mechanism / timer service (especially on rowtime); and the “evicting policies”. I was wondering if it is possible to consider, on an operator level, support: 1. The same-window operator concept as a generic case of the over-aggregate, and 2. Create a more comment window operator base that generalize the state, trigger and eviction context. 3b). The support for partial process function: retracting vs. merge. Supporting retracting operations is definitely a plus in some use cases, for example like Fabian suggested in the JIRA discussion [6]. Currently this already exist in the Table/SQL API aggregation function. 3c). States and Chaining Strategy With the chaining strategy [7] in the operators, I was wondering whether there’s performance consideration when implementing the sliding window as one operator, or multiple operator chaining together. One question I think worth investigating is what is the most efficient way of storing the slicing information like Jark suggested in the JIRA discussion [8]. As I tried to capture as many discussions here as possible, I can hardly imagine that we’ve address all of these questions. So please share your comment and feedbacks as they are highly appreciated! I can definitely volunteer will start a design/discussion doc for this effort if everyone thinks it is suitable to move forward. Thanks, Rong [1] https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.yevpiesfln1m [2] https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.j0v7ubhwfl0y [3] https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java [4] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction [5] https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing [6] https://issues.apache.org/jira/browse/FLINK-11454 [7] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups [8] https://issues.apache.org/jira/browse/FLINK-7001 On Tue, Feb 19, 2019 at 1:57 AM Kurt Young <[hidden email]> wrote: > Hi Rong, > > Thanks for the improvement proposal, this topic aroused my interest since > we did some similar improvements in Blink. > After going through your design doc, i would like share some thoughts: > > 1. It looks to me the proposed SliceManager and MergeManager is quite > generic and can be used in various situations like unaligned windows. > However > this will introduce some new API and some new stream operators, and thus > make is a big effort to do. I suppose the main reason to introduce these > concepts is > we want to support some complex scenarios like unaligned windows, multi > timeout session windows and so on. But if let's go back to the original > motivation of this > improvement, it will be "Highly overlapping window operation" and "Window > functions with efficient methods calculating and merging partial results", > just as you mentioned > in the section "Target Usage Pattern". I'm curious if it's necessary to > introduce all these new APIs and operators to meet these requirements. If i > understand you correctly, > we may even can achieve the goal by using two successive windows, first > with tumbling window, and then sliding window. And from the experiences we > had in Blink, by adding > some enhancements to original window operator may also be sufficient. Does > it make sense to you if we can first try to improve the target scenario > without adding new APIs and > operators? It will definitely make it more easy to review and merged. > > 2. From what we observed after Blink did similar improvements, slice or > pane based improvement is not a silver bullet for all kinds of situations. > In most cases, we only > observed small performance gain like 20% or 30%. The main reason is, if we > want get big gain from this improvement, normally we require the window > function have efficient > calculating and merging methods, like SUM, COUNT. If this assumption > stands, the basic approach of window operator will also be performant. It > stored compact intermedia result for > each window per key, and it's also very efficient to get result based on > this. The only downside is we need to do the calculation in multiply > windows per key, but based on the > assumption, the calculation is efficient and may not be a blocker. Based on > this, i think we must be more careful about the changes, especially API > related. > > 3. The last thing in my mind is whether we can share some implementations > with blink's window operator. We introduced a new window operator in Blink > SQL, which i think needs to be > discussed when we try to adopt blink improvements. Without strong reasons, > i think it's better to share some implementations with DataStream's window > operator. Maybe we don't need > to share the whole operator, but at least some underlying concepts and > data structures. It would also be great if you can think about it and see > if it's a feasible way. > > Last small question: how do you handle sliding windows whose window size > can't be divided by step, such as 10 seconds window and slide with 3 > seconds? > > Best, > Kurt > > > On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <[hidden email]> wrote: > > > Thank you Rong! > > The performance of sliding windows is an issue for many users. > > Adding support for a more efficient window is a great effort. > > > > Thank you, > > Fabian > > > > Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong < > [hidden email] > > >: > > > > > Hi all, > > > > > > Thanks for the feedbacks and suggestions to the design doc. I have > > created > > > a parent JIRA [1] to track the related tasks and started the > > implementation > > > process > > > Any further feedbacks or suggestions are highly appreciated. > > > > > > Best, > > > Rong > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-11276 > > > > > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <[hidden email]> wrote: > > > > > > > Hi all, > > > > > > > > Various discussion in the mailing list & JIRA tickets [2] had been > > > brought > > > > up in the past regarding the windowing operation performance. As we > > > > experiment internally with some of our extreme use cases, we found > out > > > that > > > > using a slice-based implementation can optimize Flink's windowing > > > mechanism > > > > and provide a better performance in most cases. > > > > > > > > We've put together a preliminary enhancement and performance > > optimization > > > > plan [1] for the current windowing operation in Flink. This is > largely > > > > inspired by stream slicing research shared in recent Flink Forward > > > > conference [3] by Philip and Jonas, and the discussion in the main > JIRA > > > > ticket [2]. The initial design and POC implementations consider > > > optimizing > > > > the performance for the category of overlapping windows as well as > > > allowing > > > > chaining of cascade window operators. > > > > > > > > It will be great to hear the feedbacks and suggestions from the > > > community. > > > > Please kindly share your comments and suggestions. > > > > > > > > Thanks, > > > > Rong > > > > > > > > [1] > > > > > > > > > > https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing > > > > [2] https://issues.apache.org/jira/browse/FLINK-7001 > > > > [3] > > > > > > > > > > https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing > > > > > > > > > > > > > > > > > > > > > > |
Hi Rong,
Thanks for the detailed summarization! It indeed involves lots of problems and unanswered questions which is i think not practical to solve in one shot. From my point of view, the performance issue with the sliding window is the root one and maybe most possible which user will run into. Thus i think the first question should be answered is: "What kind of improvements we can do with the sliding window scenario?" First, we should try to figure out how many improvements can be done without changing or adding new API. This can reuse the POC you did and the work blink had been done, we can share some ideas and maybe some implementation details. And it already involves lots of efforts even if we only do this one thing. It may introduce some refactory to current window operator, and we should keep it compatible with old version. After this, we can release it in next version and gather some users feedbacks. We can further answer the question: "Does the improvements cover most use cases? Are there any critical ones which is impossible to do with current window operator?". At that time, we can open the discussions to introduce some new API to meet the requirements. It will introduce more work than improve window operator internally when we decide to add new APIs, which you have covered a lot in your proposal. Actually, the approaches you proposed looks good to me, take it step by step is a more practical way. Best, Kurt On Fri, Feb 22, 2019 at 2:58 AM Rong Rong <[hidden email]> wrote: > Hi All, > > Thanks for sharing feedbacks for the window optimization design doc and on > the discussion JIRAs @Jincheng, @Kurt, @Jark and @Fabian. These are very > valuable feedbacks and we will try to incorporate them in the next step. > > There were several revision done for the current design doc, and several > POCs being developed since we initially shared the document. Thus, some of > the following might’ve already been addressed in other places. However, I > would still like to summarize them since these points were raised up in > various scenarios. > > 1) Scope of the window optimization using slicing. > > The original scope of the doc was to address the problem of > element-to-window duplication when sliding window goes across wide range > with narrow slides (with or w/o efficient partial aggregations). However, > as we develop the 2 POCs [1,2] we found out there’s really no > one-solution-fits-all approach to how this can be optimized (same > observation as Kurt and Jark mentioned). Thus some further expansion of the > scope was done: > > 1a). Directly improving WindowOperator([1, 3]) > > In the design doc, the PartialWindowFunction was designed as a cue for > WindowOperator to choose how to optimize the sliding window. This was not > working well because: how efficient window operator process messages > depends on: 1. the pattern of the window; 2. the pattern of the incoming > traffic; and 3. the complexity of the window process function. > > The idea of having users to specify the complexity of each API (e.g. > accumulate, merge, retract) can help the system choose the path of > optimization, however as this stage the it is hard to utilize this > information. > > 1b). Explicitly allow slicing and merging steps([2]), or more generic > window chaining: > > With the second POC, it is users' responsibility to create an efficient > window operator(s), since now user can control the state and the process > individually. This opens up more possibilities: e.g. unaligned / > multi-timeout session window, cascade windows. > > However, with this API change, users are given much more flexibilities. > > In conclusion to (1), I think one of the clear feedbacks is that this > effort goes beyond just improving the sliding window performance. If we > think it is the right way to expand the window API, there should be a > proper API change discussion: As Kurt mentioned, if any of these cases can > be covered with existing API, we should always exhaust that first. (Further > discussion in (2)) > > > 2). The motivation of changing the public DataStream APIs related to > window-based processing. > > As Jincheng and Kurt mentioned, utilizing the current windowed stream API > can achieve some of the results. For example, using a tumble follow by a > sliding window can reduce the amount of duplication since the tumbling > window results will be passed to the sliding window as one element. > However, when we experiment, the are several problems I observed: > > 2a). rehashing with extra keyBy > > The limitation requires a separate set of keyBy following the tumble > result, which creates unnecessary complexity. One work around was to use > "reinterpretAsKeyedStream" as Fabian mentioned, however there are many > investigation to be done to ensure it works well (e.g. how does the keyed > state store work since the key was not sets as a part of the namespace > within WindowOperator) > > 2b). Handling iterable elements > > Another problem is if there’s no efficient partial results produce by the > tumbling window, the output element (as an iterable) will still be > duplicated into each of the downstream sliding window states. > > However, this is actually a tricky problem by itself, further discussion in > (2d) for this piece. > > 2c). Additional support for some of the more complex use cases > > As described in (1b), with the explicit control of the window state and the > window function, users can control how the optimization of the windowing > can be done. One will argue that these type of optimization can also be > done using ProcessFunction or other rich functions. > > However, if we take a look at how the WindowOperator is currently > structured, it also has explicit aggregation + windowing APIs [4] presented > for the WindowedStream. > > Another point is opening up: multi-slide windowing and multi-timeout window > described in the presentation [5] in FF. > > I would also like to raise a point that the way how the current POC [2] > changes the public API is just a proof of concept that the flexibilities > opens more complex use cases. It is by no means the final design and I > agree that there could be more-aggressive-than-necessary changes since it’s > meant for a POC. > > 2d). The support for same-window operator (similar to Table/SQL > OVER-aggregate) > > As discussed with Jincheng in a separate thread. Supporting the same window > operator similar to how OVER-aggregate was achieved in Table/SQL API. It is > definitely useful in some of the window use cases (2b). For example, A > process function without efficient partial aggregation results on sliding > window can be very efficiently evaluated since both add and retract > operation on an iterable list is cheap. > > There are some fundamental difference between the current window operator > vs the same-window operator concept, for example the trigger mechanism is > entirely different; and it will be a challenge to incorporate > OVER-aggregate of TableAPI since there are many table specific components > in the bounded/unbounded over aggregates. However, I am seeing this as an > implementation details (further discussion in (3a)). > > In conclusion to (2), I think the discussions remains whether to expand the > API if we introduce sliding: if no, whether we can exhaust all the use > cases with current API; if yes, how that extra API will look like. > Especially the same-window operation in the DataStream API. (Utilizing > whatever we already have in Table/SQL is one solution). > > > 3). Incorporating Existing POCs and Implementations (e.g. Blink’s > WindowOperator & SQL/Table API) > > There are already some implementations exist resembles the idea of slicing. > For example Blink’s WindowOperator [3]. My thinking is once we agree on the > API and the public facing changes, we should reuse as much of the current > implementations as possible, since Blink has been running in production for > years (please correct me if I am wrong @Jark/Jincheng/Kurt :-D ). > > > Regarding the implementation details and some existing approaches. > > 3a). Over-aggregate vs. window-operator. > > Over aggregate has already been implemented in Table/SQL API. Unlike window > operators, over aggregate utilizes the process function and a general > process operator approach. > > Further looking into how the current window operator works comparing with > the over aggregate, there are many *common implementations*. For example, > the window state (especially the list-based approach); the triggering > mechanism / timer service (especially on rowtime); and the “evicting > policies”. > > I was wondering if it is possible to consider, on an operator level, > support: 1. The same-window operator concept as a generic case of the > over-aggregate, and 2. Create a more comment window operator base that > generalize the state, trigger and eviction context. > > 3b). The support for partial process function: retracting vs. merge. > > Supporting retracting operations is definitely a plus in some use cases, > for example like Fabian suggested in the JIRA discussion [6]. Currently > this already exist in the Table/SQL API aggregation function. > > 3c). States and Chaining Strategy > > With the chaining strategy [7] in the operators, I was wondering whether > there’s performance consideration when implementing the sliding window as > one operator, or multiple operator chaining together. One question I think > worth investigating is what is the most efficient way of storing the > slicing information like Jark suggested in the JIRA discussion [8]. > > > As I tried to capture as many discussions here as possible, I can hardly > imagine that we’ve address all of these questions. So please share your > comment and feedbacks as they are highly appreciated! I can definitely > volunteer will start a design/discussion doc for this effort if everyone > thinks it is suitable to move forward. > > > Thanks, > > Rong > > > > [1] > > https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.yevpiesfln1m > > [2] > > https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.j0v7ubhwfl0y > > [3] > > https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java > > [4] > > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction > > [5] > > https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing > > [6] https://issues.apache.org/jira/browse/FLINK-11454 > > [7] > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups > [8] https://issues.apache.org/jira/browse/FLINK-7001 > > On Tue, Feb 19, 2019 at 1:57 AM Kurt Young <[hidden email]> wrote: > > > Hi Rong, > > > > Thanks for the improvement proposal, this topic aroused my interest since > > we did some similar improvements in Blink. > > After going through your design doc, i would like share some thoughts: > > > > 1. It looks to me the proposed SliceManager and MergeManager is quite > > generic and can be used in various situations like unaligned windows. > > However > > this will introduce some new API and some new stream operators, and thus > > make is a big effort to do. I suppose the main reason to introduce these > > concepts is > > we want to support some complex scenarios like unaligned windows, multi > > timeout session windows and so on. But if let's go back to the original > > motivation of this > > improvement, it will be "Highly overlapping window operation" and "Window > > functions with efficient methods calculating and merging partial > results", > > just as you mentioned > > in the section "Target Usage Pattern". I'm curious if it's necessary to > > introduce all these new APIs and operators to meet these requirements. > If i > > understand you correctly, > > we may even can achieve the goal by using two successive windows, first > > with tumbling window, and then sliding window. And from the experiences > we > > had in Blink, by adding > > some enhancements to original window operator may also be sufficient. > Does > > it make sense to you if we can first try to improve the target scenario > > without adding new APIs and > > operators? It will definitely make it more easy to review and merged. > > > > 2. From what we observed after Blink did similar improvements, slice or > > pane based improvement is not a silver bullet for all kinds of > situations. > > In most cases, we only > > observed small performance gain like 20% or 30%. The main reason is, if > we > > want get big gain from this improvement, normally we require the window > > function have efficient > > calculating and merging methods, like SUM, COUNT. If this assumption > > stands, the basic approach of window operator will also be performant. It > > stored compact intermedia result for > > each window per key, and it's also very efficient to get result based on > > this. The only downside is we need to do the calculation in multiply > > windows per key, but based on the > > assumption, the calculation is efficient and may not be a blocker. Based > on > > this, i think we must be more careful about the changes, especially API > > related. > > > > 3. The last thing in my mind is whether we can share some implementations > > with blink's window operator. We introduced a new window operator in > Blink > > SQL, which i think needs to be > > discussed when we try to adopt blink improvements. Without strong > reasons, > > i think it's better to share some implementations with DataStream's > window > > operator. Maybe we don't need > > to share the whole operator, but at least some underlying concepts and > > data structures. It would also be great if you can think about it and see > > if it's a feasible way. > > > > Last small question: how do you handle sliding windows whose window size > > can't be divided by step, such as 10 seconds window and slide with 3 > > seconds? > > > > Best, > > Kurt > > > > > > On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <[hidden email]> wrote: > > > > > Thank you Rong! > > > The performance of sliding windows is an issue for many users. > > > Adding support for a more efficient window is a great effort. > > > > > > Thank you, > > > Fabian > > > > > > Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong < > > [hidden email] > > > >: > > > > > > > Hi all, > > > > > > > > Thanks for the feedbacks and suggestions to the design doc. I have > > > created > > > > a parent JIRA [1] to track the related tasks and started the > > > implementation > > > > process > > > > Any further feedbacks or suggestions are highly appreciated. > > > > > > > > Best, > > > > Rong > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-11276 > > > > > > > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <[hidden email]> > wrote: > > > > > > > > > Hi all, > > > > > > > > > > Various discussion in the mailing list & JIRA tickets [2] had been > > > > brought > > > > > up in the past regarding the windowing operation performance. As we > > > > > experiment internally with some of our extreme use cases, we found > > out > > > > that > > > > > using a slice-based implementation can optimize Flink's windowing > > > > mechanism > > > > > and provide a better performance in most cases. > > > > > > > > > > We've put together a preliminary enhancement and performance > > > optimization > > > > > plan [1] for the current windowing operation in Flink. This is > > largely > > > > > inspired by stream slicing research shared in recent Flink Forward > > > > > conference [3] by Philip and Jonas, and the discussion in the main > > JIRA > > > > > ticket [2]. The initial design and POC implementations consider > > > > optimizing > > > > > the performance for the category of overlapping windows as well as > > > > allowing > > > > > chaining of cascade window operators. > > > > > > > > > > It will be great to hear the feedbacks and suggestions from the > > > > community. > > > > > Please kindly share your comments and suggestions. > > > > > > > > > > Thanks, > > > > > Rong > > > > > > > > > > [1] > > > > > > > > > > > > > > > https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7001 > > > > > [3] > > > > > > > > > > > > > > > https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Hi Kurt,
Thanks for the valuable feedback. I think the suggestions you and Jincheng provided are definitely the best execution plan - Starting with sliding window optimization by exhausting the current public API, there are some components we can leverage or directly reuse from Blink's window operator [1] implementation. - Backward compatibility is definitely an issue as any state changes will probably result in a non-trivial state upgrade. I can definitely follow up with you on this. At the same time I think it is also a good idea to summarize all the use cases that has been discussed so far. This can be very valuable as a reference: To answer the questions "Does the improvements cover most use cases?" and when not covered, "whether introduce some new API can meet the requirements?". I will try to convert the current parent JIRA [2] into one that does not include public API alternation. As you mentioned this will be much more practical way in terms of execution. Many thanks for the suggestions and guidance! -- Rong [1] https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java [2] https://issues.apache.org/jira/browse/FLINK-11454 On Mon, Feb 25, 2019 at 3:44 AM Kurt Young <[hidden email]> wrote: > Hi Rong, > > Thanks for the detailed summarization! It indeed involves lots of problems > and unanswered questions which is i think not practical to > solve in one shot. From my point of view, the performance issue with the > sliding window is the root one and maybe most possible > which user will run into. Thus i think the first question should be > answered is: > "What kind of improvements we can do with the sliding window scenario?" > > First, we should try to figure out how many improvements can be done > without changing or adding new API. This can reuse the POC you did > and the work blink had been done, we can share some ideas and maybe some > implementation details. And it already involves lots of efforts > even if we only do this one thing. It may introduce some refactory to > current window operator, and we should keep it compatible with old version. > > After this, we can release it in next version and gather some users > feedbacks. We can further answer the question: "Does the improvements cover > most use cases? Are there any critical ones which is impossible to do with > current window operator?". At that time, we can open the discussions to > introduce some new API to meet the requirements. > > It will introduce more work than improve window operator internally when we > decide to add new APIs, which you have covered a lot in your proposal. > Actually, the approaches you proposed looks good to me, take it step by > step is a more practical way. > > Best, > Kurt > > > On Fri, Feb 22, 2019 at 2:58 AM Rong Rong <[hidden email]> wrote: > > > Hi All, > > > > Thanks for sharing feedbacks for the window optimization design doc and > on > > the discussion JIRAs @Jincheng, @Kurt, @Jark and @Fabian. These are very > > valuable feedbacks and we will try to incorporate them in the next step. > > > > There were several revision done for the current design doc, and several > > POCs being developed since we initially shared the document. Thus, some > of > > the following might’ve already been addressed in other places. However, I > > would still like to summarize them since these points were raised up in > > various scenarios. > > > > 1) Scope of the window optimization using slicing. > > > > The original scope of the doc was to address the problem of > > element-to-window duplication when sliding window goes across wide range > > with narrow slides (with or w/o efficient partial aggregations). However, > > as we develop the 2 POCs [1,2] we found out there’s really no > > one-solution-fits-all approach to how this can be optimized (same > > observation as Kurt and Jark mentioned). Thus some further expansion of > the > > scope was done: > > > > 1a). Directly improving WindowOperator([1, 3]) > > > > In the design doc, the PartialWindowFunction was designed as a cue for > > WindowOperator to choose how to optimize the sliding window. This was not > > working well because: how efficient window operator process messages > > depends on: 1. the pattern of the window; 2. the pattern of the incoming > > traffic; and 3. the complexity of the window process function. > > > > The idea of having users to specify the complexity of each API (e.g. > > accumulate, merge, retract) can help the system choose the path of > > optimization, however as this stage the it is hard to utilize this > > information. > > > > 1b). Explicitly allow slicing and merging steps([2]), or more generic > > window chaining: > > > > With the second POC, it is users' responsibility to create an efficient > > window operator(s), since now user can control the state and the process > > individually. This opens up more possibilities: e.g. unaligned / > > multi-timeout session window, cascade windows. > > > > However, with this API change, users are given much more flexibilities. > > > > In conclusion to (1), I think one of the clear feedbacks is that this > > effort goes beyond just improving the sliding window performance. If we > > think it is the right way to expand the window API, there should be a > > proper API change discussion: As Kurt mentioned, if any of these cases > can > > be covered with existing API, we should always exhaust that first. > (Further > > discussion in (2)) > > > > > > 2). The motivation of changing the public DataStream APIs related to > > window-based processing. > > > > As Jincheng and Kurt mentioned, utilizing the current windowed stream API > > can achieve some of the results. For example, using a tumble follow by a > > sliding window can reduce the amount of duplication since the tumbling > > window results will be passed to the sliding window as one element. > > However, when we experiment, the are several problems I observed: > > > > 2a). rehashing with extra keyBy > > > > The limitation requires a separate set of keyBy following the tumble > > result, which creates unnecessary complexity. One work around was to use > > "reinterpretAsKeyedStream" as Fabian mentioned, however there are many > > investigation to be done to ensure it works well (e.g. how does the keyed > > state store work since the key was not sets as a part of the namespace > > within WindowOperator) > > > > 2b). Handling iterable elements > > > > Another problem is if there’s no efficient partial results produce by the > > tumbling window, the output element (as an iterable) will still be > > duplicated into each of the downstream sliding window states. > > > > However, this is actually a tricky problem by itself, further discussion > in > > (2d) for this piece. > > > > 2c). Additional support for some of the more complex use cases > > > > As described in (1b), with the explicit control of the window state and > the > > window function, users can control how the optimization of the windowing > > can be done. One will argue that these type of optimization can also be > > done using ProcessFunction or other rich functions. > > > > However, if we take a look at how the WindowOperator is currently > > structured, it also has explicit aggregation + windowing APIs [4] > presented > > for the WindowedStream. > > > > Another point is opening up: multi-slide windowing and multi-timeout > window > > described in the presentation [5] in FF. > > > > I would also like to raise a point that the way how the current POC [2] > > changes the public API is just a proof of concept that the flexibilities > > opens more complex use cases. It is by no means the final design and I > > agree that there could be more-aggressive-than-necessary changes since > it’s > > meant for a POC. > > > > 2d). The support for same-window operator (similar to Table/SQL > > OVER-aggregate) > > > > As discussed with Jincheng in a separate thread. Supporting the same > window > > operator similar to how OVER-aggregate was achieved in Table/SQL API. It > is > > definitely useful in some of the window use cases (2b). For example, A > > process function without efficient partial aggregation results on sliding > > window can be very efficiently evaluated since both add and retract > > operation on an iterable list is cheap. > > > > There are some fundamental difference between the current window operator > > vs the same-window operator concept, for example the trigger mechanism is > > entirely different; and it will be a challenge to incorporate > > OVER-aggregate of TableAPI since there are many table specific components > > in the bounded/unbounded over aggregates. However, I am seeing this as an > > implementation details (further discussion in (3a)). > > > > In conclusion to (2), I think the discussions remains whether to expand > the > > API if we introduce sliding: if no, whether we can exhaust all the use > > cases with current API; if yes, how that extra API will look like. > > Especially the same-window operation in the DataStream API. (Utilizing > > whatever we already have in Table/SQL is one solution). > > > > > > 3). Incorporating Existing POCs and Implementations (e.g. Blink’s > > WindowOperator & SQL/Table API) > > > > There are already some implementations exist resembles the idea of > slicing. > > For example Blink’s WindowOperator [3]. My thinking is once we agree on > the > > API and the public facing changes, we should reuse as much of the current > > implementations as possible, since Blink has been running in production > for > > years (please correct me if I am wrong @Jark/Jincheng/Kurt :-D ). > > > > > > Regarding the implementation details and some existing approaches. > > > > 3a). Over-aggregate vs. window-operator. > > > > Over aggregate has already been implemented in Table/SQL API. Unlike > window > > operators, over aggregate utilizes the process function and a general > > process operator approach. > > > > Further looking into how the current window operator works comparing with > > the over aggregate, there are many *common implementations*. For example, > > the window state (especially the list-based approach); the triggering > > mechanism / timer service (especially on rowtime); and the “evicting > > policies”. > > > > I was wondering if it is possible to consider, on an operator level, > > support: 1. The same-window operator concept as a generic case of the > > over-aggregate, and 2. Create a more comment window operator base that > > generalize the state, trigger and eviction context. > > > > 3b). The support for partial process function: retracting vs. merge. > > > > Supporting retracting operations is definitely a plus in some use cases, > > for example like Fabian suggested in the JIRA discussion [6]. Currently > > this already exist in the Table/SQL API aggregation function. > > > > 3c). States and Chaining Strategy > > > > With the chaining strategy [7] in the operators, I was wondering whether > > there’s performance consideration when implementing the sliding window as > > one operator, or multiple operator chaining together. One question I > think > > worth investigating is what is the most efficient way of storing the > > slicing information like Jark suggested in the JIRA discussion [8]. > > > > > > As I tried to capture as many discussions here as possible, I can hardly > > imagine that we’ve address all of these questions. So please share your > > comment and feedbacks as they are highly appreciated! I can definitely > > volunteer will start a design/discussion doc for this effort if everyone > > thinks it is suitable to move forward. > > > > > > Thanks, > > > > Rong > > > > > > > > [1] > > > > > https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.yevpiesfln1m > > > > [2] > > > > > https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.j0v7ubhwfl0y > > > > [3] > > > > > https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java > > > > [4] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction > > > > [5] > > > > > https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing > > > > [6] https://issues.apache.org/jira/browse/FLINK-11454 > > > > [7] > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups > > [8] https://issues.apache.org/jira/browse/FLINK-7001 > > > > On Tue, Feb 19, 2019 at 1:57 AM Kurt Young <[hidden email]> wrote: > > > > > Hi Rong, > > > > > > Thanks for the improvement proposal, this topic aroused my interest > since > > > we did some similar improvements in Blink. > > > After going through your design doc, i would like share some thoughts: > > > > > > 1. It looks to me the proposed SliceManager and MergeManager is quite > > > generic and can be used in various situations like unaligned windows. > > > However > > > this will introduce some new API and some new stream operators, and > thus > > > make is a big effort to do. I suppose the main reason to introduce > these > > > concepts is > > > we want to support some complex scenarios like unaligned windows, multi > > > timeout session windows and so on. But if let's go back to the original > > > motivation of this > > > improvement, it will be "Highly overlapping window operation" and > "Window > > > functions with efficient methods calculating and merging partial > > results", > > > just as you mentioned > > > in the section "Target Usage Pattern". I'm curious if it's necessary to > > > introduce all these new APIs and operators to meet these requirements. > > If i > > > understand you correctly, > > > we may even can achieve the goal by using two successive windows, first > > > with tumbling window, and then sliding window. And from the experiences > > we > > > had in Blink, by adding > > > some enhancements to original window operator may also be sufficient. > > Does > > > it make sense to you if we can first try to improve the target scenario > > > without adding new APIs and > > > operators? It will definitely make it more easy to review and merged. > > > > > > 2. From what we observed after Blink did similar improvements, slice or > > > pane based improvement is not a silver bullet for all kinds of > > situations. > > > In most cases, we only > > > observed small performance gain like 20% or 30%. The main reason is, if > > we > > > want get big gain from this improvement, normally we require the window > > > function have efficient > > > calculating and merging methods, like SUM, COUNT. If this assumption > > > stands, the basic approach of window operator will also be performant. > It > > > stored compact intermedia result for > > > each window per key, and it's also very efficient to get result based > on > > > this. The only downside is we need to do the calculation in multiply > > > windows per key, but based on the > > > assumption, the calculation is efficient and may not be a blocker. > Based > > on > > > this, i think we must be more careful about the changes, especially API > > > related. > > > > > > 3. The last thing in my mind is whether we can share some > implementations > > > with blink's window operator. We introduced a new window operator in > > Blink > > > SQL, which i think needs to be > > > discussed when we try to adopt blink improvements. Without strong > > reasons, > > > i think it's better to share some implementations with DataStream's > > window > > > operator. Maybe we don't need > > > to share the whole operator, but at least some underlying concepts and > > > data structures. It would also be great if you can think about it and > see > > > if it's a feasible way. > > > > > > Last small question: how do you handle sliding windows whose window > size > > > can't be divided by step, such as 10 seconds window and slide with 3 > > > seconds? > > > > > > Best, > > > Kurt > > > > > > > > > On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <[hidden email]> > wrote: > > > > > > > Thank you Rong! > > > > The performance of sliding windows is an issue for many users. > > > > Adding support for a more efficient window is a great effort. > > > > > > > > Thank you, > > > > Fabian > > > > > > > > Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong < > > > [hidden email] > > > > >: > > > > > > > > > Hi all, > > > > > > > > > > Thanks for the feedbacks and suggestions to the design doc. I have > > > > created > > > > > a parent JIRA [1] to track the related tasks and started the > > > > implementation > > > > > process > > > > > Any further feedbacks or suggestions are highly appreciated. > > > > > > > > > > Best, > > > > > Rong > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-11276 > > > > > > > > > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <[hidden email]> > > wrote: > > > > > > > > > > > Hi all, > > > > > > > > > > > > Various discussion in the mailing list & JIRA tickets [2] had > been > > > > > brought > > > > > > up in the past regarding the windowing operation performance. As > we > > > > > > experiment internally with some of our extreme use cases, we > found > > > out > > > > > that > > > > > > using a slice-based implementation can optimize Flink's windowing > > > > > mechanism > > > > > > and provide a better performance in most cases. > > > > > > > > > > > > We've put together a preliminary enhancement and performance > > > > optimization > > > > > > plan [1] for the current windowing operation in Flink. This is > > > largely > > > > > > inspired by stream slicing research shared in recent Flink > Forward > > > > > > conference [3] by Philip and Jonas, and the discussion in the > main > > > JIRA > > > > > > ticket [2]. The initial design and POC implementations consider > > > > > optimizing > > > > > > the performance for the category of overlapping windows as well > as > > > > > allowing > > > > > > chaining of cascade window operators. > > > > > > > > > > > > It will be great to hear the feedbacks and suggestions from the > > > > > community. > > > > > > Please kindly share your comments and suggestions. > > > > > > > > > > > > Thanks, > > > > > > Rong > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7001 > > > > > > [3] > > > > > > > > > > > > > > > > > > > > > https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Hi Devs,
Thank you all for the valuable feedbacks and comments in the previous design doc. We have currently created an initial design/work plan based on @Kurt's suggestions as "improvements with the sliding window scenario without changing/adding new public APIs". Please kindly take a look at the initial design document here [1]. Any comments or suggestions are highly appreciated! Thanks, Rong -- [1] https://docs.google.com/document/d/1CvjPJl1Fm1PCpsuuZ4Qc-p_iUzUosBePX_rWNUt8lRw/edit# On Thu, Feb 28, 2019 at 2:24 PM Rong Rong <[hidden email]> wrote: > Hi Kurt, > > Thanks for the valuable feedback. I think the suggestions you and Jincheng > provided are definitely the best execution plan > > - Starting with sliding window optimization by exhausting the current > public API, there are some components we can leverage or directly reuse > from Blink's window operator [1] implementation. > - Backward compatibility is definitely an issue as any state changes will > probably result in a non-trivial state upgrade. I can definitely follow up > with you on this. > > At the same time I think it is also a good idea to summarize all the use > cases that has been discussed so far. This can be very valuable as a > reference: To answer the questions "Does the improvements cover > most use cases?" and when not covered, "whether introduce some new API can > meet the requirements?". > > I will try to convert the current parent JIRA [2] into one that does not > include public API alternation. As you mentioned this will be much more > practical way in terms of execution. > > Many thanks for the suggestions and guidance! > > -- > Rong > > [1] > https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java > [2] https://issues.apache.org/jira/browse/FLINK-11454 > > On Mon, Feb 25, 2019 at 3:44 AM Kurt Young <[hidden email]> wrote: > >> Hi Rong, >> >> Thanks for the detailed summarization! It indeed involves lots of problems >> and unanswered questions which is i think not practical to >> solve in one shot. From my point of view, the performance issue with the >> sliding window is the root one and maybe most possible >> which user will run into. Thus i think the first question should be >> answered is: >> "What kind of improvements we can do with the sliding window scenario?" >> >> First, we should try to figure out how many improvements can be done >> without changing or adding new API. This can reuse the POC you did >> and the work blink had been done, we can share some ideas and maybe some >> implementation details. And it already involves lots of efforts >> even if we only do this one thing. It may introduce some refactory to >> current window operator, and we should keep it compatible with old >> version. >> >> After this, we can release it in next version and gather some users >> feedbacks. We can further answer the question: "Does the improvements >> cover >> most use cases? Are there any critical ones which is impossible to do with >> current window operator?". At that time, we can open the discussions to >> introduce some new API to meet the requirements. >> >> It will introduce more work than improve window operator internally when >> we >> decide to add new APIs, which you have covered a lot in your proposal. >> Actually, the approaches you proposed looks good to me, take it step by >> step is a more practical way. >> >> Best, >> Kurt >> >> >> On Fri, Feb 22, 2019 at 2:58 AM Rong Rong <[hidden email]> wrote: >> >> > Hi All, >> > >> > Thanks for sharing feedbacks for the window optimization design doc and >> on >> > the discussion JIRAs @Jincheng, @Kurt, @Jark and @Fabian. These are very >> > valuable feedbacks and we will try to incorporate them in the next step. >> > >> > There were several revision done for the current design doc, and several >> > POCs being developed since we initially shared the document. Thus, some >> of >> > the following might’ve already been addressed in other places. However, >> I >> > would still like to summarize them since these points were raised up in >> > various scenarios. >> > >> > 1) Scope of the window optimization using slicing. >> > >> > The original scope of the doc was to address the problem of >> > element-to-window duplication when sliding window goes across wide range >> > with narrow slides (with or w/o efficient partial aggregations). >> However, >> > as we develop the 2 POCs [1,2] we found out there’s really no >> > one-solution-fits-all approach to how this can be optimized (same >> > observation as Kurt and Jark mentioned). Thus some further expansion of >> the >> > scope was done: >> > >> > 1a). Directly improving WindowOperator([1, 3]) >> > >> > In the design doc, the PartialWindowFunction was designed as a cue for >> > WindowOperator to choose how to optimize the sliding window. This was >> not >> > working well because: how efficient window operator process messages >> > depends on: 1. the pattern of the window; 2. the pattern of the incoming >> > traffic; and 3. the complexity of the window process function. >> > >> > The idea of having users to specify the complexity of each API (e.g. >> > accumulate, merge, retract) can help the system choose the path of >> > optimization, however as this stage the it is hard to utilize this >> > information. >> > >> > 1b). Explicitly allow slicing and merging steps([2]), or more generic >> > window chaining: >> > >> > With the second POC, it is users' responsibility to create an efficient >> > window operator(s), since now user can control the state and the process >> > individually. This opens up more possibilities: e.g. unaligned / >> > multi-timeout session window, cascade windows. >> > >> > However, with this API change, users are given much more flexibilities. >> > >> > In conclusion to (1), I think one of the clear feedbacks is that this >> > effort goes beyond just improving the sliding window performance. If we >> > think it is the right way to expand the window API, there should be a >> > proper API change discussion: As Kurt mentioned, if any of these cases >> can >> > be covered with existing API, we should always exhaust that first. >> (Further >> > discussion in (2)) >> > >> > >> > 2). The motivation of changing the public DataStream APIs related to >> > window-based processing. >> > >> > As Jincheng and Kurt mentioned, utilizing the current windowed stream >> API >> > can achieve some of the results. For example, using a tumble follow by a >> > sliding window can reduce the amount of duplication since the tumbling >> > window results will be passed to the sliding window as one element. >> > However, when we experiment, the are several problems I observed: >> > >> > 2a). rehashing with extra keyBy >> > >> > The limitation requires a separate set of keyBy following the tumble >> > result, which creates unnecessary complexity. One work around was to use >> > "reinterpretAsKeyedStream" as Fabian mentioned, however there are many >> > investigation to be done to ensure it works well (e.g. how does the >> keyed >> > state store work since the key was not sets as a part of the namespace >> > within WindowOperator) >> > >> > 2b). Handling iterable elements >> > >> > Another problem is if there’s no efficient partial results produce by >> the >> > tumbling window, the output element (as an iterable) will still be >> > duplicated into each of the downstream sliding window states. >> > >> > However, this is actually a tricky problem by itself, further >> discussion in >> > (2d) for this piece. >> > >> > 2c). Additional support for some of the more complex use cases >> > >> > As described in (1b), with the explicit control of the window state and >> the >> > window function, users can control how the optimization of the windowing >> > can be done. One will argue that these type of optimization can also be >> > done using ProcessFunction or other rich functions. >> > >> > However, if we take a look at how the WindowOperator is currently >> > structured, it also has explicit aggregation + windowing APIs [4] >> presented >> > for the WindowedStream. >> > >> > Another point is opening up: multi-slide windowing and multi-timeout >> window >> > described in the presentation [5] in FF. >> > >> > I would also like to raise a point that the way how the current POC [2] >> > changes the public API is just a proof of concept that the flexibilities >> > opens more complex use cases. It is by no means the final design and I >> > agree that there could be more-aggressive-than-necessary changes since >> it’s >> > meant for a POC. >> > >> > 2d). The support for same-window operator (similar to Table/SQL >> > OVER-aggregate) >> > >> > As discussed with Jincheng in a separate thread. Supporting the same >> window >> > operator similar to how OVER-aggregate was achieved in Table/SQL API. >> It is >> > definitely useful in some of the window use cases (2b). For example, A >> > process function without efficient partial aggregation results on >> sliding >> > window can be very efficiently evaluated since both add and retract >> > operation on an iterable list is cheap. >> > >> > There are some fundamental difference between the current window >> operator >> > vs the same-window operator concept, for example the trigger mechanism >> is >> > entirely different; and it will be a challenge to incorporate >> > OVER-aggregate of TableAPI since there are many table specific >> components >> > in the bounded/unbounded over aggregates. However, I am seeing this as >> an >> > implementation details (further discussion in (3a)). >> > >> > In conclusion to (2), I think the discussions remains whether to expand >> the >> > API if we introduce sliding: if no, whether we can exhaust all the use >> > cases with current API; if yes, how that extra API will look like. >> > Especially the same-window operation in the DataStream API. (Utilizing >> > whatever we already have in Table/SQL is one solution). >> > >> > >> > 3). Incorporating Existing POCs and Implementations (e.g. Blink’s >> > WindowOperator & SQL/Table API) >> > >> > There are already some implementations exist resembles the idea of >> slicing. >> > For example Blink’s WindowOperator [3]. My thinking is once we agree on >> the >> > API and the public facing changes, we should reuse as much of the >> current >> > implementations as possible, since Blink has been running in production >> for >> > years (please correct me if I am wrong @Jark/Jincheng/Kurt :-D ). >> > >> > >> > Regarding the implementation details and some existing approaches. >> > >> > 3a). Over-aggregate vs. window-operator. >> > >> > Over aggregate has already been implemented in Table/SQL API. Unlike >> window >> > operators, over aggregate utilizes the process function and a general >> > process operator approach. >> > >> > Further looking into how the current window operator works comparing >> with >> > the over aggregate, there are many *common implementations*. For >> example, >> > the window state (especially the list-based approach); the triggering >> > mechanism / timer service (especially on rowtime); and the “evicting >> > policies”. >> > >> > I was wondering if it is possible to consider, on an operator level, >> > support: 1. The same-window operator concept as a generic case of the >> > over-aggregate, and 2. Create a more comment window operator base that >> > generalize the state, trigger and eviction context. >> > >> > 3b). The support for partial process function: retracting vs. merge. >> > >> > Supporting retracting operations is definitely a plus in some use cases, >> > for example like Fabian suggested in the JIRA discussion [6]. Currently >> > this already exist in the Table/SQL API aggregation function. >> > >> > 3c). States and Chaining Strategy >> > >> > With the chaining strategy [7] in the operators, I was wondering whether >> > there’s performance consideration when implementing the sliding window >> as >> > one operator, or multiple operator chaining together. One question I >> think >> > worth investigating is what is the most efficient way of storing the >> > slicing information like Jark suggested in the JIRA discussion [8]. >> > >> > >> > As I tried to capture as many discussions here as possible, I can hardly >> > imagine that we’ve address all of these questions. So please share your >> > comment and feedbacks as they are highly appreciated! I can definitely >> > volunteer will start a design/discussion doc for this effort if everyone >> > thinks it is suitable to move forward. >> > >> > >> > Thanks, >> > >> > Rong >> > >> > >> > >> > [1] >> > >> > >> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.yevpiesfln1m >> > >> > [2] >> > >> > >> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?ts=5c6a613e#heading=h.j0v7ubhwfl0y >> > >> > [3] >> > >> > >> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java >> > >> > [4] >> > >> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction >> > >> > [5] >> > >> > >> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing >> > >> > [6] https://issues.apache.org/jira/browse/FLINK-11454 >> > >> > [7] >> > >> > >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups >> > [8] https://issues.apache.org/jira/browse/FLINK-7001 >> > >> > On Tue, Feb 19, 2019 at 1:57 AM Kurt Young <[hidden email]> wrote: >> > >> > > Hi Rong, >> > > >> > > Thanks for the improvement proposal, this topic aroused my interest >> since >> > > we did some similar improvements in Blink. >> > > After going through your design doc, i would like share some thoughts: >> > > >> > > 1. It looks to me the proposed SliceManager and MergeManager is quite >> > > generic and can be used in various situations like unaligned windows. >> > > However >> > > this will introduce some new API and some new stream operators, and >> thus >> > > make is a big effort to do. I suppose the main reason to introduce >> these >> > > concepts is >> > > we want to support some complex scenarios like unaligned windows, >> multi >> > > timeout session windows and so on. But if let's go back to the >> original >> > > motivation of this >> > > improvement, it will be "Highly overlapping window operation" and >> "Window >> > > functions with efficient methods calculating and merging partial >> > results", >> > > just as you mentioned >> > > in the section "Target Usage Pattern". I'm curious if it's necessary >> to >> > > introduce all these new APIs and operators to meet these requirements. >> > If i >> > > understand you correctly, >> > > we may even can achieve the goal by using two successive windows, >> first >> > > with tumbling window, and then sliding window. And from the >> experiences >> > we >> > > had in Blink, by adding >> > > some enhancements to original window operator may also be sufficient. >> > Does >> > > it make sense to you if we can first try to improve the target >> scenario >> > > without adding new APIs and >> > > operators? It will definitely make it more easy to review and merged. >> > > >> > > 2. From what we observed after Blink did similar improvements, slice >> or >> > > pane based improvement is not a silver bullet for all kinds of >> > situations. >> > > In most cases, we only >> > > observed small performance gain like 20% or 30%. The main reason is, >> if >> > we >> > > want get big gain from this improvement, normally we require the >> window >> > > function have efficient >> > > calculating and merging methods, like SUM, COUNT. If this assumption >> > > stands, the basic approach of window operator will also be >> performant. It >> > > stored compact intermedia result for >> > > each window per key, and it's also very efficient to get result based >> on >> > > this. The only downside is we need to do the calculation in multiply >> > > windows per key, but based on the >> > > assumption, the calculation is efficient and may not be a blocker. >> Based >> > on >> > > this, i think we must be more careful about the changes, especially >> API >> > > related. >> > > >> > > 3. The last thing in my mind is whether we can share some >> implementations >> > > with blink's window operator. We introduced a new window operator in >> > Blink >> > > SQL, which i think needs to be >> > > discussed when we try to adopt blink improvements. Without strong >> > reasons, >> > > i think it's better to share some implementations with DataStream's >> > window >> > > operator. Maybe we don't need >> > > to share the whole operator, but at least some underlying concepts >> and >> > > data structures. It would also be great if you can think about it and >> see >> > > if it's a feasible way. >> > > >> > > Last small question: how do you handle sliding windows whose window >> size >> > > can't be divided by step, such as 10 seconds window and slide with 3 >> > > seconds? >> > > >> > > Best, >> > > Kurt >> > > >> > > >> > > On Wed, Jan 30, 2019 at 2:12 AM Fabian Hueske <[hidden email]> >> wrote: >> > > >> > > > Thank you Rong! >> > > > The performance of sliding windows is an issue for many users. >> > > > Adding support for a more efficient window is a great effort. >> > > > >> > > > Thank you, >> > > > Fabian >> > > > >> > > > Am Di., 29. Jan. 2019 um 16:37 Uhr schrieb Rong Rong < >> > > [hidden email] >> > > > >: >> > > > >> > > > > Hi all, >> > > > > >> > > > > Thanks for the feedbacks and suggestions to the design doc. I have >> > > > created >> > > > > a parent JIRA [1] to track the related tasks and started the >> > > > implementation >> > > > > process >> > > > > Any further feedbacks or suggestions are highly appreciated. >> > > > > >> > > > > Best, >> > > > > Rong >> > > > > >> > > > > [1] https://issues.apache.org/jira/browse/FLINK-11276 >> > > > > >> > > > > On Wed, Dec 5, 2018 at 6:17 PM Rong Rong <[hidden email]> >> > wrote: >> > > > > >> > > > > > Hi all, >> > > > > > >> > > > > > Various discussion in the mailing list & JIRA tickets [2] had >> been >> > > > > brought >> > > > > > up in the past regarding the windowing operation performance. >> As we >> > > > > > experiment internally with some of our extreme use cases, we >> found >> > > out >> > > > > that >> > > > > > using a slice-based implementation can optimize Flink's >> windowing >> > > > > mechanism >> > > > > > and provide a better performance in most cases. >> > > > > > >> > > > > > We've put together a preliminary enhancement and performance >> > > > optimization >> > > > > > plan [1] for the current windowing operation in Flink. This is >> > > largely >> > > > > > inspired by stream slicing research shared in recent Flink >> Forward >> > > > > > conference [3] by Philip and Jonas, and the discussion in the >> main >> > > JIRA >> > > > > > ticket [2]. The initial design and POC implementations consider >> > > > > optimizing >> > > > > > the performance for the category of overlapping windows as well >> as >> > > > > allowing >> > > > > > chaining of cascade window operators. >> > > > > > >> > > > > > It will be great to hear the feedbacks and suggestions from the >> > > > > community. >> > > > > > Please kindly share your comments and suggestions. >> > > > > > >> > > > > > Thanks, >> > > > > > Rong >> > > > > > >> > > > > > [1] >> > > > > > >> > > > > >> > > > >> > > >> > >> https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing >> > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7001 >> > > > > > [3] >> > > > > > >> > > > > >> > > > >> > > >> > >> https://data-artisans.com/flink-forward-berlin/resources/efficient-window-aggregation-with-stream-slicing >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > |
Free forum by Nabble | Edit this page |