Some questions about Table API and FlinkSQL

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Some questions about Table API and FlinkSQL

Anton Mushin
Hello all,

I have some questions about work with FlinkSQL.



1)    I'm want calculate average for column values:



val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val ds = env.fromElements(
      (1.0f, 1),
      (2.0f, 2)).toTable(tEnv)
    tEnv.registerTable("MyTable", ds)
    val sqlQuery="select avg(_1), avg(_2) from MyTable"
    tEnv.sql(sqlQuery).toDataSet[Row].collect().foreach(x=>print(s"$x "))



As result I'm getting: "1.5,1 ". But I expected: "1.5,1.5 "

Why is for columns like integer types avg function is return result as integer? Where is described this behavior?​​



2) I wanted calculate stddev_pop function like as sequences sql aggregate functions, how it is describe in calcite javadocs: https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java#L64



val ds = env.fromElements(

      (1.0f, 1),

      (1.0f, 2)).toTable(tEnv)

    tEnv.registerTable("MyTable", ds)



val sqlQuery = "SELECT " +

      "SQRT((SUM(a*a) - SUM(a)*SUM(a)/COUNT(a))/COUNT(a)) "+

      "from (select _1 as a from MyTable)"

tEnv.sql(sqlQuery).toDataSet[Row].collect().foreach(print)



I got exception:



        org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

.......

    Caused by: java.lang.Exception: The user defined 'open(Configuration)' method in class org.apache.flink.api.table.runtime.FlatMapRunner caused an exception: Table program cannot be compiled. This is a bug. Please file an issue.

         at org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337)

         at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47)

.....

    Caused by: org.codehaus.commons.compiler.CompileException: Line 59, Column 57: No applicable constructor/method found for actual parameters "float, java.math.BigDecimal"; candidates are: "public static double org.apache.calcite.runtime.SqlFunctions.power(long, java.math.BigDecimal)", "public static double org.apache.calcite.runtime.SqlFunctions.power(long, long)", "public static double org.apache.calcite.runtime.SqlFunctions.power(double, double)"

         at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:10062)

         at org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:7476)



In this time if I am execute for int column ('_2') i getting result is equals '0.0'

What am I doing wrong?



Best regards,

Anton Mushin



Reply | Threaded
Open this post in threaded view
|

Re: Some questions about Table API and FlinkSQL

Timo Walther-2
Hi Anton,

1) according to org.apache.calcite.sql.fun.SqlAvgAggFunction " the
result is the same type" so I think this is standard SQL behavior.
2) This seems to be a code generation bug. The sqrt/power function seems
not accept the data types. Would be great if you could open an issue if
it does not yet exists in Jira.

I hope that helps.

Regards,
Timo



Am 04/10/16 um 18:04 schrieb Anton Mushin:

> Hello all,
>
> I have some questions about work with FlinkSQL.
>
>
>
> 1)    I'm want calculate average for column values:
>
>
>
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> val ds = env.fromElements(
>        (1.0f, 1),
>        (2.0f, 2)).toTable(tEnv)
>      tEnv.registerTable("MyTable", ds)
>      val sqlQuery="select avg(_1), avg(_2) from MyTable"
>      tEnv.sql(sqlQuery).toDataSet[Row].collect().foreach(x=>print(s"$x "))
>
>
>
> As result I'm getting: "1.5,1 ". But I expected: "1.5,1.5 "
>
> Why is for columns like integer types avg function is return result as integer? Where is described this behavior?​​
>
>
>
> 2) I wanted calculate stddev_pop function like as sequences sql aggregate functions, how it is describe in calcite javadocs: https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java#L64
>
>
>
> val ds = env.fromElements(
>
>        (1.0f, 1),
>
>        (1.0f, 2)).toTable(tEnv)
>
>      tEnv.registerTable("MyTable", ds)
>
>
>
> val sqlQuery = "SELECT " +
>
>        "SQRT((SUM(a*a) - SUM(a)*SUM(a)/COUNT(a))/COUNT(a)) "+
>
>        "from (select _1 as a from MyTable)"
>
> tEnv.sql(sqlQuery).toDataSet[Row].collect().foreach(print)
>
> ​
>
> I got exception:
>
>
>
>          org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>
> .......
>
>      Caused by: java.lang.Exception: The user defined 'open(Configuration)' method in class org.apache.flink.api.table.runtime.FlatMapRunner caused an exception: Table program cannot be compiled. This is a bug. Please file an issue.
>
>           at org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337)
>
>           at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47)
>
> .....
>
>      Caused by: org.codehaus.commons.compiler.CompileException: Line 59, Column 57: No applicable constructor/method found for actual parameters "float, java.math.BigDecimal"; candidates are: "public static double org.apache.calcite.runtime.SqlFunctions.power(long, java.math.BigDecimal)", "public static double org.apache.calcite.runtime.SqlFunctions.power(long, long)", "public static double org.apache.calcite.runtime.SqlFunctions.power(double, double)"
>
>           at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:10062)
>
>           at org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:7476)
>
>
>
> In this time if I am execute for int column ('_2') i getting result is equals '0.0'
>
> What am I doing wrong?
>
>
>
> Best regards,
>
> Anton Mushin
>
>
>


