hehuiyuan created FLINK-18339:
--------------------------------- Summary: ValidationException exception that field typeinformation in TableSchema and in TableSource return type for blink Key: FLINK-18339 URL: https://issues.apache.org/jira/browse/FLINK-18339 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.9.0 Reporter: hehuiyuan Attachments: image-2020-06-17-10-37-48-166.png, image-2020-06-17-10-53-08-424.png The type of `datatime` field is OBJECT_ARRAY<STRING>. Exception: {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: Type LEGACY(BasicArrayTypeInfo<String>) of table field 'datatime' does not match with type BasicArrayTypeInfo<String> of the field 'datatime' of the TableSource return type.Exception in thread "main" org.apache.flink.table.api.ValidationException: Type LEGACY(BasicArrayTypeInfo<String>) of table field 'datatime' does not match with type BasicArrayTypeInfo<String> of the field 'datatime' of the TableSource return type. at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:141) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:119) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:152) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348) {code} Usage: `fieldNames` is field name array `fieldsType` is field type array We can acquire field typeinformation by the way: {code:java} TypeInformation typeInformation = TypeStringUtils.readTypeInfo("OBJECT_ARRAY<STRING>"); {code} {code:java} ConnectTableDescriptor d = descriptor.withFormat( new Csv().fieldDelimiter(fielddelimiter).schema(new RowTypeInfo(fieldsType,fieldNames)) ) .withSchema( schema ); {code} (1) RowTypeInfo(fieldsType,fieldNames) calls toString method: Row(name: String, age: Integer, sex: String, datatime: BasicArrayTypeInfo<String>) `datatime` field type is BasicArrayTypeInfo<String>. (2)Schema shema : schema = schema.field(fieldNames[i],fieldsType[i]); `datatime` field type is BasicArrayTypeInfo<String> !image-2020-06-17-10-37-48-166.png! Code analysis: `schemaBuilder.field(name, type)` is called when create TableSchema {code:java} public Builder field(String name, TypeInformation<?> typeInfo) { return field(name, fromLegacyInfoToDataType(typeInfo)); } public static DataType fromLegacyInfoToDataType(TypeInformation<?> typeInfo) { return LegacyTypeInfoDataTypeConverter.toDataType(typeInfo); } public static DataType toDataType(TypeInformation<?> typeInfo) { // time indicators first as their hashCode/equals is shared with those of regular timestamps if (typeInfo instanceof TimeIndicatorTypeInfo) { return convertToTimeAttributeType((TimeIndicatorTypeInfo) typeInfo); } final DataType foundDataType = typeInfoDataTypeMap.get(typeInfo); if (foundDataType != null) { return foundDataType; } if (typeInfo instanceof RowTypeInfo) { return convertToRowType((RowTypeInfo) typeInfo); } else if (typeInfo instanceof ObjectArrayTypeInfo) { return convertToArrayType( typeInfo.getTypeClass(), ((ObjectArrayTypeInfo) typeInfo).getComponentInfo()); } else if (typeInfo instanceof BasicArrayTypeInfo) { return createLegacyType(LogicalTypeRoot.ARRAY, typeInfo); } else if (typeInfo instanceof MultisetTypeInfo) { return convertToMultisetType(((MultisetTypeInfo) typeInfo).getElementTypeInfo()); } else if (typeInfo instanceof MapTypeInfo) { return convertToMapType((MapTypeInfo) typeInfo); } else if (typeInfo instanceof CompositeType) { return createLegacyType(LogicalTypeRoot.STRUCTURED_TYPE, typeInfo); } return createLegacyType(LogicalTypeRoot.ANY, typeInfo); } {code} if typeinformation is BasicArrayTypeinfo , the code is called: {code:java} return createLegacyType(LogicalTypeRoot.ARRAY, typeInfo); } private static DataType createLegacyType(LogicalTypeRoot typeRoot, TypeInformation<?> typeInfo) { return new AtomicDataType(new LegacyTypeInformationType<>(typeRoot, typeInfo)) .bridgedTo(typeInfo.getTypeClass()); } {code} `datatime` field type is LEGACY(BasicArrayTypeInfo<String>) -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |