[jira] [Created] (FLINK-16108) StreamSQLExample is failed if running in blink planner

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-16108) StreamSQLExample is failed if running in blink planner

Shang Yuanchun (Jira)
Jark Wu created FLINK-16108:
-------------------------------

             Summary: StreamSQLExample is failed if running in blink planner
                 Key: FLINK-16108
                 URL: https://issues.apache.org/jira/browse/FLINK-16108
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: Jark Wu
             Fix For: 1.10.1


{{StreamSQLExample}} in flink-example will fail if the specified planner is blink planner. Exception is as following:

{code}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink  do not match.
Query schema: [user: BIGINT, product: STRING, amount: INT]
Sink schema: [amount: INT, product: STRING, user: BIGINT]
        at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
        at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
        at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:361)
        at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:269)
        at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:260)
        at org.apache.flink.table.examples.java.StreamSQLExample.main(StreamSQLExample.java:90)

Process finished with exit code 1
{code}

That's because blink planner will also validate the sink schema even if it is come from {{toAppendStream()}}. However, the {{TableSinkUtils#inferSinkPhysicalDataType}} should derive sink schema from query schema when the requested type is POJO [1], because fields order of POJO is not deterministic.


[1]: https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala#L237






--
This message was sent by Atlassian Jira
(v8.3.4#803005)