Jark Wu created FLINK-16108:
------------------------------- Summary: StreamSQLExample is failed if running in blink planner Key: FLINK-16108 URL: https://issues.apache.org/jira/browse/FLINK-16108 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Jark Wu Fix For: 1.10.1 {{StreamSQLExample}} in flink-example will fail if the specified planner is blink planner. Exception is as following: {code} Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink do not match. Query schema: [user: BIGINT, product: STRING, amount: INT] Sink schema: [amount: INT, product: STRING, user: BIGINT] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:361) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:269) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:260) at org.apache.flink.table.examples.java.StreamSQLExample.main(StreamSQLExample.java:90) Process finished with exit code 1 {code} That's because blink planner will also validate the sink schema even if it is come from {{toAppendStream()}}. However, the {{TableSinkUtils#inferSinkPhysicalDataType}} should derive sink schema from query schema when the requested type is POJO [1], because fields order of POJO is not deterministic. [1]: https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala#L237 -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |