[jira] [Created] (FLINK-18230) Table API has no Functions like sparkSQL explode

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-18230) Table API has no Functions like sparkSQL explode

Shang Yuanchun (Jira)
mzz created FLINK-18230:
---------------------------

             Summary: Table API has no Functions like  sparkSQL explode
                 Key: FLINK-18230
                 URL: https://issues.apache.org/jira/browse/FLINK-18230
             Project: Flink
          Issue Type: Bug
            Reporter: mzz


streamTableEnvironment.connect(new Kafka()
      .topic(TOPIC)
      .version(VERSION)
      .startFromEarliest()
      .property("bootstrap.servers", "172.16.30.207:9092")
      .property("group.id", "km_aggs_group_3")
    )
      .withFormat(
        new Json()
          .failOnMissingField(true)
          .deriveSchema()
      )
      .withSchema(new Schema()
        .field("devs", Types.STRING())
        .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass,
            Types.ROW(Array("count", "eventid", "sid", "params"), Array[TypeInformation[_]](Types.STRING(), Types.STRING, Types.STRING,
              Types.ROW(Array("adid", "adtype", "ecpm"), Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING))))
      ))
        .field("identity", Types.STRING())
        .field("ip", Types.STRING())
        .field("launchs", Types.STRING())
        .field("ts", Types.STRING())
      )
      .inAppendMode()
      .registerTableSource("aggs_test")

    val tableResult = streamTableEnvironment.sqlQuery("select ip, ts,launchs,advs[1].`count` from aggs_test")
    tableResult.printSchema()
    streamTableEnvironment.toAppendStream[Row](tableResult).print()



--
This message was sent by Atlassian Jira
(v8.3.4#803005)