[jira] [Created] (FLINK-18782) How to retain the column'name when convert a Table to DataStream

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-18782) How to retain the column'name when convert a Table to DataStream

Shang Yuanchun (Jira)
Ying Z created FLINK-18782:
------------------------------

             Summary: How to retain the column'name when convert a Table to DataStream
                 Key: FLINK-18782
                 URL: https://issues.apache.org/jira/browse/FLINK-18782
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.9.1
            Reporter: Ying Z


mail: [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-retain-the-column-name-when-convert-a-Table-to-DataStream-td37002.html]

 

I met some field name errors when try to convert in Table and DataStream.
 First, init a datastream and convert to table 'source', register a tablefunction named 'foo'
{code:java}
val sourceStream = env.socketTextStream("127.0.0.1", 8010)
  .map(line => line.toInt)
tableEnv.registerDataStream("source_table", sourceStream, 'a)

class Foo() extends TableFunction[(Int)] {
  def eval(col: Int): Unit = collect((col * 10))
}
tableEnv.registerFunction("foo", new Foo)
{code}
Then, use sqlQuery to generate a new table t1 with columns 'a' 'b'
{code:java}
val t1 = tableEnv.sqlQuery(
  """
    |SELECT source_table.a, b FROM source_table
    |, LATERAL TABLE(foo(a)) as T(b)
    |""".stripMargin
)
/*
 t1 table schema: root
 |-- a: INT
 |-- b: INT
 */
println(s"t1 table schema: ${t1.getSchema}")
{code}
When I try to convert 't1' to a datastream then register to a new table(for some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b'
{code:java}
val t1Stream = t1.toAppendStream[Row]
// t1 stream schema: Row(a: Integer, f0: Integer)
println(s"t1 stream schema: ${t1Stream.getType()}")
tableEnv.registerDataStream("t1", t1Stream)
/*
new t1 table schema: root
|-- a: INT
|-- f0: INT
 */
println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}")
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)