How to use Dimension table in Flink TableAPI with StreamExecutionEnvironment ?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to use Dimension table in Flink TableAPI with StreamExecutionEnvironment ?

叶振宝
Hey, I am new to flink and I have a question and want to see if anyone can
help here.

How to use Dimension table in Flink TableAPI with StreamExecutionEnvironment ?

I use TableFuncion to deal this question, but it have some problem in debug
like this:
LogicalProject(col_1=[$0])
  LogicalJoin(condition=[true], joinType=[left])
    LogicalTableScan(table=[[test]])
    LogicalTableFunctionScan(invocation=[dim_test()], rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)], elementType=[class [Ljava.lang.Object;])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
        at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
        at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:674)
        at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
        at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:216)
        at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:692)
        at com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64)

SQL : select t.col_1 from test t left join lateral table(dim_test()) b on true

Main Code:
public static void main(String[] args) throws Exception {
        String sql = "select t.col_1 from test t left join lateral table(dim_test()) b on true";
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment stEnv = TableEnvironment.getTableEnvironment(streamEnv);
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
        kafkaProps.setProperty("group.id", "test");
        Kafka010JsonTableSource tableSource = Kafka010JsonTableSource.builder()
                .forTopic("test")
                .withKafkaProperties(kafkaProps)
                .withSchema(TableSchema.builder()
                        .field("col_1", Types.STRING)
                        .field("col_2",Types.STRING).build())
                .build();
        stEnv.registerTableSource("test", tableSource);
        String[] columns = {"col","name"};
        TypeInformation[] typeInformations = {TypeInformation.of(String.class),TypeInformation.of(String.class)};
        TableSchema tableSchema = new TableSchema(columns,typeInformations);
        Map<String,Object> context = new HashMap<>();
        context.put("mysql.url","jdbc:mysql://localhost:3306/test");
        context.put("mysql.driver","com.mysql.jdbc.Driver");
        context.put("mysql.user","test");
        context.put("mysql.password","test");
        context.put("mysql.table","dim_test");
        StreamSqlDim dim = new MySqlDimFactory().getInstance(tableSchema,new StreamSqlContext(context));
        stEnv.registerFunction("dim_test",dim);

        String[] outColumns = {"col"};
        TypeInformation[] outType = {TypeInformation.of(String.class)};
        TableSink tableSink = new Kafka010JsonTableSink("test_out",kafkaProps);
        stEnv.registerTableSink("test_out",outColumns,outType,tableSink);
        Table t = stEnv.sql(sql);
        stEnv.insertInto(t,"test_out",stEnv.queryConfig());
        streamEnv.execute();
    }

MySqlDim is extends TableFunction ,and the method eval() is empty,like this:
public void eval(){

}



Reply | Threaded
Open this post in threaded view
|

Re: How to use Dimension table in Flink TableAPI with StreamExecutionEnvironment ?

Renjie Liu
Hi, according to flink doc, it seems that you need to pass at least one
argument into the table function.

On Fri, Feb 23, 2018 at 12:35 AM 叶振宝 <[hidden email]> wrote:

> Hey, I am new to flink and I have a question and want to see if anyone can
> help here.
>
> How to use Dimension table in Flink TableAPI with
> StreamExecutionEnvironment ?
>
> I use TableFuncion to deal this question, but it have some problem in debug
> like this:
> LogicalProject(col_1=[$0])
>   LogicalJoin(condition=[true], joinType=[left])
>     LogicalTableScan(table=[[test]])
>     LogicalTableFunctionScan(invocation=[dim_test()],
> rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)],
> elementType=[class [Ljava.lang.Object;])
>
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
>         at
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
>         at
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:674)
>         at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
>         at
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:216)
>         at
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:692)
>         at
> com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64)
>
> SQL : select t.col_1 from test t left join lateral table(dim_test()) b on
> true
>
> Main Code:
> public static void main(String[] args) throws Exception {
>         String sql = "select t.col_1 from test t left join lateral
> table(dim_test()) b on true";
>         StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment stEnv =
> TableEnvironment.getTableEnvironment(streamEnv);
>         Properties kafkaProps = new Properties();
>         kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
>         kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
>         kafkaProps.setProperty("group.id", "test");
>         Kafka010JsonTableSource tableSource =
> Kafka010JsonTableSource.builder()
>                 .forTopic("test")
>                 .withKafkaProperties(kafkaProps)
>                 .withSchema(TableSchema.builder()
>                         .field("col_1", Types.STRING)
>                         .field("col_2",Types.STRING).build())
>                 .build();
>         stEnv.registerTableSource("test", tableSource);
>         String[] columns = {"col","name"};
>         TypeInformation[] typeInformations =
> {TypeInformation.of(String.class),TypeInformation.of(String.class)};
>         TableSchema tableSchema = new
> TableSchema(columns,typeInformations);
>         Map<String,Object> context = new HashMap<>();
>         context.put("mysql.url","jdbc:mysql://localhost:3306/test");
>         context.put("mysql.driver","com.mysql.jdbc.Driver");
>         context.put("mysql.user","test");
>         context.put("mysql.password","test");
>         context.put("mysql.table","dim_test");
>         StreamSqlDim dim = new
> MySqlDimFactory().getInstance(tableSchema,new StreamSqlContext(context));
>         stEnv.registerFunction("dim_test",dim);
>
>         String[] outColumns = {"col"};
>         TypeInformation[] outType = {TypeInformation.of(String.class)};
>         TableSink tableSink = new
> Kafka010JsonTableSink("test_out",kafkaProps);
>         stEnv.registerTableSink("test_out",outColumns,outType,tableSink);
>         Table t = stEnv.sql(sql);
>         stEnv.insertInto(t,"test_out",stEnv.queryConfig());
>         streamEnv.execute();
>     }
>
> MySqlDim is extends TableFunction ,and the method eval() is empty,like
> this:
> public void eval(){
>
> }
>
>
>
> --
Liu, Renjie
Software Engineer, MVAD
Reply | Threaded
Open this post in threaded view
|

