Hi,
Currently, Flink use Calcite for SQL parsing. So we use the StreamSQL grammer proposed by Calcite[1] which we have to use the `STREAM` keyword in SQL. For example, `SELECT * FROM Orders` is a regular standard SQL and will be translated to a batch job. If you want to statement a stream job, you have add the `STREAM` keyword, `SELECT STREAM * FROM Orders`. I'm thinking of why do we distinguish between StreamSQL and BatchSQL grammer? We already have separate high-level API for batch(DataSet) and stream(DataStream). And we have a unified Table API for batch and stream (that's great!). Why do we have to separate them again in SQL? I hope we can manipulate stream data like a table. Such as `SELECT * FROM Orders`, if Orders is a table (or run in batch execution env), then it's a batch job. If Orders is a stream (or run in stream execution env), then it's a stream job. The grammer of StreamSQL and BatchSQL is totally the same. And that is what we did in Blink SQL. The benefits if we unify the grammar : 1. Easy to use StreamSQL for anyone who knows regular SQL. There is no difference between StreamSQL and regular SQL. 2. Not blocked by Calcite. Currently, Calcite StreamSQL is not fullly supported. Not support stream-to-stream JOIN, not support window aggregate, not support aggregate without window, etc. We may need to wait for calcite to support them before we start work. As they are supported by regular SQL besides window. We can implement window via user-defined-function. So if we can use regular SQL instead of StreamSQL, we can start to work it right now and not wait for Calcite. 3. Blink SQL can merge back to community to accelerate Flink SQL evolving. Blink SQL has done most work of it. We implement UDF/UDTF/UDAF, aggregate with/without window, and stream-to-stream JOIN, and so on. 4. Window also can work in batch job. Just my thoughts :) What do you think about this ? [1] https://calcite.apache.org/docs/stream.html - Jark Wu |
Hi,
I personally would like it a lot if the SQL queries for batch and stream programs looked the same. With the decision to move the Table API on top of Calcite and also use the Calcite SQL parser Flink is somewhat tied to Calcite so I don't know whether we can add our own window constructs and teach the parser to properly read them. Maybe Fabian and Timo have more insights here since they worked on the move to Calcite. Cheers, Aljoscha +Timo looping him in directly On Tue, 16 Aug 2016 at 09:29 Jark Wu <[hidden email]> wrote: > Hi, > > Currently, Flink use Calcite for SQL parsing. So we use the StreamSQL > grammer proposed by Calcite[1] which we have to use the `STREAM` keyword in > SQL. For example, `SELECT * > FROM Orders` is a regular standard SQL and will be translated to a batch > job. If you want to statement a stream job, you have add the `STREAM` > keyword, `SELECT STREAM * > FROM Orders`. > > I'm thinking of why do we distinguish between StreamSQL and BatchSQL > grammer? We already have separate high-level API for batch(DataSet) and > stream(DataStream). And we have a unified Table API for batch and stream > (that's great!). Why do we have to separate them again in SQL? > > I hope we can manipulate stream data like a table. Such as `SELECT * > FROM Orders`, if Orders is a table (or run in batch execution env), then > it's a batch job. If Orders is a stream (or run in stream execution env), > then it's a stream job. The grammer of StreamSQL and BatchSQL is totally > the same. And that is what we did in Blink SQL. > > The benefits if we unify the grammar : > > 1. Easy to use StreamSQL for anyone who knows regular SQL. There is no > difference between StreamSQL and regular SQL. > 2. Not blocked by Calcite. Currently, Calcite StreamSQL is not fullly > supported. Not support stream-to-stream JOIN, not support window aggregate, > not support aggregate without window, etc. We may need to wait for calcite > to support them before we start work. As they are supported by regular SQL > besides window. We can implement window via user-defined-function. So if we > can use regular SQL instead of StreamSQL, we can start to work it right now > and not wait for Calcite. > 3. Blink SQL can merge back to community to accelerate Flink SQL evolving. > Blink SQL has done most work of it. We implement UDF/UDTF/UDAF, aggregate > with/without window, and stream-to-stream JOIN, and so on. > 4. Window also can work in batch job. > > Just my thoughts :) > > What do you think about this ? > > [1] https://calcite.apache.org/docs/stream.html > > - Jark Wu > > |
Hi Jark,
sorry that I didn't wrote back earlier. I wanted to talk to Fabian first about this. In general, according to Calcite's plans, even SQL queries containing the "STREAM" keyword are regular standard SQL. In theory we could omit the "STREAM" keyword as long as it is guaranteed that the generated logical plans look the same. So I'm not against having the same grammar for both batch and streaming queries. However, I think we should contribute code to Calcite if the logical representation is not there already for operators we need. We need to research how far the Calcite development is. We can implement windows via user-defined-function as it also done in Calcite streaming design document. It would be very interesting for the upcoming design phase if you could show us how you implemented your Blink SQL. For instance, how do you define windows there? Regards, Timo Am 18/08/16 um 16:34 schrieb Aljoscha Krettek: > Hi, > I personally would like it a lot if the SQL queries for batch and > stream programs looked the same. With the decision to move the Table > API on top of Calcite and also use the Calcite SQL parser Flink is > somewhat tied to Calcite so I don't know whether we can add our own > window constructs and teach the parser to properly read them. > > Maybe Fabian and Timo have more insights here since they worked on the > move to Calcite. > > Cheers, > Aljoscha > > +Timo looping him in directly > > On Tue, 16 Aug 2016 at 09:29 Jark Wu <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi, > > Currently, Flink use Calcite for SQL parsing. So we use the > StreamSQL grammer proposed by Calcite[1] which we have to use the > `STREAM` keyword in SQL. For example, `SELECT * > FROM Orders` is a regular standard SQL and will be translated to a > batch job. If you want to statement a stream job, you have add the > `STREAM` keyword, `SELECT STREAM * > FROM Orders`. > > I'm thinking of why do we distinguish between StreamSQL and > BatchSQL grammer? We already have separate high-level API for > batch(DataSet) and stream(DataStream). And we have a unified Table > API for batch and stream (that's great!). Why do we have to > separate them again in SQL? > > I hope we can manipulate stream data like a table. Such as `SELECT * > FROM Orders`, if Orders is a table (or run in batch execution > env), then it's a batch job. If Orders is a stream (or run in > stream execution env), then it's a stream job. The grammer of > StreamSQL and BatchSQL is totally the same. And that is what we > did in Blink SQL. > > The benefits if we unify the grammar : > > 1. Easy to use StreamSQL for anyone who knows regular SQL. There > is no difference between StreamSQL and regular SQL. > 2. Not blocked by Calcite. Currently, Calcite StreamSQL is not > fullly supported. Not support stream-to-stream JOIN, not support > window aggregate, not support aggregate without window, etc. We > may need to wait for calcite to support them before we start work. > As they are supported by regular SQL besides window. We can > implement window via user-defined-function. So if we can use > regular SQL instead of StreamSQL, we can start to work it right > now and not wait for Calcite. > 3. Blink SQL can merge back to community to accelerate Flink SQL > evolving. Blink SQL has done most work of it. We implement > UDF/UDTF/UDAF, aggregate with/without window, and stream-to-stream > JOIN, and so on. > 4. Window also can work in batch job. > > Just my thoughts :) > > What do you think about this ? > > [1] https://calcite.apache.org/docs/stream.html > > - Jark Wu > -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr |
Hi Jark,
thanks for starting this discussion. Actually, I think we are rather "blocked" on the internal handling of streaming windows in Calcite than the SQL parser. IMO, it should be possible to exchange or modify the parser if we want that. Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, Calcite closely follows the SQL standard (e.g.,no special keywords like WINDOW. Instead stream specific aspects like tumbling windows are done as functions such as TUMBLE [1]). One main motivation of the Calcite community is to have the same syntax for streaming and static tables. This includes support for tables which are static and streaming at the same time (the example of [1] is a table about orders to which new order records are added). When querying such a table, the STREAM keyword is required to distinguish the cases of a batch query which returns a result set and a standing query which returns a result stream. In the context of Flink we can can do the distinction using the type of the TableEnvironment. So we could use the batch parser, but would need to change a couple things internally and add checks for proper grouping on the timestamp column when doing windows, etc. So far the discussion about the StreamSQL syntax rather focused on the question whether 1) StreamSQL should follow the SQL standard (as Calcite proposes) or 2) whether Flink should use a custom syntax with stream specific features. For instance a tumbling window is expressed in the GROUP BY clause [1] when following standard SQL but it could be defined using a special WINDOW keyword in a custom StreamSQL dialect. You are right that we have a dependency on Calcite. However, I think this dependency is rather in the internals than the parser, i.e., how does the validator/optimizer support and handle monotone / quasi-monotone attributes and windows. I am not sure how much is already supported but the Calcite community is working on this [2]. I think we need these features in Calcite unless we want to completely remove our dependency on Calcite for StreamSQL. I would not be in favor of removing Calcite at this point. We put a lot of effort into refactoring the Table API internals. Instead we should start to talk to the Calcite community and see how far they are, what is missing, and how we can help. I will start a discussion on the Calcite dev mailing list in the next days and ask about the status of StreamSQL. Best, Fabian [1] http://calcite.apache.org/docs/stream.html#tumbling-windows-improved [2] https://issues.apache.org/jira/browse/CALCITE-1345 |
Hi,
I did a bit of prototyping yesterday to check to what extend Calcite supports window operations on streams if we would implement them for the Table API. For the Table API we do not go through Calcite's SQL parser and validator, but generate the logical plan (tree of RelNodes) ourselves mostly using Calcite's Relbuilder. It turns out that Calcite does not restrict grouped aggregations on streams at this abstraction level, i.e., it does not perform any checks. I think it should be possible to implement windowed aggregates for the Table API. Once CALCITE-1345 [1] is implemented (and released), windowed aggregates are also supported by the SQL parser, validator, and optimizer. In order to make them work with our implementation we would need to adapt our solution to it (only internally), but I am sure we could reuse a lot of our initial implementation (Table API, validation, execution). I drafted an API proposal a few months ago [2] and could convert this into a FLIP to discuss the API and break it down into subtasks. What do you think? Cheers, Fabian [1] https://issues.apache.org/jira/browse/CALCITE-1345 [2] https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o3AyCh2ePqr3V5E 2016-08-19 11:04 GMT+02:00 Fabian Hueske <[hidden email]>: > Hi Jark, > > thanks for starting this discussion. Actually, I think we are rather > "blocked" on the internal handling of streaming windows in Calcite than the > SQL parser. IMO, it should be possible to exchange or modify the parser if > we want that. > > Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, > Calcite closely follows the SQL standard (e.g.,no special keywords like > WINDOW. Instead stream specific aspects like tumbling windows are done as > functions such as TUMBLE [1]). One main motivation of the Calcite community > is to have the same syntax for streaming and static tables. This includes > support for tables which are static and streaming at the same time (the > example of [1] is a table about orders to which new order records are > added). When querying such a table, the STREAM keyword is required to > distinguish the cases of a batch query which returns a result set and a > standing query which returns a result stream. In the context of Flink we > can can do the distinction using the type of the TableEnvironment. So we > could use the batch parser, but would need to change a couple things > internally and add checks for proper grouping on the timestamp column when > doing windows, etc. So far the discussion about the StreamSQL syntax rather > focused on the question whether 1) StreamSQL should follow the SQL standard > (as Calcite proposes) or 2) whether Flink should use a custom syntax with > stream specific features. For instance a tumbling window is expressed in > the GROUP BY clause [1] when following standard SQL but it could be defined > using a special WINDOW keyword in a custom StreamSQL dialect. > > You are right that we have a dependency on Calcite. However, I think this > dependency is rather in the internals than the parser, i.e., how does the > validator/optimizer support and handle monotone / quasi-monotone attributes > and windows. I am not sure how much is already supported but the Calcite > community is working on this [2]. I think we need these features in Calcite > unless we want to completely remove our dependency on Calcite for > StreamSQL. I would not be in favor of removing Calcite at this point. We > put a lot of effort into refactoring the Table API internals. Instead we > should start to talk to the Calcite community and see how far they are, > what is missing, and how we can help. > > I will start a discussion on the Calcite dev mailing list in the next days > and ask about the status of StreamSQL. > > Best, > Fabian > > [1] http://calcite.apache.org/docs/stream.html#tumbling-windows-improved > [2] https://issues.apache.org/jira/browse/CALCITE-1345 > |
On Tue, Aug 23, 2016 at 9:47 AM, Fabian Hueske <[hidden email]> wrote:
> I drafted an API proposal a few months ago [2] and could convert this into > a FLIP to discuss the API and break it down into subtasks. > > What do you think? Sounds very reasonable :) +1 |
In reply to this post by Fabian Hueske-2
Hi Fabian, Timo,
Sorry for the late response. Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM keyword and no agg-without-window. Which makes different syntax for streaming and static tables. I don’t think Flink should have a custom SQL syntax, but it’s better to have a consistent syntax for batch and streaming. Regarding window syntax , I think it’s good and reasonable to follow Calcite’s syntax. Actually, we implement Blink SQL Window following Calcite’s syntax[1]. In addition, I describe the Blink SQL design including UDF, UDTF, UDAF, Window in google doc[1]. Hope that can help for the upcoming Flink SQL design. +1 for creating FLIP [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek - Jark Wu > 在 2016年8月23日,下午3:47,Fabian Hueske <[hidden email]> 写道: > > Hi, > > I did a bit of prototyping yesterday to check to what extend Calcite > supports window operations on streams if we would implement them for the > Table API. > For the Table API we do not go through Calcite's SQL parser and validator, > but generate the logical plan (tree of RelNodes) ourselves mostly using > Calcite's Relbuilder. > It turns out that Calcite does not restrict grouped aggregations on streams > at this abstraction level, i.e., it does not perform any checks. > > I think it should be possible to implement windowed aggregates for the > Table API. Once CALCITE-1345 [1] is implemented (and released), windowed > aggregates are also supported by the SQL parser, validator, and optimizer. > In order to make them work with our implementation we would need to adapt > our solution to it (only internally), but I am sure we could reuse a lot of > our initial implementation (Table API, validation, execution). > > I drafted an API proposal a few months ago [2] and could convert this into > a FLIP to discuss the API and break it down into subtasks. > > What do you think? > > Cheers, Fabian > > [1] https://issues.apache.org/jira/browse/CALCITE-1345 > [2] > https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o3AyCh2ePqr3V5E > > 2016-08-19 11:04 GMT+02:00 Fabian Hueske <[hidden email]>: > >> Hi Jark, >> >> thanks for starting this discussion. Actually, I think we are rather >> "blocked" on the internal handling of streaming windows in Calcite than the >> SQL parser. IMO, it should be possible to exchange or modify the parser if >> we want that. >> >> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, >> Calcite closely follows the SQL standard (e.g.,no special keywords like >> WINDOW. Instead stream specific aspects like tumbling windows are done as >> functions such as TUMBLE [1]). One main motivation of the Calcite community >> is to have the same syntax for streaming and static tables. This includes >> support for tables which are static and streaming at the same time (the >> example of [1] is a table about orders to which new order records are >> added). When querying such a table, the STREAM keyword is required to >> distinguish the cases of a batch query which returns a result set and a >> standing query which returns a result stream. In the context of Flink we >> can can do the distinction using the type of the TableEnvironment. So we >> could use the batch parser, but would need to change a couple things >> internally and add checks for proper grouping on the timestamp column when >> doing windows, etc. So far the discussion about the StreamSQL syntax rather >> focused on the question whether 1) StreamSQL should follow the SQL standard >> (as Calcite proposes) or 2) whether Flink should use a custom syntax with >> stream specific features. For instance a tumbling window is expressed in >> the GROUP BY clause [1] when following standard SQL but it could be defined >> using a special WINDOW keyword in a custom StreamSQL dialect. >> >> You are right that we have a dependency on Calcite. However, I think this >> dependency is rather in the internals than the parser, i.e., how does the >> validator/optimizer support and handle monotone / quasi-monotone attributes >> and windows. I am not sure how much is already supported but the Calcite >> community is working on this [2]. I think we need these features in Calcite >> unless we want to completely remove our dependency on Calcite for >> StreamSQL. I would not be in favor of removing Calcite at this point. We >> put a lot of effort into refactoring the Table API internals. Instead we >> should start to talk to the Calcite community and see how far they are, >> what is missing, and how we can help. >> >> I will start a discussion on the Calcite dev mailing list in the next days >> and ask about the status of StreamSQL. >> >> Best, >> Fabian >> >> [1] http://calcite.apache.org/docs/stream.html#tumbling-windows-improved >> [2] https://issues.apache.org/jira/browse/CALCITE-1345 >> |
Hi Jark,
your design document looks very promising. It would be great if you could contribute parts of your implementation back. E.g. UDTFs, UDAFs or even your CROSS APPLY operator. We don't need a FLIP for every little new feature. So feel free to create a Jira issue, discuss it a little bit and open a PR. I'm also in favor of following Calcite's syntax as it might be used in other frameworks as well. The user should not learn new syntax when coming from Storm, Samza etc. Timo Am 23/08/16 um 13:09 schrieb Jark Wu: > Hi Fabian, Timo, > > Sorry for the late response. > > Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM keyword and no agg-without-window. Which makes different syntax for streaming and static tables. I don’t think Flink should have a custom SQL syntax, but it’s better to have a consistent syntax for batch and streaming. Regarding window syntax , I think it’s good and reasonable to follow Calcite’s syntax. Actually, we implement Blink SQL Window following Calcite’s syntax[1]. > > In addition, I describe the Blink SQL design including UDF, UDTF, UDAF, Window in google doc[1]. Hope that can help for the upcoming Flink SQL design. > > +1 for creating FLIP > > [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek > > > - Jark Wu > >> 在 2016年8月23日,下午3:47,Fabian Hueske <[hidden email]> 写道: >> >> Hi, >> >> I did a bit of prototyping yesterday to check to what extend Calcite >> supports window operations on streams if we would implement them for the >> Table API. >> For the Table API we do not go through Calcite's SQL parser and validator, >> but generate the logical plan (tree of RelNodes) ourselves mostly using >> Calcite's Relbuilder. >> It turns out that Calcite does not restrict grouped aggregations on streams >> at this abstraction level, i.e., it does not perform any checks. >> >> I think it should be possible to implement windowed aggregates for the >> Table API. Once CALCITE-1345 [1] is implemented (and released), windowed >> aggregates are also supported by the SQL parser, validator, and optimizer. >> In order to make them work with our implementation we would need to adapt >> our solution to it (only internally), but I am sure we could reuse a lot of >> our initial implementation (Table API, validation, execution). >> >> I drafted an API proposal a few months ago [2] and could convert this into >> a FLIP to discuss the API and break it down into subtasks. >> >> What do you think? >> >> Cheers, Fabian >> >> [1] https://issues.apache.org/jira/browse/CALCITE-1345 >> [2] >> https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o3AyCh2ePqr3V5E >> >> 2016-08-19 11:04 GMT+02:00 Fabian Hueske <[hidden email]>: >> >>> Hi Jark, >>> >>> thanks for starting this discussion. Actually, I think we are rather >>> "blocked" on the internal handling of streaming windows in Calcite than the >>> SQL parser. IMO, it should be possible to exchange or modify the parser if >>> we want that. >>> >>> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, >>> Calcite closely follows the SQL standard (e.g.,no special keywords like >>> WINDOW. Instead stream specific aspects like tumbling windows are done as >>> functions such as TUMBLE [1]). One main motivation of the Calcite community >>> is to have the same syntax for streaming and static tables. This includes >>> support for tables which are static and streaming at the same time (the >>> example of [1] is a table about orders to which new order records are >>> added). When querying such a table, the STREAM keyword is required to >>> distinguish the cases of a batch query which returns a result set and a >>> standing query which returns a result stream. In the context of Flink we >>> can can do the distinction using the type of the TableEnvironment. So we >>> could use the batch parser, but would need to change a couple things >>> internally and add checks for proper grouping on the timestamp column when >>> doing windows, etc. So far the discussion about the StreamSQL syntax rather >>> focused on the question whether 1) StreamSQL should follow the SQL standard >>> (as Calcite proposes) or 2) whether Flink should use a custom syntax with >>> stream specific features. For instance a tumbling window is expressed in >>> the GROUP BY clause [1] when following standard SQL but it could be defined >>> using a special WINDOW keyword in a custom StreamSQL dialect. >>> >>> You are right that we have a dependency on Calcite. However, I think this >>> dependency is rather in the internals than the parser, i.e., how does the >>> validator/optimizer support and handle monotone / quasi-monotone attributes >>> and windows. I am not sure how much is already supported but the Calcite >>> community is working on this [2]. I think we need these features in Calcite >>> unless we want to completely remove our dependency on Calcite for >>> StreamSQL. I would not be in favor of removing Calcite at this point. We >>> put a lot of effort into refactoring the Table API internals. Instead we >>> should start to talk to the Calcite community and see how far they are, >>> what is missing, and how we can help. >>> >>> I will start a discussion on the Calcite dev mailing list in the next days >>> and ask about the status of StreamSQL. >>> >>> Best, >>> Fabian >>> >>> [1] http://calcite.apache.org/docs/stream.html#tumbling-windows-improved >>> [2] https://issues.apache.org/jira/browse/CALCITE-1345 >>> > -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr |
In reply to this post by 伍翀(云邪)
Hi Jark,
We can think about removing the STREAM keyword or not. In principle, Calcite should allow the same windowing syntax on streaming and static tables (this is one of the main goals of Calcite). The Table API can also distinguish stream and batch without the STREAM keyword by looking at the ExecutionEnvironment. I think we would need to change the way that tables are registered in Calcite's catalog and also add more validation (check that time windows refer to a time column, etc). A prototype should help to see what the consequence of removing the STREAM keyword (which is actually, changing the table registration, the parser is the same) would be. Regarding streaming aggregates without window definition: We can certainly implement this feature in the Table API. There are a few points that need to be considered like value expiration after a certain time of update inactivity (otherwise the state might grow infinitely). But these aspects should be rather easy to solve. I think for SQL, such running aggregates are a special case of the Sliding Windows as discussed in Calcite's StreamSQL document [1]. Thanks also for the document! I'll take that into account when sketching the FLIP for streaming aggregation support. Cheers, Fabian [1] http://calcite.apache.org/docs/stream.html#sliding-windows 2016-08-23 13:09 GMT+02:00 Jark Wu <[hidden email]>: > Hi Fabian, Timo, > > Sorry for the late response. > > Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM > keyword and no agg-without-window. Which makes different syntax for > streaming and static tables. I don’t think Flink should have a custom SQL > syntax, but it’s better to have a consistent syntax for batch and > streaming. Regarding window syntax , I think it’s good and reasonable to > follow Calcite’s syntax. Actually, we implement Blink SQL Window following > Calcite’s syntax[1]. > > In addition, I describe the Blink SQL design including UDF, UDTF, UDAF, > Window in google doc[1]. Hope that can help for the upcoming Flink SQL > design. > > +1 for creating FLIP > > [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb > buVFPZWBYuY1Ek > > > - Jark Wu > > > 在 2016年8月23日,下午3:47,Fabian Hueske <[hidden email]> 写道: > > > > Hi, > > > > I did a bit of prototyping yesterday to check to what extend Calcite > > supports window operations on streams if we would implement them for the > > Table API. > > For the Table API we do not go through Calcite's SQL parser and > validator, > > but generate the logical plan (tree of RelNodes) ourselves mostly using > > Calcite's Relbuilder. > > It turns out that Calcite does not restrict grouped aggregations on > streams > > at this abstraction level, i.e., it does not perform any checks. > > > > I think it should be possible to implement windowed aggregates for the > > Table API. Once CALCITE-1345 [1] is implemented (and released), windowed > > aggregates are also supported by the SQL parser, validator, and > optimizer. > > In order to make them work with our implementation we would need to adapt > > our solution to it (only internally), but I am sure we could reuse a lot > of > > our initial implementation (Table API, validation, execution). > > > > I drafted an API proposal a few months ago [2] and could convert this > into > > a FLIP to discuss the API and break it down into subtasks. > > > > What do you think? > > > > Cheers, Fabian > > > > [1] https://issues.apache.org/jira/browse/CALCITE-1345 > > [2] > > https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o > 3AyCh2ePqr3V5E > > > > 2016-08-19 11:04 GMT+02:00 Fabian Hueske <[hidden email]>: > > > >> Hi Jark, > >> > >> thanks for starting this discussion. Actually, I think we are rather > >> "blocked" on the internal handling of streaming windows in Calcite than > the > >> SQL parser. IMO, it should be possible to exchange or modify the parser > if > >> we want that. > >> > >> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, > >> Calcite closely follows the SQL standard (e.g.,no special keywords like > >> WINDOW. Instead stream specific aspects like tumbling windows are done > as > >> functions such as TUMBLE [1]). One main motivation of the Calcite > community > >> is to have the same syntax for streaming and static tables. This > includes > >> support for tables which are static and streaming at the same time (the > >> example of [1] is a table about orders to which new order records are > >> added). When querying such a table, the STREAM keyword is required to > >> distinguish the cases of a batch query which returns a result set and a > >> standing query which returns a result stream. In the context of Flink we > >> can can do the distinction using the type of the TableEnvironment. So we > >> could use the batch parser, but would need to change a couple things > >> internally and add checks for proper grouping on the timestamp column > when > >> doing windows, etc. So far the discussion about the StreamSQL syntax > rather > >> focused on the question whether 1) StreamSQL should follow the SQL > standard > >> (as Calcite proposes) or 2) whether Flink should use a custom syntax > with > >> stream specific features. For instance a tumbling window is expressed in > >> the GROUP BY clause [1] when following standard SQL but it could be > defined > >> using a special WINDOW keyword in a custom StreamSQL dialect. > >> > >> You are right that we have a dependency on Calcite. However, I think > this > >> dependency is rather in the internals than the parser, i.e., how does > the > >> validator/optimizer support and handle monotone / quasi-monotone > attributes > >> and windows. I am not sure how much is already supported but the Calcite > >> community is working on this [2]. I think we need these features in > Calcite > >> unless we want to completely remove our dependency on Calcite for > >> StreamSQL. I would not be in favor of removing Calcite at this point. We > >> put a lot of effort into refactoring the Table API internals. Instead we > >> should start to talk to the Calcite community and see how far they are, > >> what is missing, and how we can help. > >> > >> I will start a discussion on the Calcite dev mailing list in the next > days > >> and ask about the status of StreamSQL. > >> > >> Best, > >> Fabian > >> > >> [1] http://calcite.apache.org/docs/stream.html#tumbling- > windows-improved > >> [2] https://issues.apache.org/jira/browse/CALCITE-1345 > >> > > |
Hi Fabian,
You are right, the main thing we need to change for removing STREAM keyword is the table registration. If you would like, I can do a prototype. Hi Timo, I’m glad to contribute our work back to Flink. I will look into it and create JIRAs next days. - Jark Wu > 在 2016年8月24日,上午12:13,Fabian Hueske <[hidden email]> 写道: > > Hi Jark, > > We can think about removing the STREAM keyword or not. In principle, > Calcite should allow the same windowing syntax on streaming and static > tables (this is one of the main goals of Calcite). The Table API can also > distinguish stream and batch without the STREAM keyword by looking at the > ExecutionEnvironment. > I think we would need to change the way that tables are registered in > Calcite's catalog and also add more validation (check that time windows > refer to a time column, etc). > A prototype should help to see what the consequence of removing the STREAM > keyword (which is actually, changing the table registration, the parser is > the same) would be. > > Regarding streaming aggregates without window definition: We can certainly > implement this feature in the Table API. There are a few points that need > to be considered like value expiration after a certain time of update > inactivity (otherwise the state might grow infinitely). But these aspects > should be rather easy to solve. I think for SQL, such running aggregates > are a special case of the Sliding Windows as discussed in Calcite's > StreamSQL document [1]. > > Thanks also for the document! I'll take that into account when sketching > the FLIP for streaming aggregation support. > > Cheers, Fabian > > [1] http://calcite.apache.org/docs/stream.html#sliding-windows > > 2016-08-23 13:09 GMT+02:00 Jark Wu <[hidden email]>: > >> Hi Fabian, Timo, >> >> Sorry for the late response. >> >> Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM >> keyword and no agg-without-window. Which makes different syntax for >> streaming and static tables. I don’t think Flink should have a custom SQL >> syntax, but it’s better to have a consistent syntax for batch and >> streaming. Regarding window syntax , I think it’s good and reasonable to >> follow Calcite’s syntax. Actually, we implement Blink SQL Window following >> Calcite’s syntax[1]. >> >> In addition, I describe the Blink SQL design including UDF, UDTF, UDAF, >> Window in google doc[1]. Hope that can help for the upcoming Flink SQL >> design. >> >> +1 for creating FLIP >> >> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb >> buVFPZWBYuY1Ek >> >> >> - Jark Wu >> >>> 在 2016年8月23日,下午3:47,Fabian Hueske <[hidden email]> 写道: >>> >>> Hi, >>> >>> I did a bit of prototyping yesterday to check to what extend Calcite >>> supports window operations on streams if we would implement them for the >>> Table API. >>> For the Table API we do not go through Calcite's SQL parser and >> validator, >>> but generate the logical plan (tree of RelNodes) ourselves mostly using >>> Calcite's Relbuilder. >>> It turns out that Calcite does not restrict grouped aggregations on >> streams >>> at this abstraction level, i.e., it does not perform any checks. >>> >>> I think it should be possible to implement windowed aggregates for the >>> Table API. Once CALCITE-1345 [1] is implemented (and released), windowed >>> aggregates are also supported by the SQL parser, validator, and >> optimizer. >>> In order to make them work with our implementation we would need to adapt >>> our solution to it (only internally), but I am sure we could reuse a lot >> of >>> our initial implementation (Table API, validation, execution). >>> >>> I drafted an API proposal a few months ago [2] and could convert this >> into >>> a FLIP to discuss the API and break it down into subtasks. >>> >>> What do you think? >>> >>> Cheers, Fabian >>> >>> [1] https://issues.apache.org/jira/browse/CALCITE-1345 >>> [2] >>> https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o >> 3AyCh2ePqr3V5E >>> >>> 2016-08-19 11:04 GMT+02:00 Fabian Hueske <[hidden email]>: >>> >>>> Hi Jark, >>>> >>>> thanks for starting this discussion. Actually, I think we are rather >>>> "blocked" on the internal handling of streaming windows in Calcite than >> the >>>> SQL parser. IMO, it should be possible to exchange or modify the parser >> if >>>> we want that. >>>> >>>> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, >>>> Calcite closely follows the SQL standard (e.g.,no special keywords like >>>> WINDOW. Instead stream specific aspects like tumbling windows are done >> as >>>> functions such as TUMBLE [1]). One main motivation of the Calcite >> community >>>> is to have the same syntax for streaming and static tables. This >> includes >>>> support for tables which are static and streaming at the same time (the >>>> example of [1] is a table about orders to which new order records are >>>> added). When querying such a table, the STREAM keyword is required to >>>> distinguish the cases of a batch query which returns a result set and a >>>> standing query which returns a result stream. In the context of Flink we >>>> can can do the distinction using the type of the TableEnvironment. So we >>>> could use the batch parser, but would need to change a couple things >>>> internally and add checks for proper grouping on the timestamp column >> when >>>> doing windows, etc. So far the discussion about the StreamSQL syntax >> rather >>>> focused on the question whether 1) StreamSQL should follow the SQL >> standard >>>> (as Calcite proposes) or 2) whether Flink should use a custom syntax >> with >>>> stream specific features. For instance a tumbling window is expressed in >>>> the GROUP BY clause [1] when following standard SQL but it could be >> defined >>>> using a special WINDOW keyword in a custom StreamSQL dialect. >>>> >>>> You are right that we have a dependency on Calcite. However, I think >> this >>>> dependency is rather in the internals than the parser, i.e., how does >> the >>>> validator/optimizer support and handle monotone / quasi-monotone >> attributes >>>> and windows. I am not sure how much is already supported but the Calcite >>>> community is working on this [2]. I think we need these features in >> Calcite >>>> unless we want to completely remove our dependency on Calcite for >>>> StreamSQL. I would not be in favor of removing Calcite at this point. We >>>> put a lot of effort into refactoring the Table API internals. Instead we >>>> should start to talk to the Calcite community and see how far they are, >>>> what is missing, and how we can help. >>>> >>>> I will start a discussion on the Calcite dev mailing list in the next >> days >>>> and ask about the status of StreamSQL. >>>> >>>> Best, >>>> Fabian >>>> >>>> [1] http://calcite.apache.org/docs/stream.html#tumbling- >> windows-improved >>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345 >>>> >> >> |
Starting with a prototype would be great, Jark.
We had some trouble with Calcite's StreamableTable interface anyways. A few things can be simplified if we do not declare our tables as streamable. I would try to implement DataStreamTable (and all related classes and methods) equivalent to DataSetTables if possible. Best, Fabian 2016-08-24 6:27 GMT+02:00 Jark Wu <[hidden email]>: > Hi Fabian, > > You are right, the main thing we need to change for removing STREAM > keyword is the table registration. If you would like, I can do a prototype. > > Hi Timo, > > I’m glad to contribute our work back to Flink. I will look into it and > create JIRAs next days. > > - Jark Wu > > > 在 2016年8月24日,上午12:13,Fabian Hueske <[hidden email]> 写道: > > > > Hi Jark, > > > > We can think about removing the STREAM keyword or not. In principle, > > Calcite should allow the same windowing syntax on streaming and static > > tables (this is one of the main goals of Calcite). The Table API can also > > distinguish stream and batch without the STREAM keyword by looking at the > > ExecutionEnvironment. > > I think we would need to change the way that tables are registered in > > Calcite's catalog and also add more validation (check that time windows > > refer to a time column, etc). > > A prototype should help to see what the consequence of removing the > STREAM > > keyword (which is actually, changing the table registration, the parser > is > > the same) would be. > > > > Regarding streaming aggregates without window definition: We can > certainly > > implement this feature in the Table API. There are a few points that need > > to be considered like value expiration after a certain time of update > > inactivity (otherwise the state might grow infinitely). But these aspects > > should be rather easy to solve. I think for SQL, such running aggregates > > are a special case of the Sliding Windows as discussed in Calcite's > > StreamSQL document [1]. > > > > Thanks also for the document! I'll take that into account when sketching > > the FLIP for streaming aggregation support. > > > > Cheers, Fabian > > > > [1] http://calcite.apache.org/docs/stream.html#sliding-windows > > > > 2016-08-23 13:09 GMT+02:00 Jark Wu <[hidden email]>: > > > >> Hi Fabian, Timo, > >> > >> Sorry for the late response. > >> > >> Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM > >> keyword and no agg-without-window. Which makes different syntax for > >> streaming and static tables. I don’t think Flink should have a custom > SQL > >> syntax, but it’s better to have a consistent syntax for batch and > >> streaming. Regarding window syntax , I think it’s good and reasonable to > >> follow Calcite’s syntax. Actually, we implement Blink SQL Window > following > >> Calcite’s syntax[1]. > >> > >> In addition, I describe the Blink SQL design including UDF, UDTF, UDAF, > >> Window in google doc[1]. Hope that can help for the upcoming Flink SQL > >> design. > >> > >> +1 for creating FLIP > >> > >> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb > >> buVFPZWBYuY1Ek > >> > >> > >> - Jark Wu > >> > >>> 在 2016年8月23日,下午3:47,Fabian Hueske <[hidden email]> 写道: > >>> > >>> Hi, > >>> > >>> I did a bit of prototyping yesterday to check to what extend Calcite > >>> supports window operations on streams if we would implement them for > the > >>> Table API. > >>> For the Table API we do not go through Calcite's SQL parser and > >> validator, > >>> but generate the logical plan (tree of RelNodes) ourselves mostly using > >>> Calcite's Relbuilder. > >>> It turns out that Calcite does not restrict grouped aggregations on > >> streams > >>> at this abstraction level, i.e., it does not perform any checks. > >>> > >>> I think it should be possible to implement windowed aggregates for the > >>> Table API. Once CALCITE-1345 [1] is implemented (and released), > windowed > >>> aggregates are also supported by the SQL parser, validator, and > >> optimizer. > >>> In order to make them work with our implementation we would need to > adapt > >>> our solution to it (only internally), but I am sure we could reuse a > lot > >> of > >>> our initial implementation (Table API, validation, execution). > >>> > >>> I drafted an API proposal a few months ago [2] and could convert this > >> into > >>> a FLIP to discuss the API and break it down into subtasks. > >>> > >>> What do you think? > >>> > >>> Cheers, Fabian > >>> > >>> [1] https://issues.apache.org/jira/browse/CALCITE-1345 > >>> [2] > >>> https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o > >> 3AyCh2ePqr3V5E > >>> > >>> 2016-08-19 11:04 GMT+02:00 Fabian Hueske <[hidden email]>: > >>> > >>>> Hi Jark, > >>>> > >>>> thanks for starting this discussion. Actually, I think we are rather > >>>> "blocked" on the internal handling of streaming windows in Calcite > than > >> the > >>>> SQL parser. IMO, it should be possible to exchange or modify the > parser > >> if > >>>> we want that. > >>>> > >>>> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, > >>>> Calcite closely follows the SQL standard (e.g.,no special keywords > like > >>>> WINDOW. Instead stream specific aspects like tumbling windows are done > >> as > >>>> functions such as TUMBLE [1]). One main motivation of the Calcite > >> community > >>>> is to have the same syntax for streaming and static tables. This > >> includes > >>>> support for tables which are static and streaming at the same time > (the > >>>> example of [1] is a table about orders to which new order records are > >>>> added). When querying such a table, the STREAM keyword is required to > >>>> distinguish the cases of a batch query which returns a result set and > a > >>>> standing query which returns a result stream. In the context of Flink > we > >>>> can can do the distinction using the type of the TableEnvironment. So > we > >>>> could use the batch parser, but would need to change a couple things > >>>> internally and add checks for proper grouping on the timestamp column > >> when > >>>> doing windows, etc. So far the discussion about the StreamSQL syntax > >> rather > >>>> focused on the question whether 1) StreamSQL should follow the SQL > >> standard > >>>> (as Calcite proposes) or 2) whether Flink should use a custom syntax > >> with > >>>> stream specific features. For instance a tumbling window is expressed > in > >>>> the GROUP BY clause [1] when following standard SQL but it could be > >> defined > >>>> using a special WINDOW keyword in a custom StreamSQL dialect. > >>>> > >>>> You are right that we have a dependency on Calcite. However, I think > >> this > >>>> dependency is rather in the internals than the parser, i.e., how does > >> the > >>>> validator/optimizer support and handle monotone / quasi-monotone > >> attributes > >>>> and windows. I am not sure how much is already supported but the > Calcite > >>>> community is working on this [2]. I think we need these features in > >> Calcite > >>>> unless we want to completely remove our dependency on Calcite for > >>>> StreamSQL. I would not be in favor of removing Calcite at this point. > We > >>>> put a lot of effort into refactoring the Table API internals. Instead > we > >>>> should start to talk to the Calcite community and see how far they > are, > >>>> what is missing, and how we can help. > >>>> > >>>> I will start a discussion on the Calcite dev mailing list in the next > >> days > >>>> and ask about the status of StreamSQL. > >>>> > >>>> Best, > >>>> Fabian > >>>> > >>>> [1] http://calcite.apache.org/docs/stream.html#tumbling- > >> windows-improved > >>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345 > >>>> > >> > >> > > |
Hi Fabian, Timo,
I have created a prototype for removing STREAM keyword and using batch sql parser for stream jobs. This is the working brach: https://github.com/wuchong/flink/tree/remove-stream <https://github.com/wuchong/flink/tree/remove-stream> Looking forward to your feedback. - Jark Wu > 在 2016年8月24日,下午4:56,Fabian Hueske <[hidden email]> 写道: > > Starting with a prototype would be great, Jark. > We had some trouble with Calcite's StreamableTable interface anyways. A few > things can be simplified if we do not declare our tables as streamable. > I would try to implement DataStreamTable (and all related classes and > methods) equivalent to DataSetTables if possible. > > Best, Fabian > > 2016-08-24 6:27 GMT+02:00 Jark Wu <[hidden email]>: > >> Hi Fabian, >> >> You are right, the main thing we need to change for removing STREAM >> keyword is the table registration. If you would like, I can do a prototype. >> >> Hi Timo, >> >> I’m glad to contribute our work back to Flink. I will look into it and >> create JIRAs next days. >> >> - Jark Wu >> >>> 在 2016年8月24日,上午12:13,Fabian Hueske <[hidden email]> 写道: >>> >>> Hi Jark, >>> >>> We can think about removing the STREAM keyword or not. In principle, >>> Calcite should allow the same windowing syntax on streaming and static >>> tables (this is one of the main goals of Calcite). The Table API can also >>> distinguish stream and batch without the STREAM keyword by looking at the >>> ExecutionEnvironment. >>> I think we would need to change the way that tables are registered in >>> Calcite's catalog and also add more validation (check that time windows >>> refer to a time column, etc). >>> A prototype should help to see what the consequence of removing the >> STREAM >>> keyword (which is actually, changing the table registration, the parser >> is >>> the same) would be. >>> >>> Regarding streaming aggregates without window definition: We can >> certainly >>> implement this feature in the Table API. There are a few points that need >>> to be considered like value expiration after a certain time of update >>> inactivity (otherwise the state might grow infinitely). But these aspects >>> should be rather easy to solve. I think for SQL, such running aggregates >>> are a special case of the Sliding Windows as discussed in Calcite's >>> StreamSQL document [1]. >>> >>> Thanks also for the document! I'll take that into account when sketching >>> the FLIP for streaming aggregation support. >>> >>> Cheers, Fabian >>> >>> [1] http://calcite.apache.org/docs/stream.html#sliding-windows >>> >>> 2016-08-23 13:09 GMT+02:00 Jark Wu <[hidden email]>: >>> >>>> Hi Fabian, Timo, >>>> >>>> Sorry for the late response. >>>> >>>> Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM >>>> keyword and no agg-without-window. Which makes different syntax for >>>> streaming and static tables. I don’t think Flink should have a custom >> SQL >>>> syntax, but it’s better to have a consistent syntax for batch and >>>> streaming. Regarding window syntax , I think it’s good and reasonable to >>>> follow Calcite’s syntax. Actually, we implement Blink SQL Window >> following >>>> Calcite’s syntax[1]. >>>> >>>> In addition, I describe the Blink SQL design including UDF, UDTF, UDAF, >>>> Window in google doc[1]. Hope that can help for the upcoming Flink SQL >>>> design. >>>> >>>> +1 for creating FLIP >>>> >>>> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb >>>> buVFPZWBYuY1Ek >>>> >>>> >>>> - Jark Wu >>>> >>>>> 在 2016年8月23日,下午3:47,Fabian Hueske <[hidden email]> 写道: >>>>> >>>>> Hi, >>>>> >>>>> I did a bit of prototyping yesterday to check to what extend Calcite >>>>> supports window operations on streams if we would implement them for >> the >>>>> Table API. >>>>> For the Table API we do not go through Calcite's SQL parser and >>>> validator, >>>>> but generate the logical plan (tree of RelNodes) ourselves mostly using >>>>> Calcite's Relbuilder. >>>>> It turns out that Calcite does not restrict grouped aggregations on >>>> streams >>>>> at this abstraction level, i.e., it does not perform any checks. >>>>> >>>>> I think it should be possible to implement windowed aggregates for the >>>>> Table API. Once CALCITE-1345 [1] is implemented (and released), >> windowed >>>>> aggregates are also supported by the SQL parser, validator, and >>>> optimizer. >>>>> In order to make them work with our implementation we would need to >> adapt >>>>> our solution to it (only internally), but I am sure we could reuse a >> lot >>>> of >>>>> our initial implementation (Table API, validation, execution). >>>>> >>>>> I drafted an API proposal a few months ago [2] and could convert this >>>> into >>>>> a FLIP to discuss the API and break it down into subtasks. >>>>> >>>>> What do you think? >>>>> >>>>> Cheers, Fabian >>>>> >>>>> [1] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>> [2] >>>>> https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o >>>> 3AyCh2ePqr3V5E >>>>> >>>>> 2016-08-19 11:04 GMT+02:00 Fabian Hueske <[hidden email]>: >>>>> >>>>>> Hi Jark, >>>>>> >>>>>> thanks for starting this discussion. Actually, I think we are rather >>>>>> "blocked" on the internal handling of streaming windows in Calcite >> than >>>> the >>>>>> SQL parser. IMO, it should be possible to exchange or modify the >> parser >>>> if >>>>>> we want that. >>>>>> >>>>>> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, >>>>>> Calcite closely follows the SQL standard (e.g.,no special keywords >> like >>>>>> WINDOW. Instead stream specific aspects like tumbling windows are done >>>> as >>>>>> functions such as TUMBLE [1]). One main motivation of the Calcite >>>> community >>>>>> is to have the same syntax for streaming and static tables. This >>>> includes >>>>>> support for tables which are static and streaming at the same time >> (the >>>>>> example of [1] is a table about orders to which new order records are >>>>>> added). When querying such a table, the STREAM keyword is required to >>>>>> distinguish the cases of a batch query which returns a result set and >> a >>>>>> standing query which returns a result stream. In the context of Flink >> we >>>>>> can can do the distinction using the type of the TableEnvironment. So >> we >>>>>> could use the batch parser, but would need to change a couple things >>>>>> internally and add checks for proper grouping on the timestamp column >>>> when >>>>>> doing windows, etc. So far the discussion about the StreamSQL syntax >>>> rather >>>>>> focused on the question whether 1) StreamSQL should follow the SQL >>>> standard >>>>>> (as Calcite proposes) or 2) whether Flink should use a custom syntax >>>> with >>>>>> stream specific features. For instance a tumbling window is expressed >> in >>>>>> the GROUP BY clause [1] when following standard SQL but it could be >>>> defined >>>>>> using a special WINDOW keyword in a custom StreamSQL dialect. >>>>>> >>>>>> You are right that we have a dependency on Calcite. However, I think >>>> this >>>>>> dependency is rather in the internals than the parser, i.e., how does >>>> the >>>>>> validator/optimizer support and handle monotone / quasi-monotone >>>> attributes >>>>>> and windows. I am not sure how much is already supported but the >> Calcite >>>>>> community is working on this [2]. I think we need these features in >>>> Calcite >>>>>> unless we want to completely remove our dependency on Calcite for >>>>>> StreamSQL. I would not be in favor of removing Calcite at this point. >> We >>>>>> put a lot of effort into refactoring the Table API internals. Instead >> we >>>>>> should start to talk to the Calcite community and see how far they >> are, >>>>>> what is missing, and how we can help. >>>>>> >>>>>> I will start a discussion on the Calcite dev mailing list in the next >>>> days >>>>>> and ask about the status of StreamSQL. >>>>>> >>>>>> Best, >>>>>> Fabian >>>>>> >>>>>> [1] http://calcite.apache.org/docs/stream.html#tumbling- >>>> windows-improved >>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>>> >>>> >>>> >> >> |
Hi Jark,
your code looks good and it also simplifies many parts. So the STREAM keyword is not optional but invalid now, right? What happens if there is keyword in the query? Timo Am 29/08/16 um 05:40 schrieb Jark Wu: > Hi Fabian, Timo, > > I have created a prototype for removing STREAM keyword and using batch sql parser for stream jobs. > > This is the working brach: https://github.com/wuchong/flink/tree/remove-stream <https://github.com/wuchong/flink/tree/remove-stream> > > Looking forward to your feedback. > > - Jark Wu > >> 在 2016年8月24日,下午4:56,Fabian Hueske <[hidden email]> 写道: >> >> Starting with a prototype would be great, Jark. >> We had some trouble with Calcite's StreamableTable interface anyways. A few >> things can be simplified if we do not declare our tables as streamable. >> I would try to implement DataStreamTable (and all related classes and >> methods) equivalent to DataSetTables if possible. >> >> Best, Fabian >> >> 2016-08-24 6:27 GMT+02:00 Jark Wu <[hidden email]>: >> >>> Hi Fabian, >>> >>> You are right, the main thing we need to change for removing STREAM >>> keyword is the table registration. If you would like, I can do a prototype. >>> >>> Hi Timo, >>> >>> I’m glad to contribute our work back to Flink. I will look into it and >>> create JIRAs next days. >>> >>> - Jark Wu >>> >>>> 在 2016年8月24日,上午12:13,Fabian Hueske <[hidden email]> 写道: >>>> >>>> Hi Jark, >>>> >>>> We can think about removing the STREAM keyword or not. In principle, >>>> Calcite should allow the same windowing syntax on streaming and static >>>> tables (this is one of the main goals of Calcite). The Table API can also >>>> distinguish stream and batch without the STREAM keyword by looking at the >>>> ExecutionEnvironment. >>>> I think we would need to change the way that tables are registered in >>>> Calcite's catalog and also add more validation (check that time windows >>>> refer to a time column, etc). >>>> A prototype should help to see what the consequence of removing the >>> STREAM >>>> keyword (which is actually, changing the table registration, the parser >>> is >>>> the same) would be. >>>> >>>> Regarding streaming aggregates without window definition: We can >>> certainly >>>> implement this feature in the Table API. There are a few points that need >>>> to be considered like value expiration after a certain time of update >>>> inactivity (otherwise the state might grow infinitely). But these aspects >>>> should be rather easy to solve. I think for SQL, such running aggregates >>>> are a special case of the Sliding Windows as discussed in Calcite's >>>> StreamSQL document [1]. >>>> >>>> Thanks also for the document! I'll take that into account when sketching >>>> the FLIP for streaming aggregation support. >>>> >>>> Cheers, Fabian >>>> >>>> [1] http://calcite.apache.org/docs/stream.html#sliding-windows >>>> >>>> 2016-08-23 13:09 GMT+02:00 Jark Wu <[hidden email]>: >>>> >>>>> Hi Fabian, Timo, >>>>> >>>>> Sorry for the late response. >>>>> >>>>> Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM >>>>> keyword and no agg-without-window. Which makes different syntax for >>>>> streaming and static tables. I don’t think Flink should have a custom >>> SQL >>>>> syntax, but it’s better to have a consistent syntax for batch and >>>>> streaming. Regarding window syntax , I think it’s good and reasonable to >>>>> follow Calcite’s syntax. Actually, we implement Blink SQL Window >>> following >>>>> Calcite’s syntax[1]. >>>>> >>>>> In addition, I describe the Blink SQL design including UDF, UDTF, UDAF, >>>>> Window in google doc[1]. Hope that can help for the upcoming Flink SQL >>>>> design. >>>>> >>>>> +1 for creating FLIP >>>>> >>>>> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb >>>>> buVFPZWBYuY1Ek >>>>> >>>>> >>>>> - Jark Wu >>>>> >>>>>> 在 2016年8月23日,下午3:47,Fabian Hueske <[hidden email]> 写道: >>>>>> >>>>>> Hi, >>>>>> >>>>>> I did a bit of prototyping yesterday to check to what extend Calcite >>>>>> supports window operations on streams if we would implement them for >>> the >>>>>> Table API. >>>>>> For the Table API we do not go through Calcite's SQL parser and >>>>> validator, >>>>>> but generate the logical plan (tree of RelNodes) ourselves mostly using >>>>>> Calcite's Relbuilder. >>>>>> It turns out that Calcite does not restrict grouped aggregations on >>>>> streams >>>>>> at this abstraction level, i.e., it does not perform any checks. >>>>>> >>>>>> I think it should be possible to implement windowed aggregates for the >>>>>> Table API. Once CALCITE-1345 [1] is implemented (and released), >>> windowed >>>>>> aggregates are also supported by the SQL parser, validator, and >>>>> optimizer. >>>>>> In order to make them work with our implementation we would need to >>> adapt >>>>>> our solution to it (only internally), but I am sure we could reuse a >>> lot >>>>> of >>>>>> our initial implementation (Table API, validation, execution). >>>>>> >>>>>> I drafted an API proposal a few months ago [2] and could convert this >>>>> into >>>>>> a FLIP to discuss the API and break it down into subtasks. >>>>>> >>>>>> What do you think? >>>>>> >>>>>> Cheers, Fabian >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>>> [2] >>>>>> https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o >>>>> 3AyCh2ePqr3V5E >>>>>> 2016-08-19 11:04 GMT+02:00 Fabian Hueske <[hidden email]>: >>>>>> >>>>>>> Hi Jark, >>>>>>> >>>>>>> thanks for starting this discussion. Actually, I think we are rather >>>>>>> "blocked" on the internal handling of streaming windows in Calcite >>> than >>>>> the >>>>>>> SQL parser. IMO, it should be possible to exchange or modify the >>> parser >>>>> if >>>>>>> we want that. >>>>>>> >>>>>>> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, >>>>>>> Calcite closely follows the SQL standard (e.g.,no special keywords >>> like >>>>>>> WINDOW. Instead stream specific aspects like tumbling windows are done >>>>> as >>>>>>> functions such as TUMBLE [1]). One main motivation of the Calcite >>>>> community >>>>>>> is to have the same syntax for streaming and static tables. This >>>>> includes >>>>>>> support for tables which are static and streaming at the same time >>> (the >>>>>>> example of [1] is a table about orders to which new order records are >>>>>>> added). When querying such a table, the STREAM keyword is required to >>>>>>> distinguish the cases of a batch query which returns a result set and >>> a >>>>>>> standing query which returns a result stream. In the context of Flink >>> we >>>>>>> can can do the distinction using the type of the TableEnvironment. So >>> we >>>>>>> could use the batch parser, but would need to change a couple things >>>>>>> internally and add checks for proper grouping on the timestamp column >>>>> when >>>>>>> doing windows, etc. So far the discussion about the StreamSQL syntax >>>>> rather >>>>>>> focused on the question whether 1) StreamSQL should follow the SQL >>>>> standard >>>>>>> (as Calcite proposes) or 2) whether Flink should use a custom syntax >>>>> with >>>>>>> stream specific features. For instance a tumbling window is expressed >>> in >>>>>>> the GROUP BY clause [1] when following standard SQL but it could be >>>>> defined >>>>>>> using a special WINDOW keyword in a custom StreamSQL dialect. >>>>>>> >>>>>>> You are right that we have a dependency on Calcite. However, I think >>>>> this >>>>>>> dependency is rather in the internals than the parser, i.e., how does >>>>> the >>>>>>> validator/optimizer support and handle monotone / quasi-monotone >>>>> attributes >>>>>>> and windows. I am not sure how much is already supported but the >>> Calcite >>>>>>> community is working on this [2]. I think we need these features in >>>>> Calcite >>>>>>> unless we want to completely remove our dependency on Calcite for >>>>>>> StreamSQL. I would not be in favor of removing Calcite at this point. >>> We >>>>>>> put a lot of effort into refactoring the Table API internals. Instead >>> we >>>>>>> should start to talk to the Calcite community and see how far they >>> are, >>>>>>> what is missing, and how we can help. >>>>>>> >>>>>>> I will start a discussion on the Calcite dev mailing list in the next >>>>> days >>>>>>> and ask about the status of StreamSQL. >>>>>>> >>>>>>> Best, >>>>>>> Fabian >>>>>>> >>>>>>> [1] http://calcite.apache.org/docs/stream.html#tumbling- >>>>> windows-improved >>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>>>> >>>>> >>> > -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr |
Hi Timo,
Yes, you are right. The STREAM keyword is invalid now. If there is a STREAM keyword in the query, the parser will throw "can’t convert table xxx to stream" exception. Because we register the table as a regular table not streamable. - Jark Wu > 在 2016年8月29日,下午8:13,Timo Walther <[hidden email]> 写道: > > Hi Jark, > > your code looks good and it also simplifies many parts. So the STREAM keyword is not optional but invalid now, right? What happens if there is keyword in the query? > > Timo > > > Am 29/08/16 um 05:40 schrieb Jark Wu: >> Hi Fabian, Timo, >> >> I have created a prototype for removing STREAM keyword and using batch sql parser for stream jobs. >> >> This is the working brach: https://github.com/wuchong/flink/tree/remove-stream <https://github.com/wuchong/flink/tree/remove-stream> >> >> Looking forward to your feedback. >> >> - Jark Wu >> >>> 在 2016年8月24日,下午4:56,Fabian Hueske <[hidden email]> 写道: >>> >>> Starting with a prototype would be great, Jark. >>> We had some trouble with Calcite's StreamableTable interface anyways. A few >>> things can be simplified if we do not declare our tables as streamable. >>> I would try to implement DataStreamTable (and all related classes and >>> methods) equivalent to DataSetTables if possible. >>> >>> Best, Fabian >>> >>> 2016-08-24 6:27 GMT+02:00 Jark Wu <[hidden email]>: >>> >>>> Hi Fabian, >>>> >>>> You are right, the main thing we need to change for removing STREAM >>>> keyword is the table registration. If you would like, I can do a prototype. >>>> >>>> Hi Timo, >>>> >>>> I’m glad to contribute our work back to Flink. I will look into it and >>>> create JIRAs next days. >>>> >>>> - Jark Wu >>>> >>>>> 在 2016年8月24日,上午12:13,Fabian Hueske <[hidden email]> 写道: >>>>> >>>>> Hi Jark, >>>>> >>>>> We can think about removing the STREAM keyword or not. In principle, >>>>> Calcite should allow the same windowing syntax on streaming and static >>>>> tables (this is one of the main goals of Calcite). The Table API can also >>>>> distinguish stream and batch without the STREAM keyword by looking at the >>>>> ExecutionEnvironment. >>>>> I think we would need to change the way that tables are registered in >>>>> Calcite's catalog and also add more validation (check that time windows >>>>> refer to a time column, etc). >>>>> A prototype should help to see what the consequence of removing the >>>> STREAM >>>>> keyword (which is actually, changing the table registration, the parser >>>> is >>>>> the same) would be. >>>>> >>>>> Regarding streaming aggregates without window definition: We can >>>> certainly >>>>> implement this feature in the Table API. There are a few points that need >>>>> to be considered like value expiration after a certain time of update >>>>> inactivity (otherwise the state might grow infinitely). But these aspects >>>>> should be rather easy to solve. I think for SQL, such running aggregates >>>>> are a special case of the Sliding Windows as discussed in Calcite's >>>>> StreamSQL document [1]. >>>>> >>>>> Thanks also for the document! I'll take that into account when sketching >>>>> the FLIP for streaming aggregation support. >>>>> >>>>> Cheers, Fabian >>>>> >>>>> [1] http://calcite.apache.org/docs/stream.html#sliding-windows >>>>> >>>>> 2016-08-23 13:09 GMT+02:00 Jark Wu <[hidden email]>: >>>>> >>>>>> Hi Fabian, Timo, >>>>>> >>>>>> Sorry for the late response. >>>>>> >>>>>> Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM >>>>>> keyword and no agg-without-window. Which makes different syntax for >>>>>> streaming and static tables. I don’t think Flink should have a custom >>>> SQL >>>>>> syntax, but it’s better to have a consistent syntax for batch and >>>>>> streaming. Regarding window syntax , I think it’s good and reasonable to >>>>>> follow Calcite’s syntax. Actually, we implement Blink SQL Window >>>> following >>>>>> Calcite’s syntax[1]. >>>>>> >>>>>> In addition, I describe the Blink SQL design including UDF, UDTF, UDAF, >>>>>> Window in google doc[1]. Hope that can help for the upcoming Flink SQL >>>>>> design. >>>>>> >>>>>> +1 for creating FLIP >>>>>> >>>>>> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb >>>>>> buVFPZWBYuY1Ek >>>>>> >>>>>> >>>>>> - Jark Wu >>>>>> >>>>>>> 在 2016年8月23日,下午3:47,Fabian Hueske <[hidden email]> 写道: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I did a bit of prototyping yesterday to check to what extend Calcite >>>>>>> supports window operations on streams if we would implement them for >>>> the >>>>>>> Table API. >>>>>>> For the Table API we do not go through Calcite's SQL parser and >>>>>> validator, >>>>>>> but generate the logical plan (tree of RelNodes) ourselves mostly using >>>>>>> Calcite's Relbuilder. >>>>>>> It turns out that Calcite does not restrict grouped aggregations on >>>>>> streams >>>>>>> at this abstraction level, i.e., it does not perform any checks. >>>>>>> >>>>>>> I think it should be possible to implement windowed aggregates for the >>>>>>> Table API. Once CALCITE-1345 [1] is implemented (and released), >>>> windowed >>>>>>> aggregates are also supported by the SQL parser, validator, and >>>>>> optimizer. >>>>>>> In order to make them work with our implementation we would need to >>>> adapt >>>>>>> our solution to it (only internally), but I am sure we could reuse a >>>> lot >>>>>> of >>>>>>> our initial implementation (Table API, validation, execution). >>>>>>> >>>>>>> I drafted an API proposal a few months ago [2] and could convert this >>>>>> into >>>>>>> a FLIP to discuss the API and break it down into subtasks. >>>>>>> >>>>>>> What do you think? >>>>>>> >>>>>>> Cheers, Fabian >>>>>>> >>>>>>> [1] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>>>> [2] >>>>>>> https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o >>>>>> 3AyCh2ePqr3V5E >>>>>>> 2016-08-19 11:04 GMT+02:00 Fabian Hueske <[hidden email]>: >>>>>>> >>>>>>>> Hi Jark, >>>>>>>> >>>>>>>> thanks for starting this discussion. Actually, I think we are rather >>>>>>>> "blocked" on the internal handling of streaming windows in Calcite >>>> than >>>>>> the >>>>>>>> SQL parser. IMO, it should be possible to exchange or modify the >>>> parser >>>>>> if >>>>>>>> we want that. >>>>>>>> >>>>>>>> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, >>>>>>>> Calcite closely follows the SQL standard (e.g.,no special keywords >>>> like >>>>>>>> WINDOW. Instead stream specific aspects like tumbling windows are done >>>>>> as >>>>>>>> functions such as TUMBLE [1]). One main motivation of the Calcite >>>>>> community >>>>>>>> is to have the same syntax for streaming and static tables. This >>>>>> includes >>>>>>>> support for tables which are static and streaming at the same time >>>> (the >>>>>>>> example of [1] is a table about orders to which new order records are >>>>>>>> added). When querying such a table, the STREAM keyword is required to >>>>>>>> distinguish the cases of a batch query which returns a result set and >>>> a >>>>>>>> standing query which returns a result stream. In the context of Flink >>>> we >>>>>>>> can can do the distinction using the type of the TableEnvironment. So >>>> we >>>>>>>> could use the batch parser, but would need to change a couple things >>>>>>>> internally and add checks for proper grouping on the timestamp column >>>>>> when >>>>>>>> doing windows, etc. So far the discussion about the StreamSQL syntax >>>>>> rather >>>>>>>> focused on the question whether 1) StreamSQL should follow the SQL >>>>>> standard >>>>>>>> (as Calcite proposes) or 2) whether Flink should use a custom syntax >>>>>> with >>>>>>>> stream specific features. For instance a tumbling window is expressed >>>> in >>>>>>>> the GROUP BY clause [1] when following standard SQL but it could be >>>>>> defined >>>>>>>> using a special WINDOW keyword in a custom StreamSQL dialect. >>>>>>>> >>>>>>>> You are right that we have a dependency on Calcite. However, I think >>>>>> this >>>>>>>> dependency is rather in the internals than the parser, i.e., how does >>>>>> the >>>>>>>> validator/optimizer support and handle monotone / quasi-monotone >>>>>> attributes >>>>>>>> and windows. I am not sure how much is already supported but the >>>> Calcite >>>>>>>> community is working on this [2]. I think we need these features in >>>>>> Calcite >>>>>>>> unless we want to completely remove our dependency on Calcite for >>>>>>>> StreamSQL. I would not be in favor of removing Calcite at this point. >>>> We >>>>>>>> put a lot of effort into refactoring the Table API internals. Instead >>>> we >>>>>>>> should start to talk to the Calcite community and see how far they >>>> are, >>>>>>>> what is missing, and how we can help. >>>>>>>> >>>>>>>> I will start a discussion on the Calcite dev mailing list in the next >>>>>> days >>>>>>>> and ask about the status of StreamSQL. >>>>>>>> >>>>>>>> Best, >>>>>>>> Fabian >>>>>>>> >>>>>>>> [1] http://calcite.apache.org/docs/stream.html#tumbling- >>>>>> windows-improved >>>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>>>>> >>>>>> >>>> >> > > > -- > Freundliche Grüße / Kind Regards > > Timo Walther > > Follow me: @twalthr > https://www.linkedin.com/in/twalthr |
At first glance, I thought we are losing the possibility to distingish
between choosing a batch or streaming table if a TableSource implements both. Because currently you are using a StreamTableSource as default if a TableSource implements both types. I think it would be better to determine batch or stream using the type of execution environment. What do you think? Timo Am 29/08/16 um 14:31 schrieb Jark Wu: > Hi Timo, > > Yes, you are right. The STREAM keyword is invalid now. If there is a STREAM keyword in the query, the parser will throw "can’t convert table xxx to stream" exception. Because we register the table as a regular table not streamable. > > - Jark Wu > >> 在 2016年8月29日,下午8:13,Timo Walther <[hidden email]> 写道: >> >> Hi Jark, >> >> your code looks good and it also simplifies many parts. So the STREAM keyword is not optional but invalid now, right? What happens if there is keyword in the query? >> >> Timo >> >> >> Am 29/08/16 um 05:40 schrieb Jark Wu: >>> Hi Fabian, Timo, >>> >>> I have created a prototype for removing STREAM keyword and using batch sql parser for stream jobs. >>> >>> This is the working brach: https://github.com/wuchong/flink/tree/remove-stream <https://github.com/wuchong/flink/tree/remove-stream> >>> >>> Looking forward to your feedback. >>> >>> - Jark Wu >>> >>>> 在 2016年8月24日,下午4:56,Fabian Hueske <[hidden email]> 写道: >>>> >>>> Starting with a prototype would be great, Jark. >>>> We had some trouble with Calcite's StreamableTable interface anyways. A few >>>> things can be simplified if we do not declare our tables as streamable. >>>> I would try to implement DataStreamTable (and all related classes and >>>> methods) equivalent to DataSetTables if possible. >>>> >>>> Best, Fabian >>>> >>>> 2016-08-24 6:27 GMT+02:00 Jark Wu <[hidden email]>: >>>> >>>>> Hi Fabian, >>>>> >>>>> You are right, the main thing we need to change for removing STREAM >>>>> keyword is the table registration. If you would like, I can do a prototype. >>>>> >>>>> Hi Timo, >>>>> >>>>> I’m glad to contribute our work back to Flink. I will look into it and >>>>> create JIRAs next days. >>>>> >>>>> - Jark Wu >>>>> >>>>>> 在 2016年8月24日,上午12:13,Fabian Hueske <[hidden email]> 写道: >>>>>> >>>>>> Hi Jark, >>>>>> >>>>>> We can think about removing the STREAM keyword or not. In principle, >>>>>> Calcite should allow the same windowing syntax on streaming and static >>>>>> tables (this is one of the main goals of Calcite). The Table API can also >>>>>> distinguish stream and batch without the STREAM keyword by looking at the >>>>>> ExecutionEnvironment. >>>>>> I think we would need to change the way that tables are registered in >>>>>> Calcite's catalog and also add more validation (check that time windows >>>>>> refer to a time column, etc). >>>>>> A prototype should help to see what the consequence of removing the >>>>> STREAM >>>>>> keyword (which is actually, changing the table registration, the parser >>>>> is >>>>>> the same) would be. >>>>>> >>>>>> Regarding streaming aggregates without window definition: We can >>>>> certainly >>>>>> implement this feature in the Table API. There are a few points that need >>>>>> to be considered like value expiration after a certain time of update >>>>>> inactivity (otherwise the state might grow infinitely). But these aspects >>>>>> should be rather easy to solve. I think for SQL, such running aggregates >>>>>> are a special case of the Sliding Windows as discussed in Calcite's >>>>>> StreamSQL document [1]. >>>>>> >>>>>> Thanks also for the document! I'll take that into account when sketching >>>>>> the FLIP for streaming aggregation support. >>>>>> >>>>>> Cheers, Fabian >>>>>> >>>>>> [1] http://calcite.apache.org/docs/stream.html#sliding-windows >>>>>> >>>>>> 2016-08-23 13:09 GMT+02:00 Jark Wu <[hidden email]>: >>>>>> >>>>>>> Hi Fabian, Timo, >>>>>>> >>>>>>> Sorry for the late response. >>>>>>> >>>>>>> Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM >>>>>>> keyword and no agg-without-window. Which makes different syntax for >>>>>>> streaming and static tables. I don’t think Flink should have a custom >>>>> SQL >>>>>>> syntax, but it’s better to have a consistent syntax for batch and >>>>>>> streaming. Regarding window syntax , I think it’s good and reasonable to >>>>>>> follow Calcite’s syntax. Actually, we implement Blink SQL Window >>>>> following >>>>>>> Calcite’s syntax[1]. >>>>>>> >>>>>>> In addition, I describe the Blink SQL design including UDF, UDTF, UDAF, >>>>>>> Window in google doc[1]. Hope that can help for the upcoming Flink SQL >>>>>>> design. >>>>>>> >>>>>>> +1 for creating FLIP >>>>>>> >>>>>>> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb >>>>>>> buVFPZWBYuY1Ek >>>>>>> >>>>>>> >>>>>>> - Jark Wu >>>>>>> >>>>>>>> 在 2016年8月23日,下午3:47,Fabian Hueske <[hidden email]> 写道: >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I did a bit of prototyping yesterday to check to what extend Calcite >>>>>>>> supports window operations on streams if we would implement them for >>>>> the >>>>>>>> Table API. >>>>>>>> For the Table API we do not go through Calcite's SQL parser and >>>>>>> validator, >>>>>>>> but generate the logical plan (tree of RelNodes) ourselves mostly using >>>>>>>> Calcite's Relbuilder. >>>>>>>> It turns out that Calcite does not restrict grouped aggregations on >>>>>>> streams >>>>>>>> at this abstraction level, i.e., it does not perform any checks. >>>>>>>> >>>>>>>> I think it should be possible to implement windowed aggregates for the >>>>>>>> Table API. Once CALCITE-1345 [1] is implemented (and released), >>>>> windowed >>>>>>>> aggregates are also supported by the SQL parser, validator, and >>>>>>> optimizer. >>>>>>>> In order to make them work with our implementation we would need to >>>>> adapt >>>>>>>> our solution to it (only internally), but I am sure we could reuse a >>>>> lot >>>>>>> of >>>>>>>> our initial implementation (Table API, validation, execution). >>>>>>>> >>>>>>>> I drafted an API proposal a few months ago [2] and could convert this >>>>>>> into >>>>>>>> a FLIP to discuss the API and break it down into subtasks. >>>>>>>> >>>>>>>> What do you think? >>>>>>>> >>>>>>>> Cheers, Fabian >>>>>>>> >>>>>>>> [1] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>>>>> [2] >>>>>>>> https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o >>>>>>> 3AyCh2ePqr3V5E >>>>>>>> 2016-08-19 11:04 GMT+02:00 Fabian Hueske <[hidden email]>: >>>>>>>> >>>>>>>>> Hi Jark, >>>>>>>>> >>>>>>>>> thanks for starting this discussion. Actually, I think we are rather >>>>>>>>> "blocked" on the internal handling of streaming windows in Calcite >>>>> than >>>>>>> the >>>>>>>>> SQL parser. IMO, it should be possible to exchange or modify the >>>>> parser >>>>>>> if >>>>>>>>> we want that. >>>>>>>>> >>>>>>>>> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, >>>>>>>>> Calcite closely follows the SQL standard (e.g.,no special keywords >>>>> like >>>>>>>>> WINDOW. Instead stream specific aspects like tumbling windows are done >>>>>>> as >>>>>>>>> functions such as TUMBLE [1]). One main motivation of the Calcite >>>>>>> community >>>>>>>>> is to have the same syntax for streaming and static tables. This >>>>>>> includes >>>>>>>>> support for tables which are static and streaming at the same time >>>>> (the >>>>>>>>> example of [1] is a table about orders to which new order records are >>>>>>>>> added). When querying such a table, the STREAM keyword is required to >>>>>>>>> distinguish the cases of a batch query which returns a result set and >>>>> a >>>>>>>>> standing query which returns a result stream. In the context of Flink >>>>> we >>>>>>>>> can can do the distinction using the type of the TableEnvironment. So >>>>> we >>>>>>>>> could use the batch parser, but would need to change a couple things >>>>>>>>> internally and add checks for proper grouping on the timestamp column >>>>>>> when >>>>>>>>> doing windows, etc. So far the discussion about the StreamSQL syntax >>>>>>> rather >>>>>>>>> focused on the question whether 1) StreamSQL should follow the SQL >>>>>>> standard >>>>>>>>> (as Calcite proposes) or 2) whether Flink should use a custom syntax >>>>>>> with >>>>>>>>> stream specific features. For instance a tumbling window is expressed >>>>> in >>>>>>>>> the GROUP BY clause [1] when following standard SQL but it could be >>>>>>> defined >>>>>>>>> using a special WINDOW keyword in a custom StreamSQL dialect. >>>>>>>>> >>>>>>>>> You are right that we have a dependency on Calcite. However, I think >>>>>>> this >>>>>>>>> dependency is rather in the internals than the parser, i.e., how does >>>>>>> the >>>>>>>>> validator/optimizer support and handle monotone / quasi-monotone >>>>>>> attributes >>>>>>>>> and windows. I am not sure how much is already supported but the >>>>> Calcite >>>>>>>>> community is working on this [2]. I think we need these features in >>>>>>> Calcite >>>>>>>>> unless we want to completely remove our dependency on Calcite for >>>>>>>>> StreamSQL. I would not be in favor of removing Calcite at this point. >>>>> We >>>>>>>>> put a lot of effort into refactoring the Table API internals. Instead >>>>> we >>>>>>>>> should start to talk to the Calcite community and see how far they >>>>> are, >>>>>>>>> what is missing, and how we can help. >>>>>>>>> >>>>>>>>> I will start a discussion on the Calcite dev mailing list in the next >>>>>>> days >>>>>>>>> and ask about the status of StreamSQL. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Fabian >>>>>>>>> >>>>>>>>> [1] http://calcite.apache.org/docs/stream.html#tumbling- >>>>>>> windows-improved >>>>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>>>>>> >> >> -- >> Freundliche Grüße / Kind Regards >> >> Timo Walther >> >> Follow me: @twalthr >> https://www.linkedin.com/in/twalthr > -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr |
It seems that we have done that (?). The BatchTableEnvironment.registerTableSource(name, tableSource) only accept a BatchTableSource. In contrast, the StreamTableEnvironment.registerTableSource(name, tableSource) only accept a StreamTableSource. So that, if a TableSource implements both batch and stream, we will determine batch or stream by the type of table environment. I think the TableSourceITCase.testCsvTableSource in batch and stream package can explain it. Am I right ?
- Jark Wu > 在 2016年8月29日,下午8:59,Timo Walther <[hidden email]> 写道: > > At first glance, I thought we are losing the possibility to distingish between choosing a batch or streaming table if a TableSource implements both. Because currently you are using a StreamTableSource as default if a TableSource implements both types. I think it would be better to determine batch or stream using the type of execution environment. What do you think? > > Timo > > > Am 29/08/16 um 14:31 schrieb Jark Wu: >> Hi Timo, >> >> Yes, you are right. The STREAM keyword is invalid now. If there is a STREAM keyword in the query, the parser will throw "can’t convert table xxx to stream" exception. Because we register the table as a regular table not streamable. >> >> - Jark Wu >> >>> 在 2016年8月29日,下午8:13,Timo Walther <[hidden email]> 写道: >>> >>> Hi Jark, >>> >>> your code looks good and it also simplifies many parts. So the STREAM keyword is not optional but invalid now, right? What happens if there is keyword in the query? >>> >>> Timo >>> >>> >>> Am 29/08/16 um 05:40 schrieb Jark Wu: >>>> Hi Fabian, Timo, >>>> >>>> I have created a prototype for removing STREAM keyword and using batch sql parser for stream jobs. >>>> >>>> This is the working brach: https://github.com/wuchong/flink/tree/remove-stream <https://github.com/wuchong/flink/tree/remove-stream> >>>> >>>> Looking forward to your feedback. >>>> >>>> - Jark Wu >>>> >>>>> 在 2016年8月24日,下午4:56,Fabian Hueske <[hidden email]> 写道: >>>>> >>>>> Starting with a prototype would be great, Jark. >>>>> We had some trouble with Calcite's StreamableTable interface anyways. A few >>>>> things can be simplified if we do not declare our tables as streamable. >>>>> I would try to implement DataStreamTable (and all related classes and >>>>> methods) equivalent to DataSetTables if possible. >>>>> >>>>> Best, Fabian >>>>> >>>>> 2016-08-24 6:27 GMT+02:00 Jark Wu <[hidden email]>: >>>>> >>>>>> Hi Fabian, >>>>>> >>>>>> You are right, the main thing we need to change for removing STREAM >>>>>> keyword is the table registration. If you would like, I can do a prototype. >>>>>> >>>>>> Hi Timo, >>>>>> >>>>>> I’m glad to contribute our work back to Flink. I will look into it and >>>>>> create JIRAs next days. >>>>>> >>>>>> - Jark Wu >>>>>> >>>>>>> 在 2016年8月24日,上午12:13,Fabian Hueske <[hidden email]> 写道: >>>>>>> >>>>>>> Hi Jark, >>>>>>> >>>>>>> We can think about removing the STREAM keyword or not. In principle, >>>>>>> Calcite should allow the same windowing syntax on streaming and static >>>>>>> tables (this is one of the main goals of Calcite). The Table API can also >>>>>>> distinguish stream and batch without the STREAM keyword by looking at the >>>>>>> ExecutionEnvironment. >>>>>>> I think we would need to change the way that tables are registered in >>>>>>> Calcite's catalog and also add more validation (check that time windows >>>>>>> refer to a time column, etc). >>>>>>> A prototype should help to see what the consequence of removing the >>>>>> STREAM >>>>>>> keyword (which is actually, changing the table registration, the parser >>>>>> is >>>>>>> the same) would be. >>>>>>> >>>>>>> Regarding streaming aggregates without window definition: We can >>>>>> certainly >>>>>>> implement this feature in the Table API. There are a few points that need >>>>>>> to be considered like value expiration after a certain time of update >>>>>>> inactivity (otherwise the state might grow infinitely). But these aspects >>>>>>> should be rather easy to solve. I think for SQL, such running aggregates >>>>>>> are a special case of the Sliding Windows as discussed in Calcite's >>>>>>> StreamSQL document [1]. >>>>>>> >>>>>>> Thanks also for the document! I'll take that into account when sketching >>>>>>> the FLIP for streaming aggregation support. >>>>>>> >>>>>>> Cheers, Fabian >>>>>>> >>>>>>> [1] http://calcite.apache.org/docs/stream.html#sliding-windows >>>>>>> >>>>>>> 2016-08-23 13:09 GMT+02:00 Jark Wu <[hidden email]>: >>>>>>> >>>>>>>> Hi Fabian, Timo, >>>>>>>> >>>>>>>> Sorry for the late response. >>>>>>>> >>>>>>>> Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM >>>>>>>> keyword and no agg-without-window. Which makes different syntax for >>>>>>>> streaming and static tables. I don’t think Flink should have a custom >>>>>> SQL >>>>>>>> syntax, but it’s better to have a consistent syntax for batch and >>>>>>>> streaming. Regarding window syntax , I think it’s good and reasonable to >>>>>>>> follow Calcite’s syntax. Actually, we implement Blink SQL Window >>>>>> following >>>>>>>> Calcite’s syntax[1]. >>>>>>>> >>>>>>>> In addition, I describe the Blink SQL design including UDF, UDTF, UDAF, >>>>>>>> Window in google doc[1]. Hope that can help for the upcoming Flink SQL >>>>>>>> design. >>>>>>>> >>>>>>>> +1 for creating FLIP >>>>>>>> >>>>>>>> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb >>>>>>>> buVFPZWBYuY1Ek >>>>>>>> >>>>>>>> >>>>>>>> - Jark Wu >>>>>>>> >>>>>>>>> 在 2016年8月23日,下午3:47,Fabian Hueske <[hidden email]> 写道: >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I did a bit of prototyping yesterday to check to what extend Calcite >>>>>>>>> supports window operations on streams if we would implement them for >>>>>> the >>>>>>>>> Table API. >>>>>>>>> For the Table API we do not go through Calcite's SQL parser and >>>>>>>> validator, >>>>>>>>> but generate the logical plan (tree of RelNodes) ourselves mostly using >>>>>>>>> Calcite's Relbuilder. >>>>>>>>> It turns out that Calcite does not restrict grouped aggregations on >>>>>>>> streams >>>>>>>>> at this abstraction level, i.e., it does not perform any checks. >>>>>>>>> >>>>>>>>> I think it should be possible to implement windowed aggregates for the >>>>>>>>> Table API. Once CALCITE-1345 [1] is implemented (and released), >>>>>> windowed >>>>>>>>> aggregates are also supported by the SQL parser, validator, and >>>>>>>> optimizer. >>>>>>>>> In order to make them work with our implementation we would need to >>>>>> adapt >>>>>>>>> our solution to it (only internally), but I am sure we could reuse a >>>>>> lot >>>>>>>> of >>>>>>>>> our initial implementation (Table API, validation, execution). >>>>>>>>> >>>>>>>>> I drafted an API proposal a few months ago [2] and could convert this >>>>>>>> into >>>>>>>>> a FLIP to discuss the API and break it down into subtasks. >>>>>>>>> >>>>>>>>> What do you think? >>>>>>>>> >>>>>>>>> Cheers, Fabian >>>>>>>>> >>>>>>>>> [1] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>>>>>> [2] >>>>>>>>> https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o >>>>>>>> 3AyCh2ePqr3V5E >>>>>>>>> 2016-08-19 11:04 GMT+02:00 Fabian Hueske <[hidden email]>: >>>>>>>>> >>>>>>>>>> Hi Jark, >>>>>>>>>> >>>>>>>>>> thanks for starting this discussion. Actually, I think we are rather >>>>>>>>>> "blocked" on the internal handling of streaming windows in Calcite >>>>>> than >>>>>>>> the >>>>>>>>>> SQL parser. IMO, it should be possible to exchange or modify the >>>>>> parser >>>>>>>> if >>>>>>>>>> we want that. >>>>>>>>>> >>>>>>>>>> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, >>>>>>>>>> Calcite closely follows the SQL standard (e.g.,no special keywords >>>>>> like >>>>>>>>>> WINDOW. Instead stream specific aspects like tumbling windows are done >>>>>>>> as >>>>>>>>>> functions such as TUMBLE [1]). One main motivation of the Calcite >>>>>>>> community >>>>>>>>>> is to have the same syntax for streaming and static tables. This >>>>>>>> includes >>>>>>>>>> support for tables which are static and streaming at the same time >>>>>> (the >>>>>>>>>> example of [1] is a table about orders to which new order records are >>>>>>>>>> added). When querying such a table, the STREAM keyword is required to >>>>>>>>>> distinguish the cases of a batch query which returns a result set and >>>>>> a >>>>>>>>>> standing query which returns a result stream. In the context of Flink >>>>>> we >>>>>>>>>> can can do the distinction using the type of the TableEnvironment. So >>>>>> we >>>>>>>>>> could use the batch parser, but would need to change a couple things >>>>>>>>>> internally and add checks for proper grouping on the timestamp column >>>>>>>> when >>>>>>>>>> doing windows, etc. So far the discussion about the StreamSQL syntax >>>>>>>> rather >>>>>>>>>> focused on the question whether 1) StreamSQL should follow the SQL >>>>>>>> standard >>>>>>>>>> (as Calcite proposes) or 2) whether Flink should use a custom syntax >>>>>>>> with >>>>>>>>>> stream specific features. For instance a tumbling window is expressed >>>>>> in >>>>>>>>>> the GROUP BY clause [1] when following standard SQL but it could be >>>>>>>> defined >>>>>>>>>> using a special WINDOW keyword in a custom StreamSQL dialect. >>>>>>>>>> >>>>>>>>>> You are right that we have a dependency on Calcite. However, I think >>>>>>>> this >>>>>>>>>> dependency is rather in the internals than the parser, i.e., how does >>>>>>>> the >>>>>>>>>> validator/optimizer support and handle monotone / quasi-monotone >>>>>>>> attributes >>>>>>>>>> and windows. I am not sure how much is already supported but the >>>>>> Calcite >>>>>>>>>> community is working on this [2]. I think we need these features in >>>>>>>> Calcite >>>>>>>>>> unless we want to completely remove our dependency on Calcite for >>>>>>>>>> StreamSQL. I would not be in favor of removing Calcite at this point. >>>>>> We >>>>>>>>>> put a lot of effort into refactoring the Table API internals. Instead >>>>>> we >>>>>>>>>> should start to talk to the Calcite community and see how far they >>>>>> are, >>>>>>>>>> what is missing, and how we can help. >>>>>>>>>> >>>>>>>>>> I will start a discussion on the Calcite dev mailing list in the next >>>>>>>> days >>>>>>>>>> and ask about the status of StreamSQL. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Fabian >>>>>>>>>> >>>>>>>>>> [1] http://calcite.apache.org/docs/stream.html#tumbling- >>>>>>>> windows-improved >>>>>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>>>>>>> >>> >>> -- >>> Freundliche Grüße / Kind Regards >>> >>> Timo Walther >>> >>> Follow me: @twalthr >>> https://www.linkedin.com/in/twalthr >> > > > -- > Freundliche Grüße / Kind Regards > > Timo Walther > > Follow me: @twalthr > https://www.linkedin.com/in/twalthr |
Sorry, I thought that StreamTableSourceScanRule always fires but it is
only used in StreamTableEnvironment so everything is fine. I think you can open PR if you like. Timo Am 30/08/16 um 05:25 schrieb Jark Wu: > It seems that we have done that (?). The BatchTableEnvironment.registerTableSource(name, tableSource) only accept a BatchTableSource. In contrast, the StreamTableEnvironment.registerTableSource(name, tableSource) only accept a StreamTableSource. So that, if a TableSource implements both batch and stream, we will determine batch or stream by the type of table environment. I think the TableSourceITCase.testCsvTableSource in batch and stream package can explain it. Am I right ? > > - Jark Wu > >> 在 2016年8月29日,下午8:59,Timo Walther <[hidden email]> 写道: >> >> At first glance, I thought we are losing the possibility to distingish between choosing a batch or streaming table if a TableSource implements both. Because currently you are using a StreamTableSource as default if a TableSource implements both types. I think it would be better to determine batch or stream using the type of execution environment. What do you think? >> >> Timo >> >> >> Am 29/08/16 um 14:31 schrieb Jark Wu: >>> Hi Timo, >>> >>> Yes, you are right. The STREAM keyword is invalid now. If there is a STREAM keyword in the query, the parser will throw "can’t convert table xxx to stream" exception. Because we register the table as a regular table not streamable. >>> >>> - Jark Wu >>> >>>> 在 2016年8月29日,下午8:13,Timo Walther <[hidden email]> 写道: >>>> >>>> Hi Jark, >>>> >>>> your code looks good and it also simplifies many parts. So the STREAM keyword is not optional but invalid now, right? What happens if there is keyword in the query? >>>> >>>> Timo >>>> >>>> >>>> Am 29/08/16 um 05:40 schrieb Jark Wu: >>>>> Hi Fabian, Timo, >>>>> >>>>> I have created a prototype for removing STREAM keyword and using batch sql parser for stream jobs. >>>>> >>>>> This is the working brach: https://github.com/wuchong/flink/tree/remove-stream <https://github.com/wuchong/flink/tree/remove-stream> >>>>> >>>>> Looking forward to your feedback. >>>>> >>>>> - Jark Wu >>>>> >>>>>> 在 2016年8月24日,下午4:56,Fabian Hueske <[hidden email]> 写道: >>>>>> >>>>>> Starting with a prototype would be great, Jark. >>>>>> We had some trouble with Calcite's StreamableTable interface anyways. A few >>>>>> things can be simplified if we do not declare our tables as streamable. >>>>>> I would try to implement DataStreamTable (and all related classes and >>>>>> methods) equivalent to DataSetTables if possible. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> 2016-08-24 6:27 GMT+02:00 Jark Wu <[hidden email]>: >>>>>> >>>>>>> Hi Fabian, >>>>>>> >>>>>>> You are right, the main thing we need to change for removing STREAM >>>>>>> keyword is the table registration. If you would like, I can do a prototype. >>>>>>> >>>>>>> Hi Timo, >>>>>>> >>>>>>> I’m glad to contribute our work back to Flink. I will look into it and >>>>>>> create JIRAs next days. >>>>>>> >>>>>>> - Jark Wu >>>>>>> >>>>>>>> 在 2016年8月24日,上午12:13,Fabian Hueske <[hidden email]> 写道: >>>>>>>> >>>>>>>> Hi Jark, >>>>>>>> >>>>>>>> We can think about removing the STREAM keyword or not. In principle, >>>>>>>> Calcite should allow the same windowing syntax on streaming and static >>>>>>>> tables (this is one of the main goals of Calcite). The Table API can also >>>>>>>> distinguish stream and batch without the STREAM keyword by looking at the >>>>>>>> ExecutionEnvironment. >>>>>>>> I think we would need to change the way that tables are registered in >>>>>>>> Calcite's catalog and also add more validation (check that time windows >>>>>>>> refer to a time column, etc). >>>>>>>> A prototype should help to see what the consequence of removing the >>>>>>> STREAM >>>>>>>> keyword (which is actually, changing the table registration, the parser >>>>>>> is >>>>>>>> the same) would be. >>>>>>>> >>>>>>>> Regarding streaming aggregates without window definition: We can >>>>>>> certainly >>>>>>>> implement this feature in the Table API. There are a few points that need >>>>>>>> to be considered like value expiration after a certain time of update >>>>>>>> inactivity (otherwise the state might grow infinitely). But these aspects >>>>>>>> should be rather easy to solve. I think for SQL, such running aggregates >>>>>>>> are a special case of the Sliding Windows as discussed in Calcite's >>>>>>>> StreamSQL document [1]. >>>>>>>> >>>>>>>> Thanks also for the document! I'll take that into account when sketching >>>>>>>> the FLIP for streaming aggregation support. >>>>>>>> >>>>>>>> Cheers, Fabian >>>>>>>> >>>>>>>> [1] http://calcite.apache.org/docs/stream.html#sliding-windows >>>>>>>> >>>>>>>> 2016-08-23 13:09 GMT+02:00 Jark Wu <[hidden email]>: >>>>>>>> >>>>>>>>> Hi Fabian, Timo, >>>>>>>>> >>>>>>>>> Sorry for the late response. >>>>>>>>> >>>>>>>>> Regarding Calcite’s StreamSQL syntax, what I concern is only the STREAM >>>>>>>>> keyword and no agg-without-window. Which makes different syntax for >>>>>>>>> streaming and static tables. I don’t think Flink should have a custom >>>>>>> SQL >>>>>>>>> syntax, but it’s better to have a consistent syntax for batch and >>>>>>>>> streaming. Regarding window syntax , I think it’s good and reasonable to >>>>>>>>> follow Calcite’s syntax. Actually, we implement Blink SQL Window >>>>>>> following >>>>>>>>> Calcite’s syntax[1]. >>>>>>>>> >>>>>>>>> In addition, I describe the Blink SQL design including UDF, UDTF, UDAF, >>>>>>>>> Window in google doc[1]. Hope that can help for the upcoming Flink SQL >>>>>>>>> design. >>>>>>>>> >>>>>>>>> +1 for creating FLIP >>>>>>>>> >>>>>>>>> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb >>>>>>>>> buVFPZWBYuY1Ek >>>>>>>>> >>>>>>>>> >>>>>>>>> - Jark Wu >>>>>>>>> >>>>>>>>>> 在 2016年8月23日,下午3:47,Fabian Hueske <[hidden email]> 写道: >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I did a bit of prototyping yesterday to check to what extend Calcite >>>>>>>>>> supports window operations on streams if we would implement them for >>>>>>> the >>>>>>>>>> Table API. >>>>>>>>>> For the Table API we do not go through Calcite's SQL parser and >>>>>>>>> validator, >>>>>>>>>> but generate the logical plan (tree of RelNodes) ourselves mostly using >>>>>>>>>> Calcite's Relbuilder. >>>>>>>>>> It turns out that Calcite does not restrict grouped aggregations on >>>>>>>>> streams >>>>>>>>>> at this abstraction level, i.e., it does not perform any checks. >>>>>>>>>> >>>>>>>>>> I think it should be possible to implement windowed aggregates for the >>>>>>>>>> Table API. Once CALCITE-1345 [1] is implemented (and released), >>>>>>> windowed >>>>>>>>>> aggregates are also supported by the SQL parser, validator, and >>>>>>>>> optimizer. >>>>>>>>>> In order to make them work with our implementation we would need to >>>>>>> adapt >>>>>>>>>> our solution to it (only internally), but I am sure we could reuse a >>>>>>> lot >>>>>>>>> of >>>>>>>>>> our initial implementation (Table API, validation, execution). >>>>>>>>>> >>>>>>>>>> I drafted an API proposal a few months ago [2] and could convert this >>>>>>>>> into >>>>>>>>>> a FLIP to discuss the API and break it down into subtasks. >>>>>>>>>> >>>>>>>>>> What do you think? >>>>>>>>>> >>>>>>>>>> Cheers, Fabian >>>>>>>>>> >>>>>>>>>> [1] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>>>>>>> [2] >>>>>>>>>> https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o >>>>>>>>> 3AyCh2ePqr3V5E >>>>>>>>>> 2016-08-19 11:04 GMT+02:00 Fabian Hueske <[hidden email]>: >>>>>>>>>> >>>>>>>>>>> Hi Jark, >>>>>>>>>>> >>>>>>>>>>> thanks for starting this discussion. Actually, I think we are rather >>>>>>>>>>> "blocked" on the internal handling of streaming windows in Calcite >>>>>>> than >>>>>>>>> the >>>>>>>>>>> SQL parser. IMO, it should be possible to exchange or modify the >>>>>>> parser >>>>>>>>> if >>>>>>>>>>> we want that. >>>>>>>>>>> >>>>>>>>>>> Regarding Calcite's StreamSQL syntax: Except for the STREAM keyword, >>>>>>>>>>> Calcite closely follows the SQL standard (e.g.,no special keywords >>>>>>> like >>>>>>>>>>> WINDOW. Instead stream specific aspects like tumbling windows are done >>>>>>>>> as >>>>>>>>>>> functions such as TUMBLE [1]). One main motivation of the Calcite >>>>>>>>> community >>>>>>>>>>> is to have the same syntax for streaming and static tables. This >>>>>>>>> includes >>>>>>>>>>> support for tables which are static and streaming at the same time >>>>>>> (the >>>>>>>>>>> example of [1] is a table about orders to which new order records are >>>>>>>>>>> added). When querying such a table, the STREAM keyword is required to >>>>>>>>>>> distinguish the cases of a batch query which returns a result set and >>>>>>> a >>>>>>>>>>> standing query which returns a result stream. In the context of Flink >>>>>>> we >>>>>>>>>>> can can do the distinction using the type of the TableEnvironment. So >>>>>>> we >>>>>>>>>>> could use the batch parser, but would need to change a couple things >>>>>>>>>>> internally and add checks for proper grouping on the timestamp column >>>>>>>>> when >>>>>>>>>>> doing windows, etc. So far the discussion about the StreamSQL syntax >>>>>>>>> rather >>>>>>>>>>> focused on the question whether 1) StreamSQL should follow the SQL >>>>>>>>> standard >>>>>>>>>>> (as Calcite proposes) or 2) whether Flink should use a custom syntax >>>>>>>>> with >>>>>>>>>>> stream specific features. For instance a tumbling window is expressed >>>>>>> in >>>>>>>>>>> the GROUP BY clause [1] when following standard SQL but it could be >>>>>>>>> defined >>>>>>>>>>> using a special WINDOW keyword in a custom StreamSQL dialect. >>>>>>>>>>> >>>>>>>>>>> You are right that we have a dependency on Calcite. However, I think >>>>>>>>> this >>>>>>>>>>> dependency is rather in the internals than the parser, i.e., how does >>>>>>>>> the >>>>>>>>>>> validator/optimizer support and handle monotone / quasi-monotone >>>>>>>>> attributes >>>>>>>>>>> and windows. I am not sure how much is already supported but the >>>>>>> Calcite >>>>>>>>>>> community is working on this [2]. I think we need these features in >>>>>>>>> Calcite >>>>>>>>>>> unless we want to completely remove our dependency on Calcite for >>>>>>>>>>> StreamSQL. I would not be in favor of removing Calcite at this point. >>>>>>> We >>>>>>>>>>> put a lot of effort into refactoring the Table API internals. Instead >>>>>>> we >>>>>>>>>>> should start to talk to the Calcite community and see how far they >>>>>>> are, >>>>>>>>>>> what is missing, and how we can help. >>>>>>>>>>> >>>>>>>>>>> I will start a discussion on the Calcite dev mailing list in the next >>>>>>>>> days >>>>>>>>>>> and ask about the status of StreamSQL. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Fabian >>>>>>>>>>> >>>>>>>>>>> [1] http://calcite.apache.org/docs/stream.html#tumbling- >>>>>>>>> windows-improved >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345 >>>>>>>>>>> >>>> -- >>>> Freundliche Grüße / Kind Regards >>>> >>>> Timo Walther >>>> >>>> Follow me: @twalthr >>>> https://www.linkedin.com/in/twalthr >> >> -- >> Freundliche Grüße / Kind Regards >> >> Timo Walther >> >> Follow me: @twalthr >> https://www.linkedin.com/in/twalthr > -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr |
Free forum by Nabble | Edit this page |