Wei Zhong created FLINK-13578:
--------------------------------- Summary: Blink throws exception when using Types.INTERVAL_MILLIS in TableSource Key: FLINK-13578 URL: https://issues.apache.org/jira/browse/FLINK-13578 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.0 Reporter: Wei Zhong Running this program will throw a TableException: {code:java} object Tests { class MyTableSource extends InputFormatTableSource[java.lang.Long] { val data = new java.util.ArrayList[java.lang.Long]() data.add(1L) data.add(2L) data.add(3L) val dataType = Types.INTERVAL_MILLIS() val inputFormat = new CollectionInputFormat[java.lang.Long]( data, dataType.createSerializer(new ExecutionConfig)) override def getInputFormat: InputFormat[java.lang.Long, _ <: InputSplit] = inputFormat override def getTableSchema: TableSchema = TableSchema.fromTypeInfo(dataType) override def getReturnType: TypeInformation[java.lang.Long] = dataType } def main(args: Array[String]): Unit = { val tenv = TableEnvironmentImpl.create( EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build()) val table = tenv.fromTableSource(new MyTableSource) tenv.registerTableSink("sink", Array("f0"), Array(Types.INTERVAL_MILLIS()), new CsvTableSink("/tmp/results")) table.select("f0").insertInto("sink") tenv.execute("test") } } {code} The TableException detail: {code:java} Exception in thread "main" org.apache.flink.table.api.TableException: Unsupported conversion from data type 'INTERVAL SECOND(3)' (conversion class: java.time.Duration) to type information. Only data types that originated from type information fully support a reverse conversion. at org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:242) at org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49) at org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(TypeInfoDataTypeConverter.java:145) at org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo(TypeInfoLogicalTypeConverter.java:58) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545) at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) at org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo.<init>(BaseRowTypeInfo.java:64) at org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo.of(BaseRowTypeInfo.java:210) at org.apache.flink.table.planner.plan.utils.ScanUtil$.convertToInternalRow(ScanUtil.scala:126) at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:112) at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:48) at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToTransformation(BatchExecSink.scala:127) at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:92) at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:50) at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:70) at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:69) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:69) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:408) at org.apache.flink.table.planner.runtime.batch.table.Tests$.main(CalcITCase.scala:711) at org.apache.flink.table.planner.runtime.batch.table.Tests.main(CalcITCase.scala) {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016) |
Free forum by Nabble | Edit this page |