Re: How to use Dimension table in Flink TableAPI with StreamExecutionEnvironment ?

Hequn Cheng
In reply to this post by 叶振宝
Hi, udtf variables should not be empty, otherwise calcite failed to convert
the join to an correlate.

Best, Hequn

2018-02-23 0:34 GMT+08:00 叶振宝 <[hidden email]>:

> Hey, I am new to flink and I have a question and want to see if anyone can
> help here.
>
> How to use Dimension table in Flink TableAPI with
> StreamExecutionEnvironment ?
>
> I use TableFuncion to deal this question, but it have some problem in debug
> like this:
> LogicalProject(col_1=[$0])
>   LogicalJoin(condition=[true], joinType=[left])
>     LogicalTableScan(table=[[test]])
>     LogicalTableFunctionScan(invocation=[dim_test()],
> rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)],
> elementType=[class [Ljava.lang.Object;])
>
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
>         at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(
> TableEnvironment.scala:274)
>         at org.apache.flink.table.api.StreamTableEnvironment.optimize(
> StreamTableEnvironment.scala:674)
>         at org.apache.flink.table.api.StreamTableEnvironment.translate(
> StreamTableEnvironment.scala:730)
>         at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(
> StreamTableEnvironment.scala:216)
>         at org.apache.flink.table.api.TableEnvironment.insertInto(
> TableEnvironment.scala:692)
>         at com.bigdata.stream.streamsql.FlinkSqlTest.main(
> FlinkSqlTest.java:64)
>
> SQL : select t.col_1 from test t left join lateral table(dim_test()) b on
> true
>
> Main Code:
> public static void main(String[] args) throws Exception {
>         String sql = "select t.col_1 from test t left join lateral
> table(dim_test()) b on true";
>         StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.
> getExecutionEnvironment();
>         StreamTableEnvironment stEnv = TableEnvironment.
> getTableEnvironment(streamEnv);
>         Properties kafkaProps = new Properties();
>         kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
>         kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
>         kafkaProps.setProperty("group.id", "test");
>         Kafka010JsonTableSource tableSource = Kafka010JsonTableSource.
> builder()
>                 .forTopic("test")
>                 .withKafkaProperties(kafkaProps)
>                 .withSchema(TableSchema.builder()
>                         .field("col_1", Types.STRING)
>                         .field("col_2",Types.STRING).build())
>                 .build();
>         stEnv.registerTableSource("test", tableSource);
>         String[] columns = {"col","name"};
>         TypeInformation[] typeInformations = {TypeInformation.of(String.
> class),TypeInformation.of(String.class)};
>         TableSchema tableSchema = new TableSchema(columns,
> typeInformations);
>         Map<String,Object> context = new HashMap<>();
>         context.put("mysql.url","jdbc:mysql://localhost:3306/test");
>         context.put("mysql.driver","com.mysql.jdbc.Driver");
>         context.put("mysql.user","test");
>         context.put("mysql.password","test");
>         context.put("mysql.table","dim_test");
>         StreamSqlDim dim = new MySqlDimFactory().getInstance(tableSchema,new
> StreamSqlContext(context));
>         stEnv.registerFunction("dim_test",dim);
>
>         String[] outColumns = {"col"};
>         TypeInformation[] outType = {TypeInformation.of(String.class)};
>         TableSink tableSink = new Kafka010JsonTableSink("test_
> out",kafkaProps);
>         stEnv.registerTableSink("test_out",outColumns,outType,tableSink);
>         Table t = stEnv.sql(sql);
>         stEnv.insertInto(t,"test_out",stEnv.queryConfig());
>         streamEnv.execute();
>     }
>
> MySqlDim is extends TableFunction ,and the method eval() is empty,like
> this:
> public void eval(){
>
> }
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: How to use Dimension table in Flink TableAPI with StreamExecutionEnvironment ?

ZhenBao Ye
In reply to this post by Renjie Liu
hi,i was use 1.4.0。

Yezhenbao

> 在 2018年2月24日,17:55,Renjie Liu <[hidden email]> 写道:
>
> Hi, according to flink doc, it seems that you need to pass at least one
> argument into the table function.
>
>> On Fri, Feb 23, 2018 at 12:35 AM 叶振宝 <[hidden email]> wrote:
>>
>> Hey, I am new to flink and I have a question and want to see if anyone can
>> help here.
>>
>> How to use Dimension table in Flink TableAPI with
>> StreamExecutionEnvironment ?
>>
>> I use TableFuncion to deal this question, but it have some problem in debug
>> like this:
>> LogicalProject(col_1=[$0])
>>  LogicalJoin(condition=[true], joinType=[left])
>>    LogicalTableScan(table=[[test]])
>>    LogicalTableFunctionScan(invocation=[dim_test()],
>> rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)],
>> elementType=[class [Ljava.lang.Object;])
>>
>> This exception indicates that the query uses an unsupported SQL feature.
>> Please check the documentation for the set of currently supported SQL
>> features.
>>        at
>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
>>        at
>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:674)
>>        at
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
>>        at
>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:216)
>>        at
>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:692)
>>        at
>> com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64)
>>
>> SQL : select t.col_1 from test t left join lateral table(dim_test()) b on
>> true
>>
>> Main Code:
>> public static void main(String[] args) throws Exception {
>>        String sql = "select t.col_1 from test t left join lateral
>> table(dim_test()) b on true";
>>        StreamExecutionEnvironment streamEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>        StreamTableEnvironment stEnv =
>> TableEnvironment.getTableEnvironment(streamEnv);
>>        Properties kafkaProps = new Properties();
>>        kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
>>        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
>>        kafkaProps.setProperty("group.id", "test");
>>        Kafka010JsonTableSource tableSource =
>> Kafka010JsonTableSource.builder()
>>                .forTopic("test")
>>                .withKafkaProperties(kafkaProps)
>>                .withSchema(TableSchema.builder()
>>                        .field("col_1", Types.STRING)
>>                        .field("col_2",Types.STRING).build())
>>                .build();
>>        stEnv.registerTableSource("test", tableSource);
>>        String[] columns = {"col","name"};
>>        TypeInformation[] typeInformations =
>> {TypeInformation.of(String.class),TypeInformation.of(String.class)};
>>        TableSchema tableSchema = new
>> TableSchema(columns,typeInformations);
>>        Map<String,Object> context = new HashMap<>();
>>        context.put("mysql.url","jdbc:mysql://localhost:3306/test");
>>        context.put("mysql.driver","com.mysql.jdbc.Driver");
>>        context.put("mysql.user","test");
>>        context.put("mysql.password","test");
>>        context.put("mysql.table","dim_test");
>>        StreamSqlDim dim = new
>> MySqlDimFactory().getInstance(tableSchema,new StreamSqlContext(context));
>>        stEnv.registerFunction("dim_test",dim);
>>
>>        String[] outColumns = {"col"};
>>        TypeInformation[] outType = {TypeInformation.of(String.class)};
>>        TableSink tableSink = new
>> Kafka010JsonTableSink("test_out",kafkaProps);
>>        stEnv.registerTableSink("test_out",outColumns,outType,tableSink);
>>        Table t = stEnv.sql(sql);
>>        stEnv.insertInto(t,"test_out",stEnv.queryConfig());
>>        streamEnv.execute();
>>    }
>>
>> MySqlDim is extends TableFunction ,and the method eval() is empty,like
>> this:
>> public void eval(){
>>
>> }
>>
>>
>>
>> --
> Liu, Renjie
> Software Engineer, MVAD
>



