+1 for voting. Thanks Jark for driving.
+1 for TVF, It has been put forward by theory and supported by calcite. It will greatly enhance the window related operations. My personal feeling is that after TVF, the following operations can be similar to the traditional batch SQL, as long as the window related attributes are included in the key. I am not sure about the CUMULATE window, yes, It's a common requirement, Is there any more evidence (other systems) to prove this word ("CUMULATE") is appropriate. Best, Jingsong On Sat, Oct 10, 2020 at 3:43 PM Jark Wu <[hidden email]> wrote: > Hi Pengcheng, > > IIUC, the "stream operators" you mean is the non-time operators or called > regular operators, such as regular join, regular aggregate. > But you may misunderstand me, only the time operators can't be applied > after the new window operators, because of missing time attributes. > The regular operators can still be applied after the new window operators. > > Regarding using window TVFs to re-assign event-time and watermarks, I'm not > sure about this. > Because assigning watermark requires to define the watermark strategy, > however, the window TVF doesn't provide such ability. > Polymorphic table functions are table functions which just append > additional columns and convert N rows into M rows, it can't touch meta > information. > > Best, > Jark > > On Sat, 10 Oct 2020 at 15:41, Jark Wu <[hidden email]> wrote: > > > Hi Danny, > > > > Thanks for the hint about named params syntax, I added examples with > named > > params in the FLIP. > > > > Best, > > Jark > > > > > > On Sat, 10 Oct 2020 at 15:03, Pengcheng Liu <[hidden email] > > > > wrote: > > > >> Hi, Jark, > >> > >> I've got some different opinions there, I think it's a very common > use > >> case to use > >> window operators in combination with streaming operators(even those > >> time operators). > >> (e.g. for some tables, users only care data within a period, but for > >> other tables, they may > >> want the whole historical data). > >> The pipeline may looks like this: > >> window join -> dimension table join -> stream aggregate -> stream > sort > >> > >> Just as what you said, the key clause can be used to distinguish > >> whether a operator should > >> be translated to a window operator or a streaming operator. > >> > >> Also, as I've mentioned before, 1) for time operator after window > >> aggregation, the auxiliary function > >> which is used to access time attribute column can be actually > replaced > >> with (window_end -1). > >> Actually, we only just need to make the results of the upstream > >> contains a time column whose > >> range is within (window_start, window_end), and thus the downstream > >> time operators can work on it > >> (driving by the original watermark in the source). 2) for time > >> operator after other window operators, > >> the downstream time operators can access the time column directly > from > >> it's input. > >> > >> One more thoughts there, maybe the window TVFs can re-assign > >> timestamps and watermarks, so > >> that in some case when the watermark can not be retrieved from source > >> directly(may needs some > >> conversions), the watermark can still be assigned dynamically in the > >> SQL(use the time column as > >> the watermark column) and thus make it work. I think this can save > >> much time to revise the event > >> time column in some cases(this is a real demand in our production > >> environment). > >> > >> I strongly suggest that we should support the combination usage of > >> window operators and > >> streaming operators. And I think we can achieve this with little > work. > >> > >> Best, > >> Pengcheng > >> > >> > >> Jark Wu <[hidden email]> 于2020年10月10日周六 下午1:45写道: > >> > >>> Hi Benchao, > >>> > >>> That's a good question. > >>> > >>> IMO, the new windowed operators and the current time operators are two > >>> different sets of functions, > >>> just like time operators and non-time operators are two different sets > of > >>> functions. > >>> I think it's fine if we don't support integrating them, just like time > >>> operators can't be applied on non-windowed aggregate. > >>> If users want to use time operators in the whole pipeline, then he/she > >>> can > >>> use the grouped window aggregates instead of the window TVFs. > >>> > >>> The key idea of window TVF is that all the operators in the pipeline > are > >>> based on the **windows**. > >>> In terms of syntax, if the key clause (e.g. group by, partitioned by, > >>> join > >>> on, order by) contains window_start and window_end, > >>> it can be translated into windowed operators. > >>> Thus, we will have windowed CEP, windowed sort, windowed over aggregate > >>> in > >>> the future to make it possible to build a windowed pipeline. > >>> > >>> But I think we can elaborate the integration more in the future if > users > >>> need it. Actually, I don't fully understand the scenario of integrating > >>> window TVF and time operators at this point. > >>> For example, interval join an input stream and a window join result. I > >>> don't see why it can't be expressed by nested window join and why users > >>> have to use interval join here. > >>> Maybe we can wait for more inputs from users when the window TVF is > >>> released and we can elaborate it again. > >>> > >>> Best, > >>> Jark > >>> > >>> On Sat, 10 Oct 2020 at 12:01, 刘 芃成 <[hidden email]> > wrote: > >>> > >>> > Hi, Benchao, > >>> > I think I got your point, actually, in current implementation > >>> for > >>> > group window aggregation, the value of time attributes(e.g. > >>> > TUMBLE_ROWTIME/TUMBLE_PROCTIME) is calculated as (window_end – 1), > so I > >>> > think we can just use it directly if you need this. But I think this > >>> time > >>> > attributes is mainly suggested to use in case of cascaded window > >>> operations. > >>> > Regarding the example you provided, I think the semantics of the SQL > in > >>> > your example which doing interval join(e.g. with TUMBLE_ROWTIME) > after > >>> > window aggregation is not clear in the current implementation, and I > >>> think > >>> > that’s a strong reason why we need the new TVFs syntax. > >>> > With the new syntax, users should understand which time column > to > >>> > use and how to generate it when doing interval join and etc. > >>> > > >>> > Best, > >>> > Pengcheng > >>> > > >>> > 发件人: Benchao Li <[hidden email]> > >>> > 日期: 2020年10月10日 星期六 上午11:02 > >>> > 收件人: pengcheng Liu <[hidden email]> > >>> > 抄送: dev <[hidden email]> > >>> > 主题: Re: [DISCUSS] FLIP-145: Support SQL windowing table-valued > function > >>> > > >>> > Hi pengcheng, > >>> > > >>> > Thanks for your response. > >>> > I knew that the original time attribute column will be retained after > >>> the > >>> > TVF, > >>> > what I'm questioning is how do we get the time attribute column after > >>> > Aggregation. > >>> > Your answer did not remove my doubts about this. > >>> > > >>> > It's ok if we did not plan to integrate new TVF aggregate with old > >>> "time > >>> > attribute scenarios" > >>> > listed in my previous email in this FLIP. However it's good to > >>> elaborate > >>> > leave it to the future plan. > >>> > > >>> > pengcheng Liu <[hidden email]<mailto: > >>> > [hidden email]>> 于2020年10月10日周六 上午10:45写道: > >>> > Hi,Benchao, > >>> > In TVFs, the time attributes is just passed through from parent > >>> rels, > >>> > and the TVFs just add two > >>> > additional window attributes(i.e. window_start & window_end). > >>> Also, I > >>> > think the time columns can be not only a time attribute > >>> > with type of `TimeIndicatorType` but also a regular column with > >>> type > >>> > of `Timestamp`. > >>> > > >>> > For cascaded window operations, we can use > window_start/window_end > >>> of > >>> > the previous window result directly to > >>> > indicate operating on the same window, or use new DESCRIPTOR > >>> column > >>> > to assign new windows, in case of the change of > >>> > the time column(e.g. in some case, the original timestamp is > >>> > inaccurate and need some conversion to be used). > >>> > > >>> > You can check the definition or signature of these TVFs in the > >>> FLIP. > >>> > e.g. > >>> > SELECT * FROM TABLE( > >>> > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) > >>> > In the example, the `bidtime` is the time attribute column, which > >>> is > >>> > the first operand of the DESCRIPTOR function. > >>> > > >>> > +1 start voting. > >>> > > >>> > Benchao Li <[hidden email]<mailto:[hidden email]>> > >>> > 于2020年10月10日周六 上午10:08写道: > >>> > Hi Jark, > >>> > > >>> > 2 & 3 sounds good to me. > >>> > > >>> > Regarding time attribute, > >>> > I still have some questions, I knew it's easy to support cascaded > >>> window > >>> > aggregate using new TVFs. > >>> > However there are some other places where need time attribute: > >>> > - CEP > >>> > - interval join > >>> > - order by > >>> > - over window > >>> > If there is no time attribute column, how do we integrate these old > >>> > features with the new TVFs. > >>> > E.g. > >>> > StreamA -> new window aggregate -> interval join -> Sink > >>> > / > >>> > StreamB ----------------------------------- > >>> > > >>> > > >>> > Jark Wu <[hidden email]<mailto:[hidden email]>> 于2020年10月9日周五 > >>> > 下午11:51写道: > >>> > Hi Benchao, > >>> > > >>> > 1) time attribute > >>> > Yes. We don't need time attribute auxiliary function. Because the new > >>> > window operations are all based on the > >>> > window_start and window_end columns instead of on the time > >>> attributes. So > >>> > we don't need to propagate time attributes. > >>> > Cascaded window aggregate can be expressed by simply GROUP BY the > >>> > window_start and window_end of the previous window result. > >>> > I have added a cascaded window aggregate example in the Tumbling > Window > >>> > section in the FLIP. > >>> > If you want to define proctime window aggregate, the time column in > TVF > >>> > should be a proctime attribute field (or PROCTIME() function). > >>> > > >>> > 2) batch support > >>> > Yes. The proposed syntax/API are unified for batch and streaming. > Batch > >>> > support is in the plan, but may not have enough time to catch up > 1.12. > >>> > > >>> > 3) support `grouping sets` > >>> > This is not included in the FLIP, but I think it's great if we can > >>> support > >>> > `grouping sets`. > >>> > The existing window impl doesn't support this because we convert the > >>> > LogicalAggregate into WindowAggregate in the beginning, > >>> > the expand grouping sets rule can't be applied in this situation. > >>> > Fortunately, with the new window impl, the conversion to > >>> WindowAggregate > >>> > will happen at the end, so I think the expand rule can be > >>> > applied and support this feature naturally. > >>> > Therefore, IMO, we don't need to include this feature in this FLIP to > >>> avoid > >>> > the FLIP being too large. > >>> > This can be a follow-up issue (maybe just add tests and docs) after > the > >>> > FLIP. > >>> > > >>> > Best, > >>> > Jark > >>> > > >>> > > >>> > On Fri, 9 Oct 2020 at 19:09, 刘 芃成 <[hidden email] > <mailto: > >>> > [hidden email]>> wrote: > >>> > > >>> > > Hi,Benchao, > >>> > > Welcome to join the discussion, yes, this new syntax can > >>> make SQL > >>> > > more clear and simpler. > >>> > > For your first question, the `window_start` and > `window_end` > >>> > > columns will be added automatically, > >>> > > so we don't need to use auxiliary group functions to infer > or > >>> > > access the window properties. > >>> > > > >>> > > For the `grouping sets` on TVFs, I think it's interesting > if > >>> we > >>> > > can support it, as we already supported `grouping sets` > >>> > > on streaming aggregates in blink planner. But I'm not sure > >>> if it > >>> > > will be included into this FLIP. > >>> > > > >>> > > cc @Jark Wu > >>> > > > >>> > > Best, > >>> > > Pengcheng > >>> > > > >>> > > > >>> > > 在 2020/10/9 下午5:25,“Benchao Li”<[hidden email]<mailto: > >>> > [hidden email]>> 写入: > >>> > > > >>> > > Thanks Jark for bringing this discussion, I like this FLIP very > >>> much. > >>> > > > >>> > > Especially the cumulate window, it's much like the current > TUMBLE > >>> > > window + > >>> > > Fast Emit (which is an undocumented experimental feature), > >>> however, > >>> > > it's > >>> > > more powerful. > >>> > > > >>> > > And This will make the SQL semantic more standard, especially > >>> for the > >>> > > HOPPING window. > >>> > > > >>> > > Regarding time attribute, > >>> > > It seems that we don't need a specific function to infer the > time > >>> > > attribute > >>> > > like > >>> > > `TUMBLE_ROWTIME` / `TUMBLE_PROCTIME`. Then are `window_start` > and > >>> > > `window_end` > >>> > > column a time attribute column automatically? > >>> > > - If not, what will be the time attribute of the result > relation > >>> of > >>> > > these > >>> > > TVFs? > >>> > > Especially after the window aggregation. > >>> > > - If yes, then how do we handle proctime? > >>> > > > >>> > > Regarding batch operators, > >>> > > It's great to hear that we can reuse the batch operators in > >>> > continuous > >>> > > batch mode > >>> > > as you mentioned in the FLIP. > >>> > > Current window aggregate could also be used in batch mode with > >>> > > rowtime. Do > >>> > > you plan > >>> > > to support these TVFs for batch mode in this FLIP? Hence the > >>> > Table/SQL > >>> > > is a > >>> > > unified > >>> > > API, it's great if we can keep the features complete both in > >>> > streaming > >>> > > and > >>> > > batch mode. > >>> > > > >>> > > There is one more question, I don't know whether it should be > >>> > > considered in > >>> > > this FLIP. > >>> > > Does the new window support `grouping sets`? (It's not > supported > >>> in > >>> > old > >>> > > window impl). > >>> > > > >>> > > Jark Wu <[hidden email]<mailto:[hidden email]>> > >>> 于2020年10月9日周五 > >>> > 下午4:14写道: > >>> > > > >>> > > > Hi all, > >>> > > > > >>> > > > I know we have a lot of discussion and development on going > >>> right > >>> > > now but > >>> > > > it would be great if we can get FLIP-145 into a votable > state. > >>> > > > If there are no objections, I would like to start voting in > the > >>> > next > >>> > > days. > >>> > > > > >>> > > > Best, > >>> > > > Jark > >>> > > > > >>> > > > On Thu, 1 Oct 2020 at 14:29, Jark Wu <[hidden email] > <mailto: > >>> > [hidden email]>> wrote: > >>> > > > > >>> > > > > Hi everyone, > >>> > > > > > >>> > > > > I have added a section for Performance Optimization to > >>> describe > >>> > > how to > >>> > > > > improve the performance in the short-term and long-term > >>> > > > > and sketch the future performance potential under the new > >>> window > >>> > > API. > >>> > > > > Introducing the window API is just the first step, we will > >>> > > > > continuously improve the performance to make it powerful > and > >>> > > useful. > >>> > > > > > >>> > > > > Best, > >>> > > > > Jark > >>> > > > > > >>> > > > > On Thu, 1 Oct 2020 at 14:28, Jark Wu <[hidden email] > >>> <mailto: > >>> > [hidden email]>> wrote: > >>> > > > > > >>> > > > >> Hi Pengcheng, > >>> > > > >> > >>> > > > >> Yes, the window TVF is part of the FLIP. Welcome to > >>> contribute > >>> > > and join > >>> > > > >> the discussion. > >>> > > > >> Regarding the SESSION window aggregation, users can use > the > >>> > > existing > >>> > > > >> grouped session window function. > >>> > > > >> > >>> > > > >> Best, > >>> > > > >> Jark > >>> > > > >> > >>> > > > >> On Sun, 27 Sep 2020 at 21:24, liupengcheng < > >>> > > [hidden email]<mailto:[hidden email]> > >>> > > > > > >>> > > > >> wrote: > >>> > > > >> > >>> > > > >>> Hi Jark, > >>> > > > >>> Thanks for reply, yes, I think it's a good > >>> feature, it > >>> > > can > >>> > > > >>> improve the NRT scenarios > >>> > > > >>> as you mentioned in the FLIP. Also, I think it > can > >>> > > improve the > >>> > > > >>> streaming SQL greatly, > >>> > > > >>> it can support richer window operations in flink > >>> SQL > >>> > and > >>> > > bring > >>> > > > >>> great convenience to users. > >>> > > > >>> (we are now only supported group window in > flink). > >>> > > > >>> > >>> > > > >>> Regarding the SESSION window, I think it's > >>> especially > >>> > > useful > >>> > > > for > >>> > > > >>> user behavior analysis(e.g. > >>> > > > >>> counting user visits on a news website or social > >>> > > platform), but > >>> > > > >>> I agree that we can keep it > >>> > > > >>> out of the FLIP now to catch up 1.12. > >>> > > > >>> > >>> > > > >>> Recently, I've done some work on the stream > planner > >>> > with > >>> > > the > >>> > > > >>> TVFs, and I'm willing to contribute > >>> > > > >>> to this part. Is it in the plan of this FLIP? > >>> > > > >>> > >>> > > > >>> Best, > >>> > > > >>> PengchengLiu > >>> > > > >>> > >>> > > > >>> > >>> > > > >>> 在 2020/9/26 下午11:09,“Jark Wu”<[hidden email]<mailto: > >>> > [hidden email]>> 写入: > >>> > > > >>> > >>> > > > >>> Hi pengcheng, > >>> > > > >>> > >>> > > > >>> That's great to see you also have the need of window > >>> join. > >>> > > > >>> You are right, the windowing TVF is a powerful > feature > >>> > which > >>> > > can > >>> > > > >>> support > >>> > > > >>> more operations in the future. > >>> > > > >>> I think it as of the date time "partition" selection > in > >>> > > batch SQL > >>> > > > >>> jobs, > >>> > > > >>> with this new syntax, I think it is possible > >>> > > > >>> to migrate traditional batch SQL jobs to Flink SQL > by > >>> > > changing a > >>> > > > >>> few lines. > >>> > > > >>> > >>> > > > >>> Regarding the SESSION window, this is on purpose to > >>> keep it > >>> > > out of > >>> > > > >>> the > >>> > > > >>> FLIP, because we want to keep the > >>> > > > >>> FLIP small to catch up 1.12 and SESSION TVF is rarely > >>> > useful > >>> > > (e.g. > >>> > > > >>> session > >>> > > > >>> window join?). > >>> > > > >>> > >>> > > > >>> Best, > >>> > > > >>> Jark > >>> > > > >>> > >>> > > > >>> On Fri, 25 Sep 2020 at 22:59, liupengcheng < > >>> > > > >>> [hidden email]<mailto: > >>> [hidden email] > >>> > >> > >>> > > > >>> wrote: > >>> > > > >>> > >>> > > > >>> > Hi, Jark, > >>> > > > >>> > I'm very interested in this feature, and > I'm > >>> also > >>> > > working > >>> > > > >>> on this > >>> > > > >>> > recently. > >>> > > > >>> > I just have a glance at the FLIP, it's > good, > >>> but > >>> > I > >>> > > found > >>> > > > >>> that > >>> > > > >>> > there is no plan to add SESSION windows. > >>> > > > >>> > Also, I think there can be more things we > >>> can do > >>> > > based on > >>> > > > >>> this new > >>> > > > >>> > syntax. For example, > >>> > > > >>> > - window sort support > >>> > > > >>> > - window union/intersect/minus support > >>> > > > >>> > - Improve dimension table join > >>> > > > >>> > We can have more deep discussion on this > new > >>> > > feature > >>> > > > later > >>> > > > >>> . > >>> > > > >>> > I've also opened an jira that is related to > >>> this > >>> > > feature > >>> > > > >>> recently: > >>> > > > >>> > https://issues.apache.org/jira/browse/FLINK-18830 > >>> > > > >>> > > >>> > > > >>> > Best! > >>> > > > >>> > PengchengLiu > >>> > > > >>> > > >>> > > > >>> > 在 2020/9/25 下午10:30,“Jark Wu”<[hidden email] > >>> <mailto: > >>> > [hidden email]>> 写入: > >>> > > > >>> > > >>> > > > >>> > Hi everyone, > >>> > > > >>> > > >>> > > > >>> > I want to start a FLIP about supporting > windowing > >>> > > > table-valued > >>> > > > >>> > functions > >>> > > > >>> > (TVF). > >>> > > > >>> > The main purpose of this FLIP is to improve the > >>> near > >>> > > > real-time > >>> > > > >>> (NRT) > >>> > > > >>> > experience of Flink. > >>> > > > >>> > > >>> > > > >>> > FLIP-145: > >>> > > > >>> > > >>> > > > >>> > > >>> > > > >>> > >>> > > > > >>> > > > >>> > > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function > >>> > > > >>> > > >>> > > > >>> > We want to introduce TUMBLE, HOP, CUMULATE > >>> windowing > >>> > > TVFs, > >>> > > > the > >>> > > > >>> > CUMULATE is > >>> > > > >>> > a new kind of window. > >>> > > > >>> > With the windowing TVFs, we can support richer > >>> > > operations on > >>> > > > >>> windows, > >>> > > > >>> > including window join, window TopN and so on. > >>> > > > >>> > This makes things simple: we only need to > assign > >>> > > windows at > >>> > > > the > >>> > > > >>> > beginning > >>> > > > >>> > of the query, and then apply operations after > >>> that > >>> > like > >>> > > > >>> traditional > >>> > > > >>> > batch > >>> > > > >>> > SQL. > >>> > > > >>> > We hope it can help to reduce the learning > curve > >>> of > >>> > > windows, > >>> > > > >>> improve > >>> > > > >>> > NRT > >>> > > > >>> > for Flink, and attract more batch users. > >>> > > > >>> > > >>> > > > >>> > A simple code snippet for 10 minutes tumbling > >>> window > >>> > > > aggregate: > >>> > > > >>> > > >>> > > > >>> > SELECT window_start, window_end, SUM(price) > >>> > > > >>> > FROM TABLE( > >>> > > > >>> > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), > >>> INTERVAL > >>> > > '10' > >>> > > > >>> MINUTES)) > >>> > > > >>> > GROUP BY window_start, window_end; > >>> > > > >>> > > >>> > > > >>> > I'm looking forward to your feedback. > >>> > > > >>> > > >>> > > > >>> > Best, > >>> > > > >>> > Jark > >>> > > > >>> > > >>> > > > >>> > > >>> > > > >>> > > >>> > > > >>> > >>> > > > >>> > >>> > > > >>> > >>> > > > > >>> > > > >>> > > > >>> > > -- > >>> > > > >>> > > Best, > >>> > > Benchao Li > >>> > > > >>> > > >>> > > >>> > -- > >>> > > >>> > Best, > >>> > Benchao Li > >>> > > >>> > > >>> > -- > >>> > > >>> > Best, > >>> > Benchao Li > >>> > > >>> > >> > -- Best, Jingsong Lee |
Hi Jingsong,
That's a good question. I did have searched a lot and didn't find any system that provides such an out-of-box function. I guess the reason is that in the traditional batch systems, this feature is supported by the over window and they don't need to invent a new function/syntax for this. For streaming systems, we are the first one to propose this new window. However, I think CUMULATE is a good name. Because almost all the databases call such scenarios as "cumulative window", e.g. Snowflake[1], SQL Server [2], Postgres [3]. Thus we choose "cumulative" as the base name, but use the verb form "cumulate" because other window function names are also verbs, e.g. tumble, hop. I hope this can address your concern. Best, Jark [1]: https://docs.snowflake.com/en/sql-reference/functions-analytic.html#cumulative-window-frame-examples [2]: https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql?view=sql-server-ver15#c-producing-a-moving-average-and-cumulative-total [3]: https://popsql.com/learn-sql/postgresql/how-to-calculate-cumulative-sum-running-total-in-postgresql On Sat, 10 Oct 2020 at 17:26, Jingsong Li <[hidden email]> wrote: > +1 for voting. Thanks Jark for driving. > > +1 for TVF, It has been put forward by theory and supported by calcite. It > will greatly enhance the window related operations. > > My personal feeling is that after TVF, the following operations can be > similar to the traditional batch SQL, as long as the window related > attributes are included in the key. > > I am not sure about the CUMULATE window, yes, It's a common requirement, Is > there any more evidence (other systems) to prove this word ("CUMULATE") is > appropriate. > > Best, > Jingsong > > On Sat, Oct 10, 2020 at 3:43 PM Jark Wu <[hidden email]> wrote: > > > Hi Pengcheng, > > > > IIUC, the "stream operators" you mean is the non-time operators or called > > regular operators, such as regular join, regular aggregate. > > But you may misunderstand me, only the time operators can't be applied > > after the new window operators, because of missing time attributes. > > The regular operators can still be applied after the new window > operators. > > > > Regarding using window TVFs to re-assign event-time and watermarks, I'm > not > > sure about this. > > Because assigning watermark requires to define the watermark strategy, > > however, the window TVF doesn't provide such ability. > > Polymorphic table functions are table functions which just append > > additional columns and convert N rows into M rows, it can't touch meta > > information. > > > > Best, > > Jark > > > > On Sat, 10 Oct 2020 at 15:41, Jark Wu <[hidden email]> wrote: > > > > > Hi Danny, > > > > > > Thanks for the hint about named params syntax, I added examples with > > named > > > params in the FLIP. > > > > > > Best, > > > Jark > > > > > > > > > On Sat, 10 Oct 2020 at 15:03, Pengcheng Liu < > [hidden email] > > > > > > wrote: > > > > > >> Hi, Jark, > > >> > > >> I've got some different opinions there, I think it's a very common > > use > > >> case to use > > >> window operators in combination with streaming operators(even those > > >> time operators). > > >> (e.g. for some tables, users only care data within a period, but > for > > >> other tables, they may > > >> want the whole historical data). > > >> The pipeline may looks like this: > > >> window join -> dimension table join -> stream aggregate -> stream > > sort > > >> > > >> Just as what you said, the key clause can be used to distinguish > > >> whether a operator should > > >> be translated to a window operator or a streaming operator. > > >> > > >> Also, as I've mentioned before, 1) for time operator after window > > >> aggregation, the auxiliary function > > >> which is used to access time attribute column can be actually > > replaced > > >> with (window_end -1). > > >> Actually, we only just need to make the results of the upstream > > >> contains a time column whose > > >> range is within (window_start, window_end), and thus the downstream > > >> time operators can work on it > > >> (driving by the original watermark in the source). 2) for time > > >> operator after other window operators, > > >> the downstream time operators can access the time column directly > > from > > >> it's input. > > >> > > >> One more thoughts there, maybe the window TVFs can re-assign > > >> timestamps and watermarks, so > > >> that in some case when the watermark can not be retrieved from > source > > >> directly(may needs some > > >> conversions), the watermark can still be assigned dynamically in > the > > >> SQL(use the time column as > > >> the watermark column) and thus make it work. I think this can save > > >> much time to revise the event > > >> time column in some cases(this is a real demand in our production > > >> environment). > > >> > > >> I strongly suggest that we should support the combination usage of > > >> window operators and > > >> streaming operators. And I think we can achieve this with little > > work. > > >> > > >> Best, > > >> Pengcheng > > >> > > >> > > >> Jark Wu <[hidden email]> 于2020年10月10日周六 下午1:45写道: > > >> > > >>> Hi Benchao, > > >>> > > >>> That's a good question. > > >>> > > >>> IMO, the new windowed operators and the current time operators are > two > > >>> different sets of functions, > > >>> just like time operators and non-time operators are two different > sets > > of > > >>> functions. > > >>> I think it's fine if we don't support integrating them, just like > time > > >>> operators can't be applied on non-windowed aggregate. > > >>> If users want to use time operators in the whole pipeline, then > he/she > > >>> can > > >>> use the grouped window aggregates instead of the window TVFs. > > >>> > > >>> The key idea of window TVF is that all the operators in the pipeline > > are > > >>> based on the **windows**. > > >>> In terms of syntax, if the key clause (e.g. group by, partitioned by, > > >>> join > > >>> on, order by) contains window_start and window_end, > > >>> it can be translated into windowed operators. > > >>> Thus, we will have windowed CEP, windowed sort, windowed over > aggregate > > >>> in > > >>> the future to make it possible to build a windowed pipeline. > > >>> > > >>> But I think we can elaborate the integration more in the future if > > users > > >>> need it. Actually, I don't fully understand the scenario of > integrating > > >>> window TVF and time operators at this point. > > >>> For example, interval join an input stream and a window join result. > I > > >>> don't see why it can't be expressed by nested window join and why > users > > >>> have to use interval join here. > > >>> Maybe we can wait for more inputs from users when the window TVF is > > >>> released and we can elaborate it again. > > >>> > > >>> Best, > > >>> Jark > > >>> > > >>> On Sat, 10 Oct 2020 at 12:01, 刘 芃成 <[hidden email]> > > wrote: > > >>> > > >>> > Hi, Benchao, > > >>> > I think I got your point, actually, in current > implementation > > >>> for > > >>> > group window aggregation, the value of time attributes(e.g. > > >>> > TUMBLE_ROWTIME/TUMBLE_PROCTIME) is calculated as (window_end – 1), > > so I > > >>> > think we can just use it directly if you need this. But I think > this > > >>> time > > >>> > attributes is mainly suggested to use in case of cascaded window > > >>> operations. > > >>> > Regarding the example you provided, I think the semantics of the > SQL > > in > > >>> > your example which doing interval join(e.g. with TUMBLE_ROWTIME) > > after > > >>> > window aggregation is not clear in the current implementation, and > I > > >>> think > > >>> > that’s a strong reason why we need the new TVFs syntax. > > >>> > With the new syntax, users should understand which time > column > > to > > >>> > use and how to generate it when doing interval join and etc. > > >>> > > > >>> > Best, > > >>> > Pengcheng > > >>> > > > >>> > 发件人: Benchao Li <[hidden email]> > > >>> > 日期: 2020年10月10日 星期六 上午11:02 > > >>> > 收件人: pengcheng Liu <[hidden email]> > > >>> > 抄送: dev <[hidden email]> > > >>> > 主题: Re: [DISCUSS] FLIP-145: Support SQL windowing table-valued > > function > > >>> > > > >>> > Hi pengcheng, > > >>> > > > >>> > Thanks for your response. > > >>> > I knew that the original time attribute column will be retained > after > > >>> the > > >>> > TVF, > > >>> > what I'm questioning is how do we get the time attribute column > after > > >>> > Aggregation. > > >>> > Your answer did not remove my doubts about this. > > >>> > > > >>> > It's ok if we did not plan to integrate new TVF aggregate with old > > >>> "time > > >>> > attribute scenarios" > > >>> > listed in my previous email in this FLIP. However it's good to > > >>> elaborate > > >>> > leave it to the future plan. > > >>> > > > >>> > pengcheng Liu <[hidden email]<mailto: > > >>> > [hidden email]>> 于2020年10月10日周六 上午10:45写道: > > >>> > Hi,Benchao, > > >>> > In TVFs, the time attributes is just passed through from parent > > >>> rels, > > >>> > and the TVFs just add two > > >>> > additional window attributes(i.e. window_start & window_end). > > >>> Also, I > > >>> > think the time columns can be not only a time attribute > > >>> > with type of `TimeIndicatorType` but also a regular column with > > >>> type > > >>> > of `Timestamp`. > > >>> > > > >>> > For cascaded window operations, we can use > > window_start/window_end > > >>> of > > >>> > the previous window result directly to > > >>> > indicate operating on the same window, or use new DESCRIPTOR > > >>> column > > >>> > to assign new windows, in case of the change of > > >>> > the time column(e.g. in some case, the original timestamp is > > >>> > inaccurate and need some conversion to be used). > > >>> > > > >>> > You can check the definition or signature of these TVFs in the > > >>> FLIP. > > >>> > e.g. > > >>> > SELECT * FROM TABLE( > > >>> > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) > > >>> > In the example, the `bidtime` is the time attribute column, > which > > >>> is > > >>> > the first operand of the DESCRIPTOR function. > > >>> > > > >>> > +1 start voting. > > >>> > > > >>> > Benchao Li <[hidden email]<mailto:[hidden email]>> > > >>> > 于2020年10月10日周六 上午10:08写道: > > >>> > Hi Jark, > > >>> > > > >>> > 2 & 3 sounds good to me. > > >>> > > > >>> > Regarding time attribute, > > >>> > I still have some questions, I knew it's easy to support cascaded > > >>> window > > >>> > aggregate using new TVFs. > > >>> > However there are some other places where need time attribute: > > >>> > - CEP > > >>> > - interval join > > >>> > - order by > > >>> > - over window > > >>> > If there is no time attribute column, how do we integrate these old > > >>> > features with the new TVFs. > > >>> > E.g. > > >>> > StreamA -> new window aggregate -> interval join -> Sink > > >>> > / > > >>> > StreamB ----------------------------------- > > >>> > > > >>> > > > >>> > Jark Wu <[hidden email]<mailto:[hidden email]>> 于2020年10月9日周五 > > >>> > 下午11:51写道: > > >>> > Hi Benchao, > > >>> > > > >>> > 1) time attribute > > >>> > Yes. We don't need time attribute auxiliary function. Because the > new > > >>> > window operations are all based on the > > >>> > window_start and window_end columns instead of on the time > > >>> attributes. So > > >>> > we don't need to propagate time attributes. > > >>> > Cascaded window aggregate can be expressed by simply GROUP BY the > > >>> > window_start and window_end of the previous window result. > > >>> > I have added a cascaded window aggregate example in the Tumbling > > Window > > >>> > section in the FLIP. > > >>> > If you want to define proctime window aggregate, the time column in > > TVF > > >>> > should be a proctime attribute field (or PROCTIME() function). > > >>> > > > >>> > 2) batch support > > >>> > Yes. The proposed syntax/API are unified for batch and streaming. > > Batch > > >>> > support is in the plan, but may not have enough time to catch up > > 1.12. > > >>> > > > >>> > 3) support `grouping sets` > > >>> > This is not included in the FLIP, but I think it's great if we can > > >>> support > > >>> > `grouping sets`. > > >>> > The existing window impl doesn't support this because we convert > the > > >>> > LogicalAggregate into WindowAggregate in the beginning, > > >>> > the expand grouping sets rule can't be applied in this situation. > > >>> > Fortunately, with the new window impl, the conversion to > > >>> WindowAggregate > > >>> > will happen at the end, so I think the expand rule can be > > >>> > applied and support this feature naturally. > > >>> > Therefore, IMO, we don't need to include this feature in this FLIP > to > > >>> avoid > > >>> > the FLIP being too large. > > >>> > This can be a follow-up issue (maybe just add tests and docs) after > > the > > >>> > FLIP. > > >>> > > > >>> > Best, > > >>> > Jark > > >>> > > > >>> > > > >>> > On Fri, 9 Oct 2020 at 19:09, 刘 芃成 <[hidden email] > > <mailto: > > >>> > [hidden email]>> wrote: > > >>> > > > >>> > > Hi,Benchao, > > >>> > > Welcome to join the discussion, yes, this new syntax can > > >>> make SQL > > >>> > > more clear and simpler. > > >>> > > For your first question, the `window_start` and > > `window_end` > > >>> > > columns will be added automatically, > > >>> > > so we don't need to use auxiliary group functions to > infer > > or > > >>> > > access the window properties. > > >>> > > > > >>> > > For the `grouping sets` on TVFs, I think it's interesting > > if > > >>> we > > >>> > > can support it, as we already supported `grouping sets` > > >>> > > on streaming aggregates in blink planner. But I'm not > sure > > >>> if it > > >>> > > will be included into this FLIP. > > >>> > > > > >>> > > cc @Jark Wu > > >>> > > > > >>> > > Best, > > >>> > > Pengcheng > > >>> > > > > >>> > > > > >>> > > 在 2020/10/9 下午5:25,“Benchao Li”<[hidden email]<mailto: > > >>> > [hidden email]>> 写入: > > >>> > > > > >>> > > Thanks Jark for bringing this discussion, I like this FLIP > very > > >>> much. > > >>> > > > > >>> > > Especially the cumulate window, it's much like the current > > TUMBLE > > >>> > > window + > > >>> > > Fast Emit (which is an undocumented experimental feature), > > >>> however, > > >>> > > it's > > >>> > > more powerful. > > >>> > > > > >>> > > And This will make the SQL semantic more standard, especially > > >>> for the > > >>> > > HOPPING window. > > >>> > > > > >>> > > Regarding time attribute, > > >>> > > It seems that we don't need a specific function to infer the > > time > > >>> > > attribute > > >>> > > like > > >>> > > `TUMBLE_ROWTIME` / `TUMBLE_PROCTIME`. Then are `window_start` > > and > > >>> > > `window_end` > > >>> > > column a time attribute column automatically? > > >>> > > - If not, what will be the time attribute of the result > > relation > > >>> of > > >>> > > these > > >>> > > TVFs? > > >>> > > Especially after the window aggregation. > > >>> > > - If yes, then how do we handle proctime? > > >>> > > > > >>> > > Regarding batch operators, > > >>> > > It's great to hear that we can reuse the batch operators in > > >>> > continuous > > >>> > > batch mode > > >>> > > as you mentioned in the FLIP. > > >>> > > Current window aggregate could also be used in batch mode > with > > >>> > > rowtime. Do > > >>> > > you plan > > >>> > > to support these TVFs for batch mode in this FLIP? Hence the > > >>> > Table/SQL > > >>> > > is a > > >>> > > unified > > >>> > > API, it's great if we can keep the features complete both in > > >>> > streaming > > >>> > > and > > >>> > > batch mode. > > >>> > > > > >>> > > There is one more question, I don't know whether it should be > > >>> > > considered in > > >>> > > this FLIP. > > >>> > > Does the new window support `grouping sets`? (It's not > > supported > > >>> in > > >>> > old > > >>> > > window impl). > > >>> > > > > >>> > > Jark Wu <[hidden email]<mailto:[hidden email]>> > > >>> 于2020年10月9日周五 > > >>> > 下午4:14写道: > > >>> > > > > >>> > > > Hi all, > > >>> > > > > > >>> > > > I know we have a lot of discussion and development on going > > >>> right > > >>> > > now but > > >>> > > > it would be great if we can get FLIP-145 into a votable > > state. > > >>> > > > If there are no objections, I would like to start voting in > > the > > >>> > next > > >>> > > days. > > >>> > > > > > >>> > > > Best, > > >>> > > > Jark > > >>> > > > > > >>> > > > On Thu, 1 Oct 2020 at 14:29, Jark Wu <[hidden email] > > <mailto: > > >>> > [hidden email]>> wrote: > > >>> > > > > > >>> > > > > Hi everyone, > > >>> > > > > > > >>> > > > > I have added a section for Performance Optimization to > > >>> describe > > >>> > > how to > > >>> > > > > improve the performance in the short-term and long-term > > >>> > > > > and sketch the future performance potential under the new > > >>> window > > >>> > > API. > > >>> > > > > Introducing the window API is just the first step, we > will > > >>> > > > > continuously improve the performance to make it powerful > > and > > >>> > > useful. > > >>> > > > > > > >>> > > > > Best, > > >>> > > > > Jark > > >>> > > > > > > >>> > > > > On Thu, 1 Oct 2020 at 14:28, Jark Wu <[hidden email] > > >>> <mailto: > > >>> > [hidden email]>> wrote: > > >>> > > > > > > >>> > > > >> Hi Pengcheng, > > >>> > > > >> > > >>> > > > >> Yes, the window TVF is part of the FLIP. Welcome to > > >>> contribute > > >>> > > and join > > >>> > > > >> the discussion. > > >>> > > > >> Regarding the SESSION window aggregation, users can use > > the > > >>> > > existing > > >>> > > > >> grouped session window function. > > >>> > > > >> > > >>> > > > >> Best, > > >>> > > > >> Jark > > >>> > > > >> > > >>> > > > >> On Sun, 27 Sep 2020 at 21:24, liupengcheng < > > >>> > > [hidden email]<mailto:[hidden email]> > > >>> > > > > > > >>> > > > >> wrote: > > >>> > > > >> > > >>> > > > >>> Hi Jark, > > >>> > > > >>> Thanks for reply, yes, I think it's a good > > >>> feature, it > > >>> > > can > > >>> > > > >>> improve the NRT scenarios > > >>> > > > >>> as you mentioned in the FLIP. Also, I think it > > can > > >>> > > improve the > > >>> > > > >>> streaming SQL greatly, > > >>> > > > >>> it can support richer window operations in > flink > > >>> SQL > > >>> > and > > >>> > > bring > > >>> > > > >>> great convenience to users. > > >>> > > > >>> (we are now only supported group window in > > flink). > > >>> > > > >>> > > >>> > > > >>> Regarding the SESSION window, I think it's > > >>> especially > > >>> > > useful > > >>> > > > for > > >>> > > > >>> user behavior analysis(e.g. > > >>> > > > >>> counting user visits on a news website or > social > > >>> > > platform), but > > >>> > > > >>> I agree that we can keep it > > >>> > > > >>> out of the FLIP now to catch up 1.12. > > >>> > > > >>> > > >>> > > > >>> Recently, I've done some work on the stream > > planner > > >>> > with > > >>> > > the > > >>> > > > >>> TVFs, and I'm willing to contribute > > >>> > > > >>> to this part. Is it in the plan of this FLIP? > > >>> > > > >>> > > >>> > > > >>> Best, > > >>> > > > >>> PengchengLiu > > >>> > > > >>> > > >>> > > > >>> > > >>> > > > >>> 在 2020/9/26 下午11:09,“Jark Wu”<[hidden email] > <mailto: > > >>> > [hidden email]>> 写入: > > >>> > > > >>> > > >>> > > > >>> Hi pengcheng, > > >>> > > > >>> > > >>> > > > >>> That's great to see you also have the need of > window > > >>> join. > > >>> > > > >>> You are right, the windowing TVF is a powerful > > feature > > >>> > which > > >>> > > can > > >>> > > > >>> support > > >>> > > > >>> more operations in the future. > > >>> > > > >>> I think it as of the date time "partition" > selection > > in > > >>> > > batch SQL > > >>> > > > >>> jobs, > > >>> > > > >>> with this new syntax, I think it is possible > > >>> > > > >>> to migrate traditional batch SQL jobs to Flink SQL > > by > > >>> > > changing a > > >>> > > > >>> few lines. > > >>> > > > >>> > > >>> > > > >>> Regarding the SESSION window, this is on purpose to > > >>> keep it > > >>> > > out of > > >>> > > > >>> the > > >>> > > > >>> FLIP, because we want to keep the > > >>> > > > >>> FLIP small to catch up 1.12 and SESSION TVF is > rarely > > >>> > useful > > >>> > > (e.g. > > >>> > > > >>> session > > >>> > > > >>> window join?). > > >>> > > > >>> > > >>> > > > >>> Best, > > >>> > > > >>> Jark > > >>> > > > >>> > > >>> > > > >>> On Fri, 25 Sep 2020 at 22:59, liupengcheng < > > >>> > > > >>> [hidden email]<mailto: > > >>> [hidden email] > > >>> > >> > > >>> > > > >>> wrote: > > >>> > > > >>> > > >>> > > > >>> > Hi, Jark, > > >>> > > > >>> > I'm very interested in this feature, and > > I'm > > >>> also > > >>> > > working > > >>> > > > >>> on this > > >>> > > > >>> > recently. > > >>> > > > >>> > I just have a glance at the FLIP, it's > > good, > > >>> but > > >>> > I > > >>> > > found > > >>> > > > >>> that > > >>> > > > >>> > there is no plan to add SESSION windows. > > >>> > > > >>> > Also, I think there can be more things we > > >>> can do > > >>> > > based on > > >>> > > > >>> this new > > >>> > > > >>> > syntax. For example, > > >>> > > > >>> > - window sort support > > >>> > > > >>> > - window union/intersect/minus support > > >>> > > > >>> > - Improve dimension table join > > >>> > > > >>> > We can have more deep discussion on this > > new > > >>> > > feature > > >>> > > > later > > >>> > > > >>> . > > >>> > > > >>> > I've also opened an jira that is related > to > > >>> this > > >>> > > feature > > >>> > > > >>> recently: > > >>> > > > >>> > > https://issues.apache.org/jira/browse/FLINK-18830 > > >>> > > > >>> > > > >>> > > > >>> > Best! > > >>> > > > >>> > PengchengLiu > > >>> > > > >>> > > > >>> > > > >>> > 在 2020/9/25 下午10:30,“Jark Wu”<[hidden email] > > >>> <mailto: > > >>> > [hidden email]>> 写入: > > >>> > > > >>> > > > >>> > > > >>> > Hi everyone, > > >>> > > > >>> > > > >>> > > > >>> > I want to start a FLIP about supporting > > windowing > > >>> > > > table-valued > > >>> > > > >>> > functions > > >>> > > > >>> > (TVF). > > >>> > > > >>> > The main purpose of this FLIP is to improve > the > > >>> near > > >>> > > > real-time > > >>> > > > >>> (NRT) > > >>> > > > >>> > experience of Flink. > > >>> > > > >>> > > > >>> > > > >>> > FLIP-145: > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function > > >>> > > > >>> > > > >>> > > > >>> > We want to introduce TUMBLE, HOP, CUMULATE > > >>> windowing > > >>> > > TVFs, > > >>> > > > the > > >>> > > > >>> > CUMULATE is > > >>> > > > >>> > a new kind of window. > > >>> > > > >>> > With the windowing TVFs, we can support > richer > > >>> > > operations on > > >>> > > > >>> windows, > > >>> > > > >>> > including window join, window TopN and so on. > > >>> > > > >>> > This makes things simple: we only need to > > assign > > >>> > > windows at > > >>> > > > the > > >>> > > > >>> > beginning > > >>> > > > >>> > of the query, and then apply operations after > > >>> that > > >>> > like > > >>> > > > >>> traditional > > >>> > > > >>> > batch > > >>> > > > >>> > SQL. > > >>> > > > >>> > We hope it can help to reduce the learning > > curve > > >>> of > > >>> > > windows, > > >>> > > > >>> improve > > >>> > > > >>> > NRT > > >>> > > > >>> > for Flink, and attract more batch users. > > >>> > > > >>> > > > >>> > > > >>> > A simple code snippet for 10 minutes tumbling > > >>> window > > >>> > > > aggregate: > > >>> > > > >>> > > > >>> > > > >>> > SELECT window_start, window_end, SUM(price) > > >>> > > > >>> > FROM TABLE( > > >>> > > > >>> > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), > > >>> INTERVAL > > >>> > > '10' > > >>> > > > >>> MINUTES)) > > >>> > > > >>> > GROUP BY window_start, window_end; > > >>> > > > >>> > > > >>> > > > >>> > I'm looking forward to your feedback. > > >>> > > > >>> > > > >>> > > > >>> > Best, > > >>> > > > >>> > Jark > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > >>> > > > >>> > > >>> > > > >>> > > >>> > > > > > >>> > > > > >>> > > > > >>> > > -- > > >>> > > > > >>> > > Best, > > >>> > > Benchao Li > > >>> > > > > >>> > > > >>> > > > >>> > -- > > >>> > > > >>> > Best, > > >>> > Benchao Li > > >>> > > > >>> > > > >>> > -- > > >>> > > > >>> > Best, > > >>> > Benchao Li > > >>> > > > >>> > > >> > > > > > -- > Best, Jingsong Lee > |
Hi everyone,
Thanks everyone for this healthy discussion. I think we have addressed all the concerns. I would continue with a voting. If you have any new objections, feel free to let me know. Best, Jark On Sat, 10 Oct 2020 at 17:54, Jark Wu <[hidden email]> wrote: > Hi Jingsong, > > That's a good question. I did have searched a lot and didn't find any > system that provides such an out-of-box function. > I guess the reason is that in the traditional batch systems, this feature > is supported by the over window and they don't need to invent a > new function/syntax for this. > For streaming systems, we are the first one to propose this new window. > > However, I think CUMULATE is a good name. Because almost all the databases > call such scenarios as "cumulative window", e.g. Snowflake[1], SQL Server > [2], Postgres [3]. > Thus we choose "cumulative" as the base name, but use the verb form > "cumulate" because other window function names are also verbs, e.g. tumble, > hop. > > I hope this can address your concern. > > Best, > Jark > > [1]: > https://docs.snowflake.com/en/sql-reference/functions-analytic.html#cumulative-window-frame-examples > [2]: > https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql?view=sql-server-ver15#c-producing-a-moving-average-and-cumulative-total > [3]: > https://popsql.com/learn-sql/postgresql/how-to-calculate-cumulative-sum-running-total-in-postgresql > > On Sat, 10 Oct 2020 at 17:26, Jingsong Li <[hidden email]> wrote: > >> +1 for voting. Thanks Jark for driving. >> >> +1 for TVF, It has been put forward by theory and supported by calcite. It >> will greatly enhance the window related operations. >> >> My personal feeling is that after TVF, the following operations can be >> similar to the traditional batch SQL, as long as the window related >> attributes are included in the key. >> >> I am not sure about the CUMULATE window, yes, It's a common requirement, >> Is >> there any more evidence (other systems) to prove this word ("CUMULATE") is >> appropriate. >> >> Best, >> Jingsong >> >> On Sat, Oct 10, 2020 at 3:43 PM Jark Wu <[hidden email]> wrote: >> >> > Hi Pengcheng, >> > >> > IIUC, the "stream operators" you mean is the non-time operators or >> called >> > regular operators, such as regular join, regular aggregate. >> > But you may misunderstand me, only the time operators can't be applied >> > after the new window operators, because of missing time attributes. >> > The regular operators can still be applied after the new window >> operators. >> > >> > Regarding using window TVFs to re-assign event-time and watermarks, I'm >> not >> > sure about this. >> > Because assigning watermark requires to define the watermark strategy, >> > however, the window TVF doesn't provide such ability. >> > Polymorphic table functions are table functions which just append >> > additional columns and convert N rows into M rows, it can't touch meta >> > information. >> > >> > Best, >> > Jark >> > >> > On Sat, 10 Oct 2020 at 15:41, Jark Wu <[hidden email]> wrote: >> > >> > > Hi Danny, >> > > >> > > Thanks for the hint about named params syntax, I added examples with >> > named >> > > params in the FLIP. >> > > >> > > Best, >> > > Jark >> > > >> > > >> > > On Sat, 10 Oct 2020 at 15:03, Pengcheng Liu < >> [hidden email] >> > > >> > > wrote: >> > > >> > >> Hi, Jark, >> > >> >> > >> I've got some different opinions there, I think it's a very common >> > use >> > >> case to use >> > >> window operators in combination with streaming operators(even >> those >> > >> time operators). >> > >> (e.g. for some tables, users only care data within a period, but >> for >> > >> other tables, they may >> > >> want the whole historical data). >> > >> The pipeline may looks like this: >> > >> window join -> dimension table join -> stream aggregate -> stream >> > sort >> > >> >> > >> Just as what you said, the key clause can be used to distinguish >> > >> whether a operator should >> > >> be translated to a window operator or a streaming operator. >> > >> >> > >> Also, as I've mentioned before, 1) for time operator after window >> > >> aggregation, the auxiliary function >> > >> which is used to access time attribute column can be actually >> > replaced >> > >> with (window_end -1). >> > >> Actually, we only just need to make the results of the upstream >> > >> contains a time column whose >> > >> range is within (window_start, window_end), and thus the >> downstream >> > >> time operators can work on it >> > >> (driving by the original watermark in the source). 2) for time >> > >> operator after other window operators, >> > >> the downstream time operators can access the time column directly >> > from >> > >> it's input. >> > >> >> > >> One more thoughts there, maybe the window TVFs can re-assign >> > >> timestamps and watermarks, so >> > >> that in some case when the watermark can not be retrieved from >> source >> > >> directly(may needs some >> > >> conversions), the watermark can still be assigned dynamically in >> the >> > >> SQL(use the time column as >> > >> the watermark column) and thus make it work. I think this can save >> > >> much time to revise the event >> > >> time column in some cases(this is a real demand in our production >> > >> environment). >> > >> >> > >> I strongly suggest that we should support the combination usage of >> > >> window operators and >> > >> streaming operators. And I think we can achieve this with little >> > work. >> > >> >> > >> Best, >> > >> Pengcheng >> > >> >> > >> >> > >> Jark Wu <[hidden email]> 于2020年10月10日周六 下午1:45写道: >> > >> >> > >>> Hi Benchao, >> > >>> >> > >>> That's a good question. >> > >>> >> > >>> IMO, the new windowed operators and the current time operators are >> two >> > >>> different sets of functions, >> > >>> just like time operators and non-time operators are two different >> sets >> > of >> > >>> functions. >> > >>> I think it's fine if we don't support integrating them, just like >> time >> > >>> operators can't be applied on non-windowed aggregate. >> > >>> If users want to use time operators in the whole pipeline, then >> he/she >> > >>> can >> > >>> use the grouped window aggregates instead of the window TVFs. >> > >>> >> > >>> The key idea of window TVF is that all the operators in the pipeline >> > are >> > >>> based on the **windows**. >> > >>> In terms of syntax, if the key clause (e.g. group by, partitioned >> by, >> > >>> join >> > >>> on, order by) contains window_start and window_end, >> > >>> it can be translated into windowed operators. >> > >>> Thus, we will have windowed CEP, windowed sort, windowed over >> aggregate >> > >>> in >> > >>> the future to make it possible to build a windowed pipeline. >> > >>> >> > >>> But I think we can elaborate the integration more in the future if >> > users >> > >>> need it. Actually, I don't fully understand the scenario of >> integrating >> > >>> window TVF and time operators at this point. >> > >>> For example, interval join an input stream and a window join >> result. I >> > >>> don't see why it can't be expressed by nested window join and why >> users >> > >>> have to use interval join here. >> > >>> Maybe we can wait for more inputs from users when the window TVF is >> > >>> released and we can elaborate it again. >> > >>> >> > >>> Best, >> > >>> Jark >> > >>> >> > >>> On Sat, 10 Oct 2020 at 12:01, 刘 芃成 <[hidden email]> >> > wrote: >> > >>> >> > >>> > Hi, Benchao, >> > >>> > I think I got your point, actually, in current >> implementation >> > >>> for >> > >>> > group window aggregation, the value of time attributes(e.g. >> > >>> > TUMBLE_ROWTIME/TUMBLE_PROCTIME) is calculated as (window_end – 1), >> > so I >> > >>> > think we can just use it directly if you need this. But I think >> this >> > >>> time >> > >>> > attributes is mainly suggested to use in case of cascaded window >> > >>> operations. >> > >>> > Regarding the example you provided, I think the semantics of the >> SQL >> > in >> > >>> > your example which doing interval join(e.g. with TUMBLE_ROWTIME) >> > after >> > >>> > window aggregation is not clear in the current implementation, >> and I >> > >>> think >> > >>> > that’s a strong reason why we need the new TVFs syntax. >> > >>> > With the new syntax, users should understand which time >> column >> > to >> > >>> > use and how to generate it when doing interval join and etc. >> > >>> > >> > >>> > Best, >> > >>> > Pengcheng >> > >>> > >> > >>> > 发件人: Benchao Li <[hidden email]> >> > >>> > 日期: 2020年10月10日 星期六 上午11:02 >> > >>> > 收件人: pengcheng Liu <[hidden email]> >> > >>> > 抄送: dev <[hidden email]> >> > >>> > 主题: Re: [DISCUSS] FLIP-145: Support SQL windowing table-valued >> > function >> > >>> > >> > >>> > Hi pengcheng, >> > >>> > >> > >>> > Thanks for your response. >> > >>> > I knew that the original time attribute column will be retained >> after >> > >>> the >> > >>> > TVF, >> > >>> > what I'm questioning is how do we get the time attribute column >> after >> > >>> > Aggregation. >> > >>> > Your answer did not remove my doubts about this. >> > >>> > >> > >>> > It's ok if we did not plan to integrate new TVF aggregate with old >> > >>> "time >> > >>> > attribute scenarios" >> > >>> > listed in my previous email in this FLIP. However it's good to >> > >>> elaborate >> > >>> > leave it to the future plan. >> > >>> > >> > >>> > pengcheng Liu <[hidden email]<mailto: >> > >>> > [hidden email]>> 于2020年10月10日周六 上午10:45写道: >> > >>> > Hi,Benchao, >> > >>> > In TVFs, the time attributes is just passed through from >> parent >> > >>> rels, >> > >>> > and the TVFs just add two >> > >>> > additional window attributes(i.e. window_start & window_end). >> > >>> Also, I >> > >>> > think the time columns can be not only a time attribute >> > >>> > with type of `TimeIndicatorType` but also a regular column >> with >> > >>> type >> > >>> > of `Timestamp`. >> > >>> > >> > >>> > For cascaded window operations, we can use >> > window_start/window_end >> > >>> of >> > >>> > the previous window result directly to >> > >>> > indicate operating on the same window, or use new DESCRIPTOR >> > >>> column >> > >>> > to assign new windows, in case of the change of >> > >>> > the time column(e.g. in some case, the original timestamp is >> > >>> > inaccurate and need some conversion to be used). >> > >>> > >> > >>> > You can check the definition or signature of these TVFs in the >> > >>> FLIP. >> > >>> > e.g. >> > >>> > SELECT * FROM TABLE( >> > >>> > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) >> > >>> > In the example, the `bidtime` is the time attribute column, >> which >> > >>> is >> > >>> > the first operand of the DESCRIPTOR function. >> > >>> > >> > >>> > +1 start voting. >> > >>> > >> > >>> > Benchao Li <[hidden email]<mailto:[hidden email]>> >> > >>> > 于2020年10月10日周六 上午10:08写道: >> > >>> > Hi Jark, >> > >>> > >> > >>> > 2 & 3 sounds good to me. >> > >>> > >> > >>> > Regarding time attribute, >> > >>> > I still have some questions, I knew it's easy to support cascaded >> > >>> window >> > >>> > aggregate using new TVFs. >> > >>> > However there are some other places where need time attribute: >> > >>> > - CEP >> > >>> > - interval join >> > >>> > - order by >> > >>> > - over window >> > >>> > If there is no time attribute column, how do we integrate these >> old >> > >>> > features with the new TVFs. >> > >>> > E.g. >> > >>> > StreamA -> new window aggregate -> interval join -> Sink >> > >>> > / >> > >>> > StreamB ----------------------------------- >> > >>> > >> > >>> > >> > >>> > Jark Wu <[hidden email]<mailto:[hidden email]>> 于2020年10月9日周五 >> > >>> > 下午11:51写道: >> > >>> > Hi Benchao, >> > >>> > >> > >>> > 1) time attribute >> > >>> > Yes. We don't need time attribute auxiliary function. Because the >> new >> > >>> > window operations are all based on the >> > >>> > window_start and window_end columns instead of on the time >> > >>> attributes. So >> > >>> > we don't need to propagate time attributes. >> > >>> > Cascaded window aggregate can be expressed by simply GROUP BY the >> > >>> > window_start and window_end of the previous window result. >> > >>> > I have added a cascaded window aggregate example in the Tumbling >> > Window >> > >>> > section in the FLIP. >> > >>> > If you want to define proctime window aggregate, the time column >> in >> > TVF >> > >>> > should be a proctime attribute field (or PROCTIME() function). >> > >>> > >> > >>> > 2) batch support >> > >>> > Yes. The proposed syntax/API are unified for batch and streaming. >> > Batch >> > >>> > support is in the plan, but may not have enough time to catch up >> > 1.12. >> > >>> > >> > >>> > 3) support `grouping sets` >> > >>> > This is not included in the FLIP, but I think it's great if we can >> > >>> support >> > >>> > `grouping sets`. >> > >>> > The existing window impl doesn't support this because we convert >> the >> > >>> > LogicalAggregate into WindowAggregate in the beginning, >> > >>> > the expand grouping sets rule can't be applied in this situation. >> > >>> > Fortunately, with the new window impl, the conversion to >> > >>> WindowAggregate >> > >>> > will happen at the end, so I think the expand rule can be >> > >>> > applied and support this feature naturally. >> > >>> > Therefore, IMO, we don't need to include this feature in this >> FLIP to >> > >>> avoid >> > >>> > the FLIP being too large. >> > >>> > This can be a follow-up issue (maybe just add tests and docs) >> after >> > the >> > >>> > FLIP. >> > >>> > >> > >>> > Best, >> > >>> > Jark >> > >>> > >> > >>> > >> > >>> > On Fri, 9 Oct 2020 at 19:09, 刘 芃成 <[hidden email] >> > <mailto: >> > >>> > [hidden email]>> wrote: >> > >>> > >> > >>> > > Hi,Benchao, >> > >>> > > Welcome to join the discussion, yes, this new syntax can >> > >>> make SQL >> > >>> > > more clear and simpler. >> > >>> > > For your first question, the `window_start` and >> > `window_end` >> > >>> > > columns will be added automatically, >> > >>> > > so we don't need to use auxiliary group functions to >> infer >> > or >> > >>> > > access the window properties. >> > >>> > > >> > >>> > > For the `grouping sets` on TVFs, I think it's >> interesting >> > if >> > >>> we >> > >>> > > can support it, as we already supported `grouping sets` >> > >>> > > on streaming aggregates in blink planner. But I'm not >> sure >> > >>> if it >> > >>> > > will be included into this FLIP. >> > >>> > > >> > >>> > > cc @Jark Wu >> > >>> > > >> > >>> > > Best, >> > >>> > > Pengcheng >> > >>> > > >> > >>> > > >> > >>> > > 在 2020/10/9 下午5:25,“Benchao Li”<[hidden email]<mailto: >> > >>> > [hidden email]>> 写入: >> > >>> > > >> > >>> > > Thanks Jark for bringing this discussion, I like this FLIP >> very >> > >>> much. >> > >>> > > >> > >>> > > Especially the cumulate window, it's much like the current >> > TUMBLE >> > >>> > > window + >> > >>> > > Fast Emit (which is an undocumented experimental feature), >> > >>> however, >> > >>> > > it's >> > >>> > > more powerful. >> > >>> > > >> > >>> > > And This will make the SQL semantic more standard, >> especially >> > >>> for the >> > >>> > > HOPPING window. >> > >>> > > >> > >>> > > Regarding time attribute, >> > >>> > > It seems that we don't need a specific function to infer the >> > time >> > >>> > > attribute >> > >>> > > like >> > >>> > > `TUMBLE_ROWTIME` / `TUMBLE_PROCTIME`. Then are >> `window_start` >> > and >> > >>> > > `window_end` >> > >>> > > column a time attribute column automatically? >> > >>> > > - If not, what will be the time attribute of the result >> > relation >> > >>> of >> > >>> > > these >> > >>> > > TVFs? >> > >>> > > Especially after the window aggregation. >> > >>> > > - If yes, then how do we handle proctime? >> > >>> > > >> > >>> > > Regarding batch operators, >> > >>> > > It's great to hear that we can reuse the batch operators in >> > >>> > continuous >> > >>> > > batch mode >> > >>> > > as you mentioned in the FLIP. >> > >>> > > Current window aggregate could also be used in batch mode >> with >> > >>> > > rowtime. Do >> > >>> > > you plan >> > >>> > > to support these TVFs for batch mode in this FLIP? Hence the >> > >>> > Table/SQL >> > >>> > > is a >> > >>> > > unified >> > >>> > > API, it's great if we can keep the features complete both in >> > >>> > streaming >> > >>> > > and >> > >>> > > batch mode. >> > >>> > > >> > >>> > > There is one more question, I don't know whether it should >> be >> > >>> > > considered in >> > >>> > > this FLIP. >> > >>> > > Does the new window support `grouping sets`? (It's not >> > supported >> > >>> in >> > >>> > old >> > >>> > > window impl). >> > >>> > > >> > >>> > > Jark Wu <[hidden email]<mailto:[hidden email]>> >> > >>> 于2020年10月9日周五 >> > >>> > 下午4:14写道: >> > >>> > > >> > >>> > > > Hi all, >> > >>> > > > >> > >>> > > > I know we have a lot of discussion and development on >> going >> > >>> right >> > >>> > > now but >> > >>> > > > it would be great if we can get FLIP-145 into a votable >> > state. >> > >>> > > > If there are no objections, I would like to start voting >> in >> > the >> > >>> > next >> > >>> > > days. >> > >>> > > > >> > >>> > > > Best, >> > >>> > > > Jark >> > >>> > > > >> > >>> > > > On Thu, 1 Oct 2020 at 14:29, Jark Wu <[hidden email] >> > <mailto: >> > >>> > [hidden email]>> wrote: >> > >>> > > > >> > >>> > > > > Hi everyone, >> > >>> > > > > >> > >>> > > > > I have added a section for Performance Optimization to >> > >>> describe >> > >>> > > how to >> > >>> > > > > improve the performance in the short-term and long-term >> > >>> > > > > and sketch the future performance potential under the >> new >> > >>> window >> > >>> > > API. >> > >>> > > > > Introducing the window API is just the first step, we >> will >> > >>> > > > > continuously improve the performance to make it powerful >> > and >> > >>> > > useful. >> > >>> > > > > >> > >>> > > > > Best, >> > >>> > > > > Jark >> > >>> > > > > >> > >>> > > > > On Thu, 1 Oct 2020 at 14:28, Jark Wu <[hidden email] >> > >>> <mailto: >> > >>> > [hidden email]>> wrote: >> > >>> > > > > >> > >>> > > > >> Hi Pengcheng, >> > >>> > > > >> >> > >>> > > > >> Yes, the window TVF is part of the FLIP. Welcome to >> > >>> contribute >> > >>> > > and join >> > >>> > > > >> the discussion. >> > >>> > > > >> Regarding the SESSION window aggregation, users can use >> > the >> > >>> > > existing >> > >>> > > > >> grouped session window function. >> > >>> > > > >> >> > >>> > > > >> Best, >> > >>> > > > >> Jark >> > >>> > > > >> >> > >>> > > > >> On Sun, 27 Sep 2020 at 21:24, liupengcheng < >> > >>> > > [hidden email]<mailto:[hidden email]> >> > >>> > > > > >> > >>> > > > >> wrote: >> > >>> > > > >> >> > >>> > > > >>> Hi Jark, >> > >>> > > > >>> Thanks for reply, yes, I think it's a good >> > >>> feature, it >> > >>> > > can >> > >>> > > > >>> improve the NRT scenarios >> > >>> > > > >>> as you mentioned in the FLIP. Also, I think it >> > can >> > >>> > > improve the >> > >>> > > > >>> streaming SQL greatly, >> > >>> > > > >>> it can support richer window operations in >> flink >> > >>> SQL >> > >>> > and >> > >>> > > bring >> > >>> > > > >>> great convenience to users. >> > >>> > > > >>> (we are now only supported group window in >> > flink). >> > >>> > > > >>> >> > >>> > > > >>> Regarding the SESSION window, I think it's >> > >>> especially >> > >>> > > useful >> > >>> > > > for >> > >>> > > > >>> user behavior analysis(e.g. >> > >>> > > > >>> counting user visits on a news website or >> social >> > >>> > > platform), but >> > >>> > > > >>> I agree that we can keep it >> > >>> > > > >>> out of the FLIP now to catch up 1.12. >> > >>> > > > >>> >> > >>> > > > >>> Recently, I've done some work on the stream >> > planner >> > >>> > with >> > >>> > > the >> > >>> > > > >>> TVFs, and I'm willing to contribute >> > >>> > > > >>> to this part. Is it in the plan of this FLIP? >> > >>> > > > >>> >> > >>> > > > >>> Best, >> > >>> > > > >>> PengchengLiu >> > >>> > > > >>> >> > >>> > > > >>> >> > >>> > > > >>> 在 2020/9/26 下午11:09,“Jark Wu”<[hidden email] >> <mailto: >> > >>> > [hidden email]>> 写入: >> > >>> > > > >>> >> > >>> > > > >>> Hi pengcheng, >> > >>> > > > >>> >> > >>> > > > >>> That's great to see you also have the need of >> window >> > >>> join. >> > >>> > > > >>> You are right, the windowing TVF is a powerful >> > feature >> > >>> > which >> > >>> > > can >> > >>> > > > >>> support >> > >>> > > > >>> more operations in the future. >> > >>> > > > >>> I think it as of the date time "partition" >> selection >> > in >> > >>> > > batch SQL >> > >>> > > > >>> jobs, >> > >>> > > > >>> with this new syntax, I think it is possible >> > >>> > > > >>> to migrate traditional batch SQL jobs to Flink >> SQL >> > by >> > >>> > > changing a >> > >>> > > > >>> few lines. >> > >>> > > > >>> >> > >>> > > > >>> Regarding the SESSION window, this is on purpose >> to >> > >>> keep it >> > >>> > > out of >> > >>> > > > >>> the >> > >>> > > > >>> FLIP, because we want to keep the >> > >>> > > > >>> FLIP small to catch up 1.12 and SESSION TVF is >> rarely >> > >>> > useful >> > >>> > > (e.g. >> > >>> > > > >>> session >> > >>> > > > >>> window join?). >> > >>> > > > >>> >> > >>> > > > >>> Best, >> > >>> > > > >>> Jark >> > >>> > > > >>> >> > >>> > > > >>> On Fri, 25 Sep 2020 at 22:59, liupengcheng < >> > >>> > > > >>> [hidden email]<mailto: >> > >>> [hidden email] >> > >>> > >> >> > >>> > > > >>> wrote: >> > >>> > > > >>> >> > >>> > > > >>> > Hi, Jark, >> > >>> > > > >>> > I'm very interested in this feature, and >> > I'm >> > >>> also >> > >>> > > working >> > >>> > > > >>> on this >> > >>> > > > >>> > recently. >> > >>> > > > >>> > I just have a glance at the FLIP, it's >> > good, >> > >>> but >> > >>> > I >> > >>> > > found >> > >>> > > > >>> that >> > >>> > > > >>> > there is no plan to add SESSION windows. >> > >>> > > > >>> > Also, I think there can be more things >> we >> > >>> can do >> > >>> > > based on >> > >>> > > > >>> this new >> > >>> > > > >>> > syntax. For example, >> > >>> > > > >>> > - window sort support >> > >>> > > > >>> > - window union/intersect/minus support >> > >>> > > > >>> > - Improve dimension table join >> > >>> > > > >>> > We can have more deep discussion on this >> > new >> > >>> > > feature >> > >>> > > > later >> > >>> > > > >>> . >> > >>> > > > >>> > I've also opened an jira that is >> related to >> > >>> this >> > >>> > > feature >> > >>> > > > >>> recently: >> > >>> > > > >>> > >> https://issues.apache.org/jira/browse/FLINK-18830 >> > >>> > > > >>> > >> > >>> > > > >>> > Best! >> > >>> > > > >>> > PengchengLiu >> > >>> > > > >>> > >> > >>> > > > >>> > 在 2020/9/25 下午10:30,“Jark Wu”<[hidden email] >> > >>> <mailto: >> > >>> > [hidden email]>> 写入: >> > >>> > > > >>> > >> > >>> > > > >>> > Hi everyone, >> > >>> > > > >>> > >> > >>> > > > >>> > I want to start a FLIP about supporting >> > windowing >> > >>> > > > table-valued >> > >>> > > > >>> > functions >> > >>> > > > >>> > (TVF). >> > >>> > > > >>> > The main purpose of this FLIP is to improve >> the >> > >>> near >> > >>> > > > real-time >> > >>> > > > >>> (NRT) >> > >>> > > > >>> > experience of Flink. >> > >>> > > > >>> > >> > >>> > > > >>> > FLIP-145: >> > >>> > > > >>> > >> > >>> > > > >>> > >> > >>> > > > >>> >> > >>> > > > >> > >>> > > >> > >>> > >> > >>> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function >> > >>> > > > >>> > >> > >>> > > > >>> > We want to introduce TUMBLE, HOP, CUMULATE >> > >>> windowing >> > >>> > > TVFs, >> > >>> > > > the >> > >>> > > > >>> > CUMULATE is >> > >>> > > > >>> > a new kind of window. >> > >>> > > > >>> > With the windowing TVFs, we can support >> richer >> > >>> > > operations on >> > >>> > > > >>> windows, >> > >>> > > > >>> > including window join, window TopN and so >> on. >> > >>> > > > >>> > This makes things simple: we only need to >> > assign >> > >>> > > windows at >> > >>> > > > the >> > >>> > > > >>> > beginning >> > >>> > > > >>> > of the query, and then apply operations >> after >> > >>> that >> > >>> > like >> > >>> > > > >>> traditional >> > >>> > > > >>> > batch >> > >>> > > > >>> > SQL. >> > >>> > > > >>> > We hope it can help to reduce the learning >> > curve >> > >>> of >> > >>> > > windows, >> > >>> > > > >>> improve >> > >>> > > > >>> > NRT >> > >>> > > > >>> > for Flink, and attract more batch users. >> > >>> > > > >>> > >> > >>> > > > >>> > A simple code snippet for 10 minutes >> tumbling >> > >>> window >> > >>> > > > aggregate: >> > >>> > > > >>> > >> > >>> > > > >>> > SELECT window_start, window_end, SUM(price) >> > >>> > > > >>> > FROM TABLE( >> > >>> > > > >>> > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), >> > >>> INTERVAL >> > >>> > > '10' >> > >>> > > > >>> MINUTES)) >> > >>> > > > >>> > GROUP BY window_start, window_end; >> > >>> > > > >>> > >> > >>> > > > >>> > I'm looking forward to your feedback. >> > >>> > > > >>> > >> > >>> > > > >>> > Best, >> > >>> > > > >>> > Jark >> > >>> > > > >>> > >> > >>> > > > >>> > >> > >>> > > > >>> > >> > >>> > > > >>> >> > >>> > > > >>> >> > >>> > > > >>> >> > >>> > > > >> > >>> > > >> > >>> > > >> > >>> > > -- >> > >>> > > >> > >>> > > Best, >> > >>> > > Benchao Li >> > >>> > > >> > >>> > >> > >>> > >> > >>> > -- >> > >>> > >> > >>> > Best, >> > >>> > Benchao Li >> > >>> > >> > >>> > >> > >>> > -- >> > >>> > >> > >>> > Best, >> > >>> > Benchao Li >> > >>> > >> > >>> >> > >> >> > >> >> >> -- >> Best, Jingsong Lee >> > |
Hi everyone,
Timo just raised a good point in the vote thread. I copied the feedback here: > Timo: 1) I think we should not offer 2 different kinds of syntax that do the same thing. We should deprecate the old syntax. 2) We should have session windows in the new syntax as well to give users a complete migration path. 3) We should investigate if we can remove the additional `FROM TABLE(...)` syntax. As far as I see it in other examples from Oracle 18c, the additional `TABLE()` syntax is not necessary anymore for polymorphic table functions. Here are my comments: 1) I'm not sure about this. If we are going to drop the old syntax, this will break lots of existing SQL jobs. Upgrading SQL jobs is not as easy as Table API jobs, We should be as cautious as possible for this. Besides, if we want to deprecate old syntax, we must provide equal functionality first, and the new syntax doesn't support propagate time attributes. A possible solution can be to generate one more column "window_time" for window TVFs. The value of "window_time" would always be "window_end - 1" and has the time attribute type. Users can propagate the time attribute by adding "window_time" to the "group by" and "join on" clauses with the "window_start", "window_end" together. 2) I will add session window syntax to the FLIP later. 3) I like the simplified syntax "FROM tumble(input, rowtime, interval '1' minute)". However, the polymorphic table function syntax introduced in SQL standard 2016 [1] requires the TABLE() and DESCRIPTOR() syntax (see Chapter-8 “Invocation”). Therefore, I think it's safe to support the standard syntax first, and can explore whether we can extend the syntax to make TABLE() and DESCRIPTOR() keywords optional. Note that, Calcite parser currently doesn't support the simplified syntax, and this definitely needs to be discussed in the Calcite community. Actually, there has been a discussion about this [2], and Julian said: > Standard SQL doesn’t allow functions in the FROM clause. I think it’s because tables and functions are in different namespaces (and therefore there could be a table and a function with the same name). So you need to use the TABLE keyword to indicate that you are using a function as a table. Best, Jark [1]: https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip [2]: https://lists.apache.org/x/thread.html/4a91632b1c780ef9d67311f90fce626582faae7d30a134a768c3d324@%3Cdev.calcite.apache.org%3E On Sat, 10 Oct 2020 at 17:59, Jark Wu <[hidden email]> wrote: > Hi everyone, > > Thanks everyone for this healthy discussion. I think we have addressed all > the concerns. I would continue with a voting. > If you have any new objections, feel free to let me know. > > Best, > Jark > > On Sat, 10 Oct 2020 at 17:54, Jark Wu <[hidden email]> wrote: > >> Hi Jingsong, >> >> That's a good question. I did have searched a lot and didn't find any >> system that provides such an out-of-box function. >> I guess the reason is that in the traditional batch systems, this feature >> is supported by the over window and they don't need to invent a >> new function/syntax for this. >> For streaming systems, we are the first one to propose this new window. >> >> However, I think CUMULATE is a good name. Because almost all the >> databases call such scenarios as "cumulative window", e.g. Snowflake[1], >> SQL Server [2], Postgres [3]. >> Thus we choose "cumulative" as the base name, but use the verb form >> "cumulate" because other window function names are also verbs, e.g. tumble, >> hop. >> >> I hope this can address your concern. >> >> Best, >> Jark >> >> [1]: >> https://docs.snowflake.com/en/sql-reference/functions-analytic.html#cumulative-window-frame-examples >> [2]: >> https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql?view=sql-server-ver15#c-producing-a-moving-average-and-cumulative-total >> [3]: >> https://popsql.com/learn-sql/postgresql/how-to-calculate-cumulative-sum-running-total-in-postgresql >> >> On Sat, 10 Oct 2020 at 17:26, Jingsong Li <[hidden email]> wrote: >> >>> +1 for voting. Thanks Jark for driving. >>> >>> +1 for TVF, It has been put forward by theory and supported by calcite. >>> It >>> will greatly enhance the window related operations. >>> >>> My personal feeling is that after TVF, the following operations can be >>> similar to the traditional batch SQL, as long as the window related >>> attributes are included in the key. >>> >>> I am not sure about the CUMULATE window, yes, It's a common requirement, >>> Is >>> there any more evidence (other systems) to prove this word ("CUMULATE") >>> is >>> appropriate. >>> >>> Best, >>> Jingsong >>> >>> On Sat, Oct 10, 2020 at 3:43 PM Jark Wu <[hidden email]> wrote: >>> >>> > Hi Pengcheng, >>> > >>> > IIUC, the "stream operators" you mean is the non-time operators or >>> called >>> > regular operators, such as regular join, regular aggregate. >>> > But you may misunderstand me, only the time operators can't be applied >>> > after the new window operators, because of missing time attributes. >>> > The regular operators can still be applied after the new window >>> operators. >>> > >>> > Regarding using window TVFs to re-assign event-time and watermarks, >>> I'm not >>> > sure about this. >>> > Because assigning watermark requires to define the watermark strategy, >>> > however, the window TVF doesn't provide such ability. >>> > Polymorphic table functions are table functions which just append >>> > additional columns and convert N rows into M rows, it can't touch meta >>> > information. >>> > >>> > Best, >>> > Jark >>> > >>> > On Sat, 10 Oct 2020 at 15:41, Jark Wu <[hidden email]> wrote: >>> > >>> > > Hi Danny, >>> > > >>> > > Thanks for the hint about named params syntax, I added examples with >>> > named >>> > > params in the FLIP. >>> > > >>> > > Best, >>> > > Jark >>> > > >>> > > >>> > > On Sat, 10 Oct 2020 at 15:03, Pengcheng Liu < >>> [hidden email] >>> > > >>> > > wrote: >>> > > >>> > >> Hi, Jark, >>> > >> >>> > >> I've got some different opinions there, I think it's a very >>> common >>> > use >>> > >> case to use >>> > >> window operators in combination with streaming operators(even >>> those >>> > >> time operators). >>> > >> (e.g. for some tables, users only care data within a period, but >>> for >>> > >> other tables, they may >>> > >> want the whole historical data). >>> > >> The pipeline may looks like this: >>> > >> window join -> dimension table join -> stream aggregate -> stream >>> > sort >>> > >> >>> > >> Just as what you said, the key clause can be used to distinguish >>> > >> whether a operator should >>> > >> be translated to a window operator or a streaming operator. >>> > >> >>> > >> Also, as I've mentioned before, 1) for time operator after window >>> > >> aggregation, the auxiliary function >>> > >> which is used to access time attribute column can be actually >>> > replaced >>> > >> with (window_end -1). >>> > >> Actually, we only just need to make the results of the upstream >>> > >> contains a time column whose >>> > >> range is within (window_start, window_end), and thus the >>> downstream >>> > >> time operators can work on it >>> > >> (driving by the original watermark in the source). 2) for time >>> > >> operator after other window operators, >>> > >> the downstream time operators can access the time column directly >>> > from >>> > >> it's input. >>> > >> >>> > >> One more thoughts there, maybe the window TVFs can re-assign >>> > >> timestamps and watermarks, so >>> > >> that in some case when the watermark can not be retrieved from >>> source >>> > >> directly(may needs some >>> > >> conversions), the watermark can still be assigned dynamically in >>> the >>> > >> SQL(use the time column as >>> > >> the watermark column) and thus make it work. I think this can >>> save >>> > >> much time to revise the event >>> > >> time column in some cases(this is a real demand in our production >>> > >> environment). >>> > >> >>> > >> I strongly suggest that we should support the combination usage >>> of >>> > >> window operators and >>> > >> streaming operators. And I think we can achieve this with little >>> > work. >>> > >> >>> > >> Best, >>> > >> Pengcheng >>> > >> >>> > >> >>> > >> Jark Wu <[hidden email]> 于2020年10月10日周六 下午1:45写道: >>> > >> >>> > >>> Hi Benchao, >>> > >>> >>> > >>> That's a good question. >>> > >>> >>> > >>> IMO, the new windowed operators and the current time operators are >>> two >>> > >>> different sets of functions, >>> > >>> just like time operators and non-time operators are two different >>> sets >>> > of >>> > >>> functions. >>> > >>> I think it's fine if we don't support integrating them, just like >>> time >>> > >>> operators can't be applied on non-windowed aggregate. >>> > >>> If users want to use time operators in the whole pipeline, then >>> he/she >>> > >>> can >>> > >>> use the grouped window aggregates instead of the window TVFs. >>> > >>> >>> > >>> The key idea of window TVF is that all the operators in the >>> pipeline >>> > are >>> > >>> based on the **windows**. >>> > >>> In terms of syntax, if the key clause (e.g. group by, partitioned >>> by, >>> > >>> join >>> > >>> on, order by) contains window_start and window_end, >>> > >>> it can be translated into windowed operators. >>> > >>> Thus, we will have windowed CEP, windowed sort, windowed over >>> aggregate >>> > >>> in >>> > >>> the future to make it possible to build a windowed pipeline. >>> > >>> >>> > >>> But I think we can elaborate the integration more in the future if >>> > users >>> > >>> need it. Actually, I don't fully understand the scenario of >>> integrating >>> > >>> window TVF and time operators at this point. >>> > >>> For example, interval join an input stream and a window join >>> result. I >>> > >>> don't see why it can't be expressed by nested window join and why >>> users >>> > >>> have to use interval join here. >>> > >>> Maybe we can wait for more inputs from users when the window TVF is >>> > >>> released and we can elaborate it again. >>> > >>> >>> > >>> Best, >>> > >>> Jark >>> > >>> >>> > >>> On Sat, 10 Oct 2020 at 12:01, 刘 芃成 <[hidden email]> >>> > wrote: >>> > >>> >>> > >>> > Hi, Benchao, >>> > >>> > I think I got your point, actually, in current >>> implementation >>> > >>> for >>> > >>> > group window aggregation, the value of time attributes(e.g. >>> > >>> > TUMBLE_ROWTIME/TUMBLE_PROCTIME) is calculated as (window_end – >>> 1), >>> > so I >>> > >>> > think we can just use it directly if you need this. But I think >>> this >>> > >>> time >>> > >>> > attributes is mainly suggested to use in case of cascaded window >>> > >>> operations. >>> > >>> > Regarding the example you provided, I think the semantics of the >>> SQL >>> > in >>> > >>> > your example which doing interval join(e.g. with TUMBLE_ROWTIME) >>> > after >>> > >>> > window aggregation is not clear in the current implementation, >>> and I >>> > >>> think >>> > >>> > that’s a strong reason why we need the new TVFs syntax. >>> > >>> > With the new syntax, users should understand which time >>> column >>> > to >>> > >>> > use and how to generate it when doing interval join and etc. >>> > >>> > >>> > >>> > Best, >>> > >>> > Pengcheng >>> > >>> > >>> > >>> > 发件人: Benchao Li <[hidden email]> >>> > >>> > 日期: 2020年10月10日 星期六 上午11:02 >>> > >>> > 收件人: pengcheng Liu <[hidden email]> >>> > >>> > 抄送: dev <[hidden email]> >>> > >>> > 主题: Re: [DISCUSS] FLIP-145: Support SQL windowing table-valued >>> > function >>> > >>> > >>> > >>> > Hi pengcheng, >>> > >>> > >>> > >>> > Thanks for your response. >>> > >>> > I knew that the original time attribute column will be retained >>> after >>> > >>> the >>> > >>> > TVF, >>> > >>> > what I'm questioning is how do we get the time attribute column >>> after >>> > >>> > Aggregation. >>> > >>> > Your answer did not remove my doubts about this. >>> > >>> > >>> > >>> > It's ok if we did not plan to integrate new TVF aggregate with >>> old >>> > >>> "time >>> > >>> > attribute scenarios" >>> > >>> > listed in my previous email in this FLIP. However it's good to >>> > >>> elaborate >>> > >>> > leave it to the future plan. >>> > >>> > >>> > >>> > pengcheng Liu <[hidden email]<mailto: >>> > >>> > [hidden email]>> 于2020年10月10日周六 上午10:45写道: >>> > >>> > Hi,Benchao, >>> > >>> > In TVFs, the time attributes is just passed through from >>> parent >>> > >>> rels, >>> > >>> > and the TVFs just add two >>> > >>> > additional window attributes(i.e. window_start & window_end). >>> > >>> Also, I >>> > >>> > think the time columns can be not only a time attribute >>> > >>> > with type of `TimeIndicatorType` but also a regular column >>> with >>> > >>> type >>> > >>> > of `Timestamp`. >>> > >>> > >>> > >>> > For cascaded window operations, we can use >>> > window_start/window_end >>> > >>> of >>> > >>> > the previous window result directly to >>> > >>> > indicate operating on the same window, or use new DESCRIPTOR >>> > >>> column >>> > >>> > to assign new windows, in case of the change of >>> > >>> > the time column(e.g. in some case, the original timestamp is >>> > >>> > inaccurate and need some conversion to be used). >>> > >>> > >>> > >>> > You can check the definition or signature of these TVFs in >>> the >>> > >>> FLIP. >>> > >>> > e.g. >>> > >>> > SELECT * FROM TABLE( >>> > >>> > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) >>> > >>> > In the example, the `bidtime` is the time attribute column, >>> which >>> > >>> is >>> > >>> > the first operand of the DESCRIPTOR function. >>> > >>> > >>> > >>> > +1 start voting. >>> > >>> > >>> > >>> > Benchao Li <[hidden email]<mailto:[hidden email]>> >>> > >>> > 于2020年10月10日周六 上午10:08写道: >>> > >>> > Hi Jark, >>> > >>> > >>> > >>> > 2 & 3 sounds good to me. >>> > >>> > >>> > >>> > Regarding time attribute, >>> > >>> > I still have some questions, I knew it's easy to support cascaded >>> > >>> window >>> > >>> > aggregate using new TVFs. >>> > >>> > However there are some other places where need time attribute: >>> > >>> > - CEP >>> > >>> > - interval join >>> > >>> > - order by >>> > >>> > - over window >>> > >>> > If there is no time attribute column, how do we integrate these >>> old >>> > >>> > features with the new TVFs. >>> > >>> > E.g. >>> > >>> > StreamA -> new window aggregate -> interval join -> Sink >>> > >>> > / >>> > >>> > StreamB ----------------------------------- >>> > >>> > >>> > >>> > >>> > >>> > Jark Wu <[hidden email]<mailto:[hidden email]>> >>> 于2020年10月9日周五 >>> > >>> > 下午11:51写道: >>> > >>> > Hi Benchao, >>> > >>> > >>> > >>> > 1) time attribute >>> > >>> > Yes. We don't need time attribute auxiliary function. Because >>> the new >>> > >>> > window operations are all based on the >>> > >>> > window_start and window_end columns instead of on the time >>> > >>> attributes. So >>> > >>> > we don't need to propagate time attributes. >>> > >>> > Cascaded window aggregate can be expressed by simply GROUP BY the >>> > >>> > window_start and window_end of the previous window result. >>> > >>> > I have added a cascaded window aggregate example in the Tumbling >>> > Window >>> > >>> > section in the FLIP. >>> > >>> > If you want to define proctime window aggregate, the time column >>> in >>> > TVF >>> > >>> > should be a proctime attribute field (or PROCTIME() function). >>> > >>> > >>> > >>> > 2) batch support >>> > >>> > Yes. The proposed syntax/API are unified for batch and streaming. >>> > Batch >>> > >>> > support is in the plan, but may not have enough time to catch up >>> > 1.12. >>> > >>> > >>> > >>> > 3) support `grouping sets` >>> > >>> > This is not included in the FLIP, but I think it's great if we >>> can >>> > >>> support >>> > >>> > `grouping sets`. >>> > >>> > The existing window impl doesn't support this because we convert >>> the >>> > >>> > LogicalAggregate into WindowAggregate in the beginning, >>> > >>> > the expand grouping sets rule can't be applied in this situation. >>> > >>> > Fortunately, with the new window impl, the conversion to >>> > >>> WindowAggregate >>> > >>> > will happen at the end, so I think the expand rule can be >>> > >>> > applied and support this feature naturally. >>> > >>> > Therefore, IMO, we don't need to include this feature in this >>> FLIP to >>> > >>> avoid >>> > >>> > the FLIP being too large. >>> > >>> > This can be a follow-up issue (maybe just add tests and docs) >>> after >>> > the >>> > >>> > FLIP. >>> > >>> > >>> > >>> > Best, >>> > >>> > Jark >>> > >>> > >>> > >>> > >>> > >>> > On Fri, 9 Oct 2020 at 19:09, 刘 芃成 <[hidden email] >>> > <mailto: >>> > >>> > [hidden email]>> wrote: >>> > >>> > >>> > >>> > > Hi,Benchao, >>> > >>> > > Welcome to join the discussion, yes, this new syntax >>> can >>> > >>> make SQL >>> > >>> > > more clear and simpler. >>> > >>> > > For your first question, the `window_start` and >>> > `window_end` >>> > >>> > > columns will be added automatically, >>> > >>> > > so we don't need to use auxiliary group functions to >>> infer >>> > or >>> > >>> > > access the window properties. >>> > >>> > > >>> > >>> > > For the `grouping sets` on TVFs, I think it's >>> interesting >>> > if >>> > >>> we >>> > >>> > > can support it, as we already supported `grouping sets` >>> > >>> > > on streaming aggregates in blink planner. But I'm not >>> sure >>> > >>> if it >>> > >>> > > will be included into this FLIP. >>> > >>> > > >>> > >>> > > cc @Jark Wu >>> > >>> > > >>> > >>> > > Best, >>> > >>> > > Pengcheng >>> > >>> > > >>> > >>> > > >>> > >>> > > 在 2020/10/9 下午5:25,“Benchao Li”<[hidden email]<mailto: >>> > >>> > [hidden email]>> 写入: >>> > >>> > > >>> > >>> > > Thanks Jark for bringing this discussion, I like this FLIP >>> very >>> > >>> much. >>> > >>> > > >>> > >>> > > Especially the cumulate window, it's much like the current >>> > TUMBLE >>> > >>> > > window + >>> > >>> > > Fast Emit (which is an undocumented experimental feature), >>> > >>> however, >>> > >>> > > it's >>> > >>> > > more powerful. >>> > >>> > > >>> > >>> > > And This will make the SQL semantic more standard, >>> especially >>> > >>> for the >>> > >>> > > HOPPING window. >>> > >>> > > >>> > >>> > > Regarding time attribute, >>> > >>> > > It seems that we don't need a specific function to infer >>> the >>> > time >>> > >>> > > attribute >>> > >>> > > like >>> > >>> > > `TUMBLE_ROWTIME` / `TUMBLE_PROCTIME`. Then are >>> `window_start` >>> > and >>> > >>> > > `window_end` >>> > >>> > > column a time attribute column automatically? >>> > >>> > > - If not, what will be the time attribute of the result >>> > relation >>> > >>> of >>> > >>> > > these >>> > >>> > > TVFs? >>> > >>> > > Especially after the window aggregation. >>> > >>> > > - If yes, then how do we handle proctime? >>> > >>> > > >>> > >>> > > Regarding batch operators, >>> > >>> > > It's great to hear that we can reuse the batch operators in >>> > >>> > continuous >>> > >>> > > batch mode >>> > >>> > > as you mentioned in the FLIP. >>> > >>> > > Current window aggregate could also be used in batch mode >>> with >>> > >>> > > rowtime. Do >>> > >>> > > you plan >>> > >>> > > to support these TVFs for batch mode in this FLIP? Hence >>> the >>> > >>> > Table/SQL >>> > >>> > > is a >>> > >>> > > unified >>> > >>> > > API, it's great if we can keep the features complete both >>> in >>> > >>> > streaming >>> > >>> > > and >>> > >>> > > batch mode. >>> > >>> > > >>> > >>> > > There is one more question, I don't know whether it should >>> be >>> > >>> > > considered in >>> > >>> > > this FLIP. >>> > >>> > > Does the new window support `grouping sets`? (It's not >>> > supported >>> > >>> in >>> > >>> > old >>> > >>> > > window impl). >>> > >>> > > >>> > >>> > > Jark Wu <[hidden email]<mailto:[hidden email]>> >>> > >>> 于2020年10月9日周五 >>> > >>> > 下午4:14写道: >>> > >>> > > >>> > >>> > > > Hi all, >>> > >>> > > > >>> > >>> > > > I know we have a lot of discussion and development on >>> going >>> > >>> right >>> > >>> > > now but >>> > >>> > > > it would be great if we can get FLIP-145 into a votable >>> > state. >>> > >>> > > > If there are no objections, I would like to start voting >>> in >>> > the >>> > >>> > next >>> > >>> > > days. >>> > >>> > > > >>> > >>> > > > Best, >>> > >>> > > > Jark >>> > >>> > > > >>> > >>> > > > On Thu, 1 Oct 2020 at 14:29, Jark Wu <[hidden email] >>> > <mailto: >>> > >>> > [hidden email]>> wrote: >>> > >>> > > > >>> > >>> > > > > Hi everyone, >>> > >>> > > > > >>> > >>> > > > > I have added a section for Performance Optimization to >>> > >>> describe >>> > >>> > > how to >>> > >>> > > > > improve the performance in the short-term and long-term >>> > >>> > > > > and sketch the future performance potential under the >>> new >>> > >>> window >>> > >>> > > API. >>> > >>> > > > > Introducing the window API is just the first step, we >>> will >>> > >>> > > > > continuously improve the performance to make it >>> powerful >>> > and >>> > >>> > > useful. >>> > >>> > > > > >>> > >>> > > > > Best, >>> > >>> > > > > Jark >>> > >>> > > > > >>> > >>> > > > > On Thu, 1 Oct 2020 at 14:28, Jark Wu <[hidden email] >>> > >>> <mailto: >>> > >>> > [hidden email]>> wrote: >>> > >>> > > > > >>> > >>> > > > >> Hi Pengcheng, >>> > >>> > > > >> >>> > >>> > > > >> Yes, the window TVF is part of the FLIP. Welcome to >>> > >>> contribute >>> > >>> > > and join >>> > >>> > > > >> the discussion. >>> > >>> > > > >> Regarding the SESSION window aggregation, users can >>> use >>> > the >>> > >>> > > existing >>> > >>> > > > >> grouped session window function. >>> > >>> > > > >> >>> > >>> > > > >> Best, >>> > >>> > > > >> Jark >>> > >>> > > > >> >>> > >>> > > > >> On Sun, 27 Sep 2020 at 21:24, liupengcheng < >>> > >>> > > [hidden email]<mailto:[hidden email] >>> > >>> > >>> > > > > >>> > >>> > > > >> wrote: >>> > >>> > > > >> >>> > >>> > > > >>> Hi Jark, >>> > >>> > > > >>> Thanks for reply, yes, I think it's a good >>> > >>> feature, it >>> > >>> > > can >>> > >>> > > > >>> improve the NRT scenarios >>> > >>> > > > >>> as you mentioned in the FLIP. Also, I think >>> it >>> > can >>> > >>> > > improve the >>> > >>> > > > >>> streaming SQL greatly, >>> > >>> > > > >>> it can support richer window operations in >>> flink >>> > >>> SQL >>> > >>> > and >>> > >>> > > bring >>> > >>> > > > >>> great convenience to users. >>> > >>> > > > >>> (we are now only supported group window in >>> > flink). >>> > >>> > > > >>> >>> > >>> > > > >>> Regarding the SESSION window, I think it's >>> > >>> especially >>> > >>> > > useful >>> > >>> > > > for >>> > >>> > > > >>> user behavior analysis(e.g. >>> > >>> > > > >>> counting user visits on a news website or >>> social >>> > >>> > > platform), but >>> > >>> > > > >>> I agree that we can keep it >>> > >>> > > > >>> out of the FLIP now to catch up 1.12. >>> > >>> > > > >>> >>> > >>> > > > >>> Recently, I've done some work on the stream >>> > planner >>> > >>> > with >>> > >>> > > the >>> > >>> > > > >>> TVFs, and I'm willing to contribute >>> > >>> > > > >>> to this part. Is it in the plan of this FLIP? >>> > >>> > > > >>> >>> > >>> > > > >>> Best, >>> > >>> > > > >>> PengchengLiu >>> > >>> > > > >>> >>> > >>> > > > >>> >>> > >>> > > > >>> 在 2020/9/26 下午11:09,“Jark Wu”<[hidden email] >>> <mailto: >>> > >>> > [hidden email]>> 写入: >>> > >>> > > > >>> >>> > >>> > > > >>> Hi pengcheng, >>> > >>> > > > >>> >>> > >>> > > > >>> That's great to see you also have the need of >>> window >>> > >>> join. >>> > >>> > > > >>> You are right, the windowing TVF is a powerful >>> > feature >>> > >>> > which >>> > >>> > > can >>> > >>> > > > >>> support >>> > >>> > > > >>> more operations in the future. >>> > >>> > > > >>> I think it as of the date time "partition" >>> selection >>> > in >>> > >>> > > batch SQL >>> > >>> > > > >>> jobs, >>> > >>> > > > >>> with this new syntax, I think it is possible >>> > >>> > > > >>> to migrate traditional batch SQL jobs to Flink >>> SQL >>> > by >>> > >>> > > changing a >>> > >>> > > > >>> few lines. >>> > >>> > > > >>> >>> > >>> > > > >>> Regarding the SESSION window, this is on purpose >>> to >>> > >>> keep it >>> > >>> > > out of >>> > >>> > > > >>> the >>> > >>> > > > >>> FLIP, because we want to keep the >>> > >>> > > > >>> FLIP small to catch up 1.12 and SESSION TVF is >>> rarely >>> > >>> > useful >>> > >>> > > (e.g. >>> > >>> > > > >>> session >>> > >>> > > > >>> window join?). >>> > >>> > > > >>> >>> > >>> > > > >>> Best, >>> > >>> > > > >>> Jark >>> > >>> > > > >>> >>> > >>> > > > >>> On Fri, 25 Sep 2020 at 22:59, liupengcheng < >>> > >>> > > > >>> [hidden email]<mailto: >>> > >>> [hidden email] >>> > >>> > >> >>> > >>> > > > >>> wrote: >>> > >>> > > > >>> >>> > >>> > > > >>> > Hi, Jark, >>> > >>> > > > >>> > I'm very interested in this feature, >>> and >>> > I'm >>> > >>> also >>> > >>> > > working >>> > >>> > > > >>> on this >>> > >>> > > > >>> > recently. >>> > >>> > > > >>> > I just have a glance at the FLIP, it's >>> > good, >>> > >>> but >>> > >>> > I >>> > >>> > > found >>> > >>> > > > >>> that >>> > >>> > > > >>> > there is no plan to add SESSION windows. >>> > >>> > > > >>> > Also, I think there can be more things >>> we >>> > >>> can do >>> > >>> > > based on >>> > >>> > > > >>> this new >>> > >>> > > > >>> > syntax. For example, >>> > >>> > > > >>> > - window sort support >>> > >>> > > > >>> > - window union/intersect/minus support >>> > >>> > > > >>> > - Improve dimension table join >>> > >>> > > > >>> > We can have more deep discussion on >>> this >>> > new >>> > >>> > > feature >>> > >>> > > > later >>> > >>> > > > >>> . >>> > >>> > > > >>> > I've also opened an jira that is >>> related to >>> > >>> this >>> > >>> > > feature >>> > >>> > > > >>> recently: >>> > >>> > > > >>> > >>> https://issues.apache.org/jira/browse/FLINK-18830 >>> > >>> > > > >>> > >>> > >>> > > > >>> > Best! >>> > >>> > > > >>> > PengchengLiu >>> > >>> > > > >>> > >>> > >>> > > > >>> > 在 2020/9/25 下午10:30,“Jark Wu”< >>> [hidden email] >>> > >>> <mailto: >>> > >>> > [hidden email]>> 写入: >>> > >>> > > > >>> > >>> > >>> > > > >>> > Hi everyone, >>> > >>> > > > >>> > >>> > >>> > > > >>> > I want to start a FLIP about supporting >>> > windowing >>> > >>> > > > table-valued >>> > >>> > > > >>> > functions >>> > >>> > > > >>> > (TVF). >>> > >>> > > > >>> > The main purpose of this FLIP is to >>> improve the >>> > >>> near >>> > >>> > > > real-time >>> > >>> > > > >>> (NRT) >>> > >>> > > > >>> > experience of Flink. >>> > >>> > > > >>> > >>> > >>> > > > >>> > FLIP-145: >>> > >>> > > > >>> > >>> > >>> > > > >>> > >>> > >>> > > > >>> >>> > >>> > > > >>> > >>> > > >>> > >>> > >>> > >>> >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function >>> > >>> > > > >>> > >>> > >>> > > > >>> > We want to introduce TUMBLE, HOP, CUMULATE >>> > >>> windowing >>> > >>> > > TVFs, >>> > >>> > > > the >>> > >>> > > > >>> > CUMULATE is >>> > >>> > > > >>> > a new kind of window. >>> > >>> > > > >>> > With the windowing TVFs, we can support >>> richer >>> > >>> > > operations on >>> > >>> > > > >>> windows, >>> > >>> > > > >>> > including window join, window TopN and so >>> on. >>> > >>> > > > >>> > This makes things simple: we only need to >>> > assign >>> > >>> > > windows at >>> > >>> > > > the >>> > >>> > > > >>> > beginning >>> > >>> > > > >>> > of the query, and then apply operations >>> after >>> > >>> that >>> > >>> > like >>> > >>> > > > >>> traditional >>> > >>> > > > >>> > batch >>> > >>> > > > >>> > SQL. >>> > >>> > > > >>> > We hope it can help to reduce the learning >>> > curve >>> > >>> of >>> > >>> > > windows, >>> > >>> > > > >>> improve >>> > >>> > > > >>> > NRT >>> > >>> > > > >>> > for Flink, and attract more batch users. >>> > >>> > > > >>> > >>> > >>> > > > >>> > A simple code snippet for 10 minutes >>> tumbling >>> > >>> window >>> > >>> > > > aggregate: >>> > >>> > > > >>> > >>> > >>> > > > >>> > SELECT window_start, window_end, SUM(price) >>> > >>> > > > >>> > FROM TABLE( >>> > >>> > > > >>> > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), >>> > >>> INTERVAL >>> > >>> > > '10' >>> > >>> > > > >>> MINUTES)) >>> > >>> > > > >>> > GROUP BY window_start, window_end; >>> > >>> > > > >>> > >>> > >>> > > > >>> > I'm looking forward to your feedback. >>> > >>> > > > >>> > >>> > >>> > > > >>> > Best, >>> > >>> > > > >>> > Jark >>> > >>> > > > >>> > >>> > >>> > > > >>> > >>> > >>> > > > >>> > >>> > >>> > > > >>> >>> > >>> > > > >>> >>> > >>> > > > >>> >>> > >>> > > > >>> > >>> > > >>> > >>> > > >>> > >>> > > -- >>> > >>> > > >>> > >>> > > Best, >>> > >>> > > Benchao Li >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> > -- >>> > >>> > >>> > >>> > Best, >>> > >>> > Benchao Li >>> > >>> > >>> > >>> > >>> > >>> > -- >>> > >>> > >>> > >>> > Best, >>> > >>> > Benchao Li >>> > >>> > >>> > >>> >>> > >> >>> > >>> >>> >>> -- >>> Best, Jingsong Lee >>> >> |
Some more thoughts.
I think the standard syntax is what we must provide, but simplifying the PTF (Polymorphic Table Functions) syntax is really an interesting topic. This can help to make the new syntax easier to be picked up by users. I will share more with what I found. Feel free to join the discussion. Oracle 18c introduced polymorphic table functions (in 2018) after SQL:2016 was released. The conformity claim of Oracle 18c already refers to SQL:2016, but it does not mention polymorphic table functions at all (feature B200) [2]. It seems that Oracle doesn't follow the standard syntax [1]. Does Oracle also think the standard syntax is too verbose and introduce a simplified syntax? Is it possible for us to extend the standard a bit to make the TABLE() keyword optional in the future? PTF examples in Oracle 18c in Oracle PTF documentations [3][4]. > SELECT * FROM skip_col(scott.emp, COLUMNS(comm, hiredate, mgr)) Best, Jark [1]: https://modern-sql.com/blog/2018-11/whats-new-in-oracle-database-18c#ptf [2]: https://docs.oracle.com/en/database/oracle/oracle-database/18/sqlrf/Oracle-Support-for-Optional-Features-of-SQLFoundation2011.html#GUID-3BA98AEC-FAAD-4F21-A6AD-F696B5D36D56 [3]: https://oracle-base.com/articles/18c/polymorphic-table-functions-18c [4]: https://docs.oracle.com/en/database/oracle/oracle-database/18/lnpls/plsql-optimization-and-tuning.html#GUID-695FBA1A-89EA-45B4-9C81-CA99F6C794A5 On Tue, 13 Oct 2020 at 18:42, Jark Wu <[hidden email]> wrote: > Hi everyone, > > Timo just raised a good point in the vote thread. I copied the feedback > here: > > > Timo: > 1) I think we should not offer 2 different kinds of syntax that do the > same thing. We should deprecate the old syntax. > 2) We should have session windows in the new syntax as well to give users > a complete migration path. > 3) We should investigate if we can remove the additional `FROM > TABLE(...)` syntax. As far as I see it in other examples from Oracle > 18c, the additional `TABLE()` syntax is not necessary anymore for > polymorphic table functions. > > > Here are my comments: > 1) I'm not sure about this. If we are going to drop the old syntax, this > will break lots of existing SQL jobs. > Upgrading SQL jobs is not as easy as Table API jobs, We should be as > cautious as possible for this. > > Besides, if we want to deprecate old syntax, we must provide equal > functionality first, > and the new syntax doesn't support propagate time attributes. A possible > solution can be to > generate one more column "window_time" for window TVFs. The value of > "window_time" would > always be "window_end - 1" and has the time attribute type. Users can > propagate the time attribute by > adding "window_time" to the "group by" and "join on" clauses with the > "window_start", "window_end" together. > > 2) I will add session window syntax to the FLIP later. > > 3) I like the simplified syntax "FROM tumble(input, rowtime, interval '1' > minute)". > However, the polymorphic table function syntax introduced in SQL standard > 2016 [1] requires > the TABLE() and DESCRIPTOR() syntax (see Chapter-8 “Invocation”). > Therefore, I think it's safe to support the standard syntax first, and can > explore whether we can > extend the syntax to make TABLE() and DESCRIPTOR() keywords optional. > Note that, Calcite parser currently doesn't support the simplified syntax, > and this definitely needs > to be discussed in the Calcite community. Actually, there has been a > discussion about this [2], and Julian said: > > > Standard SQL doesn’t allow functions in the FROM clause. I think it’s > because tables and functions are in > different namespaces (and therefore there could be a table and a function > with the same name). > So you need to use the TABLE keyword to indicate that you are using a > function as a table. > > Best, > Jark > > [1]: > https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip > [2]: > https://lists.apache.org/x/thread.html/4a91632b1c780ef9d67311f90fce626582faae7d30a134a768c3d324@%3Cdev.calcite.apache.org%3E > > On Sat, 10 Oct 2020 at 17:59, Jark Wu <[hidden email]> wrote: > >> Hi everyone, >> >> Thanks everyone for this healthy discussion. I think we have addressed >> all the concerns. I would continue with a voting. >> If you have any new objections, feel free to let me know. >> >> Best, >> Jark >> >> On Sat, 10 Oct 2020 at 17:54, Jark Wu <[hidden email]> wrote: >> >>> Hi Jingsong, >>> >>> That's a good question. I did have searched a lot and didn't find any >>> system that provides such an out-of-box function. >>> I guess the reason is that in the traditional batch systems, this >>> feature is supported by the over window and they don't need to invent a >>> new function/syntax for this. >>> For streaming systems, we are the first one to propose this new window. >>> >>> However, I think CUMULATE is a good name. Because almost all the >>> databases call such scenarios as "cumulative window", e.g. Snowflake[1], >>> SQL Server [2], Postgres [3]. >>> Thus we choose "cumulative" as the base name, but use the verb form >>> "cumulate" because other window function names are also verbs, e.g. tumble, >>> hop. >>> >>> I hope this can address your concern. >>> >>> Best, >>> Jark >>> >>> [1]: >>> https://docs.snowflake.com/en/sql-reference/functions-analytic.html#cumulative-window-frame-examples >>> [2]: >>> https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql?view=sql-server-ver15#c-producing-a-moving-average-and-cumulative-total >>> [3]: >>> https://popsql.com/learn-sql/postgresql/how-to-calculate-cumulative-sum-running-total-in-postgresql >>> >>> On Sat, 10 Oct 2020 at 17:26, Jingsong Li <[hidden email]> >>> wrote: >>> >>>> +1 for voting. Thanks Jark for driving. >>>> >>>> +1 for TVF, It has been put forward by theory and supported by calcite. >>>> It >>>> will greatly enhance the window related operations. >>>> >>>> My personal feeling is that after TVF, the following operations can be >>>> similar to the traditional batch SQL, as long as the window related >>>> attributes are included in the key. >>>> >>>> I am not sure about the CUMULATE window, yes, It's a common >>>> requirement, Is >>>> there any more evidence (other systems) to prove this word ("CUMULATE") >>>> is >>>> appropriate. >>>> >>>> Best, >>>> Jingsong >>>> >>>> On Sat, Oct 10, 2020 at 3:43 PM Jark Wu <[hidden email]> wrote: >>>> >>>> > Hi Pengcheng, >>>> > >>>> > IIUC, the "stream operators" you mean is the non-time operators or >>>> called >>>> > regular operators, such as regular join, regular aggregate. >>>> > But you may misunderstand me, only the time operators can't be applied >>>> > after the new window operators, because of missing time attributes. >>>> > The regular operators can still be applied after the new window >>>> operators. >>>> > >>>> > Regarding using window TVFs to re-assign event-time and watermarks, >>>> I'm not >>>> > sure about this. >>>> > Because assigning watermark requires to define the watermark strategy, >>>> > however, the window TVF doesn't provide such ability. >>>> > Polymorphic table functions are table functions which just append >>>> > additional columns and convert N rows into M rows, it can't touch meta >>>> > information. >>>> > >>>> > Best, >>>> > Jark >>>> > >>>> > On Sat, 10 Oct 2020 at 15:41, Jark Wu <[hidden email]> wrote: >>>> > >>>> > > Hi Danny, >>>> > > >>>> > > Thanks for the hint about named params syntax, I added examples with >>>> > named >>>> > > params in the FLIP. >>>> > > >>>> > > Best, >>>> > > Jark >>>> > > >>>> > > >>>> > > On Sat, 10 Oct 2020 at 15:03, Pengcheng Liu < >>>> [hidden email] >>>> > > >>>> > > wrote: >>>> > > >>>> > >> Hi, Jark, >>>> > >> >>>> > >> I've got some different opinions there, I think it's a very >>>> common >>>> > use >>>> > >> case to use >>>> > >> window operators in combination with streaming operators(even >>>> those >>>> > >> time operators). >>>> > >> (e.g. for some tables, users only care data within a period, >>>> but for >>>> > >> other tables, they may >>>> > >> want the whole historical data). >>>> > >> The pipeline may looks like this: >>>> > >> window join -> dimension table join -> stream aggregate -> >>>> stream >>>> > sort >>>> > >> >>>> > >> Just as what you said, the key clause can be used to distinguish >>>> > >> whether a operator should >>>> > >> be translated to a window operator or a streaming operator. >>>> > >> >>>> > >> Also, as I've mentioned before, 1) for time operator after >>>> window >>>> > >> aggregation, the auxiliary function >>>> > >> which is used to access time attribute column can be actually >>>> > replaced >>>> > >> with (window_end -1). >>>> > >> Actually, we only just need to make the results of the upstream >>>> > >> contains a time column whose >>>> > >> range is within (window_start, window_end), and thus the >>>> downstream >>>> > >> time operators can work on it >>>> > >> (driving by the original watermark in the source). 2) for time >>>> > >> operator after other window operators, >>>> > >> the downstream time operators can access the time column >>>> directly >>>> > from >>>> > >> it's input. >>>> > >> >>>> > >> One more thoughts there, maybe the window TVFs can re-assign >>>> > >> timestamps and watermarks, so >>>> > >> that in some case when the watermark can not be retrieved from >>>> source >>>> > >> directly(may needs some >>>> > >> conversions), the watermark can still be assigned dynamically >>>> in the >>>> > >> SQL(use the time column as >>>> > >> the watermark column) and thus make it work. I think this can >>>> save >>>> > >> much time to revise the event >>>> > >> time column in some cases(this is a real demand in our >>>> production >>>> > >> environment). >>>> > >> >>>> > >> I strongly suggest that we should support the combination usage >>>> of >>>> > >> window operators and >>>> > >> streaming operators. And I think we can achieve this with little >>>> > work. >>>> > >> >>>> > >> Best, >>>> > >> Pengcheng >>>> > >> >>>> > >> >>>> > >> Jark Wu <[hidden email]> 于2020年10月10日周六 下午1:45写道: >>>> > >> >>>> > >>> Hi Benchao, >>>> > >>> >>>> > >>> That's a good question. >>>> > >>> >>>> > >>> IMO, the new windowed operators and the current time operators >>>> are two >>>> > >>> different sets of functions, >>>> > >>> just like time operators and non-time operators are two different >>>> sets >>>> > of >>>> > >>> functions. >>>> > >>> I think it's fine if we don't support integrating them, just like >>>> time >>>> > >>> operators can't be applied on non-windowed aggregate. >>>> > >>> If users want to use time operators in the whole pipeline, then >>>> he/she >>>> > >>> can >>>> > >>> use the grouped window aggregates instead of the window TVFs. >>>> > >>> >>>> > >>> The key idea of window TVF is that all the operators in the >>>> pipeline >>>> > are >>>> > >>> based on the **windows**. >>>> > >>> In terms of syntax, if the key clause (e.g. group by, partitioned >>>> by, >>>> > >>> join >>>> > >>> on, order by) contains window_start and window_end, >>>> > >>> it can be translated into windowed operators. >>>> > >>> Thus, we will have windowed CEP, windowed sort, windowed over >>>> aggregate >>>> > >>> in >>>> > >>> the future to make it possible to build a windowed pipeline. >>>> > >>> >>>> > >>> But I think we can elaborate the integration more in the future if >>>> > users >>>> > >>> need it. Actually, I don't fully understand the scenario of >>>> integrating >>>> > >>> window TVF and time operators at this point. >>>> > >>> For example, interval join an input stream and a window join >>>> result. I >>>> > >>> don't see why it can't be expressed by nested window join and why >>>> users >>>> > >>> have to use interval join here. >>>> > >>> Maybe we can wait for more inputs from users when the window TVF >>>> is >>>> > >>> released and we can elaborate it again. >>>> > >>> >>>> > >>> Best, >>>> > >>> Jark >>>> > >>> >>>> > >>> On Sat, 10 Oct 2020 at 12:01, 刘 芃成 <[hidden email]> >>>> > wrote: >>>> > >>> >>>> > >>> > Hi, Benchao, >>>> > >>> > I think I got your point, actually, in current >>>> implementation >>>> > >>> for >>>> > >>> > group window aggregation, the value of time attributes(e.g. >>>> > >>> > TUMBLE_ROWTIME/TUMBLE_PROCTIME) is calculated as (window_end – >>>> 1), >>>> > so I >>>> > >>> > think we can just use it directly if you need this. But I think >>>> this >>>> > >>> time >>>> > >>> > attributes is mainly suggested to use in case of cascaded window >>>> > >>> operations. >>>> > >>> > Regarding the example you provided, I think the semantics of >>>> the SQL >>>> > in >>>> > >>> > your example which doing interval join(e.g. with TUMBLE_ROWTIME) >>>> > after >>>> > >>> > window aggregation is not clear in the current implementation, >>>> and I >>>> > >>> think >>>> > >>> > that’s a strong reason why we need the new TVFs syntax. >>>> > >>> > With the new syntax, users should understand which time >>>> column >>>> > to >>>> > >>> > use and how to generate it when doing interval join and etc. >>>> > >>> > >>>> > >>> > Best, >>>> > >>> > Pengcheng >>>> > >>> > >>>> > >>> > 发件人: Benchao Li <[hidden email]> >>>> > >>> > 日期: 2020年10月10日 星期六 上午11:02 >>>> > >>> > 收件人: pengcheng Liu <[hidden email]> >>>> > >>> > 抄送: dev <[hidden email]> >>>> > >>> > 主题: Re: [DISCUSS] FLIP-145: Support SQL windowing table-valued >>>> > function >>>> > >>> > >>>> > >>> > Hi pengcheng, >>>> > >>> > >>>> > >>> > Thanks for your response. >>>> > >>> > I knew that the original time attribute column will be retained >>>> after >>>> > >>> the >>>> > >>> > TVF, >>>> > >>> > what I'm questioning is how do we get the time attribute column >>>> after >>>> > >>> > Aggregation. >>>> > >>> > Your answer did not remove my doubts about this. >>>> > >>> > >>>> > >>> > It's ok if we did not plan to integrate new TVF aggregate with >>>> old >>>> > >>> "time >>>> > >>> > attribute scenarios" >>>> > >>> > listed in my previous email in this FLIP. However it's good to >>>> > >>> elaborate >>>> > >>> > leave it to the future plan. >>>> > >>> > >>>> > >>> > pengcheng Liu <[hidden email]<mailto: >>>> > >>> > [hidden email]>> 于2020年10月10日周六 上午10:45写道: >>>> > >>> > Hi,Benchao, >>>> > >>> > In TVFs, the time attributes is just passed through from >>>> parent >>>> > >>> rels, >>>> > >>> > and the TVFs just add two >>>> > >>> > additional window attributes(i.e. window_start & >>>> window_end). >>>> > >>> Also, I >>>> > >>> > think the time columns can be not only a time attribute >>>> > >>> > with type of `TimeIndicatorType` but also a regular column >>>> with >>>> > >>> type >>>> > >>> > of `Timestamp`. >>>> > >>> > >>>> > >>> > For cascaded window operations, we can use >>>> > window_start/window_end >>>> > >>> of >>>> > >>> > the previous window result directly to >>>> > >>> > indicate operating on the same window, or use new >>>> DESCRIPTOR >>>> > >>> column >>>> > >>> > to assign new windows, in case of the change of >>>> > >>> > the time column(e.g. in some case, the original timestamp is >>>> > >>> > inaccurate and need some conversion to be used). >>>> > >>> > >>>> > >>> > You can check the definition or signature of these TVFs in >>>> the >>>> > >>> FLIP. >>>> > >>> > e.g. >>>> > >>> > SELECT * FROM TABLE( >>>> > >>> > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' >>>> MINUTES)) >>>> > >>> > In the example, the `bidtime` is the time attribute column, >>>> which >>>> > >>> is >>>> > >>> > the first operand of the DESCRIPTOR function. >>>> > >>> > >>>> > >>> > +1 start voting. >>>> > >>> > >>>> > >>> > Benchao Li <[hidden email]<mailto:[hidden email]>> >>>> > >>> > 于2020年10月10日周六 上午10:08写道: >>>> > >>> > Hi Jark, >>>> > >>> > >>>> > >>> > 2 & 3 sounds good to me. >>>> > >>> > >>>> > >>> > Regarding time attribute, >>>> > >>> > I still have some questions, I knew it's easy to support >>>> cascaded >>>> > >>> window >>>> > >>> > aggregate using new TVFs. >>>> > >>> > However there are some other places where need time attribute: >>>> > >>> > - CEP >>>> > >>> > - interval join >>>> > >>> > - order by >>>> > >>> > - over window >>>> > >>> > If there is no time attribute column, how do we integrate these >>>> old >>>> > >>> > features with the new TVFs. >>>> > >>> > E.g. >>>> > >>> > StreamA -> new window aggregate -> interval join -> Sink >>>> > >>> > / >>>> > >>> > StreamB ----------------------------------- >>>> > >>> > >>>> > >>> > >>>> > >>> > Jark Wu <[hidden email]<mailto:[hidden email]>> >>>> 于2020年10月9日周五 >>>> > >>> > 下午11:51写道: >>>> > >>> > Hi Benchao, >>>> > >>> > >>>> > >>> > 1) time attribute >>>> > >>> > Yes. We don't need time attribute auxiliary function. Because >>>> the new >>>> > >>> > window operations are all based on the >>>> > >>> > window_start and window_end columns instead of on the time >>>> > >>> attributes. So >>>> > >>> > we don't need to propagate time attributes. >>>> > >>> > Cascaded window aggregate can be expressed by simply GROUP BY >>>> the >>>> > >>> > window_start and window_end of the previous window result. >>>> > >>> > I have added a cascaded window aggregate example in the Tumbling >>>> > Window >>>> > >>> > section in the FLIP. >>>> > >>> > If you want to define proctime window aggregate, the time >>>> column in >>>> > TVF >>>> > >>> > should be a proctime attribute field (or PROCTIME() function). >>>> > >>> > >>>> > >>> > 2) batch support >>>> > >>> > Yes. The proposed syntax/API are unified for batch and >>>> streaming. >>>> > Batch >>>> > >>> > support is in the plan, but may not have enough time to catch up >>>> > 1.12. >>>> > >>> > >>>> > >>> > 3) support `grouping sets` >>>> > >>> > This is not included in the FLIP, but I think it's great if we >>>> can >>>> > >>> support >>>> > >>> > `grouping sets`. >>>> > >>> > The existing window impl doesn't support this because we >>>> convert the >>>> > >>> > LogicalAggregate into WindowAggregate in the beginning, >>>> > >>> > the expand grouping sets rule can't be applied in this >>>> situation. >>>> > >>> > Fortunately, with the new window impl, the conversion to >>>> > >>> WindowAggregate >>>> > >>> > will happen at the end, so I think the expand rule can be >>>> > >>> > applied and support this feature naturally. >>>> > >>> > Therefore, IMO, we don't need to include this feature in this >>>> FLIP to >>>> > >>> avoid >>>> > >>> > the FLIP being too large. >>>> > >>> > This can be a follow-up issue (maybe just add tests and docs) >>>> after >>>> > the >>>> > >>> > FLIP. >>>> > >>> > >>>> > >>> > Best, >>>> > >>> > Jark >>>> > >>> > >>>> > >>> > >>>> > >>> > On Fri, 9 Oct 2020 at 19:09, 刘 芃成 <[hidden email] >>>> > <mailto: >>>> > >>> > [hidden email]>> wrote: >>>> > >>> > >>>> > >>> > > Hi,Benchao, >>>> > >>> > > Welcome to join the discussion, yes, this new syntax >>>> can >>>> > >>> make SQL >>>> > >>> > > more clear and simpler. >>>> > >>> > > For your first question, the `window_start` and >>>> > `window_end` >>>> > >>> > > columns will be added automatically, >>>> > >>> > > so we don't need to use auxiliary group functions to >>>> infer >>>> > or >>>> > >>> > > access the window properties. >>>> > >>> > > >>>> > >>> > > For the `grouping sets` on TVFs, I think it's >>>> interesting >>>> > if >>>> > >>> we >>>> > >>> > > can support it, as we already supported `grouping sets` >>>> > >>> > > on streaming aggregates in blink planner. But I'm not >>>> sure >>>> > >>> if it >>>> > >>> > > will be included into this FLIP. >>>> > >>> > > >>>> > >>> > > cc @Jark Wu >>>> > >>> > > >>>> > >>> > > Best, >>>> > >>> > > Pengcheng >>>> > >>> > > >>>> > >>> > > >>>> > >>> > > 在 2020/10/9 下午5:25,“Benchao Li”<[hidden email]<mailto: >>>> > >>> > [hidden email]>> 写入: >>>> > >>> > > >>>> > >>> > > Thanks Jark for bringing this discussion, I like this >>>> FLIP very >>>> > >>> much. >>>> > >>> > > >>>> > >>> > > Especially the cumulate window, it's much like the current >>>> > TUMBLE >>>> > >>> > > window + >>>> > >>> > > Fast Emit (which is an undocumented experimental feature), >>>> > >>> however, >>>> > >>> > > it's >>>> > >>> > > more powerful. >>>> > >>> > > >>>> > >>> > > And This will make the SQL semantic more standard, >>>> especially >>>> > >>> for the >>>> > >>> > > HOPPING window. >>>> > >>> > > >>>> > >>> > > Regarding time attribute, >>>> > >>> > > It seems that we don't need a specific function to infer >>>> the >>>> > time >>>> > >>> > > attribute >>>> > >>> > > like >>>> > >>> > > `TUMBLE_ROWTIME` / `TUMBLE_PROCTIME`. Then are >>>> `window_start` >>>> > and >>>> > >>> > > `window_end` >>>> > >>> > > column a time attribute column automatically? >>>> > >>> > > - If not, what will be the time attribute of the result >>>> > relation >>>> > >>> of >>>> > >>> > > these >>>> > >>> > > TVFs? >>>> > >>> > > Especially after the window aggregation. >>>> > >>> > > - If yes, then how do we handle proctime? >>>> > >>> > > >>>> > >>> > > Regarding batch operators, >>>> > >>> > > It's great to hear that we can reuse the batch operators >>>> in >>>> > >>> > continuous >>>> > >>> > > batch mode >>>> > >>> > > as you mentioned in the FLIP. >>>> > >>> > > Current window aggregate could also be used in batch mode >>>> with >>>> > >>> > > rowtime. Do >>>> > >>> > > you plan >>>> > >>> > > to support these TVFs for batch mode in this FLIP? Hence >>>> the >>>> > >>> > Table/SQL >>>> > >>> > > is a >>>> > >>> > > unified >>>> > >>> > > API, it's great if we can keep the features complete both >>>> in >>>> > >>> > streaming >>>> > >>> > > and >>>> > >>> > > batch mode. >>>> > >>> > > >>>> > >>> > > There is one more question, I don't know whether it >>>> should be >>>> > >>> > > considered in >>>> > >>> > > this FLIP. >>>> > >>> > > Does the new window support `grouping sets`? (It's not >>>> > supported >>>> > >>> in >>>> > >>> > old >>>> > >>> > > window impl). >>>> > >>> > > >>>> > >>> > > Jark Wu <[hidden email]<mailto:[hidden email]>> >>>> > >>> 于2020年10月9日周五 >>>> > >>> > 下午4:14写道: >>>> > >>> > > >>>> > >>> > > > Hi all, >>>> > >>> > > > >>>> > >>> > > > I know we have a lot of discussion and development on >>>> going >>>> > >>> right >>>> > >>> > > now but >>>> > >>> > > > it would be great if we can get FLIP-145 into a votable >>>> > state. >>>> > >>> > > > If there are no objections, I would like to start >>>> voting in >>>> > the >>>> > >>> > next >>>> > >>> > > days. >>>> > >>> > > > >>>> > >>> > > > Best, >>>> > >>> > > > Jark >>>> > >>> > > > >>>> > >>> > > > On Thu, 1 Oct 2020 at 14:29, Jark Wu <[hidden email] >>>> > <mailto: >>>> > >>> > [hidden email]>> wrote: >>>> > >>> > > > >>>> > >>> > > > > Hi everyone, >>>> > >>> > > > > >>>> > >>> > > > > I have added a section for Performance Optimization to >>>> > >>> describe >>>> > >>> > > how to >>>> > >>> > > > > improve the performance in the short-term and >>>> long-term >>>> > >>> > > > > and sketch the future performance potential under the >>>> new >>>> > >>> window >>>> > >>> > > API. >>>> > >>> > > > > Introducing the window API is just the first step, we >>>> will >>>> > >>> > > > > continuously improve the performance to make it >>>> powerful >>>> > and >>>> > >>> > > useful. >>>> > >>> > > > > >>>> > >>> > > > > Best, >>>> > >>> > > > > Jark >>>> > >>> > > > > >>>> > >>> > > > > On Thu, 1 Oct 2020 at 14:28, Jark Wu < >>>> [hidden email] >>>> > >>> <mailto: >>>> > >>> > [hidden email]>> wrote: >>>> > >>> > > > > >>>> > >>> > > > >> Hi Pengcheng, >>>> > >>> > > > >> >>>> > >>> > > > >> Yes, the window TVF is part of the FLIP. Welcome to >>>> > >>> contribute >>>> > >>> > > and join >>>> > >>> > > > >> the discussion. >>>> > >>> > > > >> Regarding the SESSION window aggregation, users can >>>> use >>>> > the >>>> > >>> > > existing >>>> > >>> > > > >> grouped session window function. >>>> > >>> > > > >> >>>> > >>> > > > >> Best, >>>> > >>> > > > >> Jark >>>> > >>> > > > >> >>>> > >>> > > > >> On Sun, 27 Sep 2020 at 21:24, liupengcheng < >>>> > >>> > > [hidden email]<mailto: >>>> [hidden email]> >>>> > >>> > > > > >>>> > >>> > > > >> wrote: >>>> > >>> > > > >> >>>> > >>> > > > >>> Hi Jark, >>>> > >>> > > > >>> Thanks for reply, yes, I think it's a good >>>> > >>> feature, it >>>> > >>> > > can >>>> > >>> > > > >>> improve the NRT scenarios >>>> > >>> > > > >>> as you mentioned in the FLIP. Also, I think >>>> it >>>> > can >>>> > >>> > > improve the >>>> > >>> > > > >>> streaming SQL greatly, >>>> > >>> > > > >>> it can support richer window operations in >>>> flink >>>> > >>> SQL >>>> > >>> > and >>>> > >>> > > bring >>>> > >>> > > > >>> great convenience to users. >>>> > >>> > > > >>> (we are now only supported group window in >>>> > flink). >>>> > >>> > > > >>> >>>> > >>> > > > >>> Regarding the SESSION window, I think it's >>>> > >>> especially >>>> > >>> > > useful >>>> > >>> > > > for >>>> > >>> > > > >>> user behavior analysis(e.g. >>>> > >>> > > > >>> counting user visits on a news website or >>>> social >>>> > >>> > > platform), but >>>> > >>> > > > >>> I agree that we can keep it >>>> > >>> > > > >>> out of the FLIP now to catch up 1.12. >>>> > >>> > > > >>> >>>> > >>> > > > >>> Recently, I've done some work on the stream >>>> > planner >>>> > >>> > with >>>> > >>> > > the >>>> > >>> > > > >>> TVFs, and I'm willing to contribute >>>> > >>> > > > >>> to this part. Is it in the plan of this >>>> FLIP? >>>> > >>> > > > >>> >>>> > >>> > > > >>> Best, >>>> > >>> > > > >>> PengchengLiu >>>> > >>> > > > >>> >>>> > >>> > > > >>> >>>> > >>> > > > >>> 在 2020/9/26 下午11:09,“Jark Wu”<[hidden email] >>>> <mailto: >>>> > >>> > [hidden email]>> 写入: >>>> > >>> > > > >>> >>>> > >>> > > > >>> Hi pengcheng, >>>> > >>> > > > >>> >>>> > >>> > > > >>> That's great to see you also have the need of >>>> window >>>> > >>> join. >>>> > >>> > > > >>> You are right, the windowing TVF is a powerful >>>> > feature >>>> > >>> > which >>>> > >>> > > can >>>> > >>> > > > >>> support >>>> > >>> > > > >>> more operations in the future. >>>> > >>> > > > >>> I think it as of the date time "partition" >>>> selection >>>> > in >>>> > >>> > > batch SQL >>>> > >>> > > > >>> jobs, >>>> > >>> > > > >>> with this new syntax, I think it is possible >>>> > >>> > > > >>> to migrate traditional batch SQL jobs to Flink >>>> SQL >>>> > by >>>> > >>> > > changing a >>>> > >>> > > > >>> few lines. >>>> > >>> > > > >>> >>>> > >>> > > > >>> Regarding the SESSION window, this is on >>>> purpose to >>>> > >>> keep it >>>> > >>> > > out of >>>> > >>> > > > >>> the >>>> > >>> > > > >>> FLIP, because we want to keep the >>>> > >>> > > > >>> FLIP small to catch up 1.12 and SESSION TVF is >>>> rarely >>>> > >>> > useful >>>> > >>> > > (e.g. >>>> > >>> > > > >>> session >>>> > >>> > > > >>> window join?). >>>> > >>> > > > >>> >>>> > >>> > > > >>> Best, >>>> > >>> > > > >>> Jark >>>> > >>> > > > >>> >>>> > >>> > > > >>> On Fri, 25 Sep 2020 at 22:59, liupengcheng < >>>> > >>> > > > >>> [hidden email]<mailto: >>>> > >>> [hidden email] >>>> > >>> > >> >>>> > >>> > > > >>> wrote: >>>> > >>> > > > >>> >>>> > >>> > > > >>> > Hi, Jark, >>>> > >>> > > > >>> > I'm very interested in this feature, >>>> and >>>> > I'm >>>> > >>> also >>>> > >>> > > working >>>> > >>> > > > >>> on this >>>> > >>> > > > >>> > recently. >>>> > >>> > > > >>> > I just have a glance at the FLIP, it's >>>> > good, >>>> > >>> but >>>> > >>> > I >>>> > >>> > > found >>>> > >>> > > > >>> that >>>> > >>> > > > >>> > there is no plan to add SESSION windows. >>>> > >>> > > > >>> > Also, I think there can be more >>>> things we >>>> > >>> can do >>>> > >>> > > based on >>>> > >>> > > > >>> this new >>>> > >>> > > > >>> > syntax. For example, >>>> > >>> > > > >>> > - window sort support >>>> > >>> > > > >>> > - window union/intersect/minus support >>>> > >>> > > > >>> > - Improve dimension table join >>>> > >>> > > > >>> > We can have more deep discussion on >>>> this >>>> > new >>>> > >>> > > feature >>>> > >>> > > > later >>>> > >>> > > > >>> . >>>> > >>> > > > >>> > I've also opened an jira that is >>>> related to >>>> > >>> this >>>> > >>> > > feature >>>> > >>> > > > >>> recently: >>>> > >>> > > > >>> > >>>> https://issues.apache.org/jira/browse/FLINK-18830 >>>> > >>> > > > >>> > >>>> > >>> > > > >>> > Best! >>>> > >>> > > > >>> > PengchengLiu >>>> > >>> > > > >>> > >>>> > >>> > > > >>> > 在 2020/9/25 下午10:30,“Jark Wu”< >>>> [hidden email] >>>> > >>> <mailto: >>>> > >>> > [hidden email]>> 写入: >>>> > >>> > > > >>> > >>>> > >>> > > > >>> > Hi everyone, >>>> > >>> > > > >>> > >>>> > >>> > > > >>> > I want to start a FLIP about supporting >>>> > windowing >>>> > >>> > > > table-valued >>>> > >>> > > > >>> > functions >>>> > >>> > > > >>> > (TVF). >>>> > >>> > > > >>> > The main purpose of this FLIP is to >>>> improve the >>>> > >>> near >>>> > >>> > > > real-time >>>> > >>> > > > >>> (NRT) >>>> > >>> > > > >>> > experience of Flink. >>>> > >>> > > > >>> > >>>> > >>> > > > >>> > FLIP-145: >>>> > >>> > > > >>> > >>>> > >>> > > > >>> > >>>> > >>> > > > >>> >>>> > >>> > > > >>>> > >>> > > >>>> > >>> > >>>> > >>> >>>> > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function >>>> > >>> > > > >>> > >>>> > >>> > > > >>> > We want to introduce TUMBLE, HOP, CUMULATE >>>> > >>> windowing >>>> > >>> > > TVFs, >>>> > >>> > > > the >>>> > >>> > > > >>> > CUMULATE is >>>> > >>> > > > >>> > a new kind of window. >>>> > >>> > > > >>> > With the windowing TVFs, we can support >>>> richer >>>> > >>> > > operations on >>>> > >>> > > > >>> windows, >>>> > >>> > > > >>> > including window join, window TopN and so >>>> on. >>>> > >>> > > > >>> > This makes things simple: we only need to >>>> > assign >>>> > >>> > > windows at >>>> > >>> > > > the >>>> > >>> > > > >>> > beginning >>>> > >>> > > > >>> > of the query, and then apply operations >>>> after >>>> > >>> that >>>> > >>> > like >>>> > >>> > > > >>> traditional >>>> > >>> > > > >>> > batch >>>> > >>> > > > >>> > SQL. >>>> > >>> > > > >>> > We hope it can help to reduce the learning >>>> > curve >>>> > >>> of >>>> > >>> > > windows, >>>> > >>> > > > >>> improve >>>> > >>> > > > >>> > NRT >>>> > >>> > > > >>> > for Flink, and attract more batch users. >>>> > >>> > > > >>> > >>>> > >>> > > > >>> > A simple code snippet for 10 minutes >>>> tumbling >>>> > >>> window >>>> > >>> > > > aggregate: >>>> > >>> > > > >>> > >>>> > >>> > > > >>> > SELECT window_start, window_end, >>>> SUM(price) >>>> > >>> > > > >>> > FROM TABLE( >>>> > >>> > > > >>> > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), >>>> > >>> INTERVAL >>>> > >>> > > '10' >>>> > >>> > > > >>> MINUTES)) >>>> > >>> > > > >>> > GROUP BY window_start, window_end; >>>> > >>> > > > >>> > >>>> > >>> > > > >>> > I'm looking forward to your feedback. >>>> > >>> > > > >>> > >>>> > >>> > > > >>> > Best, >>>> > >>> > > > >>> > Jark >>>> > >>> > > > >>> > >>>> > >>> > > > >>> > >>>> > >>> > > > >>> > >>>> > >>> > > > >>> >>>> > >>> > > > >>> >>>> > >>> > > > >>> >>>> > >>> > > > >>>> > >>> > > >>>> > >>> > > >>>> > >>> > > -- >>>> > >>> > > >>>> > >>> > > Best, >>>> > >>> > > Benchao Li >>>> > >>> > > >>>> > >>> > >>>> > >>> > >>>> > >>> > -- >>>> > >>> > >>>> > >>> > Best, >>>> > >>> > Benchao Li >>>> > >>> > >>>> > >>> > >>>> > >>> > -- >>>> > >>> > >>>> > >>> > Best, >>>> > >>> > Benchao Li >>>> > >>> > >>>> > >>> >>>> > >> >>>> > >>>> >>>> >>>> -- >>>> Best, Jingsong Lee >>>> >>> |
In reply to this post by Jark Wu-2
Hi Jark,
1a) Deprecation: I totally agree that dropping old syntax is not very easy in SQL. That's why I am also extremely cautious about adding new syntax in this FLIP. However, I would already deprecate the old syntax and only use the new one in all examples, docs, slides etc. This hopefully gives us a chance to drop the old syntax at some point in the future. As far as I know, Calcite will drop support for the old syntax soon which means that we have maybe 2-3 Flink releases until we will need to either drop it as well or maintain custom code. 1b) Time attributes: I haven't noticed this limitation. We should definitely address this issue in the FLIP. Every time operation should be able to express the new rowtime. For MATCH_RECOGNIZE we use `MATCH_ROWTIME`, we could make this behavior similar and just use a helper UDF but I would prefer a similar approach to window_start and window_end. 3) Simplified syntax: Yes, after looking deeper into this topic, I see that it seems Oracle is not standard compliant here. So let's postpone this change. But I would be in favor of this syntax because the additional `TABLE()` keyword confuses users and even SQL experts. 4) Semantics: I read the summary of the SQL 2016 standard [1] again and I'm wondering if the key-ing semantics in the FLIP are correct using GROUP BY. The paper illustrates the following example: SELECT E.*, D.* FROM TABLE( UDJoin ( T1 => TABLE (Emp) AS E PARTITION BY Deptno, T2 => TABLE (Dept) AS D PARTITION BY Deptno ORDER BY Tstamp ) ) where the first PTF parameter is declared with `WITH SET SEMANTICS`. The paper further states: "WITH SET SEMANTICS is specified when the outcome of the function depends on how the data is partitioned. A table should be given set semantics if all rows of a partition should be processed on the same virtual processor." Isn't this exactly what we need for windows as well? Shouldn't we use the following syntax then: SELECT * FROM TABLE( Tumble ( data => TABLE (InputTable) PARTITION BY userId ORDER BY timestamp ) ) In the end all windows are just PTFs, maybe we should rather think about how we support PTFs in the near future. Because they would open an entire new set of use cases to SQL. The examples in chapter 12 of the 2016 standard ranging from `CSVreader` to `UDjoin` are impressive. Regards, Timo [1] https://www.researchgate.net/profile/Fred_Zemke/publication/329593276_The_new_and_improved_SQL2016_standard/links/5c17eb50a6fdcc494ffc5999/The-new-and-improved-SQL2016-standard.pdf On 13.10.20 12:42, Jark Wu wrote: > Hi everyone, > > Timo just raised a good point in the vote thread. I copied the feedback > here: > >> Timo: > 1) I think we should not offer 2 different kinds of syntax that do the > same thing. We should deprecate the old syntax. > 2) We should have session windows in the new syntax as well to give users > a complete migration path. > 3) We should investigate if we can remove the additional `FROM > TABLE(...)` syntax. As far as I see it in other examples from Oracle > 18c, the additional `TABLE()` syntax is not necessary anymore for > polymorphic table functions. > > > Here are my comments: > 1) I'm not sure about this. If we are going to drop the old syntax, this > will break lots of existing SQL jobs. > Upgrading SQL jobs is not as easy as Table API jobs, We should be as > cautious as possible for this. > > Besides, if we want to deprecate old syntax, we must provide equal > functionality first, > and the new syntax doesn't support propagate time attributes. A possible > solution can be to > generate one more column "window_time" for window TVFs. The value of > "window_time" would > always be "window_end - 1" and has the time attribute type. Users can > propagate the time attribute by > adding "window_time" to the "group by" and "join on" clauses with the > "window_start", "window_end" together. > > 2) I will add session window syntax to the FLIP later. > > 3) I like the simplified syntax "FROM tumble(input, rowtime, interval '1' > minute)". > However, the polymorphic table function syntax introduced in SQL standard > 2016 [1] requires > the TABLE() and DESCRIPTOR() syntax (see Chapter-8 “Invocation”). > Therefore, I think it's safe to support the standard syntax first, and can > explore whether we can > extend the syntax to make TABLE() and DESCRIPTOR() keywords optional. > Note that, Calcite parser currently doesn't support the simplified syntax, > and this definitely needs > to be discussed in the Calcite community. Actually, there has been a > discussion about this [2], and Julian said: > >> Standard SQL doesn’t allow functions in the FROM clause. I think it’s > because tables and functions are in > different namespaces (and therefore there could be a table and a function > with the same name). > So you need to use the TABLE keyword to indicate that you are using a > function as a table. > > Best, > Jark > > [1]: > https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip > [2]: > https://lists.apache.org/x/thread.html/4a91632b1c780ef9d67311f90fce626582faae7d30a134a768c3d324@%3Cdev.calcite.apache.org%3E > > On Sat, 10 Oct 2020 at 17:59, Jark Wu <[hidden email]> wrote: > >> Hi everyone, >> >> Thanks everyone for this healthy discussion. I think we have addressed all >> the concerns. I would continue with a voting. >> If you have any new objections, feel free to let me know. >> >> Best, >> Jark >> >> On Sat, 10 Oct 2020 at 17:54, Jark Wu <[hidden email]> wrote: >> >>> Hi Jingsong, >>> >>> That's a good question. I did have searched a lot and didn't find any >>> system that provides such an out-of-box function. >>> I guess the reason is that in the traditional batch systems, this feature >>> is supported by the over window and they don't need to invent a >>> new function/syntax for this. >>> For streaming systems, we are the first one to propose this new window. >>> >>> However, I think CUMULATE is a good name. Because almost all the >>> databases call such scenarios as "cumulative window", e.g. Snowflake[1], >>> SQL Server [2], Postgres [3]. >>> Thus we choose "cumulative" as the base name, but use the verb form >>> "cumulate" because other window function names are also verbs, e.g. tumble, >>> hop. >>> >>> I hope this can address your concern. >>> >>> Best, >>> Jark >>> >>> [1]: >>> https://docs.snowflake.com/en/sql-reference/functions-analytic.html#cumulative-window-frame-examples >>> [2]: >>> https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql?view=sql-server-ver15#c-producing-a-moving-average-and-cumulative-total >>> [3]: >>> https://popsql.com/learn-sql/postgresql/how-to-calculate-cumulative-sum-running-total-in-postgresql >>> >>> On Sat, 10 Oct 2020 at 17:26, Jingsong Li <[hidden email]> wrote: >>> >>>> +1 for voting. Thanks Jark for driving. >>>> >>>> +1 for TVF, It has been put forward by theory and supported by calcite. >>>> It >>>> will greatly enhance the window related operations. >>>> >>>> My personal feeling is that after TVF, the following operations can be >>>> similar to the traditional batch SQL, as long as the window related >>>> attributes are included in the key. >>>> >>>> I am not sure about the CUMULATE window, yes, It's a common requirement, >>>> Is >>>> there any more evidence (other systems) to prove this word ("CUMULATE") >>>> is >>>> appropriate. >>>> >>>> Best, >>>> Jingsong >>>> >>>> On Sat, Oct 10, 2020 at 3:43 PM Jark Wu <[hidden email]> wrote: >>>> >>>>> Hi Pengcheng, >>>>> >>>>> IIUC, the "stream operators" you mean is the non-time operators or >>>> called >>>>> regular operators, such as regular join, regular aggregate. >>>>> But you may misunderstand me, only the time operators can't be applied >>>>> after the new window operators, because of missing time attributes. >>>>> The regular operators can still be applied after the new window >>>> operators. >>>>> >>>>> Regarding using window TVFs to re-assign event-time and watermarks, >>>> I'm not >>>>> sure about this. >>>>> Because assigning watermark requires to define the watermark strategy, >>>>> however, the window TVF doesn't provide such ability. >>>>> Polymorphic table functions are table functions which just append >>>>> additional columns and convert N rows into M rows, it can't touch meta >>>>> information. >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> On Sat, 10 Oct 2020 at 15:41, Jark Wu <[hidden email]> wrote: >>>>> >>>>>> Hi Danny, >>>>>> >>>>>> Thanks for the hint about named params syntax, I added examples with >>>>> named >>>>>> params in the FLIP. >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> >>>>>> On Sat, 10 Oct 2020 at 15:03, Pengcheng Liu < >>>> [hidden email] >>>>>> >>>>>> wrote: >>>>>> >>>>>>> Hi, Jark, >>>>>>> >>>>>>> I've got some different opinions there, I think it's a very >>>> common >>>>> use >>>>>>> case to use >>>>>>> window operators in combination with streaming operators(even >>>> those >>>>>>> time operators). >>>>>>> (e.g. for some tables, users only care data within a period, but >>>> for >>>>>>> other tables, they may >>>>>>> want the whole historical data). >>>>>>> The pipeline may looks like this: >>>>>>> window join -> dimension table join -> stream aggregate -> stream >>>>> sort >>>>>>> >>>>>>> Just as what you said, the key clause can be used to distinguish >>>>>>> whether a operator should >>>>>>> be translated to a window operator or a streaming operator. >>>>>>> >>>>>>> Also, as I've mentioned before, 1) for time operator after window >>>>>>> aggregation, the auxiliary function >>>>>>> which is used to access time attribute column can be actually >>>>> replaced >>>>>>> with (window_end -1). >>>>>>> Actually, we only just need to make the results of the upstream >>>>>>> contains a time column whose >>>>>>> range is within (window_start, window_end), and thus the >>>> downstream >>>>>>> time operators can work on it >>>>>>> (driving by the original watermark in the source). 2) for time >>>>>>> operator after other window operators, >>>>>>> the downstream time operators can access the time column directly >>>>> from >>>>>>> it's input. >>>>>>> >>>>>>> One more thoughts there, maybe the window TVFs can re-assign >>>>>>> timestamps and watermarks, so >>>>>>> that in some case when the watermark can not be retrieved from >>>> source >>>>>>> directly(may needs some >>>>>>> conversions), the watermark can still be assigned dynamically in >>>> the >>>>>>> SQL(use the time column as >>>>>>> the watermark column) and thus make it work. I think this can >>>> save >>>>>>> much time to revise the event >>>>>>> time column in some cases(this is a real demand in our production >>>>>>> environment). >>>>>>> >>>>>>> I strongly suggest that we should support the combination usage >>>> of >>>>>>> window operators and >>>>>>> streaming operators. And I think we can achieve this with little >>>>> work. >>>>>>> >>>>>>> Best, >>>>>>> Pengcheng >>>>>>> >>>>>>> >>>>>>> Jark Wu <[hidden email]> 于2020年10月10日周六 下午1:45写道: >>>>>>> >>>>>>>> Hi Benchao, >>>>>>>> >>>>>>>> That's a good question. >>>>>>>> >>>>>>>> IMO, the new windowed operators and the current time operators are >>>> two >>>>>>>> different sets of functions, >>>>>>>> just like time operators and non-time operators are two different >>>> sets >>>>> of >>>>>>>> functions. >>>>>>>> I think it's fine if we don't support integrating them, just like >>>> time >>>>>>>> operators can't be applied on non-windowed aggregate. >>>>>>>> If users want to use time operators in the whole pipeline, then >>>> he/she >>>>>>>> can >>>>>>>> use the grouped window aggregates instead of the window TVFs. >>>>>>>> >>>>>>>> The key idea of window TVF is that all the operators in the >>>> pipeline >>>>> are >>>>>>>> based on the **windows**. >>>>>>>> In terms of syntax, if the key clause (e.g. group by, partitioned >>>> by, >>>>>>>> join >>>>>>>> on, order by) contains window_start and window_end, >>>>>>>> it can be translated into windowed operators. >>>>>>>> Thus, we will have windowed CEP, windowed sort, windowed over >>>> aggregate >>>>>>>> in >>>>>>>> the future to make it possible to build a windowed pipeline. >>>>>>>> >>>>>>>> But I think we can elaborate the integration more in the future if >>>>> users >>>>>>>> need it. Actually, I don't fully understand the scenario of >>>> integrating >>>>>>>> window TVF and time operators at this point. >>>>>>>> For example, interval join an input stream and a window join >>>> result. I >>>>>>>> don't see why it can't be expressed by nested window join and why >>>> users >>>>>>>> have to use interval join here. >>>>>>>> Maybe we can wait for more inputs from users when the window TVF is >>>>>>>> released and we can elaborate it again. >>>>>>>> >>>>>>>> Best, >>>>>>>> Jark >>>>>>>> >>>>>>>> On Sat, 10 Oct 2020 at 12:01, 刘 芃成 <[hidden email]> >>>>> wrote: >>>>>>>> >>>>>>>>> Hi, Benchao, >>>>>>>>> I think I got your point, actually, in current >>>> implementation >>>>>>>> for >>>>>>>>> group window aggregation, the value of time attributes(e.g. >>>>>>>>> TUMBLE_ROWTIME/TUMBLE_PROCTIME) is calculated as (window_end – >>>> 1), >>>>> so I >>>>>>>>> think we can just use it directly if you need this. But I think >>>> this >>>>>>>> time >>>>>>>>> attributes is mainly suggested to use in case of cascaded window >>>>>>>> operations. >>>>>>>>> Regarding the example you provided, I think the semantics of the >>>> SQL >>>>> in >>>>>>>>> your example which doing interval join(e.g. with TUMBLE_ROWTIME) >>>>> after >>>>>>>>> window aggregation is not clear in the current implementation, >>>> and I >>>>>>>> think >>>>>>>>> that’s a strong reason why we need the new TVFs syntax. >>>>>>>>> With the new syntax, users should understand which time >>>> column >>>>> to >>>>>>>>> use and how to generate it when doing interval join and etc. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Pengcheng >>>>>>>>> >>>>>>>>> 发件人: Benchao Li <[hidden email]> >>>>>>>>> 日期: 2020年10月10日 星期六 上午11:02 >>>>>>>>> 收件人: pengcheng Liu <[hidden email]> >>>>>>>>> 抄送: dev <[hidden email]> >>>>>>>>> 主题: Re: [DISCUSS] FLIP-145: Support SQL windowing table-valued >>>>> function >>>>>>>>> >>>>>>>>> Hi pengcheng, >>>>>>>>> >>>>>>>>> Thanks for your response. >>>>>>>>> I knew that the original time attribute column will be retained >>>> after >>>>>>>> the >>>>>>>>> TVF, >>>>>>>>> what I'm questioning is how do we get the time attribute column >>>> after >>>>>>>>> Aggregation. >>>>>>>>> Your answer did not remove my doubts about this. >>>>>>>>> >>>>>>>>> It's ok if we did not plan to integrate new TVF aggregate with >>>> old >>>>>>>> "time >>>>>>>>> attribute scenarios" >>>>>>>>> listed in my previous email in this FLIP. However it's good to >>>>>>>> elaborate >>>>>>>>> leave it to the future plan. >>>>>>>>> >>>>>>>>> pengcheng Liu <[hidden email]<mailto: >>>>>>>>> [hidden email]>> 于2020年10月10日周六 上午10:45写道: >>>>>>>>> Hi,Benchao, >>>>>>>>> In TVFs, the time attributes is just passed through from >>>> parent >>>>>>>> rels, >>>>>>>>> and the TVFs just add two >>>>>>>>> additional window attributes(i.e. window_start & window_end). >>>>>>>> Also, I >>>>>>>>> think the time columns can be not only a time attribute >>>>>>>>> with type of `TimeIndicatorType` but also a regular column >>>> with >>>>>>>> type >>>>>>>>> of `Timestamp`. >>>>>>>>> >>>>>>>>> For cascaded window operations, we can use >>>>> window_start/window_end >>>>>>>> of >>>>>>>>> the previous window result directly to >>>>>>>>> indicate operating on the same window, or use new DESCRIPTOR >>>>>>>> column >>>>>>>>> to assign new windows, in case of the change of >>>>>>>>> the time column(e.g. in some case, the original timestamp is >>>>>>>>> inaccurate and need some conversion to be used). >>>>>>>>> >>>>>>>>> You can check the definition or signature of these TVFs in >>>> the >>>>>>>> FLIP. >>>>>>>>> e.g. >>>>>>>>> SELECT * FROM TABLE( >>>>>>>>> TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) >>>>>>>>> In the example, the `bidtime` is the time attribute column, >>>> which >>>>>>>> is >>>>>>>>> the first operand of the DESCRIPTOR function. >>>>>>>>> >>>>>>>>> +1 start voting. >>>>>>>>> >>>>>>>>> Benchao Li <[hidden email]<mailto:[hidden email]>> >>>>>>>>> 于2020年10月10日周六 上午10:08写道: >>>>>>>>> Hi Jark, >>>>>>>>> >>>>>>>>> 2 & 3 sounds good to me. >>>>>>>>> >>>>>>>>> Regarding time attribute, >>>>>>>>> I still have some questions, I knew it's easy to support cascaded >>>>>>>> window >>>>>>>>> aggregate using new TVFs. >>>>>>>>> However there are some other places where need time attribute: >>>>>>>>> - CEP >>>>>>>>> - interval join >>>>>>>>> - order by >>>>>>>>> - over window >>>>>>>>> If there is no time attribute column, how do we integrate these >>>> old >>>>>>>>> features with the new TVFs. >>>>>>>>> E.g. >>>>>>>>> StreamA -> new window aggregate -> interval join -> Sink >>>>>>>>> / >>>>>>>>> StreamB ----------------------------------- >>>>>>>>> >>>>>>>>> >>>>>>>>> Jark Wu <[hidden email]<mailto:[hidden email]>> >>>> 于2020年10月9日周五 >>>>>>>>> 下午11:51写道: >>>>>>>>> Hi Benchao, >>>>>>>>> >>>>>>>>> 1) time attribute >>>>>>>>> Yes. We don't need time attribute auxiliary function. Because >>>> the new >>>>>>>>> window operations are all based on the >>>>>>>>> window_start and window_end columns instead of on the time >>>>>>>> attributes. So >>>>>>>>> we don't need to propagate time attributes. >>>>>>>>> Cascaded window aggregate can be expressed by simply GROUP BY the >>>>>>>>> window_start and window_end of the previous window result. >>>>>>>>> I have added a cascaded window aggregate example in the Tumbling >>>>> Window >>>>>>>>> section in the FLIP. >>>>>>>>> If you want to define proctime window aggregate, the time column >>>> in >>>>> TVF >>>>>>>>> should be a proctime attribute field (or PROCTIME() function). >>>>>>>>> >>>>>>>>> 2) batch support >>>>>>>>> Yes. The proposed syntax/API are unified for batch and streaming. >>>>> Batch >>>>>>>>> support is in the plan, but may not have enough time to catch up >>>>> 1.12. >>>>>>>>> >>>>>>>>> 3) support `grouping sets` >>>>>>>>> This is not included in the FLIP, but I think it's great if we >>>> can >>>>>>>> support >>>>>>>>> `grouping sets`. >>>>>>>>> The existing window impl doesn't support this because we convert >>>> the >>>>>>>>> LogicalAggregate into WindowAggregate in the beginning, >>>>>>>>> the expand grouping sets rule can't be applied in this situation. >>>>>>>>> Fortunately, with the new window impl, the conversion to >>>>>>>> WindowAggregate >>>>>>>>> will happen at the end, so I think the expand rule can be >>>>>>>>> applied and support this feature naturally. >>>>>>>>> Therefore, IMO, we don't need to include this feature in this >>>> FLIP to >>>>>>>> avoid >>>>>>>>> the FLIP being too large. >>>>>>>>> This can be a follow-up issue (maybe just add tests and docs) >>>> after >>>>> the >>>>>>>>> FLIP. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jark >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, 9 Oct 2020 at 19:09, 刘 芃成 <[hidden email] >>>>> <mailto: >>>>>>>>> [hidden email]>> wrote: >>>>>>>>> >>>>>>>>>> Hi,Benchao, >>>>>>>>>> Welcome to join the discussion, yes, this new syntax >>>> can >>>>>>>> make SQL >>>>>>>>>> more clear and simpler. >>>>>>>>>> For your first question, the `window_start` and >>>>> `window_end` >>>>>>>>>> columns will be added automatically, >>>>>>>>>> so we don't need to use auxiliary group functions to >>>> infer >>>>> or >>>>>>>>>> access the window properties. >>>>>>>>>> >>>>>>>>>> For the `grouping sets` on TVFs, I think it's >>>> interesting >>>>> if >>>>>>>> we >>>>>>>>>> can support it, as we already supported `grouping sets` >>>>>>>>>> on streaming aggregates in blink planner. But I'm not >>>> sure >>>>>>>> if it >>>>>>>>>> will be included into this FLIP. >>>>>>>>>> >>>>>>>>>> cc @Jark Wu >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Pengcheng >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 在 2020/10/9 下午5:25,“Benchao Li”<[hidden email]<mailto: >>>>>>>>> [hidden email]>> 写入: >>>>>>>>>> >>>>>>>>>> Thanks Jark for bringing this discussion, I like this FLIP >>>> very >>>>>>>> much. >>>>>>>>>> >>>>>>>>>> Especially the cumulate window, it's much like the current >>>>> TUMBLE >>>>>>>>>> window + >>>>>>>>>> Fast Emit (which is an undocumented experimental feature), >>>>>>>> however, >>>>>>>>>> it's >>>>>>>>>> more powerful. >>>>>>>>>> >>>>>>>>>> And This will make the SQL semantic more standard, >>>> especially >>>>>>>> for the >>>>>>>>>> HOPPING window. >>>>>>>>>> >>>>>>>>>> Regarding time attribute, >>>>>>>>>> It seems that we don't need a specific function to infer >>>> the >>>>> time >>>>>>>>>> attribute >>>>>>>>>> like >>>>>>>>>> `TUMBLE_ROWTIME` / `TUMBLE_PROCTIME`. Then are >>>> `window_start` >>>>> and >>>>>>>>>> `window_end` >>>>>>>>>> column a time attribute column automatically? >>>>>>>>>> - If not, what will be the time attribute of the result >>>>> relation >>>>>>>> of >>>>>>>>>> these >>>>>>>>>> TVFs? >>>>>>>>>> Especially after the window aggregation. >>>>>>>>>> - If yes, then how do we handle proctime? >>>>>>>>>> >>>>>>>>>> Regarding batch operators, >>>>>>>>>> It's great to hear that we can reuse the batch operators in >>>>>>>>> continuous >>>>>>>>>> batch mode >>>>>>>>>> as you mentioned in the FLIP. >>>>>>>>>> Current window aggregate could also be used in batch mode >>>> with >>>>>>>>>> rowtime. Do >>>>>>>>>> you plan >>>>>>>>>> to support these TVFs for batch mode in this FLIP? Hence >>>> the >>>>>>>>> Table/SQL >>>>>>>>>> is a >>>>>>>>>> unified >>>>>>>>>> API, it's great if we can keep the features complete both >>>> in >>>>>>>>> streaming >>>>>>>>>> and >>>>>>>>>> batch mode. >>>>>>>>>> >>>>>>>>>> There is one more question, I don't know whether it should >>>> be >>>>>>>>>> considered in >>>>>>>>>> this FLIP. >>>>>>>>>> Does the new window support `grouping sets`? (It's not >>>>> supported >>>>>>>> in >>>>>>>>> old >>>>>>>>>> window impl). >>>>>>>>>> >>>>>>>>>> Jark Wu <[hidden email]<mailto:[hidden email]>> >>>>>>>> 于2020年10月9日周五 >>>>>>>>> 下午4:14写道: >>>>>>>>>> >>>>>>>>>> > Hi all, >>>>>>>>>> > >>>>>>>>>> > I know we have a lot of discussion and development on >>>> going >>>>>>>> right >>>>>>>>>> now but >>>>>>>>>> > it would be great if we can get FLIP-145 into a votable >>>>> state. >>>>>>>>>> > If there are no objections, I would like to start voting >>>> in >>>>> the >>>>>>>>> next >>>>>>>>>> days. >>>>>>>>>> > >>>>>>>>>> > Best, >>>>>>>>>> > Jark >>>>>>>>>> > >>>>>>>>>> > On Thu, 1 Oct 2020 at 14:29, Jark Wu <[hidden email] >>>>> <mailto: >>>>>>>>> [hidden email]>> wrote: >>>>>>>>>> > >>>>>>>>>> > > Hi everyone, >>>>>>>>>> > > >>>>>>>>>> > > I have added a section for Performance Optimization to >>>>>>>> describe >>>>>>>>>> how to >>>>>>>>>> > > improve the performance in the short-term and long-term >>>>>>>>>> > > and sketch the future performance potential under the >>>> new >>>>>>>> window >>>>>>>>>> API. >>>>>>>>>> > > Introducing the window API is just the first step, we >>>> will >>>>>>>>>> > > continuously improve the performance to make it >>>> powerful >>>>> and >>>>>>>>>> useful. >>>>>>>>>> > > >>>>>>>>>> > > Best, >>>>>>>>>> > > Jark >>>>>>>>>> > > >>>>>>>>>> > > On Thu, 1 Oct 2020 at 14:28, Jark Wu <[hidden email] >>>>>>>> <mailto: >>>>>>>>> [hidden email]>> wrote: >>>>>>>>>> > > >>>>>>>>>> > >> Hi Pengcheng, >>>>>>>>>> > >> >>>>>>>>>> > >> Yes, the window TVF is part of the FLIP. Welcome to >>>>>>>> contribute >>>>>>>>>> and join >>>>>>>>>> > >> the discussion. >>>>>>>>>> > >> Regarding the SESSION window aggregation, users can >>>> use >>>>> the >>>>>>>>>> existing >>>>>>>>>> > >> grouped session window function. >>>>>>>>>> > >> >>>>>>>>>> > >> Best, >>>>>>>>>> > >> Jark >>>>>>>>>> > >> >>>>>>>>>> > >> On Sun, 27 Sep 2020 at 21:24, liupengcheng < >>>>>>>>>> [hidden email]<mailto:[hidden email] >>>>> >>>>>>>>>> > > >>>>>>>>>> > >> wrote: >>>>>>>>>> > >> >>>>>>>>>> > >>> Hi Jark, >>>>>>>>>> > >>> Thanks for reply, yes, I think it's a good >>>>>>>> feature, it >>>>>>>>>> can >>>>>>>>>> > >>> improve the NRT scenarios >>>>>>>>>> > >>> as you mentioned in the FLIP. Also, I think >>>> it >>>>> can >>>>>>>>>> improve the >>>>>>>>>> > >>> streaming SQL greatly, >>>>>>>>>> > >>> it can support richer window operations in >>>> flink >>>>>>>> SQL >>>>>>>>> and >>>>>>>>>> bring >>>>>>>>>> > >>> great convenience to users. >>>>>>>>>> > >>> (we are now only supported group window in >>>>> flink). >>>>>>>>>> > >>> >>>>>>>>>> > >>> Regarding the SESSION window, I think it's >>>>>>>> especially >>>>>>>>>> useful >>>>>>>>>> > for >>>>>>>>>> > >>> user behavior analysis(e.g. >>>>>>>>>> > >>> counting user visits on a news website or >>>> social >>>>>>>>>> platform), but >>>>>>>>>> > >>> I agree that we can keep it >>>>>>>>>> > >>> out of the FLIP now to catch up 1.12. >>>>>>>>>> > >>> >>>>>>>>>> > >>> Recently, I've done some work on the stream >>>>> planner >>>>>>>>> with >>>>>>>>>> the >>>>>>>>>> > >>> TVFs, and I'm willing to contribute >>>>>>>>>> > >>> to this part. Is it in the plan of this FLIP? >>>>>>>>>> > >>> >>>>>>>>>> > >>> Best, >>>>>>>>>> > >>> PengchengLiu >>>>>>>>>> > >>> >>>>>>>>>> > >>> >>>>>>>>>> > >>> 在 2020/9/26 下午11:09,“Jark Wu”<[hidden email] >>>> <mailto: >>>>>>>>> [hidden email]>> 写入: >>>>>>>>>> > >>> >>>>>>>>>> > >>> Hi pengcheng, >>>>>>>>>> > >>> >>>>>>>>>> > >>> That's great to see you also have the need of >>>> window >>>>>>>> join. >>>>>>>>>> > >>> You are right, the windowing TVF is a powerful >>>>> feature >>>>>>>>> which >>>>>>>>>> can >>>>>>>>>> > >>> support >>>>>>>>>> > >>> more operations in the future. >>>>>>>>>> > >>> I think it as of the date time "partition" >>>> selection >>>>> in >>>>>>>>>> batch SQL >>>>>>>>>> > >>> jobs, >>>>>>>>>> > >>> with this new syntax, I think it is possible >>>>>>>>>> > >>> to migrate traditional batch SQL jobs to Flink >>>> SQL >>>>> by >>>>>>>>>> changing a >>>>>>>>>> > >>> few lines. >>>>>>>>>> > >>> >>>>>>>>>> > >>> Regarding the SESSION window, this is on purpose >>>> to >>>>>>>> keep it >>>>>>>>>> out of >>>>>>>>>> > >>> the >>>>>>>>>> > >>> FLIP, because we want to keep the >>>>>>>>>> > >>> FLIP small to catch up 1.12 and SESSION TVF is >>>> rarely >>>>>>>>> useful >>>>>>>>>> (e.g. >>>>>>>>>> > >>> session >>>>>>>>>> > >>> window join?). >>>>>>>>>> > >>> >>>>>>>>>> > >>> Best, >>>>>>>>>> > >>> Jark >>>>>>>>>> > >>> >>>>>>>>>> > >>> On Fri, 25 Sep 2020 at 22:59, liupengcheng < >>>>>>>>>> > >>> [hidden email]<mailto: >>>>>>>> [hidden email] >>>>>>>>>>> >>>>>>>>>> > >>> wrote: >>>>>>>>>> > >>> >>>>>>>>>> > >>> > Hi, Jark, >>>>>>>>>> > >>> > I'm very interested in this feature, >>>> and >>>>> I'm >>>>>>>> also >>>>>>>>>> working >>>>>>>>>> > >>> on this >>>>>>>>>> > >>> > recently. >>>>>>>>>> > >>> > I just have a glance at the FLIP, it's >>>>> good, >>>>>>>> but >>>>>>>>> I >>>>>>>>>> found >>>>>>>>>> > >>> that >>>>>>>>>> > >>> > there is no plan to add SESSION windows. >>>>>>>>>> > >>> > Also, I think there can be more things >>>> we >>>>>>>> can do >>>>>>>>>> based on >>>>>>>>>> > >>> this new >>>>>>>>>> > >>> > syntax. For example, >>>>>>>>>> > >>> > - window sort support >>>>>>>>>> > >>> > - window union/intersect/minus support >>>>>>>>>> > >>> > - Improve dimension table join >>>>>>>>>> > >>> > We can have more deep discussion on >>>> this >>>>> new >>>>>>>>>> feature >>>>>>>>>> > later >>>>>>>>>> > >>> . >>>>>>>>>> > >>> > I've also opened an jira that is >>>> related to >>>>>>>> this >>>>>>>>>> feature >>>>>>>>>> > >>> recently: >>>>>>>>>> > >>> > >>>> https://issues.apache.org/jira/browse/FLINK-18830 >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > Best! >>>>>>>>>> > >>> > PengchengLiu >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > 在 2020/9/25 下午10:30,“Jark Wu”< >>>> [hidden email] >>>>>>>> <mailto: >>>>>>>>> [hidden email]>> 写入: >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > Hi everyone, >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > I want to start a FLIP about supporting >>>>> windowing >>>>>>>>>> > table-valued >>>>>>>>>> > >>> > functions >>>>>>>>>> > >>> > (TVF). >>>>>>>>>> > >>> > The main purpose of this FLIP is to >>>> improve the >>>>>>>> near >>>>>>>>>> > real-time >>>>>>>>>> > >>> (NRT) >>>>>>>>>> > >>> > experience of Flink. >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > FLIP-145: >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > We want to introduce TUMBLE, HOP, CUMULATE >>>>>>>> windowing >>>>>>>>>> TVFs, >>>>>>>>>> > the >>>>>>>>>> > >>> > CUMULATE is >>>>>>>>>> > >>> > a new kind of window. >>>>>>>>>> > >>> > With the windowing TVFs, we can support >>>> richer >>>>>>>>>> operations on >>>>>>>>>> > >>> windows, >>>>>>>>>> > >>> > including window join, window TopN and so >>>> on. >>>>>>>>>> > >>> > This makes things simple: we only need to >>>>> assign >>>>>>>>>> windows at >>>>>>>>>> > the >>>>>>>>>> > >>> > beginning >>>>>>>>>> > >>> > of the query, and then apply operations >>>> after >>>>>>>> that >>>>>>>>> like >>>>>>>>>> > >>> traditional >>>>>>>>>> > >>> > batch >>>>>>>>>> > >>> > SQL. >>>>>>>>>> > >>> > We hope it can help to reduce the learning >>>>> curve >>>>>>>> of >>>>>>>>>> windows, >>>>>>>>>> > >>> improve >>>>>>>>>> > >>> > NRT >>>>>>>>>> > >>> > for Flink, and attract more batch users. >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > A simple code snippet for 10 minutes >>>> tumbling >>>>>>>> window >>>>>>>>>> > aggregate: >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > SELECT window_start, window_end, SUM(price) >>>>>>>>>> > >>> > FROM TABLE( >>>>>>>>>> > >>> > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), >>>>>>>> INTERVAL >>>>>>>>>> '10' >>>>>>>>>> > >>> MINUTES)) >>>>>>>>>> > >>> > GROUP BY window_start, window_end; >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > I'm looking forward to your feedback. >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > Best, >>>>>>>>>> > >>> > Jark >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> >>>>>>>>>> > >>> >>>>>>>>>> > >>> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Benchao Li >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Benchao Li >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Benchao Li >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>> >>>> >>>> -- >>>> Best, Jingsong Lee >>>> >>> > |
Hi Timo,
1a) Deprecation: I'm fine with deprecating the old syntax and only use the new one in all examples, docs, slides etc. Will update the FLIP later. We can drop the old syntax at some point in the future, but may need another discussion. 1b) Time attributes: Are you fine with the "window_time" column approach? 4) Semantics: The SQL standards propose two semantics: SET and ROW. They are explained in the paper (Chapter 5.1.2): - Row semantics means that the result of the PTF is decided on a row-by-row basis. A table should be given row semantics if the PTF does not care how rows are assigned to virtual processors. - Set semantics means that the outcome of the function depends on how the data is partitioned. A table should be given set semantics if all rows of a partition should be processed on the same virtual processor. A per-set table requires PARTITIONED BY clause. In examples of the papers, UDjoin has two input tables with set semantics, Pivot has an input table with row semantics, In our case, tumbling, sliding, and cumulative windows are all row semantic PTFs, because the result (window_start, window_end) id decided on a row-by-row basis, we don't need to partition data before assigning these windows, they just act like pure UDTFs. Therefore, we don't need the PARTITIONED BY clause for them. However, I think the session window is a set semantic PTF, because all data of a user should be processed together (stateful processing). So the syntax of session window should be : > SELECT * FROM SESSION( data => TABLE Bids PARTITIONED BY bidder, timecol => DESCRIPTOR(bidtime), gap => INTERVAL '5' MINUTES); However, the session window syntax in Calcite is: > SELECT * FROM SESSION( data => TABLE Bids, timecol => DESCRIPTOR(bidtime), keycols => DESCRIPTOR(bidder), gap => INTERVAL '5' MINUTES); This is not correct and there is already a dicussion in Calcite [1]. What do you think? 5) Future: I'm also very excited about the potential ability of PTF. As I can see, in the future, we can support the following features in SQL in a standard PTF way. - advanced operations supported in Table API (FLIP-29), e.g. drop_columns, user-defined-table-aggregate - user defined join operator - a shortcut TopN function - re-assign watermarks? - re-partition data, similar feature to Hive DISTRIBUTED BY syntax. - ... Best, Jark [1]: https://issues.apache.org/jira/browse/CALCITE-3780?focusedCommentId=17123472&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17123472 On Tue, 13 Oct 2020 at 22:24, Timo Walther <[hidden email]> wrote: > Hi Jark, > > 1a) Deprecation: I totally agree that dropping old syntax is not very > easy in SQL. That's why I am also extremely cautious about adding new > syntax in this FLIP. However, I would already deprecate the old syntax > and only use the new one in all examples, docs, slides etc. This > hopefully gives us a chance to drop the old syntax at some point in the > future. As far as I know, Calcite will drop support for the old syntax > soon which means that we have maybe 2-3 Flink releases until we will > need to either drop it as well or maintain custom code. > > 1b) Time attributes: I haven't noticed this limitation. We should > definitely address this issue in the FLIP. Every time operation should > be able to express the new rowtime. For MATCH_RECOGNIZE we use > `MATCH_ROWTIME`, we could make this behavior similar and just use a > helper UDF but I would prefer a similar approach to window_start and > window_end. > > 3) Simplified syntax: Yes, after looking deeper into this topic, I see > that it seems Oracle is not standard compliant here. So let's postpone > this change. But I would be in favor of this syntax because the > additional `TABLE()` keyword confuses users and even SQL experts. > > 4) Semantics: I read the summary of the SQL 2016 standard [1] again and > I'm wondering if the key-ing semantics in the FLIP are correct using > GROUP BY. The paper illustrates the following example: > > SELECT E.*, D.* > FROM TABLE( > UDJoin ( > T1 => TABLE (Emp) AS E > PARTITION BY Deptno, > T2 => TABLE (Dept) AS D > PARTITION BY Deptno > ORDER BY Tstamp > ) > ) > > where the first PTF parameter is declared with `WITH SET SEMANTICS`. > > The paper further states: > > "WITH SET SEMANTICS is specified when the > outcome of the function depends on how the data is > partitioned. A table should be given set semantics if all > rows of a partition should be processed on the same > virtual processor." > > Isn't this exactly what we need for windows as well? Shouldn't we use > the following syntax then: > > SELECT * > FROM TABLE( > Tumble ( > data => > TABLE (InputTable) > PARTITION BY userId > ORDER BY timestamp > ) > ) > > In the end all windows are just PTFs, maybe we should rather think about > how we support PTFs in the near future. Because they would open an > entire new set of use cases to SQL. The examples in chapter 12 of the > 2016 standard ranging from `CSVreader` to `UDjoin` are impressive. > > Regards, > Timo > > [1] > > https://www.researchgate.net/profile/Fred_Zemke/publication/329593276_The_new_and_improved_SQL2016_standard/links/5c17eb50a6fdcc494ffc5999/The-new-and-improved-SQL2016-standard.pdf > > On 13.10.20 12:42, Jark Wu wrote: > > Hi everyone, > > > > Timo just raised a good point in the vote thread. I copied the feedback > > here: > > > >> Timo: > > 1) I think we should not offer 2 different kinds of syntax that do the > > same thing. We should deprecate the old syntax. > > 2) We should have session windows in the new syntax as well to give > users > > a complete migration path. > > 3) We should investigate if we can remove the additional `FROM > > TABLE(...)` syntax. As far as I see it in other examples from Oracle > > 18c, the additional `TABLE()` syntax is not necessary anymore for > > polymorphic table functions. > > > > > > Here are my comments: > > 1) I'm not sure about this. If we are going to drop the old syntax, this > > will break lots of existing SQL jobs. > > Upgrading SQL jobs is not as easy as Table API jobs, We should be as > > cautious as possible for this. > > > > Besides, if we want to deprecate old syntax, we must provide equal > > functionality first, > > and the new syntax doesn't support propagate time attributes. A possible > > solution can be to > > generate one more column "window_time" for window TVFs. The value of > > "window_time" would > > always be "window_end - 1" and has the time attribute type. Users can > > propagate the time attribute by > > adding "window_time" to the "group by" and "join on" clauses with the > > "window_start", "window_end" together. > > > > 2) I will add session window syntax to the FLIP later. > > > > 3) I like the simplified syntax "FROM tumble(input, rowtime, interval '1' > > minute)". > > However, the polymorphic table function syntax introduced in SQL standard > > 2016 [1] requires > > the TABLE() and DESCRIPTOR() syntax (see Chapter-8 “Invocation”). > > Therefore, I think it's safe to support the standard syntax first, and > can > > explore whether we can > > extend the syntax to make TABLE() and DESCRIPTOR() keywords optional. > > Note that, Calcite parser currently doesn't support the simplified > syntax, > > and this definitely needs > > to be discussed in the Calcite community. Actually, there has been a > > discussion about this [2], and Julian said: > > > >> Standard SQL doesn’t allow functions in the FROM clause. I think it’s > > because tables and functions are in > > different namespaces (and therefore there could be a table and a > function > > with the same name). > > So you need to use the TABLE keyword to indicate that you are using a > > function as a table. > > > > Best, > > Jark > > > > [1]: > > > https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip > > [2]: > > > https://lists.apache.org/x/thread.html/4a91632b1c780ef9d67311f90fce626582faae7d30a134a768c3d324@%3Cdev.calcite.apache.org%3E > > > > On Sat, 10 Oct 2020 at 17:59, Jark Wu <[hidden email]> wrote: > > > >> Hi everyone, > >> > >> Thanks everyone for this healthy discussion. I think we have addressed > all > >> the concerns. I would continue with a voting. > >> If you have any new objections, feel free to let me know. > >> > >> Best, > >> Jark > >> > >> On Sat, 10 Oct 2020 at 17:54, Jark Wu <[hidden email]> wrote: > >> > >>> Hi Jingsong, > >>> > >>> That's a good question. I did have searched a lot and didn't find any > >>> system that provides such an out-of-box function. > >>> I guess the reason is that in the traditional batch systems, this > feature > >>> is supported by the over window and they don't need to invent a > >>> new function/syntax for this. > >>> For streaming systems, we are the first one to propose this new window. > >>> > >>> However, I think CUMULATE is a good name. Because almost all the > >>> databases call such scenarios as "cumulative window", e.g. > Snowflake[1], > >>> SQL Server [2], Postgres [3]. > >>> Thus we choose "cumulative" as the base name, but use the verb form > >>> "cumulate" because other window function names are also verbs, e.g. > tumble, > >>> hop. > >>> > >>> I hope this can address your concern. > >>> > >>> Best, > >>> Jark > >>> > >>> [1]: > >>> > https://docs.snowflake.com/en/sql-reference/functions-analytic.html#cumulative-window-frame-examples > >>> [2]: > >>> > https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql?view=sql-server-ver15#c-producing-a-moving-average-and-cumulative-total > >>> [3]: > >>> > https://popsql.com/learn-sql/postgresql/how-to-calculate-cumulative-sum-running-total-in-postgresql > >>> > >>> On Sat, 10 Oct 2020 at 17:26, Jingsong Li <[hidden email]> > wrote: > >>> > >>>> +1 for voting. Thanks Jark for driving. > >>>> > >>>> +1 for TVF, It has been put forward by theory and supported by > calcite. > >>>> It > >>>> will greatly enhance the window related operations. > >>>> > >>>> My personal feeling is that after TVF, the following operations can be > >>>> similar to the traditional batch SQL, as long as the window related > >>>> attributes are included in the key. > >>>> > >>>> I am not sure about the CUMULATE window, yes, It's a common > requirement, > >>>> Is > >>>> there any more evidence (other systems) to prove this word > ("CUMULATE") > >>>> is > >>>> appropriate. > >>>> > >>>> Best, > >>>> Jingsong > >>>> > >>>> On Sat, Oct 10, 2020 at 3:43 PM Jark Wu <[hidden email]> wrote: > >>>> > >>>>> Hi Pengcheng, > >>>>> > >>>>> IIUC, the "stream operators" you mean is the non-time operators or > >>>> called > >>>>> regular operators, such as regular join, regular aggregate. > >>>>> But you may misunderstand me, only the time operators can't be > applied > >>>>> after the new window operators, because of missing time attributes. > >>>>> The regular operators can still be applied after the new window > >>>> operators. > >>>>> > >>>>> Regarding using window TVFs to re-assign event-time and watermarks, > >>>> I'm not > >>>>> sure about this. > >>>>> Because assigning watermark requires to define the watermark > strategy, > >>>>> however, the window TVF doesn't provide such ability. > >>>>> Polymorphic table functions are table functions which just append > >>>>> additional columns and convert N rows into M rows, it can't touch > meta > >>>>> information. > >>>>> > >>>>> Best, > >>>>> Jark > >>>>> > >>>>> On Sat, 10 Oct 2020 at 15:41, Jark Wu <[hidden email]> wrote: > >>>>> > >>>>>> Hi Danny, > >>>>>> > >>>>>> Thanks for the hint about named params syntax, I added examples with > >>>>> named > >>>>>> params in the FLIP. > >>>>>> > >>>>>> Best, > >>>>>> Jark > >>>>>> > >>>>>> > >>>>>> On Sat, 10 Oct 2020 at 15:03, Pengcheng Liu < > >>>> [hidden email] > >>>>>> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi, Jark, > >>>>>>> > >>>>>>> I've got some different opinions there, I think it's a very > >>>> common > >>>>> use > >>>>>>> case to use > >>>>>>> window operators in combination with streaming operators(even > >>>> those > >>>>>>> time operators). > >>>>>>> (e.g. for some tables, users only care data within a period, > but > >>>> for > >>>>>>> other tables, they may > >>>>>>> want the whole historical data). > >>>>>>> The pipeline may looks like this: > >>>>>>> window join -> dimension table join -> stream aggregate -> > stream > >>>>> sort > >>>>>>> > >>>>>>> Just as what you said, the key clause can be used to > distinguish > >>>>>>> whether a operator should > >>>>>>> be translated to a window operator or a streaming operator. > >>>>>>> > >>>>>>> Also, as I've mentioned before, 1) for time operator after > window > >>>>>>> aggregation, the auxiliary function > >>>>>>> which is used to access time attribute column can be actually > >>>>> replaced > >>>>>>> with (window_end -1). > >>>>>>> Actually, we only just need to make the results of the upstream > >>>>>>> contains a time column whose > >>>>>>> range is within (window_start, window_end), and thus the > >>>> downstream > >>>>>>> time operators can work on it > >>>>>>> (driving by the original watermark in the source). 2) for time > >>>>>>> operator after other window operators, > >>>>>>> the downstream time operators can access the time column > directly > >>>>> from > >>>>>>> it's input. > >>>>>>> > >>>>>>> One more thoughts there, maybe the window TVFs can re-assign > >>>>>>> timestamps and watermarks, so > >>>>>>> that in some case when the watermark can not be retrieved from > >>>> source > >>>>>>> directly(may needs some > >>>>>>> conversions), the watermark can still be assigned dynamically > in > >>>> the > >>>>>>> SQL(use the time column as > >>>>>>> the watermark column) and thus make it work. I think this can > >>>> save > >>>>>>> much time to revise the event > >>>>>>> time column in some cases(this is a real demand in our > production > >>>>>>> environment). > >>>>>>> > >>>>>>> I strongly suggest that we should support the combination usage > >>>> of > >>>>>>> window operators and > >>>>>>> streaming operators. And I think we can achieve this with > little > >>>>> work. > >>>>>>> > >>>>>>> Best, > >>>>>>> Pengcheng > >>>>>>> > >>>>>>> > >>>>>>> Jark Wu <[hidden email]> 于2020年10月10日周六 下午1:45写道: > >>>>>>> > >>>>>>>> Hi Benchao, > >>>>>>>> > >>>>>>>> That's a good question. > >>>>>>>> > >>>>>>>> IMO, the new windowed operators and the current time operators are > >>>> two > >>>>>>>> different sets of functions, > >>>>>>>> just like time operators and non-time operators are two different > >>>> sets > >>>>> of > >>>>>>>> functions. > >>>>>>>> I think it's fine if we don't support integrating them, just like > >>>> time > >>>>>>>> operators can't be applied on non-windowed aggregate. > >>>>>>>> If users want to use time operators in the whole pipeline, then > >>>> he/she > >>>>>>>> can > >>>>>>>> use the grouped window aggregates instead of the window TVFs. > >>>>>>>> > >>>>>>>> The key idea of window TVF is that all the operators in the > >>>> pipeline > >>>>> are > >>>>>>>> based on the **windows**. > >>>>>>>> In terms of syntax, if the key clause (e.g. group by, partitioned > >>>> by, > >>>>>>>> join > >>>>>>>> on, order by) contains window_start and window_end, > >>>>>>>> it can be translated into windowed operators. > >>>>>>>> Thus, we will have windowed CEP, windowed sort, windowed over > >>>> aggregate > >>>>>>>> in > >>>>>>>> the future to make it possible to build a windowed pipeline. > >>>>>>>> > >>>>>>>> But I think we can elaborate the integration more in the future if > >>>>> users > >>>>>>>> need it. Actually, I don't fully understand the scenario of > >>>> integrating > >>>>>>>> window TVF and time operators at this point. > >>>>>>>> For example, interval join an input stream and a window join > >>>> result. I > >>>>>>>> don't see why it can't be expressed by nested window join and why > >>>> users > >>>>>>>> have to use interval join here. > >>>>>>>> Maybe we can wait for more inputs from users when the window TVF > is > >>>>>>>> released and we can elaborate it again. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Jark > >>>>>>>> > >>>>>>>> On Sat, 10 Oct 2020 at 12:01, 刘 芃成 <[hidden email]> > >>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi, Benchao, > >>>>>>>>> I think I got your point, actually, in current > >>>> implementation > >>>>>>>> for > >>>>>>>>> group window aggregation, the value of time attributes(e.g. > >>>>>>>>> TUMBLE_ROWTIME/TUMBLE_PROCTIME) is calculated as (window_end – > >>>> 1), > >>>>> so I > >>>>>>>>> think we can just use it directly if you need this. But I think > >>>> this > >>>>>>>> time > >>>>>>>>> attributes is mainly suggested to use in case of cascaded window > >>>>>>>> operations. > >>>>>>>>> Regarding the example you provided, I think the semantics of the > >>>> SQL > >>>>> in > >>>>>>>>> your example which doing interval join(e.g. with TUMBLE_ROWTIME) > >>>>> after > >>>>>>>>> window aggregation is not clear in the current implementation, > >>>> and I > >>>>>>>> think > >>>>>>>>> that’s a strong reason why we need the new TVFs syntax. > >>>>>>>>> With the new syntax, users should understand which time > >>>> column > >>>>> to > >>>>>>>>> use and how to generate it when doing interval join and etc. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Pengcheng > >>>>>>>>> > >>>>>>>>> 发件人: Benchao Li <[hidden email]> > >>>>>>>>> 日期: 2020年10月10日 星期六 上午11:02 > >>>>>>>>> 收件人: pengcheng Liu <[hidden email]> > >>>>>>>>> 抄送: dev <[hidden email]> > >>>>>>>>> 主题: Re: [DISCUSS] FLIP-145: Support SQL windowing table-valued > >>>>> function > >>>>>>>>> > >>>>>>>>> Hi pengcheng, > >>>>>>>>> > >>>>>>>>> Thanks for your response. > >>>>>>>>> I knew that the original time attribute column will be retained > >>>> after > >>>>>>>> the > >>>>>>>>> TVF, > >>>>>>>>> what I'm questioning is how do we get the time attribute column > >>>> after > >>>>>>>>> Aggregation. > >>>>>>>>> Your answer did not remove my doubts about this. > >>>>>>>>> > >>>>>>>>> It's ok if we did not plan to integrate new TVF aggregate with > >>>> old > >>>>>>>> "time > >>>>>>>>> attribute scenarios" > >>>>>>>>> listed in my previous email in this FLIP. However it's good to > >>>>>>>> elaborate > >>>>>>>>> leave it to the future plan. > >>>>>>>>> > >>>>>>>>> pengcheng Liu <[hidden email]<mailto: > >>>>>>>>> [hidden email]>> 于2020年10月10日周六 上午10:45写道: > >>>>>>>>> Hi,Benchao, > >>>>>>>>> In TVFs, the time attributes is just passed through from > >>>> parent > >>>>>>>> rels, > >>>>>>>>> and the TVFs just add two > >>>>>>>>> additional window attributes(i.e. window_start & > window_end). > >>>>>>>> Also, I > >>>>>>>>> think the time columns can be not only a time attribute > >>>>>>>>> with type of `TimeIndicatorType` but also a regular column > >>>> with > >>>>>>>> type > >>>>>>>>> of `Timestamp`. > >>>>>>>>> > >>>>>>>>> For cascaded window operations, we can use > >>>>> window_start/window_end > >>>>>>>> of > >>>>>>>>> the previous window result directly to > >>>>>>>>> indicate operating on the same window, or use new > DESCRIPTOR > >>>>>>>> column > >>>>>>>>> to assign new windows, in case of the change of > >>>>>>>>> the time column(e.g. in some case, the original timestamp is > >>>>>>>>> inaccurate and need some conversion to be used). > >>>>>>>>> > >>>>>>>>> You can check the definition or signature of these TVFs in > >>>> the > >>>>>>>> FLIP. > >>>>>>>>> e.g. > >>>>>>>>> SELECT * FROM TABLE( > >>>>>>>>> TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' > MINUTES)) > >>>>>>>>> In the example, the `bidtime` is the time attribute column, > >>>> which > >>>>>>>> is > >>>>>>>>> the first operand of the DESCRIPTOR function. > >>>>>>>>> > >>>>>>>>> +1 start voting. > >>>>>>>>> > >>>>>>>>> Benchao Li <[hidden email]<mailto:[hidden email]>> > >>>>>>>>> 于2020年10月10日周六 上午10:08写道: > >>>>>>>>> Hi Jark, > >>>>>>>>> > >>>>>>>>> 2 & 3 sounds good to me. > >>>>>>>>> > >>>>>>>>> Regarding time attribute, > >>>>>>>>> I still have some questions, I knew it's easy to support cascaded > >>>>>>>> window > >>>>>>>>> aggregate using new TVFs. > >>>>>>>>> However there are some other places where need time attribute: > >>>>>>>>> - CEP > >>>>>>>>> - interval join > >>>>>>>>> - order by > >>>>>>>>> - over window > >>>>>>>>> If there is no time attribute column, how do we integrate these > >>>> old > >>>>>>>>> features with the new TVFs. > >>>>>>>>> E.g. > >>>>>>>>> StreamA -> new window aggregate -> interval join -> Sink > >>>>>>>>> / > >>>>>>>>> StreamB ----------------------------------- > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Jark Wu <[hidden email]<mailto:[hidden email]>> > >>>> 于2020年10月9日周五 > >>>>>>>>> 下午11:51写道: > >>>>>>>>> Hi Benchao, > >>>>>>>>> > >>>>>>>>> 1) time attribute > >>>>>>>>> Yes. We don't need time attribute auxiliary function. Because > >>>> the new > >>>>>>>>> window operations are all based on the > >>>>>>>>> window_start and window_end columns instead of on the time > >>>>>>>> attributes. So > >>>>>>>>> we don't need to propagate time attributes. > >>>>>>>>> Cascaded window aggregate can be expressed by simply GROUP BY the > >>>>>>>>> window_start and window_end of the previous window result. > >>>>>>>>> I have added a cascaded window aggregate example in the Tumbling > >>>>> Window > >>>>>>>>> section in the FLIP. > >>>>>>>>> If you want to define proctime window aggregate, the time column > >>>> in > >>>>> TVF > >>>>>>>>> should be a proctime attribute field (or PROCTIME() function). > >>>>>>>>> > >>>>>>>>> 2) batch support > >>>>>>>>> Yes. The proposed syntax/API are unified for batch and streaming. > >>>>> Batch > >>>>>>>>> support is in the plan, but may not have enough time to catch up > >>>>> 1.12. > >>>>>>>>> > >>>>>>>>> 3) support `grouping sets` > >>>>>>>>> This is not included in the FLIP, but I think it's great if we > >>>> can > >>>>>>>> support > >>>>>>>>> `grouping sets`. > >>>>>>>>> The existing window impl doesn't support this because we convert > >>>> the > >>>>>>>>> LogicalAggregate into WindowAggregate in the beginning, > >>>>>>>>> the expand grouping sets rule can't be applied in this situation. > >>>>>>>>> Fortunately, with the new window impl, the conversion to > >>>>>>>> WindowAggregate > >>>>>>>>> will happen at the end, so I think the expand rule can be > >>>>>>>>> applied and support this feature naturally. > >>>>>>>>> Therefore, IMO, we don't need to include this feature in this > >>>> FLIP to > >>>>>>>> avoid > >>>>>>>>> the FLIP being too large. > >>>>>>>>> This can be a follow-up issue (maybe just add tests and docs) > >>>> after > >>>>> the > >>>>>>>>> FLIP. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Jark > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Fri, 9 Oct 2020 at 19:09, 刘 芃成 <[hidden email] > >>>>> <mailto: > >>>>>>>>> [hidden email]>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi,Benchao, > >>>>>>>>>> Welcome to join the discussion, yes, this new syntax > >>>> can > >>>>>>>> make SQL > >>>>>>>>>> more clear and simpler. > >>>>>>>>>> For your first question, the `window_start` and > >>>>> `window_end` > >>>>>>>>>> columns will be added automatically, > >>>>>>>>>> so we don't need to use auxiliary group functions to > >>>> infer > >>>>> or > >>>>>>>>>> access the window properties. > >>>>>>>>>> > >>>>>>>>>> For the `grouping sets` on TVFs, I think it's > >>>> interesting > >>>>> if > >>>>>>>> we > >>>>>>>>>> can support it, as we already supported `grouping sets` > >>>>>>>>>> on streaming aggregates in blink planner. But I'm not > >>>> sure > >>>>>>>> if it > >>>>>>>>>> will be included into this FLIP. > >>>>>>>>>> > >>>>>>>>>> cc @Jark Wu > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Pengcheng > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 在 2020/10/9 下午5:25,“Benchao Li”<[hidden email]<mailto: > >>>>>>>>> [hidden email]>> 写入: > >>>>>>>>>> > >>>>>>>>>> Thanks Jark for bringing this discussion, I like this FLIP > >>>> very > >>>>>>>> much. > >>>>>>>>>> > >>>>>>>>>> Especially the cumulate window, it's much like the current > >>>>> TUMBLE > >>>>>>>>>> window + > >>>>>>>>>> Fast Emit (which is an undocumented experimental feature), > >>>>>>>> however, > >>>>>>>>>> it's > >>>>>>>>>> more powerful. > >>>>>>>>>> > >>>>>>>>>> And This will make the SQL semantic more standard, > >>>> especially > >>>>>>>> for the > >>>>>>>>>> HOPPING window. > >>>>>>>>>> > >>>>>>>>>> Regarding time attribute, > >>>>>>>>>> It seems that we don't need a specific function to infer > >>>> the > >>>>> time > >>>>>>>>>> attribute > >>>>>>>>>> like > >>>>>>>>>> `TUMBLE_ROWTIME` / `TUMBLE_PROCTIME`. Then are > >>>> `window_start` > >>>>> and > >>>>>>>>>> `window_end` > >>>>>>>>>> column a time attribute column automatically? > >>>>>>>>>> - If not, what will be the time attribute of the result > >>>>> relation > >>>>>>>> of > >>>>>>>>>> these > >>>>>>>>>> TVFs? > >>>>>>>>>> Especially after the window aggregation. > >>>>>>>>>> - If yes, then how do we handle proctime? > >>>>>>>>>> > >>>>>>>>>> Regarding batch operators, > >>>>>>>>>> It's great to hear that we can reuse the batch operators in > >>>>>>>>> continuous > >>>>>>>>>> batch mode > >>>>>>>>>> as you mentioned in the FLIP. > >>>>>>>>>> Current window aggregate could also be used in batch mode > >>>> with > >>>>>>>>>> rowtime. Do > >>>>>>>>>> you plan > >>>>>>>>>> to support these TVFs for batch mode in this FLIP? Hence > >>>> the > >>>>>>>>> Table/SQL > >>>>>>>>>> is a > >>>>>>>>>> unified > >>>>>>>>>> API, it's great if we can keep the features complete both > >>>> in > >>>>>>>>> streaming > >>>>>>>>>> and > >>>>>>>>>> batch mode. > >>>>>>>>>> > >>>>>>>>>> There is one more question, I don't know whether it should > >>>> be > >>>>>>>>>> considered in > >>>>>>>>>> this FLIP. > >>>>>>>>>> Does the new window support `grouping sets`? (It's not > >>>>> supported > >>>>>>>> in > >>>>>>>>> old > >>>>>>>>>> window impl). > >>>>>>>>>> > >>>>>>>>>> Jark Wu <[hidden email]<mailto:[hidden email]>> > >>>>>>>> 于2020年10月9日周五 > >>>>>>>>> 下午4:14写道: > >>>>>>>>>> > >>>>>>>>>> > Hi all, > >>>>>>>>>> > > >>>>>>>>>> > I know we have a lot of discussion and development on > >>>> going > >>>>>>>> right > >>>>>>>>>> now but > >>>>>>>>>> > it would be great if we can get FLIP-145 into a votable > >>>>> state. > >>>>>>>>>> > If there are no objections, I would like to start voting > >>>> in > >>>>> the > >>>>>>>>> next > >>>>>>>>>> days. > >>>>>>>>>> > > >>>>>>>>>> > Best, > >>>>>>>>>> > Jark > >>>>>>>>>> > > >>>>>>>>>> > On Thu, 1 Oct 2020 at 14:29, Jark Wu <[hidden email] > >>>>> <mailto: > >>>>>>>>> [hidden email]>> wrote: > >>>>>>>>>> > > >>>>>>>>>> > > Hi everyone, > >>>>>>>>>> > > > >>>>>>>>>> > > I have added a section for Performance Optimization to > >>>>>>>> describe > >>>>>>>>>> how to > >>>>>>>>>> > > improve the performance in the short-term and long-term > >>>>>>>>>> > > and sketch the future performance potential under the > >>>> new > >>>>>>>> window > >>>>>>>>>> API. > >>>>>>>>>> > > Introducing the window API is just the first step, we > >>>> will > >>>>>>>>>> > > continuously improve the performance to make it > >>>> powerful > >>>>> and > >>>>>>>>>> useful. > >>>>>>>>>> > > > >>>>>>>>>> > > Best, > >>>>>>>>>> > > Jark > >>>>>>>>>> > > > >>>>>>>>>> > > On Thu, 1 Oct 2020 at 14:28, Jark Wu <[hidden email] > >>>>>>>> <mailto: > >>>>>>>>> [hidden email]>> wrote: > >>>>>>>>>> > > > >>>>>>>>>> > >> Hi Pengcheng, > >>>>>>>>>> > >> > >>>>>>>>>> > >> Yes, the window TVF is part of the FLIP. Welcome to > >>>>>>>> contribute > >>>>>>>>>> and join > >>>>>>>>>> > >> the discussion. > >>>>>>>>>> > >> Regarding the SESSION window aggregation, users can > >>>> use > >>>>> the > >>>>>>>>>> existing > >>>>>>>>>> > >> grouped session window function. > >>>>>>>>>> > >> > >>>>>>>>>> > >> Best, > >>>>>>>>>> > >> Jark > >>>>>>>>>> > >> > >>>>>>>>>> > >> On Sun, 27 Sep 2020 at 21:24, liupengcheng < > >>>>>>>>>> [hidden email]<mailto:[hidden email] > >>>>> > >>>>>>>>>> > > > >>>>>>>>>> > >> wrote: > >>>>>>>>>> > >> > >>>>>>>>>> > >>> Hi Jark, > >>>>>>>>>> > >>> Thanks for reply, yes, I think it's a good > >>>>>>>> feature, it > >>>>>>>>>> can > >>>>>>>>>> > >>> improve the NRT scenarios > >>>>>>>>>> > >>> as you mentioned in the FLIP. Also, I think > >>>> it > >>>>> can > >>>>>>>>>> improve the > >>>>>>>>>> > >>> streaming SQL greatly, > >>>>>>>>>> > >>> it can support richer window operations in > >>>> flink > >>>>>>>> SQL > >>>>>>>>> and > >>>>>>>>>> bring > >>>>>>>>>> > >>> great convenience to users. > >>>>>>>>>> > >>> (we are now only supported group window in > >>>>> flink). > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> Regarding the SESSION window, I think it's > >>>>>>>> especially > >>>>>>>>>> useful > >>>>>>>>>> > for > >>>>>>>>>> > >>> user behavior analysis(e.g. > >>>>>>>>>> > >>> counting user visits on a news website or > >>>> social > >>>>>>>>>> platform), but > >>>>>>>>>> > >>> I agree that we can keep it > >>>>>>>>>> > >>> out of the FLIP now to catch up 1.12. > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> Recently, I've done some work on the stream > >>>>> planner > >>>>>>>>> with > >>>>>>>>>> the > >>>>>>>>>> > >>> TVFs, and I'm willing to contribute > >>>>>>>>>> > >>> to this part. Is it in the plan of this FLIP? > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> Best, > >>>>>>>>>> > >>> PengchengLiu > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> 在 2020/9/26 下午11:09,“Jark Wu”<[hidden email] > >>>> <mailto: > >>>>>>>>> [hidden email]>> 写入: > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> Hi pengcheng, > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> That's great to see you also have the need of > >>>> window > >>>>>>>> join. > >>>>>>>>>> > >>> You are right, the windowing TVF is a powerful > >>>>> feature > >>>>>>>>> which > >>>>>>>>>> can > >>>>>>>>>> > >>> support > >>>>>>>>>> > >>> more operations in the future. > >>>>>>>>>> > >>> I think it as of the date time "partition" > >>>> selection > >>>>> in > >>>>>>>>>> batch SQL > >>>>>>>>>> > >>> jobs, > >>>>>>>>>> > >>> with this new syntax, I think it is possible > >>>>>>>>>> > >>> to migrate traditional batch SQL jobs to Flink > >>>> SQL > >>>>> by > >>>>>>>>>> changing a > >>>>>>>>>> > >>> few lines. > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> Regarding the SESSION window, this is on purpose > >>>> to > >>>>>>>> keep it > >>>>>>>>>> out of > >>>>>>>>>> > >>> the > >>>>>>>>>> > >>> FLIP, because we want to keep the > >>>>>>>>>> > >>> FLIP small to catch up 1.12 and SESSION TVF is > >>>> rarely > >>>>>>>>> useful > >>>>>>>>>> (e.g. > >>>>>>>>>> > >>> session > >>>>>>>>>> > >>> window join?). > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> Best, > >>>>>>>>>> > >>> Jark > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> On Fri, 25 Sep 2020 at 22:59, liupengcheng < > >>>>>>>>>> > >>> [hidden email]<mailto: > >>>>>>>> [hidden email] > >>>>>>>>>>> > >>>>>>>>>> > >>> wrote: > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > Hi, Jark, > >>>>>>>>>> > >>> > I'm very interested in this feature, > >>>> and > >>>>> I'm > >>>>>>>> also > >>>>>>>>>> working > >>>>>>>>>> > >>> on this > >>>>>>>>>> > >>> > recently. > >>>>>>>>>> > >>> > I just have a glance at the FLIP, it's > >>>>> good, > >>>>>>>> but > >>>>>>>>> I > >>>>>>>>>> found > >>>>>>>>>> > >>> that > >>>>>>>>>> > >>> > there is no plan to add SESSION windows. > >>>>>>>>>> > >>> > Also, I think there can be more things > >>>> we > >>>>>>>> can do > >>>>>>>>>> based on > >>>>>>>>>> > >>> this new > >>>>>>>>>> > >>> > syntax. For example, > >>>>>>>>>> > >>> > - window sort support > >>>>>>>>>> > >>> > - window union/intersect/minus support > >>>>>>>>>> > >>> > - Improve dimension table join > >>>>>>>>>> > >>> > We can have more deep discussion on > >>>> this > >>>>> new > >>>>>>>>>> feature > >>>>>>>>>> > later > >>>>>>>>>> > >>> . > >>>>>>>>>> > >>> > I've also opened an jira that is > >>>> related to > >>>>>>>> this > >>>>>>>>>> feature > >>>>>>>>>> > >>> recently: > >>>>>>>>>> > >>> > > >>>> https://issues.apache.org/jira/browse/FLINK-18830 > >>>>>>>>>> > >>> > > >>>>>>>>>> > >>> > Best! > >>>>>>>>>> > >>> > PengchengLiu > >>>>>>>>>> > >>> > > >>>>>>>>>> > >>> > 在 2020/9/25 下午10:30,“Jark Wu”< > >>>> [hidden email] > >>>>>>>> <mailto: > >>>>>>>>> [hidden email]>> 写入: > >>>>>>>>>> > >>> > > >>>>>>>>>> > >>> > Hi everyone, > >>>>>>>>>> > >>> > > >>>>>>>>>> > >>> > I want to start a FLIP about supporting > >>>>> windowing > >>>>>>>>>> > table-valued > >>>>>>>>>> > >>> > functions > >>>>>>>>>> > >>> > (TVF). > >>>>>>>>>> > >>> > The main purpose of this FLIP is to > >>>> improve the > >>>>>>>> near > >>>>>>>>>> > real-time > >>>>>>>>>> > >>> (NRT) > >>>>>>>>>> > >>> > experience of Flink. > >>>>>>>>>> > >>> > > >>>>>>>>>> > >>> > FLIP-145: > >>>>>>>>>> > >>> > > >>>>>>>>>> > >>> > > >>>>>>>>>> > >>> > >>>>>>>>>> > > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>> > >>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function > >>>>>>>>>> > >>> > > >>>>>>>>>> > >>> > We want to introduce TUMBLE, HOP, CUMULATE > >>>>>>>> windowing > >>>>>>>>>> TVFs, > >>>>>>>>>> > the > >>>>>>>>>> > >>> > CUMULATE is > >>>>>>>>>> > >>> > a new kind of window. > >>>>>>>>>> > >>> > With the windowing TVFs, we can support > >>>> richer > >>>>>>>>>> operations on > >>>>>>>>>> > >>> windows, > >>>>>>>>>> > >>> > including window join, window TopN and so > >>>> on. > >>>>>>>>>> > >>> > This makes things simple: we only need to > >>>>> assign > >>>>>>>>>> windows at > >>>>>>>>>> > the > >>>>>>>>>> > >>> > beginning > >>>>>>>>>> > >>> > of the query, and then apply operations > >>>> after > >>>>>>>> that > >>>>>>>>> like > >>>>>>>>>> > >>> traditional > >>>>>>>>>> > >>> > batch > >>>>>>>>>> > >>> > SQL. > >>>>>>>>>> > >>> > We hope it can help to reduce the learning > >>>>> curve > >>>>>>>> of > >>>>>>>>>> windows, > >>>>>>>>>> > >>> improve > >>>>>>>>>> > >>> > NRT > >>>>>>>>>> > >>> > for Flink, and attract more batch users. > >>>>>>>>>> > >>> > > >>>>>>>>>> > >>> > A simple code snippet for 10 minutes > >>>> tumbling > >>>>>>>> window > >>>>>>>>>> > aggregate: > >>>>>>>>>> > >>> > > >>>>>>>>>> > >>> > SELECT window_start, window_end, SUM(price) > >>>>>>>>>> > >>> > FROM TABLE( > >>>>>>>>>> > >>> > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), > >>>>>>>> INTERVAL > >>>>>>>>>> '10' > >>>>>>>>>> > >>> MINUTES)) > >>>>>>>>>> > >>> > GROUP BY window_start, window_end; > >>>>>>>>>> > >>> > > >>>>>>>>>> > >>> > I'm looking forward to your feedback. > >>>>>>>>>> > >>> > > >>>>>>>>>> > >>> > Best, > >>>>>>>>>> > >>> > Jark > >>>>>>>>>> > >>> > > >>>>>>>>>> > >>> > > >>>>>>>>>> > >>> > > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > >>>>>>>>>> > > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -- > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Benchao Li > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Benchao Li > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Benchao Li > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>> > >>>> > >>>> -- > >>>> Best, Jingsong Lee > >>>> > >>> > > > > |
Hi Jark,
I think we should discuss the SET vs ROW semantics again. The FLIP itself is not wrong but I find it a bit verbose to define a window via TUMBLE + grouping by window_start and window_end. This is the Beam approach and exposes how Flink window assigners work internally but do we want it in Flink SQL? PTFs allow us to define our own operators in SQL. We are not forced by the standard to do it as stated in the `One SQL to Rule it all` paper. We can align the SQL windows more towards our regular DataStream API windows, where you keyBy first and then apply a window operator. Similarly we could unify all our windows by requiring a PARTITION BY and maybe even an ORDER BY clause. Personally, I find this easier to explain to users than telling them the difference why a session window has SET semantic input tables and tumble/sliding have ROW semantic input tables. The ORDER BY would even allows us to declare the time attribute explicitly. Similar to MATCH_RECOGNIZE: Approach A) SELECT * FROM TABLE( Tumble ( data => TABLE InputTable PARTITION BY userId ORDER BY timestamp gap => INTERVAL '5' MINUTES ) ); SELECT * FROM TABLE( Session ( data => TABLE InputTable PARTITION BY userId ORDER BY timestamp gap => INTERVAL '5' MINUTES ) ); VS. Approach B) SELECT * FROM TABLE( Tumble ( data => TABLE InputTable timecol => DESCRIPTOR(timestamp) gap => INTERVAL '5' MINUTES ) ) GROUP BY userId, window_start, window_end; SELECT * FROM TABLE( Session ( data => TABLE InputTable PARTITION BY userId ORDER BY timestamp // or timecol => DESCRIPTOR(timestamp) ? gap => INTERVAL '5' MINUTES ) ) What do others think? Regards, Timo On 14.10.20 05:26, Jark Wu wrote: > Hi Timo, > > 1a) Deprecation: I'm fine with deprecating the old syntax and only use the > new one in all examples, docs, slides etc. > Will update the FLIP later. We can drop the old syntax at some point in the > future, but may need another discussion. > > 1b) Time attributes: Are you fine with the "window_time" column approach? > > 4) Semantics: The SQL standards propose two semantics: SET and ROW. They > are explained in the paper (Chapter 5.1.2): > > - Row semantics means that the result of the PTF is decided on a row-by-row > basis. > A table should be given row semantics if the PTF does not care how rows > are assigned to virtual processors. > - Set semantics means that the outcome of the function depends on how the > data is partitioned. > A table should be given set semantics if all rows of a partition should > be processed on the same virtual processor. > > A per-set table requires PARTITIONED BY clause. In examples of the papers, > UDjoin has two input tables with set semantics, > Pivot has an input table with row semantics, > > In our case, tumbling, sliding, and cumulative windows are all row semantic > PTFs, because the result (window_start, window_end) > id decided on a row-by-row basis, we don't need to partition data before > assigning these windows, they just act like pure UDTFs. > Therefore, we don't need the PARTITIONED BY clause for them. > > However, I think the session window is a set semantic PTF, because all data > of a user should be processed together (stateful processing). > So the syntax of session window should be : > >> SELECT * > FROM SESSION( > data => TABLE Bids PARTITIONED BY bidder, > timecol => DESCRIPTOR(bidtime), > gap => INTERVAL '5' MINUTES); > > However, the session window syntax in Calcite is: > >> SELECT * > FROM SESSION( > data => TABLE Bids, > timecol => DESCRIPTOR(bidtime), > keycols => DESCRIPTOR(bidder), > gap => INTERVAL '5' MINUTES); > > This is not correct and there is already a dicussion in Calcite [1]. > > What do you think? > > 5) Future: I'm also very excited about the potential ability of PTF. > As I can see, in the future, we can support the following features in SQL > in a standard PTF way. > - advanced operations supported in Table API (FLIP-29), e.g. drop_columns, > user-defined-table-aggregate > - user defined join operator > - a shortcut TopN function > - re-assign watermarks? > - re-partition data, similar feature to Hive DISTRIBUTED BY syntax. > - ... > > Best, > Jark > > [1]: > https://issues.apache.org/jira/browse/CALCITE-3780?focusedCommentId=17123472&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17123472 > > > On Tue, 13 Oct 2020 at 22:24, Timo Walther <[hidden email]> wrote: > >> Hi Jark, >> >> 1a) Deprecation: I totally agree that dropping old syntax is not very >> easy in SQL. That's why I am also extremely cautious about adding new >> syntax in this FLIP. However, I would already deprecate the old syntax >> and only use the new one in all examples, docs, slides etc. This >> hopefully gives us a chance to drop the old syntax at some point in the >> future. As far as I know, Calcite will drop support for the old syntax >> soon which means that we have maybe 2-3 Flink releases until we will >> need to either drop it as well or maintain custom code. >> >> 1b) Time attributes: I haven't noticed this limitation. We should >> definitely address this issue in the FLIP. Every time operation should >> be able to express the new rowtime. For MATCH_RECOGNIZE we use >> `MATCH_ROWTIME`, we could make this behavior similar and just use a >> helper UDF but I would prefer a similar approach to window_start and >> window_end. >> >> 3) Simplified syntax: Yes, after looking deeper into this topic, I see >> that it seems Oracle is not standard compliant here. So let's postpone >> this change. But I would be in favor of this syntax because the >> additional `TABLE()` keyword confuses users and even SQL experts. >> >> 4) Semantics: I read the summary of the SQL 2016 standard [1] again and >> I'm wondering if the key-ing semantics in the FLIP are correct using >> GROUP BY. The paper illustrates the following example: >> >> SELECT E.*, D.* >> FROM TABLE( >> UDJoin ( >> T1 => TABLE (Emp) AS E >> PARTITION BY Deptno, >> T2 => TABLE (Dept) AS D >> PARTITION BY Deptno >> ORDER BY Tstamp >> ) >> ) >> >> where the first PTF parameter is declared with `WITH SET SEMANTICS`. >> >> The paper further states: >> >> "WITH SET SEMANTICS is specified when the >> outcome of the function depends on how the data is >> partitioned. A table should be given set semantics if all >> rows of a partition should be processed on the same >> virtual processor." >> >> Isn't this exactly what we need for windows as well? Shouldn't we use >> the following syntax then: >> >> SELECT * >> FROM TABLE( >> Tumble ( >> data => >> TABLE (InputTable) >> PARTITION BY userId >> ORDER BY timestamp >> ) >> ) >> >> In the end all windows are just PTFs, maybe we should rather think about >> how we support PTFs in the near future. Because they would open an >> entire new set of use cases to SQL. The examples in chapter 12 of the >> 2016 standard ranging from `CSVreader` to `UDjoin` are impressive. >> >> Regards, >> Timo >> >> [1] >> >> https://www.researchgate.net/profile/Fred_Zemke/publication/329593276_The_new_and_improved_SQL2016_standard/links/5c17eb50a6fdcc494ffc5999/The-new-and-improved-SQL2016-standard.pdf >> >> On 13.10.20 12:42, Jark Wu wrote: >>> Hi everyone, >>> >>> Timo just raised a good point in the vote thread. I copied the feedback >>> here: >>> >>>> Timo: >>> 1) I think we should not offer 2 different kinds of syntax that do the >>> same thing. We should deprecate the old syntax. >>> 2) We should have session windows in the new syntax as well to give >> users >>> a complete migration path. >>> 3) We should investigate if we can remove the additional `FROM >>> TABLE(...)` syntax. As far as I see it in other examples from Oracle >>> 18c, the additional `TABLE()` syntax is not necessary anymore for >>> polymorphic table functions. >>> >>> >>> Here are my comments: >>> 1) I'm not sure about this. If we are going to drop the old syntax, this >>> will break lots of existing SQL jobs. >>> Upgrading SQL jobs is not as easy as Table API jobs, We should be as >>> cautious as possible for this. >>> >>> Besides, if we want to deprecate old syntax, we must provide equal >>> functionality first, >>> and the new syntax doesn't support propagate time attributes. A possible >>> solution can be to >>> generate one more column "window_time" for window TVFs. The value of >>> "window_time" would >>> always be "window_end - 1" and has the time attribute type. Users can >>> propagate the time attribute by >>> adding "window_time" to the "group by" and "join on" clauses with the >>> "window_start", "window_end" together. >>> >>> 2) I will add session window syntax to the FLIP later. >>> >>> 3) I like the simplified syntax "FROM tumble(input, rowtime, interval '1' >>> minute)". >>> However, the polymorphic table function syntax introduced in SQL standard >>> 2016 [1] requires >>> the TABLE() and DESCRIPTOR() syntax (see Chapter-8 “Invocation”). >>> Therefore, I think it's safe to support the standard syntax first, and >> can >>> explore whether we can >>> extend the syntax to make TABLE() and DESCRIPTOR() keywords optional. >>> Note that, Calcite parser currently doesn't support the simplified >> syntax, >>> and this definitely needs >>> to be discussed in the Calcite community. Actually, there has been a >>> discussion about this [2], and Julian said: >>> >>>> Standard SQL doesn’t allow functions in the FROM clause. I think it’s >>> because tables and functions are in >>> different namespaces (and therefore there could be a table and a >> function >>> with the same name). >>> So you need to use the TABLE keyword to indicate that you are using a >>> function as a table. >>> >>> Best, >>> Jark >>> >>> [1]: >>> >> https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip >>> [2]: >>> >> https://lists.apache.org/x/thread.html/4a91632b1c780ef9d67311f90fce626582faae7d30a134a768c3d324@%3Cdev.calcite.apache.org%3E >>> >>> On Sat, 10 Oct 2020 at 17:59, Jark Wu <[hidden email]> wrote: >>> >>>> Hi everyone, >>>> >>>> Thanks everyone for this healthy discussion. I think we have addressed >> all >>>> the concerns. I would continue with a voting. >>>> If you have any new objections, feel free to let me know. >>>> >>>> Best, >>>> Jark >>>> >>>> On Sat, 10 Oct 2020 at 17:54, Jark Wu <[hidden email]> wrote: >>>> >>>>> Hi Jingsong, >>>>> >>>>> That's a good question. I did have searched a lot and didn't find any >>>>> system that provides such an out-of-box function. >>>>> I guess the reason is that in the traditional batch systems, this >> feature >>>>> is supported by the over window and they don't need to invent a >>>>> new function/syntax for this. >>>>> For streaming systems, we are the first one to propose this new window. >>>>> >>>>> However, I think CUMULATE is a good name. Because almost all the >>>>> databases call such scenarios as "cumulative window", e.g. >> Snowflake[1], >>>>> SQL Server [2], Postgres [3]. >>>>> Thus we choose "cumulative" as the base name, but use the verb form >>>>> "cumulate" because other window function names are also verbs, e.g. >> tumble, >>>>> hop. >>>>> >>>>> I hope this can address your concern. >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> [1]: >>>>> >> https://docs.snowflake.com/en/sql-reference/functions-analytic.html#cumulative-window-frame-examples >>>>> [2]: >>>>> >> https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql?view=sql-server-ver15#c-producing-a-moving-average-and-cumulative-total >>>>> [3]: >>>>> >> https://popsql.com/learn-sql/postgresql/how-to-calculate-cumulative-sum-running-total-in-postgresql >>>>> >>>>> On Sat, 10 Oct 2020 at 17:26, Jingsong Li <[hidden email]> >> wrote: >>>>> >>>>>> +1 for voting. Thanks Jark for driving. >>>>>> >>>>>> +1 for TVF, It has been put forward by theory and supported by >> calcite. >>>>>> It >>>>>> will greatly enhance the window related operations. >>>>>> >>>>>> My personal feeling is that after TVF, the following operations can be >>>>>> similar to the traditional batch SQL, as long as the window related >>>>>> attributes are included in the key. >>>>>> >>>>>> I am not sure about the CUMULATE window, yes, It's a common >> requirement, >>>>>> Is >>>>>> there any more evidence (other systems) to prove this word >> ("CUMULATE") >>>>>> is >>>>>> appropriate. >>>>>> >>>>>> Best, >>>>>> Jingsong >>>>>> >>>>>> On Sat, Oct 10, 2020 at 3:43 PM Jark Wu <[hidden email]> wrote: >>>>>> >>>>>>> Hi Pengcheng, >>>>>>> >>>>>>> IIUC, the "stream operators" you mean is the non-time operators or >>>>>> called >>>>>>> regular operators, such as regular join, regular aggregate. >>>>>>> But you may misunderstand me, only the time operators can't be >> applied >>>>>>> after the new window operators, because of missing time attributes. >>>>>>> The regular operators can still be applied after the new window >>>>>> operators. >>>>>>> >>>>>>> Regarding using window TVFs to re-assign event-time and watermarks, >>>>>> I'm not >>>>>>> sure about this. >>>>>>> Because assigning watermark requires to define the watermark >> strategy, >>>>>>> however, the window TVF doesn't provide such ability. >>>>>>> Polymorphic table functions are table functions which just append >>>>>>> additional columns and convert N rows into M rows, it can't touch >> meta >>>>>>> information. >>>>>>> >>>>>>> Best, >>>>>>> Jark >>>>>>> >>>>>>> On Sat, 10 Oct 2020 at 15:41, Jark Wu <[hidden email]> wrote: >>>>>>> >>>>>>>> Hi Danny, >>>>>>>> >>>>>>>> Thanks for the hint about named params syntax, I added examples with >>>>>>> named >>>>>>>> params in the FLIP. >>>>>>>> >>>>>>>> Best, >>>>>>>> Jark >>>>>>>> >>>>>>>> >>>>>>>> On Sat, 10 Oct 2020 at 15:03, Pengcheng Liu < >>>>>> [hidden email] >>>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, Jark, >>>>>>>>> >>>>>>>>> I've got some different opinions there, I think it's a very >>>>>> common >>>>>>> use >>>>>>>>> case to use >>>>>>>>> window operators in combination with streaming operators(even >>>>>> those >>>>>>>>> time operators). >>>>>>>>> (e.g. for some tables, users only care data within a period, >> but >>>>>> for >>>>>>>>> other tables, they may >>>>>>>>> want the whole historical data). >>>>>>>>> The pipeline may looks like this: >>>>>>>>> window join -> dimension table join -> stream aggregate -> >> stream >>>>>>> sort >>>>>>>>> >>>>>>>>> Just as what you said, the key clause can be used to >> distinguish >>>>>>>>> whether a operator should >>>>>>>>> be translated to a window operator or a streaming operator. >>>>>>>>> >>>>>>>>> Also, as I've mentioned before, 1) for time operator after >> window >>>>>>>>> aggregation, the auxiliary function >>>>>>>>> which is used to access time attribute column can be actually >>>>>>> replaced >>>>>>>>> with (window_end -1). >>>>>>>>> Actually, we only just need to make the results of the upstream >>>>>>>>> contains a time column whose >>>>>>>>> range is within (window_start, window_end), and thus the >>>>>> downstream >>>>>>>>> time operators can work on it >>>>>>>>> (driving by the original watermark in the source). 2) for time >>>>>>>>> operator after other window operators, >>>>>>>>> the downstream time operators can access the time column >> directly >>>>>>> from >>>>>>>>> it's input. >>>>>>>>> >>>>>>>>> One more thoughts there, maybe the window TVFs can re-assign >>>>>>>>> timestamps and watermarks, so >>>>>>>>> that in some case when the watermark can not be retrieved from >>>>>> source >>>>>>>>> directly(may needs some >>>>>>>>> conversions), the watermark can still be assigned dynamically >> in >>>>>> the >>>>>>>>> SQL(use the time column as >>>>>>>>> the watermark column) and thus make it work. I think this can >>>>>> save >>>>>>>>> much time to revise the event >>>>>>>>> time column in some cases(this is a real demand in our >> production >>>>>>>>> environment). >>>>>>>>> >>>>>>>>> I strongly suggest that we should support the combination usage >>>>>> of >>>>>>>>> window operators and >>>>>>>>> streaming operators. And I think we can achieve this with >> little >>>>>>> work. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Pengcheng >>>>>>>>> >>>>>>>>> >>>>>>>>> Jark Wu <[hidden email]> 于2020年10月10日周六 下午1:45写道: >>>>>>>>> >>>>>>>>>> Hi Benchao, >>>>>>>>>> >>>>>>>>>> That's a good question. >>>>>>>>>> >>>>>>>>>> IMO, the new windowed operators and the current time operators are >>>>>> two >>>>>>>>>> different sets of functions, >>>>>>>>>> just like time operators and non-time operators are two different >>>>>> sets >>>>>>> of >>>>>>>>>> functions. >>>>>>>>>> I think it's fine if we don't support integrating them, just like >>>>>> time >>>>>>>>>> operators can't be applied on non-windowed aggregate. >>>>>>>>>> If users want to use time operators in the whole pipeline, then >>>>>> he/she >>>>>>>>>> can >>>>>>>>>> use the grouped window aggregates instead of the window TVFs. >>>>>>>>>> >>>>>>>>>> The key idea of window TVF is that all the operators in the >>>>>> pipeline >>>>>>> are >>>>>>>>>> based on the **windows**. >>>>>>>>>> In terms of syntax, if the key clause (e.g. group by, partitioned >>>>>> by, >>>>>>>>>> join >>>>>>>>>> on, order by) contains window_start and window_end, >>>>>>>>>> it can be translated into windowed operators. >>>>>>>>>> Thus, we will have windowed CEP, windowed sort, windowed over >>>>>> aggregate >>>>>>>>>> in >>>>>>>>>> the future to make it possible to build a windowed pipeline. >>>>>>>>>> >>>>>>>>>> But I think we can elaborate the integration more in the future if >>>>>>> users >>>>>>>>>> need it. Actually, I don't fully understand the scenario of >>>>>> integrating >>>>>>>>>> window TVF and time operators at this point. >>>>>>>>>> For example, interval join an input stream and a window join >>>>>> result. I >>>>>>>>>> don't see why it can't be expressed by nested window join and why >>>>>> users >>>>>>>>>> have to use interval join here. >>>>>>>>>> Maybe we can wait for more inputs from users when the window TVF >> is >>>>>>>>>> released and we can elaborate it again. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Jark >>>>>>>>>> >>>>>>>>>> On Sat, 10 Oct 2020 at 12:01, 刘 芃成 <[hidden email]> >>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, Benchao, >>>>>>>>>>> I think I got your point, actually, in current >>>>>> implementation >>>>>>>>>> for >>>>>>>>>>> group window aggregation, the value of time attributes(e.g. >>>>>>>>>>> TUMBLE_ROWTIME/TUMBLE_PROCTIME) is calculated as (window_end – >>>>>> 1), >>>>>>> so I >>>>>>>>>>> think we can just use it directly if you need this. But I think >>>>>> this >>>>>>>>>> time >>>>>>>>>>> attributes is mainly suggested to use in case of cascaded window >>>>>>>>>> operations. >>>>>>>>>>> Regarding the example you provided, I think the semantics of the >>>>>> SQL >>>>>>> in >>>>>>>>>>> your example which doing interval join(e.g. with TUMBLE_ROWTIME) >>>>>>> after >>>>>>>>>>> window aggregation is not clear in the current implementation, >>>>>> and I >>>>>>>>>> think >>>>>>>>>>> that’s a strong reason why we need the new TVFs syntax. >>>>>>>>>>> With the new syntax, users should understand which time >>>>>> column >>>>>>> to >>>>>>>>>>> use and how to generate it when doing interval join and etc. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Pengcheng >>>>>>>>>>> >>>>>>>>>>> 发件人: Benchao Li <[hidden email]> >>>>>>>>>>> 日期: 2020年10月10日 星期六 上午11:02 >>>>>>>>>>> 收件人: pengcheng Liu <[hidden email]> >>>>>>>>>>> 抄送: dev <[hidden email]> >>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-145: Support SQL windowing table-valued >>>>>>> function >>>>>>>>>>> >>>>>>>>>>> Hi pengcheng, >>>>>>>>>>> >>>>>>>>>>> Thanks for your response. >>>>>>>>>>> I knew that the original time attribute column will be retained >>>>>> after >>>>>>>>>> the >>>>>>>>>>> TVF, >>>>>>>>>>> what I'm questioning is how do we get the time attribute column >>>>>> after >>>>>>>>>>> Aggregation. >>>>>>>>>>> Your answer did not remove my doubts about this. >>>>>>>>>>> >>>>>>>>>>> It's ok if we did not plan to integrate new TVF aggregate with >>>>>> old >>>>>>>>>> "time >>>>>>>>>>> attribute scenarios" >>>>>>>>>>> listed in my previous email in this FLIP. However it's good to >>>>>>>>>> elaborate >>>>>>>>>>> leave it to the future plan. >>>>>>>>>>> >>>>>>>>>>> pengcheng Liu <[hidden email]<mailto: >>>>>>>>>>> [hidden email]>> 于2020年10月10日周六 上午10:45写道: >>>>>>>>>>> Hi,Benchao, >>>>>>>>>>> In TVFs, the time attributes is just passed through from >>>>>> parent >>>>>>>>>> rels, >>>>>>>>>>> and the TVFs just add two >>>>>>>>>>> additional window attributes(i.e. window_start & >> window_end). >>>>>>>>>> Also, I >>>>>>>>>>> think the time columns can be not only a time attribute >>>>>>>>>>> with type of `TimeIndicatorType` but also a regular column >>>>>> with >>>>>>>>>> type >>>>>>>>>>> of `Timestamp`. >>>>>>>>>>> >>>>>>>>>>> For cascaded window operations, we can use >>>>>>> window_start/window_end >>>>>>>>>> of >>>>>>>>>>> the previous window result directly to >>>>>>>>>>> indicate operating on the same window, or use new >> DESCRIPTOR >>>>>>>>>> column >>>>>>>>>>> to assign new windows, in case of the change of >>>>>>>>>>> the time column(e.g. in some case, the original timestamp is >>>>>>>>>>> inaccurate and need some conversion to be used). >>>>>>>>>>> >>>>>>>>>>> You can check the definition or signature of these TVFs in >>>>>> the >>>>>>>>>> FLIP. >>>>>>>>>>> e.g. >>>>>>>>>>> SELECT * FROM TABLE( >>>>>>>>>>> TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' >> MINUTES)) >>>>>>>>>>> In the example, the `bidtime` is the time attribute column, >>>>>> which >>>>>>>>>> is >>>>>>>>>>> the first operand of the DESCRIPTOR function. >>>>>>>>>>> >>>>>>>>>>> +1 start voting. >>>>>>>>>>> >>>>>>>>>>> Benchao Li <[hidden email]<mailto:[hidden email]>> >>>>>>>>>>> 于2020年10月10日周六 上午10:08写道: >>>>>>>>>>> Hi Jark, >>>>>>>>>>> >>>>>>>>>>> 2 & 3 sounds good to me. >>>>>>>>>>> >>>>>>>>>>> Regarding time attribute, >>>>>>>>>>> I still have some questions, I knew it's easy to support cascaded >>>>>>>>>> window >>>>>>>>>>> aggregate using new TVFs. >>>>>>>>>>> However there are some other places where need time attribute: >>>>>>>>>>> - CEP >>>>>>>>>>> - interval join >>>>>>>>>>> - order by >>>>>>>>>>> - over window >>>>>>>>>>> If there is no time attribute column, how do we integrate these >>>>>> old >>>>>>>>>>> features with the new TVFs. >>>>>>>>>>> E.g. >>>>>>>>>>> StreamA -> new window aggregate -> interval join -> Sink >>>>>>>>>>> / >>>>>>>>>>> StreamB ----------------------------------- >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Jark Wu <[hidden email]<mailto:[hidden email]>> >>>>>> 于2020年10月9日周五 >>>>>>>>>>> 下午11:51写道: >>>>>>>>>>> Hi Benchao, >>>>>>>>>>> >>>>>>>>>>> 1) time attribute >>>>>>>>>>> Yes. We don't need time attribute auxiliary function. Because >>>>>> the new >>>>>>>>>>> window operations are all based on the >>>>>>>>>>> window_start and window_end columns instead of on the time >>>>>>>>>> attributes. So >>>>>>>>>>> we don't need to propagate time attributes. >>>>>>>>>>> Cascaded window aggregate can be expressed by simply GROUP BY the >>>>>>>>>>> window_start and window_end of the previous window result. >>>>>>>>>>> I have added a cascaded window aggregate example in the Tumbling >>>>>>> Window >>>>>>>>>>> section in the FLIP. >>>>>>>>>>> If you want to define proctime window aggregate, the time column >>>>>> in >>>>>>> TVF >>>>>>>>>>> should be a proctime attribute field (or PROCTIME() function). >>>>>>>>>>> >>>>>>>>>>> 2) batch support >>>>>>>>>>> Yes. The proposed syntax/API are unified for batch and streaming. >>>>>>> Batch >>>>>>>>>>> support is in the plan, but may not have enough time to catch up >>>>>>> 1.12. >>>>>>>>>>> >>>>>>>>>>> 3) support `grouping sets` >>>>>>>>>>> This is not included in the FLIP, but I think it's great if we >>>>>> can >>>>>>>>>> support >>>>>>>>>>> `grouping sets`. >>>>>>>>>>> The existing window impl doesn't support this because we convert >>>>>> the >>>>>>>>>>> LogicalAggregate into WindowAggregate in the beginning, >>>>>>>>>>> the expand grouping sets rule can't be applied in this situation. >>>>>>>>>>> Fortunately, with the new window impl, the conversion to >>>>>>>>>> WindowAggregate >>>>>>>>>>> will happen at the end, so I think the expand rule can be >>>>>>>>>>> applied and support this feature naturally. >>>>>>>>>>> Therefore, IMO, we don't need to include this feature in this >>>>>> FLIP to >>>>>>>>>> avoid >>>>>>>>>>> the FLIP being too large. >>>>>>>>>>> This can be a follow-up issue (maybe just add tests and docs) >>>>>> after >>>>>>> the >>>>>>>>>>> FLIP. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Jark >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, 9 Oct 2020 at 19:09, 刘 芃成 <[hidden email] >>>>>>> <mailto: >>>>>>>>>>> [hidden email]>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi,Benchao, >>>>>>>>>>>> Welcome to join the discussion, yes, this new syntax >>>>>> can >>>>>>>>>> make SQL >>>>>>>>>>>> more clear and simpler. >>>>>>>>>>>> For your first question, the `window_start` and >>>>>>> `window_end` >>>>>>>>>>>> columns will be added automatically, >>>>>>>>>>>> so we don't need to use auxiliary group functions to >>>>>> infer >>>>>>> or >>>>>>>>>>>> access the window properties. >>>>>>>>>>>> >>>>>>>>>>>> For the `grouping sets` on TVFs, I think it's >>>>>> interesting >>>>>>> if >>>>>>>>>> we >>>>>>>>>>>> can support it, as we already supported `grouping sets` >>>>>>>>>>>> on streaming aggregates in blink planner. But I'm not >>>>>> sure >>>>>>>>>> if it >>>>>>>>>>>> will be included into this FLIP. >>>>>>>>>>>> >>>>>>>>>>>> cc @Jark Wu >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Pengcheng >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 在 2020/10/9 下午5:25,“Benchao Li”<[hidden email]<mailto: >>>>>>>>>>> [hidden email]>> 写入: >>>>>>>>>>>> >>>>>>>>>>>> Thanks Jark for bringing this discussion, I like this FLIP >>>>>> very >>>>>>>>>> much. >>>>>>>>>>>> >>>>>>>>>>>> Especially the cumulate window, it's much like the current >>>>>>> TUMBLE >>>>>>>>>>>> window + >>>>>>>>>>>> Fast Emit (which is an undocumented experimental feature), >>>>>>>>>> however, >>>>>>>>>>>> it's >>>>>>>>>>>> more powerful. >>>>>>>>>>>> >>>>>>>>>>>> And This will make the SQL semantic more standard, >>>>>> especially >>>>>>>>>> for the >>>>>>>>>>>> HOPPING window. >>>>>>>>>>>> >>>>>>>>>>>> Regarding time attribute, >>>>>>>>>>>> It seems that we don't need a specific function to infer >>>>>> the >>>>>>> time >>>>>>>>>>>> attribute >>>>>>>>>>>> like >>>>>>>>>>>> `TUMBLE_ROWTIME` / `TUMBLE_PROCTIME`. Then are >>>>>> `window_start` >>>>>>> and >>>>>>>>>>>> `window_end` >>>>>>>>>>>> column a time attribute column automatically? >>>>>>>>>>>> - If not, what will be the time attribute of the result >>>>>>> relation >>>>>>>>>> of >>>>>>>>>>>> these >>>>>>>>>>>> TVFs? >>>>>>>>>>>> Especially after the window aggregation. >>>>>>>>>>>> - If yes, then how do we handle proctime? >>>>>>>>>>>> >>>>>>>>>>>> Regarding batch operators, >>>>>>>>>>>> It's great to hear that we can reuse the batch operators in >>>>>>>>>>> continuous >>>>>>>>>>>> batch mode >>>>>>>>>>>> as you mentioned in the FLIP. >>>>>>>>>>>> Current window aggregate could also be used in batch mode >>>>>> with >>>>>>>>>>>> rowtime. Do >>>>>>>>>>>> you plan >>>>>>>>>>>> to support these TVFs for batch mode in this FLIP? Hence >>>>>> the >>>>>>>>>>> Table/SQL >>>>>>>>>>>> is a >>>>>>>>>>>> unified >>>>>>>>>>>> API, it's great if we can keep the features complete both >>>>>> in >>>>>>>>>>> streaming >>>>>>>>>>>> and >>>>>>>>>>>> batch mode. >>>>>>>>>>>> >>>>>>>>>>>> There is one more question, I don't know whether it should >>>>>> be >>>>>>>>>>>> considered in >>>>>>>>>>>> this FLIP. >>>>>>>>>>>> Does the new window support `grouping sets`? (It's not >>>>>>> supported >>>>>>>>>> in >>>>>>>>>>> old >>>>>>>>>>>> window impl). >>>>>>>>>>>> >>>>>>>>>>>> Jark Wu <[hidden email]<mailto:[hidden email]>> >>>>>>>>>> 于2020年10月9日周五 >>>>>>>>>>> 下午4:14写道: >>>>>>>>>>>> >>>>>>>>>>>> > Hi all, >>>>>>>>>>>> > >>>>>>>>>>>> > I know we have a lot of discussion and development on >>>>>> going >>>>>>>>>> right >>>>>>>>>>>> now but >>>>>>>>>>>> > it would be great if we can get FLIP-145 into a votable >>>>>>> state. >>>>>>>>>>>> > If there are no objections, I would like to start voting >>>>>> in >>>>>>> the >>>>>>>>>>> next >>>>>>>>>>>> days. >>>>>>>>>>>> > >>>>>>>>>>>> > Best, >>>>>>>>>>>> > Jark >>>>>>>>>>>> > >>>>>>>>>>>> > On Thu, 1 Oct 2020 at 14:29, Jark Wu <[hidden email] >>>>>>> <mailto: >>>>>>>>>>> [hidden email]>> wrote: >>>>>>>>>>>> > >>>>>>>>>>>> > > Hi everyone, >>>>>>>>>>>> > > >>>>>>>>>>>> > > I have added a section for Performance Optimization to >>>>>>>>>> describe >>>>>>>>>>>> how to >>>>>>>>>>>> > > improve the performance in the short-term and long-term >>>>>>>>>>>> > > and sketch the future performance potential under the >>>>>> new >>>>>>>>>> window >>>>>>>>>>>> API. >>>>>>>>>>>> > > Introducing the window API is just the first step, we >>>>>> will >>>>>>>>>>>> > > continuously improve the performance to make it >>>>>> powerful >>>>>>> and >>>>>>>>>>>> useful. >>>>>>>>>>>> > > >>>>>>>>>>>> > > Best, >>>>>>>>>>>> > > Jark >>>>>>>>>>>> > > >>>>>>>>>>>> > > On Thu, 1 Oct 2020 at 14:28, Jark Wu <[hidden email] >>>>>>>>>> <mailto: >>>>>>>>>>> [hidden email]>> wrote: >>>>>>>>>>>> > > >>>>>>>>>>>> > >> Hi Pengcheng, >>>>>>>>>>>> > >> >>>>>>>>>>>> > >> Yes, the window TVF is part of the FLIP. Welcome to >>>>>>>>>> contribute >>>>>>>>>>>> and join >>>>>>>>>>>> > >> the discussion. >>>>>>>>>>>> > >> Regarding the SESSION window aggregation, users can >>>>>> use >>>>>>> the >>>>>>>>>>>> existing >>>>>>>>>>>> > >> grouped session window function. >>>>>>>>>>>> > >> >>>>>>>>>>>> > >> Best, >>>>>>>>>>>> > >> Jark >>>>>>>>>>>> > >> >>>>>>>>>>>> > >> On Sun, 27 Sep 2020 at 21:24, liupengcheng < >>>>>>>>>>>> [hidden email]<mailto:[hidden email] >>>>>>> >>>>>>>>>>>> > > >>>>>>>>>>>> > >> wrote: >>>>>>>>>>>> > >> >>>>>>>>>>>> > >>> Hi Jark, >>>>>>>>>>>> > >>> Thanks for reply, yes, I think it's a good >>>>>>>>>> feature, it >>>>>>>>>>>> can >>>>>>>>>>>> > >>> improve the NRT scenarios >>>>>>>>>>>> > >>> as you mentioned in the FLIP. Also, I think >>>>>> it >>>>>>> can >>>>>>>>>>>> improve the >>>>>>>>>>>> > >>> streaming SQL greatly, >>>>>>>>>>>> > >>> it can support richer window operations in >>>>>> flink >>>>>>>>>> SQL >>>>>>>>>>> and >>>>>>>>>>>> bring >>>>>>>>>>>> > >>> great convenience to users. >>>>>>>>>>>> > >>> (we are now only supported group window in >>>>>>> flink). >>>>>>>>>>>> > >>> >>>>>>>>>>>> > >>> Regarding the SESSION window, I think it's >>>>>>>>>> especially >>>>>>>>>>>> useful >>>>>>>>>>>> > for >>>>>>>>>>>> > >>> user behavior analysis(e.g. >>>>>>>>>>>> > >>> counting user visits on a news website or >>>>>> social >>>>>>>>>>>> platform), but >>>>>>>>>>>> > >>> I agree that we can keep it >>>>>>>>>>>> > >>> out of the FLIP now to catch up 1.12. >>>>>>>>>>>> > >>> >>>>>>>>>>>> > >>> Recently, I've done some work on the stream >>>>>>> planner >>>>>>>>>>> with >>>>>>>>>>>> the >>>>>>>>>>>> > >>> TVFs, and I'm willing to contribute >>>>>>>>>>>> > >>> to this part. Is it in the plan of this FLIP? >>>>>>>>>>>> > >>> >>>>>>>>>>>> > >>> Best, >>>>>>>>>>>> > >>> PengchengLiu >>>>>>>>>>>> > >>> >>>>>>>>>>>> > >>> >>>>>>>>>>>> > >>> 在 2020/9/26 下午11:09,“Jark Wu”<[hidden email] >>>>>> <mailto: >>>>>>>>>>> [hidden email]>> 写入: >>>>>>>>>>>> > >>> >>>>>>>>>>>> > >>> Hi pengcheng, >>>>>>>>>>>> > >>> >>>>>>>>>>>> > >>> That's great to see you also have the need of >>>>>> window >>>>>>>>>> join. >>>>>>>>>>>> > >>> You are right, the windowing TVF is a powerful >>>>>>> feature >>>>>>>>>>> which >>>>>>>>>>>> can >>>>>>>>>>>> > >>> support >>>>>>>>>>>> > >>> more operations in the future. >>>>>>>>>>>> > >>> I think it as of the date time "partition" >>>>>> selection >>>>>>> in >>>>>>>>>>>> batch SQL >>>>>>>>>>>> > >>> jobs, >>>>>>>>>>>> > >>> with this new syntax, I think it is possible >>>>>>>>>>>> > >>> to migrate traditional batch SQL jobs to Flink >>>>>> SQL >>>>>>> by >>>>>>>>>>>> changing a >>>>>>>>>>>> > >>> few lines. >>>>>>>>>>>> > >>> >>>>>>>>>>>> > >>> Regarding the SESSION window, this is on purpose >>>>>> to >>>>>>>>>> keep it >>>>>>>>>>>> out of >>>>>>>>>>>> > >>> the >>>>>>>>>>>> > >>> FLIP, because we want to keep the >>>>>>>>>>>> > >>> FLIP small to catch up 1.12 and SESSION TVF is >>>>>> rarely >>>>>>>>>>> useful >>>>>>>>>>>> (e.g. >>>>>>>>>>>> > >>> session >>>>>>>>>>>> > >>> window join?). >>>>>>>>>>>> > >>> >>>>>>>>>>>> > >>> Best, >>>>>>>>>>>> > >>> Jark >>>>>>>>>>>> > >>> >>>>>>>>>>>> > >>> On Fri, 25 Sep 2020 at 22:59, liupengcheng < >>>>>>>>>>>> > >>> [hidden email]<mailto: >>>>>>>>>> [hidden email] >>>>>>>>>>>>> >>>>>>>>>>>> > >>> wrote: >>>>>>>>>>>> > >>> >>>>>>>>>>>> > >>> > Hi, Jark, >>>>>>>>>>>> > >>> > I'm very interested in this feature, >>>>>> and >>>>>>> I'm >>>>>>>>>> also >>>>>>>>>>>> working >>>>>>>>>>>> > >>> on this >>>>>>>>>>>> > >>> > recently. >>>>>>>>>>>> > >>> > I just have a glance at the FLIP, it's >>>>>>> good, >>>>>>>>>> but >>>>>>>>>>> I >>>>>>>>>>>> found >>>>>>>>>>>> > >>> that >>>>>>>>>>>> > >>> > there is no plan to add SESSION windows. >>>>>>>>>>>> > >>> > Also, I think there can be more things >>>>>> we >>>>>>>>>> can do >>>>>>>>>>>> based on >>>>>>>>>>>> > >>> this new >>>>>>>>>>>> > >>> > syntax. For example, >>>>>>>>>>>> > >>> > - window sort support >>>>>>>>>>>> > >>> > - window union/intersect/minus support >>>>>>>>>>>> > >>> > - Improve dimension table join >>>>>>>>>>>> > >>> > We can have more deep discussion on >>>>>> this >>>>>>> new >>>>>>>>>>>> feature >>>>>>>>>>>> > later >>>>>>>>>>>> > >>> . >>>>>>>>>>>> > >>> > I've also opened an jira that is >>>>>> related to >>>>>>>>>> this >>>>>>>>>>>> feature >>>>>>>>>>>> > >>> recently: >>>>>>>>>>>> > >>> > >>>>>> https://issues.apache.org/jira/browse/FLINK-18830 >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > Best! >>>>>>>>>>>> > >>> > PengchengLiu >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > 在 2020/9/25 下午10:30,“Jark Wu”< >>>>>> [hidden email] >>>>>>>>>> <mailto: >>>>>>>>>>> [hidden email]>> 写入: >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > Hi everyone, >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > I want to start a FLIP about supporting >>>>>>> windowing >>>>>>>>>>>> > table-valued >>>>>>>>>>>> > >>> > functions >>>>>>>>>>>> > >>> > (TVF). >>>>>>>>>>>> > >>> > The main purpose of this FLIP is to >>>>>> improve the >>>>>>>>>> near >>>>>>>>>>>> > real-time >>>>>>>>>>>> > >>> (NRT) >>>>>>>>>>>> > >>> > experience of Flink. >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > FLIP-145: >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> >>>>>>>>>>>> > >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>> >>>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > We want to introduce TUMBLE, HOP, CUMULATE >>>>>>>>>> windowing >>>>>>>>>>>> TVFs, >>>>>>>>>>>> > the >>>>>>>>>>>> > >>> > CUMULATE is >>>>>>>>>>>> > >>> > a new kind of window. >>>>>>>>>>>> > >>> > With the windowing TVFs, we can support >>>>>> richer >>>>>>>>>>>> operations on >>>>>>>>>>>> > >>> windows, >>>>>>>>>>>> > >>> > including window join, window TopN and so >>>>>> on. >>>>>>>>>>>> > >>> > This makes things simple: we only need to >>>>>>> assign >>>>>>>>>>>> windows at >>>>>>>>>>>> > the >>>>>>>>>>>> > >>> > beginning >>>>>>>>>>>> > >>> > of the query, and then apply operations >>>>>> after >>>>>>>>>> that >>>>>>>>>>> like >>>>>>>>>>>> > >>> traditional >>>>>>>>>>>> > >>> > batch >>>>>>>>>>>> > >>> > SQL. >>>>>>>>>>>> > >>> > We hope it can help to reduce the learning >>>>>>> curve >>>>>>>>>> of >>>>>>>>>>>> windows, >>>>>>>>>>>> > >>> improve >>>>>>>>>>>> > >>> > NRT >>>>>>>>>>>> > >>> > for Flink, and attract more batch users. >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > A simple code snippet for 10 minutes >>>>>> tumbling >>>>>>>>>> window >>>>>>>>>>>> > aggregate: >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > SELECT window_start, window_end, SUM(price) >>>>>>>>>>>> > >>> > FROM TABLE( >>>>>>>>>>>> > >>> > TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), >>>>>>>>>> INTERVAL >>>>>>>>>>>> '10' >>>>>>>>>>>> > >>> MINUTES)) >>>>>>>>>>>> > >>> > GROUP BY window_start, window_end; >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > I'm looking forward to your feedback. >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > Best, >>>>>>>>>>>> > >>> > Jark >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> >>>>>>>>>>>> > >>> >>>>>>>>>>>> > >>> >>>>>>>>>>>> > >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Benchao Li >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Benchao Li >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Benchao Li >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best, Jingsong Lee >>>>>> >>>>> >>> >> >> > |
Hi Timo,
I still believe a tumbling window assigner doesn't need to be partitioned first, it just behaves like a map function. The window_start and window_end is calculated from the timestamp on input row, not depending on the partitioned data. I'm very confused about what PARTITION BY fields should the users specify and what's the "userId" used for in the TUMBLE PTF? What's the relationship between the "PARTITION BY" fields and the following "GROUP BY" or "JOIN ON" fields? Should they be the same? For example, the following code is confusing: SELECT itemId, window_start, window_end, count(distinct userId) FROM TABLE( Tumble ( data => TABLE InputTable PARTITION BY userId ORDER BY timestamp size => INTERVAL '5' MINUTES ) ) GROUP BY itemId, window_start, window_end; Besides, a windowed table should be able to be grouped/joined on different fields. For example, I guess the following code can't be expressed if we force a PARTITION BY clause, this is also the example [1] I gave in FLIP. CREATE VIEW tumbled_view AS SELECT * FROM TABLE( Tumble ( data => TABLE orders timecol => DESCRIPTOR(timestamp) gap => INTERVAL '5' MINUTES ) ); INSERT INTO sellers_5min SELECT seller_id, window_start, window_end, COUNT(DISTINCT user_id) as slr_buyer_cnt FROM tumbled_view GROUP BY seller_id, window_start, window_end; INSERT INTO items_5min SELECT item_id, seller_id, window_start, window_end, SUM(price) as sales, COUNT(DISTINCT user_id) as item_buyer_cnt FROM tumbled_view GROUP BY item_id, seller_id, window_start, window_end; Best, Jark [1]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-Example On Thu, 15 Oct 2020 at 17:16, Timo Walther <[hidden email]> wrote: > Hi Jark, > > I think we should discuss the SET vs ROW semantics again. The FLIP > itself is not wrong but I find it a bit verbose to define a window via > TUMBLE + grouping by window_start and window_end. This is the Beam > approach and exposes how Flink window assigners work internally but do > we want it in Flink SQL? > > PTFs allow us to define our own operators in SQL. We are not forced by > the standard to do it as stated in the `One SQL to Rule it all` paper. > We can align the SQL windows more towards our regular DataStream API > windows, where you keyBy first and then apply a window operator. > Similarly we could unify all our windows by requiring a PARTITION BY and > maybe even an ORDER BY clause. > > Personally, I find this easier to explain to users than telling them the > difference why a session window has SET semantic input tables and > tumble/sliding have ROW semantic input tables. > > The ORDER BY would even allows us to declare the time attribute > explicitly. Similar to MATCH_RECOGNIZE: > > Approach A) > > SELECT * > FROM TABLE( > Tumble ( > data => > TABLE InputTable > PARTITION BY userId > ORDER BY timestamp > gap => INTERVAL '5' MINUTES > ) > ); > > SELECT * > FROM TABLE( > Session ( > data => > TABLE InputTable > PARTITION BY userId > ORDER BY timestamp > gap => INTERVAL '5' MINUTES > ) > ); > > > > VS. > > Approach B) > > SELECT * > FROM TABLE( > Tumble ( > data => TABLE InputTable > timecol => DESCRIPTOR(timestamp) > gap => INTERVAL '5' MINUTES > ) > ) > GROUP BY userId, window_start, window_end; > > SELECT * > FROM TABLE( > Session ( > data => > TABLE InputTable > PARTITION BY userId > ORDER BY timestamp // or timecol => DESCRIPTOR(timestamp) ? > gap => INTERVAL '5' MINUTES > ) > ) > > > > What do others think? > > Regards, > Timo > > > On 14.10.20 05:26, Jark Wu wrote: > > Hi Timo, > > > > 1a) Deprecation: I'm fine with deprecating the old syntax and only use > the > > new one in all examples, docs, slides etc. > > Will update the FLIP later. We can drop the old syntax at some point in > the > > future, but may need another discussion. > > > > 1b) Time attributes: Are you fine with the "window_time" column approach? > > > > 4) Semantics: The SQL standards propose two semantics: SET and ROW. They > > are explained in the paper (Chapter 5.1.2): > > > > - Row semantics means that the result of the PTF is decided on a > row-by-row > > basis. > > A table should be given row semantics if the PTF does not care how > rows > > are assigned to virtual processors. > > - Set semantics means that the outcome of the function depends on how the > > data is partitioned. > > A table should be given set semantics if all rows of a partition > should > > be processed on the same virtual processor. > > > > A per-set table requires PARTITIONED BY clause. In examples of the > papers, > > UDjoin has two input tables with set semantics, > > Pivot has an input table with row semantics, > > > > In our case, tumbling, sliding, and cumulative windows are all row > semantic > > PTFs, because the result (window_start, window_end) > > id decided on a row-by-row basis, we don't need to partition data before > > assigning these windows, they just act like pure UDTFs. > > Therefore, we don't need the PARTITIONED BY clause for them. > > > > However, I think the session window is a set semantic PTF, because all > data > > of a user should be processed together (stateful processing). > > So the syntax of session window should be : > > > >> SELECT * > > FROM SESSION( > > data => TABLE Bids PARTITIONED BY bidder, > > timecol => DESCRIPTOR(bidtime), > > gap => INTERVAL '5' MINUTES); > > > > However, the session window syntax in Calcite is: > > > >> SELECT * > > FROM SESSION( > > data => TABLE Bids, > > timecol => DESCRIPTOR(bidtime), > > keycols => DESCRIPTOR(bidder), > > gap => INTERVAL '5' MINUTES); > > > > This is not correct and there is already a dicussion in Calcite [1]. > > > > What do you think? > > > > 5) Future: I'm also very excited about the potential ability of PTF. > > As I can see, in the future, we can support the following features in SQL > > in a standard PTF way. > > - advanced operations supported in Table API (FLIP-29), e.g. > drop_columns, > > user-defined-table-aggregate > > - user defined join operator > > - a shortcut TopN function > > - re-assign watermarks? > > - re-partition data, similar feature to Hive DISTRIBUTED BY syntax. > > - ... > > > > Best, > > Jark > > > > [1]: > > > https://issues.apache.org/jira/browse/CALCITE-3780?focusedCommentId=17123472&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17123472 > > > > > > On Tue, 13 Oct 2020 at 22:24, Timo Walther <[hidden email]> wrote: > > > >> Hi Jark, > >> > >> 1a) Deprecation: I totally agree that dropping old syntax is not very > >> easy in SQL. That's why I am also extremely cautious about adding new > >> syntax in this FLIP. However, I would already deprecate the old syntax > >> and only use the new one in all examples, docs, slides etc. This > >> hopefully gives us a chance to drop the old syntax at some point in the > >> future. As far as I know, Calcite will drop support for the old syntax > >> soon which means that we have maybe 2-3 Flink releases until we will > >> need to either drop it as well or maintain custom code. > >> > >> 1b) Time attributes: I haven't noticed this limitation. We should > >> definitely address this issue in the FLIP. Every time operation should > >> be able to express the new rowtime. For MATCH_RECOGNIZE we use > >> `MATCH_ROWTIME`, we could make this behavior similar and just use a > >> helper UDF but I would prefer a similar approach to window_start and > >> window_end. > >> > >> 3) Simplified syntax: Yes, after looking deeper into this topic, I see > >> that it seems Oracle is not standard compliant here. So let's postpone > >> this change. But I would be in favor of this syntax because the > >> additional `TABLE()` keyword confuses users and even SQL experts. > >> > >> 4) Semantics: I read the summary of the SQL 2016 standard [1] again and > >> I'm wondering if the key-ing semantics in the FLIP are correct using > >> GROUP BY. The paper illustrates the following example: > >> > >> SELECT E.*, D.* > >> FROM TABLE( > >> UDJoin ( > >> T1 => TABLE (Emp) AS E > >> PARTITION BY Deptno, > >> T2 => TABLE (Dept) AS D > >> PARTITION BY Deptno > >> ORDER BY Tstamp > >> ) > >> ) > >> > >> where the first PTF parameter is declared with `WITH SET SEMANTICS`. > >> > >> The paper further states: > >> > >> "WITH SET SEMANTICS is specified when the > >> outcome of the function depends on how the data is > >> partitioned. A table should be given set semantics if all > >> rows of a partition should be processed on the same > >> virtual processor." > >> > >> Isn't this exactly what we need for windows as well? Shouldn't we use > >> the following syntax then: > >> > >> SELECT * > >> FROM TABLE( > >> Tumble ( > >> data => > >> TABLE (InputTable) > >> PARTITION BY userId > >> ORDER BY timestamp > >> ) > >> ) > >> > >> In the end all windows are just PTFs, maybe we should rather think about > >> how we support PTFs in the near future. Because they would open an > >> entire new set of use cases to SQL. The examples in chapter 12 of the > >> 2016 standard ranging from `CSVreader` to `UDjoin` are impressive. > >> > >> Regards, > >> Timo > >> > >> [1] > >> > >> > https://www.researchgate.net/profile/Fred_Zemke/publication/329593276_The_new_and_improved_SQL2016_standard/links/5c17eb50a6fdcc494ffc5999/The-new-and-improved-SQL2016-standard.pdf > >> > >> On 13.10.20 12:42, Jark Wu wrote: > >>> Hi everyone, > >>> > >>> Timo just raised a good point in the vote thread. I copied the feedback > >>> here: > >>> > >>>> Timo: > >>> 1) I think we should not offer 2 different kinds of syntax that do > the > >>> same thing. We should deprecate the old syntax. > >>> 2) We should have session windows in the new syntax as well to give > >> users > >>> a complete migration path. > >>> 3) We should investigate if we can remove the additional `FROM > >>> TABLE(...)` syntax. As far as I see it in other examples from > Oracle > >>> 18c, the additional `TABLE()` syntax is not necessary anymore for > >>> polymorphic table functions. > >>> > >>> > >>> Here are my comments: > >>> 1) I'm not sure about this. If we are going to drop the old syntax, > this > >>> will break lots of existing SQL jobs. > >>> Upgrading SQL jobs is not as easy as Table API jobs, We should be as > >>> cautious as possible for this. > >>> > >>> Besides, if we want to deprecate old syntax, we must provide equal > >>> functionality first, > >>> and the new syntax doesn't support propagate time attributes. A > possible > >>> solution can be to > >>> generate one more column "window_time" for window TVFs. The value of > >>> "window_time" would > >>> always be "window_end - 1" and has the time attribute type. Users can > >>> propagate the time attribute by > >>> adding "window_time" to the "group by" and "join on" clauses with the > >>> "window_start", "window_end" together. > >>> > >>> 2) I will add session window syntax to the FLIP later. > >>> > >>> 3) I like the simplified syntax "FROM tumble(input, rowtime, interval > '1' > >>> minute)". > >>> However, the polymorphic table function syntax introduced in SQL > standard > >>> 2016 [1] requires > >>> the TABLE() and DESCRIPTOR() syntax (see Chapter-8 “Invocation”). > >>> Therefore, I think it's safe to support the standard syntax first, and > >> can > >>> explore whether we can > >>> extend the syntax to make TABLE() and DESCRIPTOR() keywords > optional. > >>> Note that, Calcite parser currently doesn't support the simplified > >> syntax, > >>> and this definitely needs > >>> to be discussed in the Calcite community. Actually, there has been a > >>> discussion about this [2], and Julian said: > >>> > >>>> Standard SQL doesn’t allow functions in the FROM clause. I think it’s > >>> because tables and functions are in > >>> different namespaces (and therefore there could be a table and a > >> function > >>> with the same name). > >>> So you need to use the TABLE keyword to indicate that you are using a > >>> function as a table. > >>> > >>> Best, > >>> Jark > >>> > >>> [1]: > >>> > >> > https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip > >>> [2]: > >>> > >> > https://lists.apache.org/x/thread.html/4a91632b1c780ef9d67311f90fce626582faae7d30a134a768c3d324@%3Cdev.calcite.apache.org%3E > >>> > >>> On Sat, 10 Oct 2020 at 17:59, Jark Wu <[hidden email]> wrote: > >>> > >>>> Hi everyone, > >>>> > >>>> Thanks everyone for this healthy discussion. I think we have addressed > >> all > >>>> the concerns. I would continue with a voting. > >>>> If you have any new objections, feel free to let me know. > >>>> > >>>> Best, > >>>> Jark > >>>> > >>>> On Sat, 10 Oct 2020 at 17:54, Jark Wu <[hidden email]> wrote: > >>>> > >>>>> Hi Jingsong, > >>>>> > >>>>> That's a good question. I did have searched a lot and didn't find any > >>>>> system that provides such an out-of-box function. > >>>>> I guess the reason is that in the traditional batch systems, this > >> feature > >>>>> is supported by the over window and they don't need to invent a > >>>>> new function/syntax for this. > >>>>> For streaming systems, we are the first one to propose this new > window. > >>>>> > >>>>> However, I think CUMULATE is a good name. Because almost all the > >>>>> databases call such scenarios as "cumulative window", e.g. > >> Snowflake[1], > >>>>> SQL Server [2], Postgres [3]. > >>>>> Thus we choose "cumulative" as the base name, but use the verb form > >>>>> "cumulate" because other window function names are also verbs, e.g. > >> tumble, > >>>>> hop. > >>>>> > >>>>> I hope this can address your concern. > >>>>> > >>>>> Best, > >>>>> Jark > >>>>> > >>>>> [1]: > >>>>> > >> > https://docs.snowflake.com/en/sql-reference/functions-analytic.html#cumulative-window-frame-examples > >>>>> [2]: > >>>>> > >> > https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql?view=sql-server-ver15#c-producing-a-moving-average-and-cumulative-total > >>>>> [3]: > >>>>> > >> > https://popsql.com/learn-sql/postgresql/how-to-calculate-cumulative-sum-running-total-in-postgresql > >>>>> > >>>>> On Sat, 10 Oct 2020 at 17:26, Jingsong Li <[hidden email]> > >> wrote: > >>>>> > >>>>>> +1 for voting. Thanks Jark for driving. > >>>>>> > >>>>>> +1 for TVF, It has been put forward by theory and supported by > >> calcite. > >>>>>> It > >>>>>> will greatly enhance the window related operations. > >>>>>> > >>>>>> My personal feeling is that after TVF, the following operations can > be > >>>>>> similar to the traditional batch SQL, as long as the window related > >>>>>> attributes are included in the key. > >>>>>> > >>>>>> I am not sure about the CUMULATE window, yes, It's a common > >> requirement, > >>>>>> Is > >>>>>> there any more evidence (other systems) to prove this word > >> ("CUMULATE") > >>>>>> is > >>>>>> appropriate. > >>>>>> > >>>>>> Best, > >>>>>> Jingsong > >>>>>> > >>>>>> On Sat, Oct 10, 2020 at 3:43 PM Jark Wu <[hidden email]> wrote: > >>>>>> > >>>>>>> Hi Pengcheng, > >>>>>>> > >>>>>>> IIUC, the "stream operators" you mean is the non-time operators or > >>>>>> called > >>>>>>> regular operators, such as regular join, regular aggregate. > >>>>>>> But you may misunderstand me, only the time operators can't be > >> applied > >>>>>>> after the new window operators, because of missing time attributes. > >>>>>>> The regular operators can still be applied after the new window > >>>>>> operators. > >>>>>>> > >>>>>>> Regarding using window TVFs to re-assign event-time and watermarks, > >>>>>> I'm not > >>>>>>> sure about this. > >>>>>>> Because assigning watermark requires to define the watermark > >> strategy, > >>>>>>> however, the window TVF doesn't provide such ability. > >>>>>>> Polymorphic table functions are table functions which just append > >>>>>>> additional columns and convert N rows into M rows, it can't touch > >> meta > >>>>>>> information. > >>>>>>> > >>>>>>> Best, > >>>>>>> Jark > >>>>>>> > >>>>>>> On Sat, 10 Oct 2020 at 15:41, Jark Wu <[hidden email]> wrote: > >>>>>>> > >>>>>>>> Hi Danny, > >>>>>>>> > >>>>>>>> Thanks for the hint about named params syntax, I added examples > with > >>>>>>> named > >>>>>>>> params in the FLIP. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Jark > >>>>>>>> > >>>>>>>> > >>>>>>>> On Sat, 10 Oct 2020 at 15:03, Pengcheng Liu < > >>>>>> [hidden email] > >>>>>>>> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi, Jark, > >>>>>>>>> > >>>>>>>>> I've got some different opinions there, I think it's a very > >>>>>> common > >>>>>>> use > >>>>>>>>> case to use > >>>>>>>>> window operators in combination with streaming > operators(even > >>>>>> those > >>>>>>>>> time operators). > >>>>>>>>> (e.g. for some tables, users only care data within a period, > >> but > >>>>>> for > >>>>>>>>> other tables, they may > >>>>>>>>> want the whole historical data). > >>>>>>>>> The pipeline may looks like this: > >>>>>>>>> window join -> dimension table join -> stream aggregate -> > >> stream > >>>>>>> sort > >>>>>>>>> > >>>>>>>>> Just as what you said, the key clause can be used to > >> distinguish > >>>>>>>>> whether a operator should > >>>>>>>>> be translated to a window operator or a streaming operator. > >>>>>>>>> > >>>>>>>>> Also, as I've mentioned before, 1) for time operator after > >> window > >>>>>>>>> aggregation, the auxiliary function > >>>>>>>>> which is used to access time attribute column can be > actually > >>>>>>> replaced > >>>>>>>>> with (window_end -1). > >>>>>>>>> Actually, we only just need to make the results of the > upstream > >>>>>>>>> contains a time column whose > >>>>>>>>> range is within (window_start, window_end), and thus the > >>>>>> downstream > >>>>>>>>> time operators can work on it > >>>>>>>>> (driving by the original watermark in the source). 2) for > time > >>>>>>>>> operator after other window operators, > >>>>>>>>> the downstream time operators can access the time column > >> directly > >>>>>>> from > >>>>>>>>> it's input. > >>>>>>>>> > >>>>>>>>> One more thoughts there, maybe the window TVFs can re-assign > >>>>>>>>> timestamps and watermarks, so > >>>>>>>>> that in some case when the watermark can not be retrieved > from > >>>>>> source > >>>>>>>>> directly(may needs some > >>>>>>>>> conversions), the watermark can still be assigned > dynamically > >> in > >>>>>> the > >>>>>>>>> SQL(use the time column as > >>>>>>>>> the watermark column) and thus make it work. I think this > can > >>>>>> save > >>>>>>>>> much time to revise the event > >>>>>>>>> time column in some cases(this is a real demand in our > >> production > >>>>>>>>> environment). > >>>>>>>>> > >>>>>>>>> I strongly suggest that we should support the combination > usage > >>>>>> of > >>>>>>>>> window operators and > >>>>>>>>> streaming operators. And I think we can achieve this with > >> little > >>>>>>> work. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Pengcheng > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Jark Wu <[hidden email]> 于2020年10月10日周六 下午1:45写道: > >>>>>>>>> > >>>>>>>>>> Hi Benchao, > >>>>>>>>>> > >>>>>>>>>> That's a good question. > >>>>>>>>>> > >>>>>>>>>> IMO, the new windowed operators and the current time operators > are > >>>>>> two > >>>>>>>>>> different sets of functions, > >>>>>>>>>> just like time operators and non-time operators are two > different > >>>>>> sets > >>>>>>> of > >>>>>>>>>> functions. > >>>>>>>>>> I think it's fine if we don't support integrating them, just > like > >>>>>> time > >>>>>>>>>> operators can't be applied on non-windowed aggregate. > >>>>>>>>>> If users want to use time operators in the whole pipeline, then > >>>>>> he/she > >>>>>>>>>> can > >>>>>>>>>> use the grouped window aggregates instead of the window TVFs. > >>>>>>>>>> > >>>>>>>>>> The key idea of window TVF is that all the operators in the > >>>>>> pipeline > >>>>>>> are > >>>>>>>>>> based on the **windows**. > >>>>>>>>>> In terms of syntax, if the key clause (e.g. group by, > partitioned > >>>>>> by, > >>>>>>>>>> join > >>>>>>>>>> on, order by) contains window_start and window_end, > >>>>>>>>>> it can be translated into windowed operators. > >>>>>>>>>> Thus, we will have windowed CEP, windowed sort, windowed over > >>>>>> aggregate > >>>>>>>>>> in > >>>>>>>>>> the future to make it possible to build a windowed pipeline. > >>>>>>>>>> > >>>>>>>>>> But I think we can elaborate the integration more in the future > if > >>>>>>> users > >>>>>>>>>> need it. Actually, I don't fully understand the scenario of > >>>>>> integrating > >>>>>>>>>> window TVF and time operators at this point. > >>>>>>>>>> For example, interval join an input stream and a window join > >>>>>> result. I > >>>>>>>>>> don't see why it can't be expressed by nested window join and > why > >>>>>> users > >>>>>>>>>> have to use interval join here. > >>>>>>>>>> Maybe we can wait for more inputs from users when the window TVF > >> is > >>>>>>>>>> released and we can elaborate it again. > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Jark > >>>>>>>>>> > >>>>>>>>>> On Sat, 10 Oct 2020 at 12:01, 刘 芃成 <[hidden email] > > > >>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi, Benchao, > >>>>>>>>>>> I think I got your point, actually, in current > >>>>>> implementation > >>>>>>>>>> for > >>>>>>>>>>> group window aggregation, the value of time attributes(e.g. > >>>>>>>>>>> TUMBLE_ROWTIME/TUMBLE_PROCTIME) is calculated as (window_end – > >>>>>> 1), > >>>>>>> so I > >>>>>>>>>>> think we can just use it directly if you need this. But I think > >>>>>> this > >>>>>>>>>> time > >>>>>>>>>>> attributes is mainly suggested to use in case of cascaded > window > >>>>>>>>>> operations. > >>>>>>>>>>> Regarding the example you provided, I think the semantics of > the > >>>>>> SQL > >>>>>>> in > >>>>>>>>>>> your example which doing interval join(e.g. with > TUMBLE_ROWTIME) > >>>>>>> after > >>>>>>>>>>> window aggregation is not clear in the current implementation, > >>>>>> and I > >>>>>>>>>> think > >>>>>>>>>>> that’s a strong reason why we need the new TVFs syntax. > >>>>>>>>>>> With the new syntax, users should understand which time > >>>>>> column > >>>>>>> to > >>>>>>>>>>> use and how to generate it when doing interval join and etc. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Pengcheng > >>>>>>>>>>> > >>>>>>>>>>> 发件人: Benchao Li <[hidden email]> > >>>>>>>>>>> 日期: 2020年10月10日 星期六 上午11:02 > >>>>>>>>>>> 收件人: pengcheng Liu <[hidden email]> > >>>>>>>>>>> 抄送: dev <[hidden email]> > >>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-145: Support SQL windowing table-valued > >>>>>>> function > >>>>>>>>>>> > >>>>>>>>>>> Hi pengcheng, > >>>>>>>>>>> > >>>>>>>>>>> Thanks for your response. > >>>>>>>>>>> I knew that the original time attribute column will be retained > >>>>>> after > >>>>>>>>>> the > >>>>>>>>>>> TVF, > >>>>>>>>>>> what I'm questioning is how do we get the time attribute column > >>>>>> after > >>>>>>>>>>> Aggregation. > >>>>>>>>>>> Your answer did not remove my doubts about this. > >>>>>>>>>>> > >>>>>>>>>>> It's ok if we did not plan to integrate new TVF aggregate with > >>>>>> old > >>>>>>>>>> "time > >>>>>>>>>>> attribute scenarios" > >>>>>>>>>>> listed in my previous email in this FLIP. However it's good to > >>>>>>>>>> elaborate > >>>>>>>>>>> leave it to the future plan. > >>>>>>>>>>> > >>>>>>>>>>> pengcheng Liu <[hidden email]<mailto: > >>>>>>>>>>> [hidden email]>> 于2020年10月10日周六 上午10:45写道: > >>>>>>>>>>> Hi,Benchao, > >>>>>>>>>>> In TVFs, the time attributes is just passed through from > >>>>>> parent > >>>>>>>>>> rels, > >>>>>>>>>>> and the TVFs just add two > >>>>>>>>>>> additional window attributes(i.e. window_start & > >> window_end). > >>>>>>>>>> Also, I > >>>>>>>>>>> think the time columns can be not only a time attribute > >>>>>>>>>>> with type of `TimeIndicatorType` but also a regular > column > >>>>>> with > >>>>>>>>>> type > >>>>>>>>>>> of `Timestamp`. > >>>>>>>>>>> > >>>>>>>>>>> For cascaded window operations, we can use > >>>>>>> window_start/window_end > >>>>>>>>>> of > >>>>>>>>>>> the previous window result directly to > >>>>>>>>>>> indicate operating on the same window, or use new > >> DESCRIPTOR > >>>>>>>>>> column > >>>>>>>>>>> to assign new windows, in case of the change of > >>>>>>>>>>> the time column(e.g. in some case, the original > timestamp is > >>>>>>>>>>> inaccurate and need some conversion to be used). > >>>>>>>>>>> > >>>>>>>>>>> You can check the definition or signature of these TVFs > in > >>>>>> the > >>>>>>>>>> FLIP. > >>>>>>>>>>> e.g. > >>>>>>>>>>> SELECT * FROM TABLE( > >>>>>>>>>>> TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' > >> MINUTES)) > >>>>>>>>>>> In the example, the `bidtime` is the time attribute > column, > >>>>>> which > >>>>>>>>>> is > >>>>>>>>>>> the first operand of the DESCRIPTOR function. > >>>>>>>>>>> > >>>>>>>>>>> +1 start voting. > >>>>>>>>>>> > >>>>>>>>>>> Benchao Li <[hidden email]<mailto:[hidden email]>> > >>>>>>>>>>> 于2020年10月10日周六 上午10:08写道: > >>>>>>>>>>> Hi Jark, > >>>>>>>>>>> > >>>>>>>>>>> 2 & 3 sounds good to me. > >>>>>>>>>>> > >>>>>>>>>>> Regarding time attribute, > >>>>>>>>>>> I still have some questions, I knew it's easy to support > cascaded > >>>>>>>>>> window > >>>>>>>>>>> aggregate using new TVFs. > >>>>>>>>>>> However there are some other places where need time attribute: > >>>>>>>>>>> - CEP > >>>>>>>>>>> - interval join > >>>>>>>>>>> - order by > >>>>>>>>>>> - over window > >>>>>>>>>>> If there is no time attribute column, how do we integrate these > >>>>>> old > >>>>>>>>>>> features with the new TVFs. > >>>>>>>>>>> E.g. > >>>>>>>>>>> StreamA -> new window aggregate -> interval join -> Sink > >>>>>>>>>>> / > >>>>>>>>>>> StreamB ----------------------------------- > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Jark Wu <[hidden email]<mailto:[hidden email]>> > >>>>>> 于2020年10月9日周五 > >>>>>>>>>>> 下午11:51写道: > >>>>>>>>>>> Hi Benchao, > >>>>>>>>>>> > >>>>>>>>>>> 1) time attribute > >>>>>>>>>>> Yes. We don't need time attribute auxiliary function. Because > >>>>>> the new > >>>>>>>>>>> window operations are all based on the > >>>>>>>>>>> window_start and window_end columns instead of on the time > >>>>>>>>>> attributes. So > >>>>>>>>>>> we don't need to propagate time attributes. > >>>>>>>>>>> Cascaded window aggregate can be expressed by simply GROUP BY > the > >>>>>>>>>>> window_start and window_end of the previous window result. > >>>>>>>>>>> I have added a cascaded window aggregate example in the > Tumbling > >>>>>>> Window > >>>>>>>>>>> section in the FLIP. > >>>>>>>>>>> If you want to define proctime window aggregate, the time > column > >>>>>> in > >>>>>>> TVF > >>>>>>>>>>> should be a proctime attribute field (or PROCTIME() function). > >>>>>>>>>>> > >>>>>>>>>>> 2) batch support > >>>>>>>>>>> Yes. The proposed syntax/API are unified for batch and > streaming. > >>>>>>> Batch > >>>>>>>>>>> support is in the plan, but may not have enough time to catch > up > >>>>>>> 1.12. > >>>>>>>>>>> > >>>>>>>>>>> 3) support `grouping sets` > >>>>>>>>>>> This is not included in the FLIP, but I think it's great if we > >>>>>> can > >>>>>>>>>> support > >>>>>>>>>>> `grouping sets`. > >>>>>>>>>>> The existing window impl doesn't support this because we > convert > >>>>>> the > >>>>>>>>>>> LogicalAggregate into WindowAggregate in the beginning, > >>>>>>>>>>> the expand grouping sets rule can't be applied in this > situation. > >>>>>>>>>>> Fortunately, with the new window impl, the conversion to > >>>>>>>>>> WindowAggregate > >>>>>>>>>>> will happen at the end, so I think the expand rule can be > >>>>>>>>>>> applied and support this feature naturally. > >>>>>>>>>>> Therefore, IMO, we don't need to include this feature in this > >>>>>> FLIP to > >>>>>>>>>> avoid > >>>>>>>>>>> the FLIP being too large. > >>>>>>>>>>> This can be a follow-up issue (maybe just add tests and docs) > >>>>>> after > >>>>>>> the > >>>>>>>>>>> FLIP. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Jark > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Fri, 9 Oct 2020 at 19:09, 刘 芃成 <[hidden email] > >>>>>>> <mailto: > >>>>>>>>>>> [hidden email]>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi,Benchao, > >>>>>>>>>>>> Welcome to join the discussion, yes, this new syntax > >>>>>> can > >>>>>>>>>> make SQL > >>>>>>>>>>>> more clear and simpler. > >>>>>>>>>>>> For your first question, the `window_start` and > >>>>>>> `window_end` > >>>>>>>>>>>> columns will be added automatically, > >>>>>>>>>>>> so we don't need to use auxiliary group functions to > >>>>>> infer > >>>>>>> or > >>>>>>>>>>>> access the window properties. > >>>>>>>>>>>> > >>>>>>>>>>>> For the `grouping sets` on TVFs, I think it's > >>>>>> interesting > >>>>>>> if > >>>>>>>>>> we > >>>>>>>>>>>> can support it, as we already supported `grouping sets` > >>>>>>>>>>>> on streaming aggregates in blink planner. But I'm > not > >>>>>> sure > >>>>>>>>>> if it > >>>>>>>>>>>> will be included into this FLIP. > >>>>>>>>>>>> > >>>>>>>>>>>> cc @Jark Wu > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Pengcheng > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> 在 2020/10/9 下午5:25,“Benchao Li”<[hidden email]<mailto: > >>>>>>>>>>> [hidden email]>> 写入: > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks Jark for bringing this discussion, I like this > FLIP > >>>>>> very > >>>>>>>>>> much. > >>>>>>>>>>>> > >>>>>>>>>>>> Especially the cumulate window, it's much like the > current > >>>>>>> TUMBLE > >>>>>>>>>>>> window + > >>>>>>>>>>>> Fast Emit (which is an undocumented experimental > feature), > >>>>>>>>>> however, > >>>>>>>>>>>> it's > >>>>>>>>>>>> more powerful. > >>>>>>>>>>>> > >>>>>>>>>>>> And This will make the SQL semantic more standard, > >>>>>> especially > >>>>>>>>>> for the > >>>>>>>>>>>> HOPPING window. > >>>>>>>>>>>> > >>>>>>>>>>>> Regarding time attribute, > >>>>>>>>>>>> It seems that we don't need a specific function to infer > >>>>>> the > >>>>>>> time > >>>>>>>>>>>> attribute > >>>>>>>>>>>> like > >>>>>>>>>>>> `TUMBLE_ROWTIME` / `TUMBLE_PROCTIME`. Then are > >>>>>> `window_start` > >>>>>>> and > >>>>>>>>>>>> `window_end` > >>>>>>>>>>>> column a time attribute column automatically? > >>>>>>>>>>>> - If not, what will be the time attribute of the result > >>>>>>> relation > >>>>>>>>>> of > >>>>>>>>>>>> these > >>>>>>>>>>>> TVFs? > >>>>>>>>>>>> Especially after the window aggregation. > >>>>>>>>>>>> - If yes, then how do we handle proctime? > >>>>>>>>>>>> > >>>>>>>>>>>> Regarding batch operators, > >>>>>>>>>>>> It's great to hear that we can reuse the batch > operators in > >>>>>>>>>>> continuous > >>>>>>>>>>>> batch mode > >>>>>>>>>>>> as you mentioned in the FLIP. > >>>>>>>>>>>> Current window aggregate could also be used in batch > mode > >>>>>> with > >>>>>>>>>>>> rowtime. Do > >>>>>>>>>>>> you plan > >>>>>>>>>>>> to support these TVFs for batch mode in this FLIP? Hence > >>>>>> the > >>>>>>>>>>> Table/SQL > >>>>>>>>>>>> is a > >>>>>>>>>>>> unified > >>>>>>>>>>>> API, it's great if we can keep the features complete > both > >>>>>> in > >>>>>>>>>>> streaming > >>>>>>>>>>>> and > >>>>>>>>>>>> batch mode. > >>>>>>>>>>>> > >>>>>>>>>>>> There is one more question, I don't know whether it > should > >>>>>> be > >>>>>>>>>>>> considered in > >>>>>>>>>>>> this FLIP. > >>>>>>>>>>>> Does the new window support `grouping sets`? (It's not > >>>>>>> supported > >>>>>>>>>> in > >>>>>>>>>>> old > >>>>>>>>>>>> window impl). > >>>>>>>>>>>> > >>>>>>>>>>>> Jark Wu <[hidden email]<mailto:[hidden email]>> > >>>>>>>>>> 于2020年10月9日周五 > >>>>>>>>>>> 下午4:14写道: > >>>>>>>>>>>> > >>>>>>>>>>>> > Hi all, > >>>>>>>>>>>> > > >>>>>>>>>>>> > I know we have a lot of discussion and development on > >>>>>> going > >>>>>>>>>> right > >>>>>>>>>>>> now but > >>>>>>>>>>>> > it would be great if we can get FLIP-145 into a > votable > >>>>>>> state. > >>>>>>>>>>>> > If there are no objections, I would like to start > voting > >>>>>> in > >>>>>>> the > >>>>>>>>>>> next > >>>>>>>>>>>> days. > >>>>>>>>>>>> > > >>>>>>>>>>>> > Best, > >>>>>>>>>>>> > Jark > >>>>>>>>>>>> > > >>>>>>>>>>>> > On Thu, 1 Oct 2020 at 14:29, Jark Wu < > [hidden email] > >>>>>>> <mailto: > >>>>>>>>>>> [hidden email]>> wrote: > >>>>>>>>>>>> > > >>>>>>>>>>>> > > Hi everyone, > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > I have added a section for Performance Optimization > to > >>>>>>>>>> describe > >>>>>>>>>>>> how to > >>>>>>>>>>>> > > improve the performance in the short-term and > long-term > >>>>>>>>>>>> > > and sketch the future performance potential under > the > >>>>>> new > >>>>>>>>>> window > >>>>>>>>>>>> API. > >>>>>>>>>>>> > > Introducing the window API is just the first step, > we > >>>>>> will > >>>>>>>>>>>> > > continuously improve the performance to make it > >>>>>> powerful > >>>>>>> and > >>>>>>>>>>>> useful. > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > Best, > >>>>>>>>>>>> > > Jark > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > On Thu, 1 Oct 2020 at 14:28, Jark Wu < > [hidden email] > >>>>>>>>>> <mailto: > >>>>>>>>>>> [hidden email]>> wrote: > >>>>>>>>>>>> > > > >>>>>>>>>>>> > >> Hi Pengcheng, > >>>>>>>>>>>> > >> > >>>>>>>>>>>> > >> Yes, the window TVF is part of the FLIP. Welcome to > >>>>>>>>>> contribute > >>>>>>>>>>>> and join > >>>>>>>>>>>> > >> the discussion. > >>>>>>>>>>>> > >> Regarding the SESSION window aggregation, users can > >>>>>> use > >>>>>>> the > >>>>>>>>>>>> existing > >>>>>>>>>>>> > >> grouped session window function. > >>>>>>>>>>>> > >> > >>>>>>>>>>>> > >> Best, > >>>>>>>>>>>> > >> Jark > >>>>>>>>>>>> > >> > >>>>>>>>>>>> > >> On Sun, 27 Sep 2020 at 21:24, liupengcheng < > >>>>>>>>>>>> [hidden email]<mailto: > [hidden email] > >>>>>>> > >>>>>>>>>>>> > > > >>>>>>>>>>>> > >> wrote: > >>>>>>>>>>>> > >> > >>>>>>>>>>>> > >>> Hi Jark, > >>>>>>>>>>>> > >>> Thanks for reply, yes, I think it's a good > >>>>>>>>>> feature, it > >>>>>>>>>>>> can > >>>>>>>>>>>> > >>> improve the NRT scenarios > >>>>>>>>>>>> > >>> as you mentioned in the FLIP. Also, I > think > >>>>>> it > >>>>>>> can > >>>>>>>>>>>> improve the > >>>>>>>>>>>> > >>> streaming SQL greatly, > >>>>>>>>>>>> > >>> it can support richer window operations in > >>>>>> flink > >>>>>>>>>> SQL > >>>>>>>>>>> and > >>>>>>>>>>>> bring > >>>>>>>>>>>> > >>> great convenience to users. > >>>>>>>>>>>> > >>> (we are now only supported group window in > >>>>>>> flink). > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> Regarding the SESSION window, I think it's > >>>>>>>>>> especially > >>>>>>>>>>>> useful > >>>>>>>>>>>> > for > >>>>>>>>>>>> > >>> user behavior analysis(e.g. > >>>>>>>>>>>> > >>> counting user visits on a news website or > >>>>>> social > >>>>>>>>>>>> platform), but > >>>>>>>>>>>> > >>> I agree that we can keep it > >>>>>>>>>>>> > >>> out of the FLIP now to catch up 1.12. > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> Recently, I've done some work on the > stream > >>>>>>> planner > >>>>>>>>>>> with > >>>>>>>>>>>> the > >>>>>>>>>>>> > >>> TVFs, and I'm willing to contribute > >>>>>>>>>>>> > >>> to this part. Is it in the plan of this > FLIP? > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> Best, > >>>>>>>>>>>> > >>> PengchengLiu > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> 在 2020/9/26 下午11:09,“Jark Wu”<[hidden email] > >>>>>> <mailto: > >>>>>>>>>>> [hidden email]>> 写入: > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> Hi pengcheng, > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> That's great to see you also have the need of > >>>>>> window > >>>>>>>>>> join. > >>>>>>>>>>>> > >>> You are right, the windowing TVF is a powerful > >>>>>>> feature > >>>>>>>>>>> which > >>>>>>>>>>>> can > >>>>>>>>>>>> > >>> support > >>>>>>>>>>>> > >>> more operations in the future. > >>>>>>>>>>>> > >>> I think it as of the date time "partition" > >>>>>> selection > >>>>>>> in > >>>>>>>>>>>> batch SQL > >>>>>>>>>>>> > >>> jobs, > >>>>>>>>>>>> > >>> with this new syntax, I think it is possible > >>>>>>>>>>>> > >>> to migrate traditional batch SQL jobs to > Flink > >>>>>> SQL > >>>>>>> by > >>>>>>>>>>>> changing a > >>>>>>>>>>>> > >>> few lines. > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> Regarding the SESSION window, this is on > purpose > >>>>>> to > >>>>>>>>>> keep it > >>>>>>>>>>>> out of > >>>>>>>>>>>> > >>> the > >>>>>>>>>>>> > >>> FLIP, because we want to keep the > >>>>>>>>>>>> > >>> FLIP small to catch up 1.12 and SESSION TVF is > >>>>>> rarely > >>>>>>>>>>> useful > >>>>>>>>>>>> (e.g. > >>>>>>>>>>>> > >>> session > >>>>>>>>>>>> > >>> window join?). > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> Best, > >>>>>>>>>>>> > >>> Jark > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> On Fri, 25 Sep 2020 at 22:59, liupengcheng < > >>>>>>>>>>>> > >>> [hidden email]<mailto: > >>>>>>>>>> [hidden email] > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>> wrote: > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > Hi, Jark, > >>>>>>>>>>>> > >>> > I'm very interested in this feature, > >>>>>> and > >>>>>>> I'm > >>>>>>>>>> also > >>>>>>>>>>>> working > >>>>>>>>>>>> > >>> on this > >>>>>>>>>>>> > >>> > recently. > >>>>>>>>>>>> > >>> > I just have a glance at the FLIP, > it's > >>>>>>> good, > >>>>>>>>>> but > >>>>>>>>>>> I > >>>>>>>>>>>> found > >>>>>>>>>>>> > >>> that > >>>>>>>>>>>> > >>> > there is no plan to add SESSION windows. > >>>>>>>>>>>> > >>> > Also, I think there can be more > things > >>>>>> we > >>>>>>>>>> can do > >>>>>>>>>>>> based on > >>>>>>>>>>>> > >>> this new > >>>>>>>>>>>> > >>> > syntax. For example, > >>>>>>>>>>>> > >>> > - window sort support > >>>>>>>>>>>> > >>> > - window union/intersect/minus > support > >>>>>>>>>>>> > >>> > - Improve dimension table join > >>>>>>>>>>>> > >>> > We can have more deep discussion on > >>>>>> this > >>>>>>> new > >>>>>>>>>>>> feature > >>>>>>>>>>>> > later > >>>>>>>>>>>> > >>> . > >>>>>>>>>>>> > >>> > I've also opened an jira that is > >>>>>> related to > >>>>>>>>>> this > >>>>>>>>>>>> feature > >>>>>>>>>>>> > >>> recently: > >>>>>>>>>>>> > >>> > > >>>>>> https://issues.apache.org/jira/browse/FLINK-18830 > >>>>>>>>>>>> > >>> > > >>>>>>>>>>>> > >>> > Best! > >>>>>>>>>>>> > >>> > PengchengLiu > >>>>>>>>>>>> > >>> > > >>>>>>>>>>>> > >>> > 在 2020/9/25 下午10:30,“Jark Wu”< > >>>>>> [hidden email] > >>>>>>>>>> <mailto: > >>>>>>>>>>> [hidden email]>> 写入: > >>>>>>>>>>>> > >>> > > >>>>>>>>>>>> > >>> > Hi everyone, > >>>>>>>>>>>> > >>> > > >>>>>>>>>>>> > >>> > I want to start a FLIP about supporting > >>>>>>> windowing > >>>>>>>>>>>> > table-valued > >>>>>>>>>>>> > >>> > functions > >>>>>>>>>>>> > >>> > (TVF). > >>>>>>>>>>>> > >>> > The main purpose of this FLIP is to > >>>>>> improve the > >>>>>>>>>> near > >>>>>>>>>>>> > real-time > >>>>>>>>>>>> > >>> (NRT) > >>>>>>>>>>>> > >>> > experience of Flink. > >>>>>>>>>>>> > >>> > > >>>>>>>>>>>> > >>> > FLIP-145: > >>>>>>>>>>>> > >>> > > >>>>>>>>>>>> > >>> > > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>> > >>>>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function > >>>>>>>>>>>> > >>> > > >>>>>>>>>>>> > >>> > We want to introduce TUMBLE, HOP, > CUMULATE > >>>>>>>>>> windowing > >>>>>>>>>>>> TVFs, > >>>>>>>>>>>> > the > >>>>>>>>>>>> > >>> > CUMULATE is > >>>>>>>>>>>> > >>> > a new kind of window. > >>>>>>>>>>>> > >>> > With the windowing TVFs, we can support > >>>>>> richer > >>>>>>>>>>>> operations on > >>>>>>>>>>>> > >>> windows, > >>>>>>>>>>>> > >>> > including window join, window TopN and > so > >>>>>> on. > >>>>>>>>>>>> > >>> > This makes things simple: we only need > to > >>>>>>> assign > >>>>>>>>>>>> windows at > >>>>>>>>>>>> > the > >>>>>>>>>>>> > >>> > beginning > >>>>>>>>>>>> > >>> > of the query, and then apply operations > >>>>>> after > >>>>>>>>>> that > >>>>>>>>>>> like > >>>>>>>>>>>> > >>> traditional > >>>>>>>>>>>> > >>> > batch > >>>>>>>>>>>> > >>> > SQL. > >>>>>>>>>>>> > >>> > We hope it can help to reduce the > learning > >>>>>>> curve > >>>>>>>>>> of > >>>>>>>>>>>> windows, > >>>>>>>>>>>> > >>> improve > >>>>>>>>>>>> > >>> > NRT > >>>>>>>>>>>> > >>> > for Flink, and attract more batch users. > >>>>>>>>>>>> > >>> > > >>>>>>>>>>>> > >>> > A simple code snippet for 10 minutes > >>>>>> tumbling > >>>>>>>>>> window > >>>>>>>>>>>> > aggregate: > >>>>>>>>>>>> > >>> > > >>>>>>>>>>>> > >>> > SELECT window_start, window_end, > SUM(price) > >>>>>>>>>>>> > >>> > FROM TABLE( > >>>>>>>>>>>> > >>> > TUMBLE(TABLE Bid, > DESCRIPTOR(bidtime), > >>>>>>>>>> INTERVAL > >>>>>>>>>>>> '10' > >>>>>>>>>>>> > >>> MINUTES)) > >>>>>>>>>>>> > >>> > GROUP BY window_start, window_end; > >>>>>>>>>>>> > >>> > > >>>>>>>>>>>> > >>> > I'm looking forward to your feedback. > >>>>>>>>>>>> > >>> > > >>>>>>>>>>>> > >>> > Best, > >>>>>>>>>>>> > >>> > Jark > >>>>>>>>>>>> > >>> > > >>>>>>>>>>>> > >>> > > >>>>>>>>>>>> > >>> > > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > >>> > >>>>>>>>>>>> > > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -- > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Benchao Li > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Benchao Li > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Benchao Li > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> Best, Jingsong Lee > >>>>>> > >>>>> > >>> > >> > >> > > > > |
In reply to this post by Timo Walther-2
Hi, Timo ~
> We are not forced by the standard to do it as stated in the `One SQL to Rule it all` paper No, slide to the SQL standard is always better, i think this is a common routine of our Flink SQL now, without a standard, everyone can give a preference and the discussion would easily go too far apart. > We can align the SQL windows more towards our regular DataStream API windows, where you keyBy first and then apply a window operator. I don't think current DataStream window join implement the window semantics correctly, it joins the data set first then windowing the LHS and RHS data together, actually each input should window its data set separately. As for the "key by data set first", current window TVF appends just window attributes and thus it is very light-weight and orthorhombic, we can combine the window TVFs with additional joins, aggregations, TopN and so on. In SQL, people usually describe the "KEY BY" with "GROUP BY" caluse, that means we bind strongly the window TVF and aggregate operator together which i would definitely vote a -1. As for the PARTTION BY, there are specific cases for the "SESSION" window because a session often has a logic key there, we can extend the PARTTION BY syntax because it is already in the SQL standard, i'm confused why a Tumble window has a PARTITION key there ? What is the real use case ? -1 for "ORDER BY" because sort on un-bounded data set does not have meanings. For un-bounded data set we already has the watermark to handle the out-of-orderness data, and for bounded data set, we can use the regular sort here because current table argument allows any query actually. Best, Danny Chan 在 2020年10月15日 +0800 PM5:16,[hidden email],写道: > > Personally, I find this easier to explain to users than telling them the > difference why a session window has SET semantic input tables and > tumble/sliding have ROW semantic input tables. |
Hi all,
After some offline discussion and investigation with Timo and Danny, I have updated the FLIP-145. FLIP-145: https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function Here are the updates: 1. Add SESSION window syntax and examples. 2. Time Attribute: the returned value of window TVF will return 3 columns now with additional window_time which is a time attribute. Add a section of "Time Attribute Propagate" to explain how to propagate time attributes and examples. 3. The old window syntax will be deprecated. We may drop the old syntax in the future but that needs another discussion. 4. Add future work about simplifying TABLE() keyword (we already started discussion in Calcite [1]) and supporting COUNT window. Besides, we also investigated whether it is possible to use a nested type "window(start, end, time)" instead of 3 columns. However, there are some problems that are not possible for now. - `window.start` can’t be selected in the group by query, because it is not grouped. Postgres supports selecting nested fields for grouped ROW columns. We can fix this in Calcite, but this isn't a trivial work. - WINDOW is a token in the parser, can’t be used as a column name. Otherwise, the parsing for OVER WINDOW will fail. - Apache Beam also considered to put wstart and wend in a separate nested row [2]. However, that would limit these extensions to engines supporting nested rows. Many systems don't support nested rows well. Therefore, we still insist on using three fields. I would like to start a new VOTE for the updated FLIP-145 if there are no objections. Best, Jark [1]: https://lists.apache.org/x/thread.html/ra98db08e280ddd9adeef62f456f61aedfdf7756e215cb4d66e2a52c9@%3Cdev.calcite.apache.org%3E [2]: https://docs.google.com/document/d/138uA7VTpbF84CFrd--cz3YVe0-AQ9ALnsavaSE2JeE4/edit?disco=AAAAHJ0EnGI On Thu, 15 Oct 2020 at 21:03, Danny Chan <[hidden email]> wrote: > Hi, Timo ~ > > > We are not forced by > the standard to do it as stated in the `One SQL to Rule it all` paper > > No, slide to the SQL standard is always better, i think this is a common > routine of our Flink SQL now, without a standard, everyone can give a > preference and the discussion would easily go too far apart. > > > We can align the SQL windows more towards our regular DataStream API > windows, where you keyBy first and then apply a window operator. > > I don't think current DataStream window join implement the window > semantics correctly, it joins the data set first then windowing the LHS and > RHS data together, actually each input should window its data set > separately. > > As for the "key by data set first", current window TVF appends just window > attributes and thus it is very light-weight and orthorhombic, we can > combine the window TVFs with additional joins, aggregations, TopN and so on. > > In SQL, people usually describe the "KEY BY" with "GROUP BY" caluse, that > means we bind strongly the window TVF and aggregate operator together which > i would definitely vote a -1. > > As for the PARTTION BY, there are specific cases for the "SESSION" window > because a session often has a logic key there, we can extend the PARTTION > BY syntax because it is already in the SQL standard, i'm confused why a > Tumble window has a PARTITION key there ? What is the real use case ? > > -1 for "ORDER BY" because sort on un-bounded data set does not have > meanings. For un-bounded data set we already has the watermark to handle > the out-of-orderness data, and for bounded data set, we can use the regular > sort here because current table argument allows any query actually. > > Best, > Danny Chan > 在 2020年10月15日 +0800 PM5:16,[hidden email],写道: > > > > Personally, I find this easier to explain to users than telling them the > > difference why a session window has SET semantic input tables and > > tumble/sliding have ROW semantic input tables. > |
Hi Jark,
thanks for the deep investigation and communication with Calcite and Beam folks. Given the new findings, +1 to vote. Regards, Timo On 09.11.20 05:22, Jark Wu wrote: > Hi all, > > After some offline discussion and investigation with Timo and Danny, I have > updated the FLIP-145. > > FLIP-145: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function > > Here are the updates: > 1. Add SESSION window syntax and examples. > 2. Time Attribute: the returned value of window TVF will return 3 columns > now with additional window_time > which is a time attribute. Add a section of "Time Attribute Propagate" > to explain how to propagate time attributes and examples. > 3. The old window syntax will be deprecated. We may drop the old syntax in > the future but that needs another discussion. > 4. Add future work about simplifying TABLE() keyword (we already started > discussion in Calcite [1]) and supporting COUNT window. > > Besides, we also investigated whether it is possible to use a nested type > "window(start, end, time)" instead of 3 columns. > However, there are some problems that are not possible for now. > - `window.start` can’t be selected in the group by query, because it is not > grouped. > Postgres supports selecting nested fields for grouped ROW columns. We > can fix this in Calcite, but this isn't a trivial work. > - WINDOW is a token in the parser, can’t be used as a column name. > Otherwise, the parsing for OVER WINDOW will fail. > - Apache Beam also considered to put wstart and wend in a separate nested > row [2]. However, that would limit these extensions > to engines supporting nested rows. Many systems don't support nested rows > well. > > Therefore, we still insist on using three fields. > > I would like to start a new VOTE for the updated FLIP-145 if there are no > objections. > > Best, > Jark > > [1]: > https://lists.apache.org/x/thread.html/ra98db08e280ddd9adeef62f456f61aedfdf7756e215cb4d66e2a52c9@%3Cdev.calcite.apache.org%3E > [2]: > https://docs.google.com/document/d/138uA7VTpbF84CFrd--cz3YVe0-AQ9ALnsavaSE2JeE4/edit?disco=AAAAHJ0EnGI > > > On Thu, 15 Oct 2020 at 21:03, Danny Chan <[hidden email]> wrote: > >> Hi, Timo ~ >> >>> We are not forced by >> the standard to do it as stated in the `One SQL to Rule it all` paper >> >> No, slide to the SQL standard is always better, i think this is a common >> routine of our Flink SQL now, without a standard, everyone can give a >> preference and the discussion would easily go too far apart. >> >>> We can align the SQL windows more towards our regular DataStream API >> windows, where you keyBy first and then apply a window operator. >> >> I don't think current DataStream window join implement the window >> semantics correctly, it joins the data set first then windowing the LHS and >> RHS data together, actually each input should window its data set >> separately. >> >> As for the "key by data set first", current window TVF appends just window >> attributes and thus it is very light-weight and orthorhombic, we can >> combine the window TVFs with additional joins, aggregations, TopN and so on. >> >> In SQL, people usually describe the "KEY BY" with "GROUP BY" caluse, that >> means we bind strongly the window TVF and aggregate operator together which >> i would definitely vote a -1. >> >> As for the PARTTION BY, there are specific cases for the "SESSION" window >> because a session often has a logic key there, we can extend the PARTTION >> BY syntax because it is already in the SQL standard, i'm confused why a >> Tumble window has a PARTITION key there ? What is the real use case ? >> >> -1 for "ORDER BY" because sort on un-bounded data set does not have >> meanings. For un-bounded data set we already has the watermark to handle >> the out-of-orderness data, and for bounded data set, we can use the regular >> sort here because current table argument allows any query actually. >> >> Best, >> Danny Chan >> 在 2020年10月15日 +0800 PM5:16,[hidden email],写道: >>> >>> Personally, I find this easier to explain to users than telling them the >>> difference why a session window has SET semantic input tables and >>> tumble/sliding have ROW semantic input tables. >> > |
Free forum by Nabble | Edit this page |