Hey, I am new to flink and I have a question and want to see if anyone can
help here. How to use Dimension table in Flink TableAPI with StreamExecutionEnvironment ? I use TableFuncion to deal this question, but it have some problem in debug like this: LogicalProject(col_1=[$0]) LogicalJoin(condition=[true], joinType=[left]) LogicalTableScan(table=[[test]]) LogicalTableFunctionScan(invocation=[dim_test()], rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)], elementType=[class [Ljava.lang.Object;]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:674) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730) at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:216) at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:692) at com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64) SQL : select t.col_1 from test t left join lateral table(dim_test()) b on true Main Code: public static void main(String[] args) throws Exception { String sql = "select t.col_1 from test t left join lateral table(dim_test()) b on true"; StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment stEnv = TableEnvironment.getTableEnvironment(streamEnv); Properties kafkaProps = new Properties(); kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); kafkaProps.setProperty("group.id", "test"); Kafka010JsonTableSource tableSource = Kafka010JsonTableSource.builder() .forTopic("test") .withKafkaProperties(kafkaProps) .withSchema(TableSchema.builder() .field("col_1", Types.STRING) .field("col_2",Types.STRING).build()) .build(); stEnv.registerTableSource("test", tableSource); String[] columns = {"col","name"}; TypeInformation[] typeInformations = {TypeInformation.of(String.class),TypeInformation.of(String.class)}; TableSchema tableSchema = new TableSchema(columns,typeInformations); Map<String,Object> context = new HashMap<>(); context.put("mysql.url","jdbc:mysql://localhost:3306/test"); context.put("mysql.driver","com.mysql.jdbc.Driver"); context.put("mysql.user","test"); context.put("mysql.password","test"); context.put("mysql.table","dim_test"); StreamSqlDim dim = new MySqlDimFactory().getInstance(tableSchema,new StreamSqlContext(context)); stEnv.registerFunction("dim_test",dim); String[] outColumns = {"col"}; TypeInformation[] outType = {TypeInformation.of(String.class)}; TableSink tableSink = new Kafka010JsonTableSink("test_out",kafkaProps); stEnv.registerTableSink("test_out",outColumns,outType,tableSink); Table t = stEnv.sql(sql); stEnv.insertInto(t,"test_out",stEnv.queryConfig()); streamEnv.execute(); } MySqlDim is extends TableFunction ,and the method eval() is empty,like this: public void eval(){ } |
Hi, according to flink doc, it seems that you need to pass at least one
argument into the table function. On Fri, Feb 23, 2018 at 12:35 AM 叶振宝 <[hidden email]> wrote: > Hey, I am new to flink and I have a question and want to see if anyone can > help here. > > How to use Dimension table in Flink TableAPI with > StreamExecutionEnvironment ? > > I use TableFuncion to deal this question, but it have some problem in debug > like this: > LogicalProject(col_1=[$0]) > LogicalJoin(condition=[true], joinType=[left]) > LogicalTableScan(table=[[test]]) > LogicalTableFunctionScan(invocation=[dim_test()], > rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)], > elementType=[class [Ljava.lang.Object;]) > > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274) > at > org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:674) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730) > at > org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:216) > at > org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:692) > at > com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64) > > SQL : select t.col_1 from test t left join lateral table(dim_test()) b on > true > > Main Code: > public static void main(String[] args) throws Exception { > String sql = "select t.col_1 from test t left join lateral > table(dim_test()) b on true"; > StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment stEnv = > TableEnvironment.getTableEnvironment(streamEnv); > Properties kafkaProps = new Properties(); > kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); > kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); > kafkaProps.setProperty("group.id", "test"); > Kafka010JsonTableSource tableSource = > Kafka010JsonTableSource.builder() > .forTopic("test") > .withKafkaProperties(kafkaProps) > .withSchema(TableSchema.builder() > .field("col_1", Types.STRING) > .field("col_2",Types.STRING).build()) > .build(); > stEnv.registerTableSource("test", tableSource); > String[] columns = {"col","name"}; > TypeInformation[] typeInformations = > {TypeInformation.of(String.class),TypeInformation.of(String.class)}; > TableSchema tableSchema = new > TableSchema(columns,typeInformations); > Map<String,Object> context = new HashMap<>(); > context.put("mysql.url","jdbc:mysql://localhost:3306/test"); > context.put("mysql.driver","com.mysql.jdbc.Driver"); > context.put("mysql.user","test"); > context.put("mysql.password","test"); > context.put("mysql.table","dim_test"); > StreamSqlDim dim = new > MySqlDimFactory().getInstance(tableSchema,new StreamSqlContext(context)); > stEnv.registerFunction("dim_test",dim); > > String[] outColumns = {"col"}; > TypeInformation[] outType = {TypeInformation.of(String.class)}; > TableSink tableSink = new > Kafka010JsonTableSink("test_out",kafkaProps); > stEnv.registerTableSink("test_out",outColumns,outType,tableSink); > Table t = stEnv.sql(sql); > stEnv.insertInto(t,"test_out",stEnv.queryConfig()); > streamEnv.execute(); > } > > MySqlDim is extends TableFunction ,and the method eval() is empty,like > this: > public void eval(){ > > } > > > > -- Software Engineer, MVAD |
In reply to this post by 叶振宝
Hi, udtf variables should not be empty, otherwise calcite failed to convert
the join to an correlate. Best, Hequn 2018-02-23 0:34 GMT+08:00 叶振宝 <[hidden email]>: > Hey, I am new to flink and I have a question and want to see if anyone can > help here. > > How to use Dimension table in Flink TableAPI with > StreamExecutionEnvironment ? > > I use TableFuncion to deal this question, but it have some problem in debug > like this: > LogicalProject(col_1=[$0]) > LogicalJoin(condition=[true], joinType=[left]) > LogicalTableScan(table=[[test]]) > LogicalTableFunctionScan(invocation=[dim_test()], > rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)], > elementType=[class [Ljava.lang.Object;]) > > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner( > TableEnvironment.scala:274) > at org.apache.flink.table.api.StreamTableEnvironment.optimize( > StreamTableEnvironment.scala:674) > at org.apache.flink.table.api.StreamTableEnvironment.translate( > StreamTableEnvironment.scala:730) > at org.apache.flink.table.api.StreamTableEnvironment.writeToSink( > StreamTableEnvironment.scala:216) > at org.apache.flink.table.api.TableEnvironment.insertInto( > TableEnvironment.scala:692) > at com.bigdata.stream.streamsql.FlinkSqlTest.main( > FlinkSqlTest.java:64) > > SQL : select t.col_1 from test t left join lateral table(dim_test()) b on > true > > Main Code: > public static void main(String[] args) throws Exception { > String sql = "select t.col_1 from test t left join lateral > table(dim_test()) b on true"; > StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment. > getExecutionEnvironment(); > StreamTableEnvironment stEnv = TableEnvironment. > getTableEnvironment(streamEnv); > Properties kafkaProps = new Properties(); > kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); > kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); > kafkaProps.setProperty("group.id", "test"); > Kafka010JsonTableSource tableSource = Kafka010JsonTableSource. > builder() > .forTopic("test") > .withKafkaProperties(kafkaProps) > .withSchema(TableSchema.builder() > .field("col_1", Types.STRING) > .field("col_2",Types.STRING).build()) > .build(); > stEnv.registerTableSource("test", tableSource); > String[] columns = {"col","name"}; > TypeInformation[] typeInformations = {TypeInformation.of(String. > class),TypeInformation.of(String.class)}; > TableSchema tableSchema = new TableSchema(columns, > typeInformations); > Map<String,Object> context = new HashMap<>(); > context.put("mysql.url","jdbc:mysql://localhost:3306/test"); > context.put("mysql.driver","com.mysql.jdbc.Driver"); > context.put("mysql.user","test"); > context.put("mysql.password","test"); > context.put("mysql.table","dim_test"); > StreamSqlDim dim = new MySqlDimFactory().getInstance(tableSchema,new > StreamSqlContext(context)); > stEnv.registerFunction("dim_test",dim); > > String[] outColumns = {"col"}; > TypeInformation[] outType = {TypeInformation.of(String.class)}; > TableSink tableSink = new Kafka010JsonTableSink("test_ > out",kafkaProps); > stEnv.registerTableSink("test_out",outColumns,outType,tableSink); > Table t = stEnv.sql(sql); > stEnv.insertInto(t,"test_out",stEnv.queryConfig()); > streamEnv.execute(); > } > > MySqlDim is extends TableFunction ,and the method eval() is empty,like > this: > public void eval(){ > > } > > > > |
In reply to this post by Renjie Liu
hi,i was use 1.4.0。
Yezhenbao > 在 2018年2月24日,17:55,Renjie Liu <[hidden email]> 写道: > > Hi, according to flink doc, it seems that you need to pass at least one > argument into the table function. > >> On Fri, Feb 23, 2018 at 12:35 AM 叶振宝 <[hidden email]> wrote: >> >> Hey, I am new to flink and I have a question and want to see if anyone can >> help here. >> >> How to use Dimension table in Flink TableAPI with >> StreamExecutionEnvironment ? >> >> I use TableFuncion to deal this question, but it have some problem in debug >> like this: >> LogicalProject(col_1=[$0]) >> LogicalJoin(condition=[true], joinType=[left]) >> LogicalTableScan(table=[[test]]) >> LogicalTableFunctionScan(invocation=[dim_test()], >> rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)], >> elementType=[class [Ljava.lang.Object;]) >> >> This exception indicates that the query uses an unsupported SQL feature. >> Please check the documentation for the set of currently supported SQL >> features. >> at >> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274) >> at >> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:674) >> at >> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730) >> at >> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:216) >> at >> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:692) >> at >> com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64) >> >> SQL : select t.col_1 from test t left join lateral table(dim_test()) b on >> true >> >> Main Code: >> public static void main(String[] args) throws Exception { >> String sql = "select t.col_1 from test t left join lateral >> table(dim_test()) b on true"; >> StreamExecutionEnvironment streamEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> StreamTableEnvironment stEnv = >> TableEnvironment.getTableEnvironment(streamEnv); >> Properties kafkaProps = new Properties(); >> kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); >> kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); >> kafkaProps.setProperty("group.id", "test"); >> Kafka010JsonTableSource tableSource = >> Kafka010JsonTableSource.builder() >> .forTopic("test") >> .withKafkaProperties(kafkaProps) >> .withSchema(TableSchema.builder() >> .field("col_1", Types.STRING) >> .field("col_2",Types.STRING).build()) >> .build(); >> stEnv.registerTableSource("test", tableSource); >> String[] columns = {"col","name"}; >> TypeInformation[] typeInformations = >> {TypeInformation.of(String.class),TypeInformation.of(String.class)}; >> TableSchema tableSchema = new >> TableSchema(columns,typeInformations); >> Map<String,Object> context = new HashMap<>(); >> context.put("mysql.url","jdbc:mysql://localhost:3306/test"); >> context.put("mysql.driver","com.mysql.jdbc.Driver"); >> context.put("mysql.user","test"); >> context.put("mysql.password","test"); >> context.put("mysql.table","dim_test"); >> StreamSqlDim dim = new >> MySqlDimFactory().getInstance(tableSchema,new StreamSqlContext(context)); >> stEnv.registerFunction("dim_test",dim); >> >> String[] outColumns = {"col"}; >> TypeInformation[] outType = {TypeInformation.of(String.class)}; >> TableSink tableSink = new >> Kafka010JsonTableSink("test_out",kafkaProps); >> stEnv.registerTableSink("test_out",outColumns,outType,tableSink); >> Table t = stEnv.sql(sql); >> stEnv.insertInto(t,"test_out",stEnv.queryConfig()); >> streamEnv.execute(); >> } >> >> MySqlDim is extends TableFunction ,and the method eval() is empty,like >> this: >> public void eval(){ >> >> } >> >> >> >> -- > Liu, Renjie > Software Engineer, MVAD > |
Hi,
Liu and Hequn are right. You need to pass at least one parameter into the table function, i.e., select t.col_1 from test t left join lateral table(dim_test(SOME_ATTRIBUTE)) b on true Best, Fabian 2018-02-24 13:24 GMT+01:00 ZhenBao Ye <[hidden email]>: > hi,i was use 1.4.0。 > > Yezhenbao > > > 在 2018年2月24日,17:55,Renjie Liu <[hidden email]> 写道: > > > > Hi, according to flink doc, it seems that you need to pass at least one > > argument into the table function. > > > >> On Fri, Feb 23, 2018 at 12:35 AM 叶振宝 <[hidden email]> wrote: > >> > >> Hey, I am new to flink and I have a question and want to see if anyone > can > >> help here. > >> > >> How to use Dimension table in Flink TableAPI with > >> StreamExecutionEnvironment ? > >> > >> I use TableFuncion to deal this question, but it have some problem in > debug > >> like this: > >> LogicalProject(col_1=[$0]) > >> LogicalJoin(condition=[true], joinType=[left]) > >> LogicalTableScan(table=[[test]]) > >> LogicalTableFunctionScan(invocation=[dim_test()], > >> rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)], > >> elementType=[class [Ljava.lang.Object;]) > >> > >> This exception indicates that the query uses an unsupported SQL feature. > >> Please check the documentation for the set of currently supported SQL > >> features. > >> at > >> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner( > TableEnvironment.scala:274) > >> at > >> org.apache.flink.table.api.StreamTableEnvironment.optimize( > StreamTableEnvironment.scala:674) > >> at > >> org.apache.flink.table.api.StreamTableEnvironment.translate( > StreamTableEnvironment.scala:730) > >> at > >> org.apache.flink.table.api.StreamTableEnvironment.writeToSink( > StreamTableEnvironment.scala:216) > >> at > >> org.apache.flink.table.api.TableEnvironment.insertInto( > TableEnvironment.scala:692) > >> at > >> com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64) > >> > >> SQL : select t.col_1 from test t left join lateral table(dim_test()) b > on > >> true > >> > >> Main Code: > >> public static void main(String[] args) throws Exception { > >> String sql = "select t.col_1 from test t left join lateral > >> table(dim_test()) b on true"; > >> StreamExecutionEnvironment streamEnv = > >> StreamExecutionEnvironment.getExecutionEnvironment(); > >> StreamTableEnvironment stEnv = > >> TableEnvironment.getTableEnvironment(streamEnv); > >> Properties kafkaProps = new Properties(); > >> kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); > >> kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); > >> kafkaProps.setProperty("group.id", "test"); > >> Kafka010JsonTableSource tableSource = > >> Kafka010JsonTableSource.builder() > >> .forTopic("test") > >> .withKafkaProperties(kafkaProps) > >> .withSchema(TableSchema.builder() > >> .field("col_1", Types.STRING) > >> .field("col_2",Types.STRING).build()) > >> .build(); > >> stEnv.registerTableSource("test", tableSource); > >> String[] columns = {"col","name"}; > >> TypeInformation[] typeInformations = > >> {TypeInformation.of(String.class),TypeInformation.of(String.class)}; > >> TableSchema tableSchema = new > >> TableSchema(columns,typeInformations); > >> Map<String,Object> context = new HashMap<>(); > >> context.put("mysql.url","jdbc:mysql://localhost:3306/test"); > >> context.put("mysql.driver","com.mysql.jdbc.Driver"); > >> context.put("mysql.user","test"); > >> context.put("mysql.password","test"); > >> context.put("mysql.table","dim_test"); > >> StreamSqlDim dim = new > >> MySqlDimFactory().getInstance(tableSchema,new > StreamSqlContext(context)); > >> stEnv.registerFunction("dim_test",dim); > >> > >> String[] outColumns = {"col"}; > >> TypeInformation[] outType = {TypeInformation.of(String.class)}; > >> TableSink tableSink = new > >> Kafka010JsonTableSink("test_out",kafkaProps); > >> stEnv.registerTableSink("test_out",outColumns,outType, > tableSink); > >> Table t = stEnv.sql(sql); > >> stEnv.insertInto(t,"test_out",stEnv.queryConfig()); > >> streamEnv.execute(); > >> } > >> > >> MySqlDim is extends TableFunction ,and the method eval() is empty,like > >> this: > >> public void eval(){ > >> > >> } > >> > >> > >> > >> -- > > Liu, Renjie > > Software Engineer, MVAD > > > > > > |
Free forum by Nabble | Edit this page |