Reply | Threaded
Open this post in threaded view
|

Re: How to use Dimension table in Flink TableAPI with StreamExecutionEnvironment ?

Fabian Hueske-2
Hi,

Liu and Hequn are right.
You need to pass at least one parameter into the table function, i.e.,

select t.col_1 from test t left join lateral
table(dim_test(SOME_ATTRIBUTE)) b on true

Best, Fabian

2018-02-24 13:24 GMT+01:00 ZhenBao Ye <[hidden email]>:

> hi,i was use 1.4.0。
>
> Yezhenbao
>
> > 在 2018年2月24日,17:55,Renjie Liu <[hidden email]> 写道:
> >
> > Hi, according to flink doc, it seems that you need to pass at least one
> > argument into the table function.
> >
> >> On Fri, Feb 23, 2018 at 12:35 AM 叶振宝 <[hidden email]> wrote:
> >>
> >> Hey, I am new to flink and I have a question and want to see if anyone
> can
> >> help here.
> >>
> >> How to use Dimension table in Flink TableAPI with
> >> StreamExecutionEnvironment ?
> >>
> >> I use TableFuncion to deal this question, but it have some problem in
> debug
> >> like this:
> >> LogicalProject(col_1=[$0])
> >>  LogicalJoin(condition=[true], joinType=[left])
> >>    LogicalTableScan(table=[[test]])
> >>    LogicalTableFunctionScan(invocation=[dim_test()],
> >> rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)],
> >> elementType=[class [Ljava.lang.Object;])
> >>
> >> This exception indicates that the query uses an unsupported SQL feature.
> >> Please check the documentation for the set of currently supported SQL
> >> features.
> >>        at
> >> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(
> TableEnvironment.scala:274)
> >>        at
> >> org.apache.flink.table.api.StreamTableEnvironment.optimize(
> StreamTableEnvironment.scala:674)
> >>        at
> >> org.apache.flink.table.api.StreamTableEnvironment.translate(
> StreamTableEnvironment.scala:730)
> >>        at
> >> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(
> StreamTableEnvironment.scala:216)
> >>        at
> >> org.apache.flink.table.api.TableEnvironment.insertInto(
> TableEnvironment.scala:692)
> >>        at
> >> com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64)
> >>
> >> SQL : select t.col_1 from test t left join lateral table(dim_test()) b
> on
> >> true
> >>
> >> Main Code:
> >> public static void main(String[] args) throws Exception {
> >>        String sql = "select t.col_1 from test t left join lateral
> >> table(dim_test()) b on true";
> >>        StreamExecutionEnvironment streamEnv =
> >> StreamExecutionEnvironment.getExecutionEnvironment();
> >>        StreamTableEnvironment stEnv =
> >> TableEnvironment.getTableEnvironment(streamEnv);
> >>        Properties kafkaProps = new Properties();
> >>        kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
> >>        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
> >>        kafkaProps.setProperty("group.id", "test");
> >>        Kafka010JsonTableSource tableSource =
> >> Kafka010JsonTableSource.builder()
> >>                .forTopic("test")
> >>                .withKafkaProperties(kafkaProps)
> >>                .withSchema(TableSchema.builder()
> >>                        .field("col_1", Types.STRING)
> >>                        .field("col_2",Types.STRING).build())
> >>                .build();
> >>        stEnv.registerTableSource("test", tableSource);
> >>        String[] columns = {"col","name"};
> >>        TypeInformation[] typeInformations =
> >> {TypeInformation.of(String.class),TypeInformation.of(String.class)};
> >>        TableSchema tableSchema = new
> >> TableSchema(columns,typeInformations);
> >>        Map<String,Object> context = new HashMap<>();
> >>        context.put("mysql.url","jdbc:mysql://localhost:3306/test");
> >>        context.put("mysql.driver","com.mysql.jdbc.Driver");
> >>        context.put("mysql.user","test");
> >>        context.put("mysql.password","test");
> >>        context.put("mysql.table","dim_test");
> >>        StreamSqlDim dim = new
> >> MySqlDimFactory().getInstance(tableSchema,new
> StreamSqlContext(context));
> >>        stEnv.registerFunction("dim_test",dim);
> >>
> >>        String[] outColumns = {"col"};
> >>        TypeInformation[] outType = {TypeInformation.of(String.class)};
> >>        TableSink tableSink = new
> >> Kafka010JsonTableSink("test_out",kafkaProps);
> >>        stEnv.registerTableSink("test_out",outColumns,outType,
> tableSink);
> >>        Table t = stEnv.sql(sql);
> >>        stEnv.insertInto(t,"test_out",stEnv.queryConfig());
> >>        streamEnv.execute();
> >>    }
> >>
> >> MySqlDim is extends TableFunction ,and the method eval() is empty,like
> >> this:
> >> public void eval(){
> >>
> >> }
> >>
> >>
> >>
> >> --
> > Liu, Renjie
> > Software Engineer, MVAD
> >
>
>
>
>