Zhenghua Gao created FLINK-16327:
------------------------------------ Summary: Add TableEnvironment.fromElements interfaces for usability Key: FLINK-16327 URL: https://issues.apache.org/jira/browse/FLINK-16327 Project: Flink Issue Type: New Feature Components: Table SQL / API Affects Versions: 1.11.0 Reporter: Zhenghua Gao h1. Interface {code:java} /** * Creates a table from a group of objects (known as its elements). The schema of the table * would be inferred from the type of elements. * * @param data a group of objects. */ Table fromElements(Collection<?> data); /** * Creates a table from a group of objects (known as its elements). The schema of the table * would be inferred from the passed in data type. * * @param data a group of objects * @param dataType the data type of the data */ Table fromElements(Collection<?> data, DataType dataType); {code} h1. Use Case * One potential use case for Table API {code:java} @Test def testUnregisteredCollectionSource1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) StreamITCase.testResults = mutable.MutableList() val data = Seq( Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith")) tEnv.fromElements(data.asJava) .as('first, 'id, 'score, 'last) .where('id > 4) .select('last, 'score * 2) .toAppendStream[Row] .addSink(new StreamITCase.StringSink[Row]) env.execute() } @Test def testUnregisteredCollectionSource2(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) StreamITCase.testResults = mutable.MutableList() val data = Seq( Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith")) val dataType = DataTypes.ROW( DataTypes.FIELD("first", DataTypes.STRING()), DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("score", DataTypes.DOUBLE()), DataTypes.FIELD("last", DataTypes.STRING())) tEnv.fromElements(data.asJava, dataType) .where('id > 4) .select('last, 'score * 2) .toAppendStream[Row] .addSink(new StreamITCase.StringSink[Row]) env.execute() } {code} * One potential use case for SQL {code:java} @Test def testUnregisteredCollectionSource1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) StreamITCase.testResults = mutable.MutableList() val data = Seq( Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith")) val table = tEnv.fromElements(data.asJava).as('first, 'id, 'score, 'last) tEnv.createTemporaryView("T", table) tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4") .toAppendStream[Row] .addSink(new StreamITCase.StringSink[Row]) env.execute() } @Test def testUnregisteredCollectionSource2(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) StreamITCase.testResults = mutable.MutableList() val data = Seq( Row.of("Mike", new JInt(5), new JDouble(12.3), "Smith")) val dataType = DataTypes.ROW( DataTypes.FIELD("first", DataTypes.STRING()), DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("score", DataTypes.DOUBLE()), DataTypes.FIELD("last", DataTypes.STRING())) val table = tEnv.fromElements(data.asJava, dataType) tEnv.createTemporaryView("T", table) tEnv.sqlQuery("SELECT last, score * 2 FROM T WHERE id > 4") .toAppendStream[Row] .addSink(new StreamITCase.StringSink[Row]) env.execute() } {code} h1. The proposal * data type inference We need to infer the data type from the data for the first interface. A potential tool is the DataTypeExtractor, but it doesn't support scala.tuple, Row, etc. For the most popular in our test cases Row or scala.tuple type, we could enumerate and use a recursive traversal method to get all available types of underlying objects. This can solve most of the cases and improve usability. * proposed changes ** A CollectionQueryOperation which implements QueryOperation to describe the relational operation ** The logical and physical RelNode for legacy planner. In the physical node, we can translate the data to DataStream ** The logical and physical RelNode for blink planner. In the physical node, we can translate the data to Transformation -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |