Fabian Hueske created FLINK-5656:
------------------------------------ Summary: Add processing time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL Key: FLINK-5656 URL: https://issues.apache.org/jira/browse/FLINK-5656 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Fabian Hueske The goal of this issue is to add support for OVER ROW aggregations on processing time streams to the SQL interface. Queries similar to the following should be supported: {code} SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sumB, MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS minB FROM myStream {code} The following restrictions should initially apply: - All OVER clauses in the same SELECT clause must be exactly the same. - The PARTITION BY clause is optional (no partitioning results in single threaded execution). - The ORDER BY clause may only have procTime() as parameter. procTime() is a parameterless scalar function that just indicates processing time mode. - x PRECEDING is not supported (see FLINK-5653) - FOLLOWING is not supported. The restrictions will be resolved in follow up issues. If we find that some of the restrictions are trivial to address, we can add the functionality in this issue as well. This issue includes: - Design of the DataStream operator to compute OVER ROW aggregates - Translation from Calcite's RelNode representation (LogicalProject with RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.4#6332) |
Hi Fabian,
In the next days I will start working on this issue. As soon as I have a proposal I will start sharing it for discussion. Regards, Dr. Stefano Bortoli Senior Research Engineer - Big Data and Semantic Technology Expert IT R&D Division -----Original Message----- From: Fabian Hueske (JIRA) [mailto:[hidden email]] Sent: Thursday, January 26, 2017 2:49 PM To: [hidden email] Subject: [jira] [Created] (FLINK-5656) Add processing time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL Fabian Hueske created FLINK-5656: ------------------------------------ Summary: Add processing time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL Key: FLINK-5656 URL: https://issues.apache.org/jira/browse/FLINK-5656 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Fabian Hueske The goal of this issue is to add support for OVER ROW aggregations on processing time streams to the SQL interface. Queries similar to the following should be supported: {code} SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sumB, MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS minB FROM myStream {code} The following restrictions should initially apply: - All OVER clauses in the same SELECT clause must be exactly the same. - The PARTITION BY clause is optional (no partitioning results in single threaded execution). - The ORDER BY clause may only have procTime() as parameter. procTime() is a parameterless scalar function that just indicates processing time mode. - x PRECEDING is not supported (see FLINK-5653) - FOLLOWING is not supported. The restrictions will be resolved in follow up issues. If we find that some of the restrictions are trivial to address, we can add the functionality in this issue as well. This issue includes: - Design of the DataStream operator to compute OVER ROW aggregates - Translation from Calcite's RelNode representation (LogicalProject with RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.4#6332) |
Hi Stefano,
I can assign the issue to you if you want to. Just drop a comment in JIRA. Best, Fabian 2017-01-27 9:39 GMT+01:00 Stefano Bortoli <[hidden email]>: > Hi Fabian, > > In the next days I will start working on this issue. As soon as I have a > proposal I will start sharing it for discussion. > > Regards, > Dr. Stefano Bortoli > Senior Research Engineer - Big Data and Semantic Technology Expert > IT R&D Division > > -----Original Message----- > From: Fabian Hueske (JIRA) [mailto:[hidden email]] > Sent: Thursday, January 26, 2017 2:49 PM > To: [hidden email] > Subject: [jira] [Created] (FLINK-5656) Add processing time OVER ROWS > BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > Fabian Hueske created FLINK-5656: > ------------------------------------ > > Summary: Add processing time OVER ROWS BETWEEN UNBOUNDED > PRECEDING aggregation to SQL > Key: FLINK-5656 > URL: https://issues.apache.org/jira/browse/FLINK-5656 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Fabian Hueske > > > The goal of this issue is to add support for OVER ROW aggregations on > processing time streams to the SQL interface. > > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB FROM myStream {code} > > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is > a parameterless scalar function that just indicates processing time mode. > - x PRECEDING is not supported (see FLINK-5653) > - FOLLOWING is not supported. > > The restrictions will be resolved in follow up issues. If we find that > some of the restrictions are trivial to address, we can add the > functionality in this issue as well. > > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). > > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332) > |
Ok, I just noticed that there is also the 5653 issue open, and I guess I should start working on that first.
-----Original Message----- From: Fabian Hueske [mailto:[hidden email]] Sent: Friday, January 27, 2017 10:34 AM To: [hidden email] Subject: Re: [jira] [Created] (FLINK-5656) Add processing time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL Hi Stefano, I can assign the issue to you if you want to. Just drop a comment in JIRA. Best, Fabian 2017-01-27 9:39 GMT+01:00 Stefano Bortoli <[hidden email]>: > Hi Fabian, > > In the next days I will start working on this issue. As soon as I have > a proposal I will start sharing it for discussion. > > Regards, > Dr. Stefano Bortoli > Senior Research Engineer - Big Data and Semantic Technology Expert IT > R&D Division > > -----Original Message----- > From: Fabian Hueske (JIRA) [mailto:[hidden email]] > Sent: Thursday, January 26, 2017 2:49 PM > To: [hidden email] > Subject: [jira] [Created] (FLINK-5656) Add processing time OVER ROWS > BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > Fabian Hueske created FLINK-5656: > ------------------------------------ > > Summary: Add processing time OVER ROWS BETWEEN UNBOUNDED > PRECEDING aggregation to SQL > Key: FLINK-5656 > URL: https://issues.apache.org/jira/browse/FLINK-5656 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Fabian Hueske > > > The goal of this issue is to add support for OVER ROW aggregations on > processing time streams to the SQL interface. > > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN > UNBOUNDED PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN > UNBOUNDED PRECEDING AND CURRENT ROW) AS minB FROM myStream {code} > > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in > single threaded execution). > - The ORDER BY clause may only have procTime() as parameter. > procTime() is a parameterless scalar function that just indicates processing time mode. > - x PRECEDING is not supported (see FLINK-5653) > - FOLLOWING is not supported. > > The restrictions will be resolved in follow up issues. If we find that > some of the restrictions are trivial to address, we can add the > functionality in this issue as well. > > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject > with RexOver expression). > > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332) > |
In reply to this post by Fabian Hueske-2
Hi all,
I was thinking to open a JIRA for the procTime() function so that it could be merged before and others could use it as well. What do you think? Regards, Stefano -----Original Message----- From: Fabian Hueske [mailto:[hidden email]] Sent: Friday, January 27, 2017 10:34 AM To: [hidden email] Subject: Re: [jira] [Created] (FLINK-5656) Add processing time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL Hi Stefano, I can assign the issue to you if you want to. Just drop a comment in JIRA. Best, Fabian 2017-01-27 9:39 GMT+01:00 Stefano Bortoli <[hidden email]>: > Hi Fabian, > > In the next days I will start working on this issue. As soon as I have > a proposal I will start sharing it for discussion. > > Regards, > Dr. Stefano Bortoli > Senior Research Engineer - Big Data and Semantic Technology Expert IT > R&D Division > > -----Original Message----- > From: Fabian Hueske (JIRA) [mailto:[hidden email]] > Sent: Thursday, January 26, 2017 2:49 PM > To: [hidden email] > Subject: [jira] [Created] (FLINK-5656) Add processing time OVER ROWS > BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > Fabian Hueske created FLINK-5656: > ------------------------------------ > > Summary: Add processing time OVER ROWS BETWEEN UNBOUNDED > PRECEDING aggregation to SQL > Key: FLINK-5656 > URL: https://issues.apache.org/jira/browse/FLINK-5656 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Fabian Hueske > > > The goal of this issue is to add support for OVER ROW aggregations on > processing time streams to the SQL interface. > > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN > UNBOUNDED PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN > UNBOUNDED PRECEDING AND CURRENT ROW) AS minB FROM myStream {code} > > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in > single threaded execution). > - The ORDER BY clause may only have procTime() as parameter. > procTime() is a parameterless scalar function that just indicates processing time mode. > - x PRECEDING is not supported (see FLINK-5653) > - FOLLOWING is not supported. > > The restrictions will be resolved in follow up issues. If we find that > some of the restrictions are trivial to address, we can add the > functionality in this issue as well. > > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject > with RexOver expression). > > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332) > |
Sounds good to me Stefano!
Best, Fabian 2017-02-01 13:43 GMT+01:00 Stefano Bortoli <[hidden email]>: > Hi all, > > I was thinking to open a JIRA for the procTime() function so that it could > be merged before and others could use it as well. What do you think? > > Regards, > Stefano > > > -----Original Message----- > From: Fabian Hueske [mailto:[hidden email]] > Sent: Friday, January 27, 2017 10:34 AM > To: [hidden email] > Subject: Re: [jira] [Created] (FLINK-5656) Add processing time OVER ROWS > BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > Hi Stefano, > > I can assign the issue to you if you want to. > Just drop a comment in JIRA. > > Best, Fabian > > 2017-01-27 9:39 GMT+01:00 Stefano Bortoli <[hidden email]>: > > > Hi Fabian, > > > > In the next days I will start working on this issue. As soon as I have > > a proposal I will start sharing it for discussion. > > > > Regards, > > Dr. Stefano Bortoli > > Senior Research Engineer - Big Data and Semantic Technology Expert IT > > R&D Division > > > > -----Original Message----- > > From: Fabian Hueske (JIRA) [mailto:[hidden email]] > > Sent: Thursday, January 26, 2017 2:49 PM > > To: [hidden email] > > Subject: [jira] [Created] (FLINK-5656) Add processing time OVER ROWS > > BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > > Fabian Hueske created FLINK-5656: > > ------------------------------------ > > > > Summary: Add processing time OVER ROWS BETWEEN UNBOUNDED > > PRECEDING aggregation to SQL > > Key: FLINK-5656 > > URL: https://issues.apache.org/jira/browse/FLINK-5656 > > Project: Flink > > Issue Type: Sub-task > > Components: Table API & SQL > > Reporter: Fabian Hueske > > > > > > The goal of this issue is to add support for OVER ROW aggregations on > > processing time streams to the SQL interface. > > > > Queries similar to the following should be supported: > > {code} > > SELECT > > a, > > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN > > UNBOUNDED PRECEDING AND CURRENT ROW) AS sumB, > > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN > > UNBOUNDED PRECEDING AND CURRENT ROW) AS minB FROM myStream {code} > > > > The following restrictions should initially apply: > > - All OVER clauses in the same SELECT clause must be exactly the same. > > - The PARTITION BY clause is optional (no partitioning results in > > single threaded execution). > > - The ORDER BY clause may only have procTime() as parameter. > > procTime() is a parameterless scalar function that just indicates > processing time mode. > > - x PRECEDING is not supported (see FLINK-5653) > > - FOLLOWING is not supported. > > > > The restrictions will be resolved in follow up issues. If we find that > > some of the restrictions are trivial to address, we can add the > > functionality in this issue as well. > > > > This issue includes: > > - Design of the DataStream operator to compute OVER ROW aggregates > > - Translation from Calcite's RelNode representation (LogicalProject > > with RexOver expression). > > > > > > > > > > -- > > This message was sent by Atlassian JIRA > > (v6.3.4#6332) > > > |
Hi Fabian,
After working around the rule, I am moving towards the implementation of the Aggregation function. I started working extending DataStreamRel (for which I created a Java version). However, I noticed the LogicalWindowAggregate provides the list of aggregatedCalls and other parameters. Perhaps it is a better idea to start extending this one. But I may not be aware of some implications related to this choice. What do you think? My first idea was to implement a WindowAggregateUtil class including some methods to extract and perhaps interpret window parameters (e.g. boundaries, aggregate calls, parameter pointers, etc. ) Dr. Stefano Bortoli Senior Research Engineer - Big Data and Semantic Technology Expert IT R&D Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München -----Original Message----- From: Fabian Hueske [mailto:[hidden email]] Sent: Thursday, February 02, 2017 1:48 PM To: [hidden email] Subject: Re: [jira] [Created] (FLINK-5656) Add processing time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL Sounds good to me Stefano! Best, Fabian 2017-02-01 13:43 GMT+01:00 Stefano Bortoli <[hidden email]>: > Hi all, > > I was thinking to open a JIRA for the procTime() function so that it > could be merged before and others could use it as well. What do you think? > > Regards, > Stefano > > > -----Original Message----- > From: Fabian Hueske [mailto:[hidden email]] > Sent: Friday, January 27, 2017 10:34 AM > To: [hidden email] > Subject: Re: [jira] [Created] (FLINK-5656) Add processing time OVER > ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > Hi Stefano, > > I can assign the issue to you if you want to. > Just drop a comment in JIRA. > > Best, Fabian > > 2017-01-27 9:39 GMT+01:00 Stefano Bortoli <[hidden email]>: > > > Hi Fabian, > > > > In the next days I will start working on this issue. As soon as I > > have a proposal I will start sharing it for discussion. > > > > Regards, > > Dr. Stefano Bortoli > > Senior Research Engineer - Big Data and Semantic Technology Expert > > IT R&D Division > > > > -----Original Message----- > > From: Fabian Hueske (JIRA) [mailto:[hidden email]] > > Sent: Thursday, January 26, 2017 2:49 PM > > To: [hidden email] > > Subject: [jira] [Created] (FLINK-5656) Add processing time OVER ROWS > > BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > > Fabian Hueske created FLINK-5656: > > ------------------------------------ > > > > Summary: Add processing time OVER ROWS BETWEEN > > UNBOUNDED PRECEDING aggregation to SQL > > Key: FLINK-5656 > > URL: https://issues.apache.org/jira/browse/FLINK-5656 > > Project: Flink > > Issue Type: Sub-task > > Components: Table API & SQL > > Reporter: Fabian Hueske > > > > > > The goal of this issue is to add support for OVER ROW aggregations > > on processing time streams to the SQL interface. > > > > Queries similar to the following should be supported: > > {code} > > SELECT > > a, > > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN > > UNBOUNDED PRECEDING AND CURRENT ROW) AS sumB, > > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN > > UNBOUNDED PRECEDING AND CURRENT ROW) AS minB FROM myStream {code} > > > > The following restrictions should initially apply: > > - All OVER clauses in the same SELECT clause must be exactly the same. > > - The PARTITION BY clause is optional (no partitioning results in > > single threaded execution). > > - The ORDER BY clause may only have procTime() as parameter. > > procTime() is a parameterless scalar function that just indicates > processing time mode. > > - x PRECEDING is not supported (see FLINK-5653) > > - FOLLOWING is not supported. > > > > The restrictions will be resolved in follow up issues. If we find > > that some of the restrictions are trivial to address, we can add the > > functionality in this issue as well. > > > > This issue includes: > > - Design of the DataStream operator to compute OVER ROW aggregates > > - Translation from Calcite's RelNode representation (LogicalProject > > with RexOver expression). > > > > > > > > > > -- > > This message was sent by Atlassian JIRA > > (v6.3.4#6332) > > > |
Hi Stefano,
I don't think we should integrate this with LogicalWindowAggregate which is meant for GroupBy windows and not Over windows. Moreover, LogicalWindowAggregate is on the logical plan level but we need to implement a physical operator, i.e., a DataStreamRel. Calcite parses the SQL query into the logical representation already. The windowing semantics is captured in the LogicalProject / LogicalCalc. Radu pointed [1] out that it makes sense to apply a rule to extract the window semantics into a LogicalWindow with a Calcite optimization rule. From there we should add DataStreamRel that creates the required DataStream transformations and functions and the corresponding translation rule that converts LogicalWindow / LogicalCalc into the DataStreamRel. Btw. It would be good to move this discussion to the JIRA issue. Just replying to the CREATE mail will not mirror the discussion on JIRA. Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-5654 2017-02-06 15:26 GMT+01:00 Stefano Bortoli <[hidden email]>: > Hi Fabian, > > After working around the rule, I am moving towards the implementation of > the Aggregation function. > > I started working extending DataStreamRel (for which I created a Java > version). However, I noticed the LogicalWindowAggregate provides the list > of aggregatedCalls and other parameters. Perhaps it is a better idea to > start extending this one. But I may not be aware of some implications > related to this choice. What do you think? > > My first idea was to implement a WindowAggregateUtil class including some > methods to extract and perhaps interpret window parameters (e.g. > boundaries, aggregate calls, parameter pointers, etc. ) > > Dr. Stefano Bortoli > Senior Research Engineer - Big Data and Semantic Technology Expert > IT R&D Division > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > European Research Center > Riesstrasse 25, 80992 München > > -----Original Message----- > From: Fabian Hueske [mailto:[hidden email]] > Sent: Thursday, February 02, 2017 1:48 PM > To: [hidden email] > Subject: Re: [jira] [Created] (FLINK-5656) Add processing time OVER ROWS > BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > Sounds good to me Stefano! > > Best, Fabian > > 2017-02-01 13:43 GMT+01:00 Stefano Bortoli <[hidden email]>: > > > Hi all, > > > > I was thinking to open a JIRA for the procTime() function so that it > > could be merged before and others could use it as well. What do you > think? > > > > Regards, > > Stefano > > > > > > -----Original Message----- > > From: Fabian Hueske [mailto:[hidden email]] > > Sent: Friday, January 27, 2017 10:34 AM > > To: [hidden email] > > Subject: Re: [jira] [Created] (FLINK-5656) Add processing time OVER > > ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > > Hi Stefano, > > > > I can assign the issue to you if you want to. > > Just drop a comment in JIRA. > > > > Best, Fabian > > > > 2017-01-27 9:39 GMT+01:00 Stefano Bortoli <[hidden email]>: > > > > > Hi Fabian, > > > > > > In the next days I will start working on this issue. As soon as I > > > have a proposal I will start sharing it for discussion. > > > > > > Regards, > > > Dr. Stefano Bortoli > > > Senior Research Engineer - Big Data and Semantic Technology Expert > > > IT R&D Division > > > > > > -----Original Message----- > > > From: Fabian Hueske (JIRA) [mailto:[hidden email]] > > > Sent: Thursday, January 26, 2017 2:49 PM > > > To: [hidden email] > > > Subject: [jira] [Created] (FLINK-5656) Add processing time OVER ROWS > > > BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > > > > Fabian Hueske created FLINK-5656: > > > ------------------------------------ > > > > > > Summary: Add processing time OVER ROWS BETWEEN > > > UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5656 > > > URL: https://issues.apache.org/jira/browse/FLINK-5656 > > > Project: Flink > > > Issue Type: Sub-task > > > Components: Table API & SQL > > > Reporter: Fabian Hueske > > > > > > > > > The goal of this issue is to add support for OVER ROW aggregations > > > on processing time streams to the SQL interface. > > > > > > Queries similar to the following should be supported: > > > {code} > > > SELECT > > > a, > > > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN > > > UNBOUNDED PRECEDING AND CURRENT ROW) AS sumB, > > > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN > > > UNBOUNDED PRECEDING AND CURRENT ROW) AS minB FROM myStream {code} > > > > > > The following restrictions should initially apply: > > > - All OVER clauses in the same SELECT clause must be exactly the same. > > > - The PARTITION BY clause is optional (no partitioning results in > > > single threaded execution). > > > - The ORDER BY clause may only have procTime() as parameter. > > > procTime() is a parameterless scalar function that just indicates > > processing time mode. > > > - x PRECEDING is not supported (see FLINK-5653) > > > - FOLLOWING is not supported. > > > > > > The restrictions will be resolved in follow up issues. If we find > > > that some of the restrictions are trivial to address, we can add the > > > functionality in this issue as well. > > > > > > This issue includes: > > > - Design of the DataStream operator to compute OVER ROW aggregates > > > - Translation from Calcite's RelNode representation (LogicalProject > > > with RexOver expression). > > > > > > > > > > > > > > > -- > > > This message was sent by Atlassian JIRA > > > (v6.3.4#6332) > > > > > > |
Thanks for confirming this. After the email I started working on it and things became more clear already. I will go ahead this way.
Dr. Stefano Bortoli Senior Research Engineer - Big Data and Semantic Technology Expert IT R&D Division -----Original Message----- From: Fabian Hueske [mailto:[hidden email]] Sent: Monday, February 06, 2017 6:37 PM To: [hidden email] Subject: Re: [jira] [Created] (FLINK-5656) Add processing time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL Hi Stefano, I don't think we should integrate this with LogicalWindowAggregate which is meant for GroupBy windows and not Over windows. Moreover, LogicalWindowAggregate is on the logical plan level but we need to implement a physical operator, i.e., a DataStreamRel. Calcite parses the SQL query into the logical representation already. The windowing semantics is captured in the LogicalProject / LogicalCalc. Radu pointed [1] out that it makes sense to apply a rule to extract the window semantics into a LogicalWindow with a Calcite optimization rule. From there we should add DataStreamRel that creates the required DataStream transformations and functions and the corresponding translation rule that converts LogicalWindow / LogicalCalc into the DataStreamRel. Btw. It would be good to move this discussion to the JIRA issue. Just replying to the CREATE mail will not mirror the discussion on JIRA. Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-5654 2017-02-06 15:26 GMT+01:00 Stefano Bortoli <[hidden email]>: > Hi Fabian, > > After working around the rule, I am moving towards the implementation > of the Aggregation function. > > I started working extending DataStreamRel (for which I created a Java > version). However, I noticed the LogicalWindowAggregate provides the > list of aggregatedCalls and other parameters. Perhaps it is a better > idea to start extending this one. But I may not be aware of some > implications related to this choice. What do you think? > > My first idea was to implement a WindowAggregateUtil class including > some methods to extract and perhaps interpret window parameters (e.g. > boundaries, aggregate calls, parameter pointers, etc. ) > > Dr. Stefano Bortoli > Senior Research Engineer - Big Data and Semantic Technology Expert IT > R&D Division > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > European Research Center > Riesstrasse 25, 80992 München > > -----Original Message----- > From: Fabian Hueske [mailto:[hidden email]] > Sent: Thursday, February 02, 2017 1:48 PM > To: [hidden email] > Subject: Re: [jira] [Created] (FLINK-5656) Add processing time OVER > ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > Sounds good to me Stefano! > > Best, Fabian > > 2017-02-01 13:43 GMT+01:00 Stefano Bortoli <[hidden email]>: > > > Hi all, > > > > I was thinking to open a JIRA for the procTime() function so that it > > could be merged before and others could use it as well. What do you > think? > > > > Regards, > > Stefano > > > > > > -----Original Message----- > > From: Fabian Hueske [mailto:[hidden email]] > > Sent: Friday, January 27, 2017 10:34 AM > > To: [hidden email] > > Subject: Re: [jira] [Created] (FLINK-5656) Add processing time OVER > > ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > > Hi Stefano, > > > > I can assign the issue to you if you want to. > > Just drop a comment in JIRA. > > > > Best, Fabian > > > > 2017-01-27 9:39 GMT+01:00 Stefano Bortoli <[hidden email]>: > > > > > Hi Fabian, > > > > > > In the next days I will start working on this issue. As soon as I > > > have a proposal I will start sharing it for discussion. > > > > > > Regards, > > > Dr. Stefano Bortoli > > > Senior Research Engineer - Big Data and Semantic Technology Expert > > > IT R&D Division > > > > > > -----Original Message----- > > > From: Fabian Hueske (JIRA) [mailto:[hidden email]] > > > Sent: Thursday, January 26, 2017 2:49 PM > > > To: [hidden email] > > > Subject: [jira] [Created] (FLINK-5656) Add processing time OVER > > > ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > > > > Fabian Hueske created FLINK-5656: > > > ------------------------------------ > > > > > > Summary: Add processing time OVER ROWS BETWEEN > > > UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5656 > > > URL: https://issues.apache.org/jira/browse/FLINK-5656 > > > Project: Flink > > > Issue Type: Sub-task > > > Components: Table API & SQL > > > Reporter: Fabian Hueske > > > > > > > > > The goal of this issue is to add support for OVER ROW aggregations > > > on processing time streams to the SQL interface. > > > > > > Queries similar to the following should be supported: > > > {code} > > > SELECT > > > a, > > > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN > > > UNBOUNDED PRECEDING AND CURRENT ROW) AS sumB, > > > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN > > > UNBOUNDED PRECEDING AND CURRENT ROW) AS minB FROM myStream {code} > > > > > > The following restrictions should initially apply: > > > - All OVER clauses in the same SELECT clause must be exactly the same. > > > - The PARTITION BY clause is optional (no partitioning results in > > > single threaded execution). > > > - The ORDER BY clause may only have procTime() as parameter. > > > procTime() is a parameterless scalar function that just indicates > > processing time mode. > > > - x PRECEDING is not supported (see FLINK-5653) > > > - FOLLOWING is not supported. > > > > > > The restrictions will be resolved in follow up issues. If we find > > > that some of the restrictions are trivial to address, we can add > > > the functionality in this issue as well. > > > > > > This issue includes: > > > - Design of the DataStream operator to compute OVER ROW aggregates > > > - Translation from Calcite's RelNode representation > > > (LogicalProject with RexOver expression). > > > > > > > > > > > > > > > -- > > > This message was sent by Atlassian JIRA > > > (v6.3.4#6332) > > > > > > |
Free forum by Nabble | Edit this page |