[jira] [Created] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

Shang Yuanchun (Jira)
Terry Wang created FLINK-17313:
----------------------------------

             Summary: Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
                 Key: FLINK-17313
                 URL: https://issues.apache.org/jira/browse/FLINK-17313
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: Terry Wang


Test code like follwing(in blink planner):
{code:java}
                tEnv.sqlUpdate("create table randomSource (" +
                                                " a varchar(10)," +
                                                " b decimal(20,2)" +
                                                " ) with (" +
                                                " 'type' = 'random'," +
                                                " 'count' = '10'" +
                                                " )");
                tEnv.sqlUpdate("create table printSink (" +
                                                " a varchar(10)," +
                                                " b decimal(22,2)," +
                                                " c timestamp(3)," +
                                                " ) with (" +
                                                " 'type' = 'print'" +
                                                " )");
                tEnv.sqlUpdate("insert into printSink select *, current_timestamp from randomSource");
                tEnv.execute("");
{code}

Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as following:


{code:java}
public TypeInformation<Row> getRecordType() {
                return getTableSchema().toRowType();
        }
{code}


varchar type exception is:


||Heading 1||
|org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 'a' does not match with the physical type STRING of the 'a' field of the TableSink consumed type.

        at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
        at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
        at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
        at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
        at org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
        at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
        at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
        at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
        at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
        at scala.Option.map(Option.scala:146)
        at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)
|
other type validation is similar, I dig it and found it's caused by TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method don't consider the different affect of source and sink . I will open a PR soon to solve this problem.








--
This message was sent by Atlassian Jira
(v8.3.4#803005)