[jira] [Created] (FLINK-13578) Blink throws exception when using Types.INTERVAL_MILLIS in TableSource

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-13578) Blink throws exception when using Types.INTERVAL_MILLIS in TableSource

Shang Yuanchun (Jira)
Wei Zhong created FLINK-13578:
---------------------------------

             Summary: Blink throws exception when using Types.INTERVAL_MILLIS in TableSource
                 Key: FLINK-13578
                 URL: https://issues.apache.org/jira/browse/FLINK-13578
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.9.0
            Reporter: Wei Zhong


Running this program will throw a TableException:
{code:java}
object Tests {

  class MyTableSource extends InputFormatTableSource[java.lang.Long] {

    val data = new java.util.ArrayList[java.lang.Long]()
    data.add(1L)
    data.add(2L)
    data.add(3L)
    val dataType = Types.INTERVAL_MILLIS()
    val inputFormat = new CollectionInputFormat[java.lang.Long](
      data, dataType.createSerializer(new ExecutionConfig))

    override def getInputFormat: InputFormat[java.lang.Long, _ <: InputSplit] = inputFormat

    override def getTableSchema: TableSchema = TableSchema.fromTypeInfo(dataType)

    override def getReturnType: TypeInformation[java.lang.Long] = dataType
  }

  def main(args: Array[String]): Unit = {
    val tenv = TableEnvironmentImpl.create(
      EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build())

    val table = tenv.fromTableSource(new MyTableSource)
    tenv.registerTableSink("sink", Array("f0"),
      Array(Types.INTERVAL_MILLIS()), new CsvTableSink("/tmp/results"))

    table.select("f0").insertInto("sink")

    tenv.execute("test")

  }
}

{code}
The TableException detail:
{code:java}
Exception in thread "main" org.apache.flink.table.api.TableException: Unsupported conversion from data type 'INTERVAL SECOND(3)' (conversion class: java.time.Duration) to type information. Only data types that originated from type information fully support a reverse conversion.
at org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:242)
at org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)
at org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(TypeInfoDataTypeConverter.java:145)
at org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo(TypeInfoLogicalTypeConverter.java:58)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo.<init>(BaseRowTypeInfo.java:64)
at org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo.of(BaseRowTypeInfo.java:210)
at org.apache.flink.table.planner.plan.utils.ScanUtil$.convertToInternalRow(ScanUtil.scala:126)
at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:112)
at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:48)
at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToTransformation(BatchExecSink.scala:127)
at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:92)
at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:50)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:50)
at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:70)
at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:69)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:69)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:408)
at org.apache.flink.table.planner.runtime.batch.table.Tests$.main(CalcITCase.scala:711)
at org.apache.flink.table.planner.runtime.batch.table.Tests.main(CalcITCase.scala)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)