--
Freundliche Grüße / Kind Regards

Timo Walther

Follow me: @twalthr
https://www.linkedin.com/in/twalthr

Reply | Threaded
Open this post in threaded view
|

RE: Some questions about Table API and FlinkSQL

Anton Mushin
Hi Timo,
I didn't find the similar issue in Jira and created a new one: FLINK-4743
Thanks for help!

Best regards,
Anton Mushin

-----Original Message-----
From: Timo Walther [mailto:[hidden email]]
Sent: Tuesday, October 04, 2016 8:44 PM
To: [hidden email]
Subject: Re: Some questions about Table API and FlinkSQL

Hi Anton,

1) according to org.apache.calcite.sql.fun.SqlAvgAggFunction " the result is the same type" so I think this is standard SQL behavior.
2) This seems to be a code generation bug. The sqrt/power function seems not accept the data types. Would be great if you could open an issue if it does not yet exists in Jira.

I hope that helps.

Regards,
Timo



Am 04/10/16 um 18:04 schrieb Anton Mushin:

> Hello all,
>
> I have some questions about work with FlinkSQL.
>
>
>
> 1)    I'm want calculate average for column values:
>
>
>
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds =
> env.fromElements(
>        (1.0f, 1),
>        (2.0f, 2)).toTable(tEnv)
>      tEnv.registerTable("MyTable", ds)
>      val sqlQuery="select avg(_1), avg(_2) from MyTable"
>      tEnv.sql(sqlQuery).toDataSet[Row].collect().foreach(x=>print(s"$x
> "))
>
>
>
> As result I'm getting: "1.5,1 ". But I expected: "1.5,1.5 "
>
> Why is for columns like integer types avg function is return result as
> integer? Where is described this behavior?​​
>
>
>
> 2) I wanted calculate stddev_pop function like as sequences sql
> aggregate functions, how it is describe in calcite javadocs:
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/a
> pache/calcite/rel/rules/AggregateReduceFunctionsRule.java#L64
>
>
>
> val ds = env.fromElements(
>
>        (1.0f, 1),
>
>        (1.0f, 2)).toTable(tEnv)
>
>      tEnv.registerTable("MyTable", ds)
>
>
>
> val sqlQuery = "SELECT " +
>
>        "SQRT((SUM(a*a) - SUM(a)*SUM(a)/COUNT(a))/COUNT(a)) "+
>
>        "from (select _1 as a from MyTable)"
>
> tEnv.sql(sqlQuery).toDataSet[Row].collect().foreach(print)
>
> ​
>
> I got exception:
>
>
>
>          org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>
> .......
>
>      Caused by: java.lang.Exception: The user defined 'open(Configuration)' method in class org.apache.flink.api.table.runtime.FlatMapRunner caused an exception: Table program cannot be compiled. This is a bug. Please file an issue.
>
>           at
> org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.ja
> va:1337)
>
>           at
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openT
> ask(ChainedFlatMapDriver.java:47)
>
> .....
>
>      Caused by: org.codehaus.commons.compiler.CompileException: Line 59, Column 57: No applicable constructor/method found for actual parameters "float, java.math.BigDecimal"; candidates are: "public static double org.apache.calcite.runtime.SqlFunctions.power(long, java.math.BigDecimal)", "public static double org.apache.calcite.runtime.SqlFunctions.power(long, long)", "public static double org.apache.calcite.runtime.SqlFunctions.power(double, double)"
>
>           at
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:10062)
>
>           at
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompil
> er.java:7476)
>
>
>
> In this time if I am execute for int column ('_2') i getting result is equals '0.0'
>
> What am I doing wrong?
>
>
>
> Best regards,
>
> Anton Mushin
>
>
>


--
Freundliche Grüße / Kind Regards

Timo Walther

Follow me: @twalthr
https://www.linkedin.com/in/twalthr