Scala / Java window issue

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Scala / Java window issue

Radu Tudoran
Hi,

I am struggling to move a working implementation from Java to Scala :(...this is for computing window aggregates (sliding window).
As I am not proficient in Scala I got block in (probably a stupid error)...maybe someone can help me.


I am trying to create a simple window function to be applied to the datastream after the window is created (I have one case with global windows and another case with keyed windows, so the question applies on both AllWindowFunction as well as to WindowFunction). However I get a typemistamtch error when applying the function to the window.

As I need to implement the function in scala... I tried 2 options, which both fail:
Option 1: implement MyWindowFunction by extending the WindowFunction from the scala package (org.apache.flink.streaming.api.scala.function)
..in this case when I apply the function to the window it tells me that the there is a typemistmatched
Option 2: implement MyWindowFunction by extending the Windowfunction from the default package (org.apache.flink.streaming.api.functions.windowing)
..in this case when I try to override the apply function I get a compilation error that the class needs to be abstract as it does not implement the apply function :(

...any solution?



Reply | Threaded
Open this post in threaded view
|

Re: Scala / Java window issue

Fabian Hueske-2
Hi Radu,

there are already several WindowFunction implementations in the Table API
that can help as a reference:

- IncrementalAggregateAllTimeWindowFunction [1]
- IncrementalAggregateAllWindowFunction [2]
- IncrementalAggregateTimeWindowFunction [3]
- IncrementalAggregateTimeWindowFunction [4]

Also have have a look at the DataStreamAggregate [5] class that assembles
the DataStream programs based on these functions.

Best, Fabian

[1]
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
[2]
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
[3]
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
[4]
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
[5]
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala

2017-03-10 18:46 GMT+01:00 Radu Tudoran <[hidden email]>:

> Hi,
>
> I am struggling to move a working implementation from Java to Scala
> :(...this is for computing window aggregates (sliding window).
> As I am not proficient in Scala I got block in (probably a stupid
> error)...maybe someone can help me.
>
>
> I am trying to create a simple window function to be applied to the
> datastream after the window is created (I have one case with global windows
> and another case with keyed windows, so the question applies on both
> AllWindowFunction as well as to WindowFunction). However I get a
> typemistamtch error when applying the function to the window.
>
> As I need to implement the function in scala... I tried 2 options, which
> both fail:
> Option 1: implement MyWindowFunction by extending the WindowFunction from
> the scala package (org.apache.flink.streaming.api.scala.function)
> ..in this case when I apply the function to the window it tells me that
> the there is a typemistmatched
> Option 2: implement MyWindowFunction by extending the Windowfunction from
> the default package (org.apache.flink.streaming.api.functions.windowing)
> ..in this case when I try to override the apply function I get a
> compilation error that the class needs to be abstract as it does not
> implement the apply function :(
>
> ...any solution?
>
>
>
>