[jira] [Created] (FLINK-16800) TypeMappingUtils#checkPhysicalLogicalTypeCompatible didn't deal with nested types

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-16800) TypeMappingUtils#checkPhysicalLogicalTypeCompatible didn't deal with nested types

Shang Yuanchun (Jira)
Zhenghua Gao created FLINK-16800:
------------------------------------

             Summary: TypeMappingUtils#checkPhysicalLogicalTypeCompatible didn't deal with nested types
                 Key: FLINK-16800
                 URL: https://issues.apache.org/jira/browse/FLINK-16800
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.10.0
            Reporter: Zhenghua Gao
             Fix For: 1.11.0


the planner will use TypeMappingUtils#checkPhysicalLogicalTypeCompatible to validate logical schema and physical schema are compatible when translate CatalogSinkModifyOperation to Calcite relational expression.  The validation didn't deal with nested types well, which could expose the following ValidationException:
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Type ARRAY<ROW<`logistics_status` DECIMAL(38, 18)>> of table field 'old'
does not match with the physical type ARRAY<ROW<`logistics_status`
LEGACY('DECIMAL', 'DECIMAL')>> of the 'old' field of the TableSource return
type.
at
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
at
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
at
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
at
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157)
at org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110)
at
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
at
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
at
org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214)
at
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192)
at
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:161)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:51)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlan(StreamExecLimit.scala:51)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
at com.akulaku.data.main.StreamMain.main(StreamMain.java:58)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)