mzz created FLINK-18230:
---------------------------
Summary: Table API has no Functions like sparkSQL explode
Key: FLINK-18230
URL:
https://issues.apache.org/jira/browse/FLINK-18230 Project: Flink
Issue Type: Bug
Reporter: mzz
streamTableEnvironment.connect(new Kafka()
.topic(TOPIC)
.version(VERSION)
.startFromEarliest()
.property("bootstrap.servers", "172.16.30.207:9092")
.property("group.id", "km_aggs_group_3")
)
.withFormat(
new Json()
.failOnMissingField(true)
.deriveSchema()
)
.withSchema(new Schema()
.field("devs", Types.STRING())
.field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass,
Types.ROW(Array("count", "eventid", "sid", "params"), Array[TypeInformation[_]](Types.STRING(), Types.STRING, Types.STRING,
Types.ROW(Array("adid", "adtype", "ecpm"), Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING))))
))
.field("identity", Types.STRING())
.field("ip", Types.STRING())
.field("launchs", Types.STRING())
.field("ts", Types.STRING())
)
.inAppendMode()
.registerTableSource("aggs_test")
val tableResult = streamTableEnvironment.sqlQuery("select ip, ts,launchs,advs[1].`count` from aggs_test")
tableResult.printSchema()
streamTableEnvironment.toAppendStream[Row](tableResult).print()
--
This message was sent by Atlassian Jira
(v8.3.4#803005)