[jira] [Created] (FLINK-17600) Blink Planner fails to generate RowtimeAttribute based on TableSource's DefinedRowtimeAttributes implementation

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-17600) Blink Planner fails to generate RowtimeAttribute based on TableSource's DefinedRowtimeAttributes implementation

Shang Yuanchun (Jira)
Yuval Itzchakov created FLINK-17600:
---------------------------------------

             Summary: Blink Planner fails to generate RowtimeAttribute based on TableSource's DefinedRowtimeAttributes implementation
                 Key: FLINK-17600
                 URL: https://issues.apache.org/jira/browse/FLINK-17600
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.10.0
            Reporter: Yuval Itzchakov


{code:java}
// code placeholder
{code}
Given the following SQL statement:

 
{code:java}
tableEnv.sqlQuery("SELECT EVENT_TIME, B, C FROM FOO"){code}
 

and a custom StreamTableSource[Row] which implements `DefinedRowtimeAttributes.getRowtimeAttributeDescriptors`, Blink Planner fails to mark the selected field with a `RowtimeAttribute`.

This happens because `TableSourceUtil.getSourceRowType`s implementation receives a `None` TableSource from `CatalogSchemaTable.getRowType`, presumably because the Catalog has yet to create the underlying TableSource which is deferred to implementing TableFactory (in this case my own custom one).

*This* *does not reproduce in the old Flink planner*, because the old planner uses `TableSourceTable` which explicitly holds a reference to the underlying `TableSource` and extracts it's row time attributes.

Relevant code:

*CatalogSchemaTable*:

 
{code:java}
private static RelDataType getRowType(RelDataTypeFactory typeFactory,
      CatalogBaseTable catalogBaseTable,
      boolean isStreamingMode) {
   final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory;
   TableSchema tableSchema = catalogBaseTable.getSchema();
   final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
   if (!isStreamingMode
      && catalogBaseTable instanceof ConnectorCatalogTable
      && ((ConnectorCatalogTable) catalogBaseTable).getTableSource().isPresent()) {
      // If the table source is bounded, materialize the time attributes to normal TIMESTAMP type.
      // Now for ConnectorCatalogTable, there is no way to
      // deduce if it is bounded in the table environment, so the data types in TableSchema
      // always patched with TimeAttribute.
      // See ConnectorCatalogTable#calculateSourceSchema
      // for details.

      // Remove the patched time attributes type to let the TableSourceTable handle it.
      // We should remove this logic if the isBatch flag in ConnectorCatalogTable is fixed.
      // TODO: Fix FLINK-14844.
      for (int i = 0; i < fieldDataTypes.length; i++) {
         LogicalType lt = fieldDataTypes[i].getLogicalType();
         if (lt instanceof TimestampType
            && (((TimestampType) lt).getKind() == TimestampKind.PROCTIME
            || ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) {
            int precision = ((TimestampType) lt).getPrecision();
            fieldDataTypes[i] = DataTypes.TIMESTAMP(precision);
         }
      }
   }
   return TableSourceUtil.getSourceRowType(flinkTypeFactory,
      tableSchema,
      scala.Option.empty(),
      isStreamingMode);
}
{code}
*TableSourceUtil:*

 

 
{code:java}
def getSourceRowType(
    typeFactory: FlinkTypeFactory,
    tableSchema: TableSchema,
    tableSource: Option[TableSource[_]],
    streaming: Boolean): RelDataType = {

  val fieldNames = tableSchema.getFieldNames
  val fieldDataTypes = tableSchema.getFieldDataTypes

  if (tableSchema.getWatermarkSpecs.nonEmpty) {
    getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSchema.getWatermarkSpecs.head,
      streaming)
  } else if (tableSource.isDefined) {
    getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSource.get,
      streaming)
  } else {
    val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType)
    typeFactory.buildRelNodeRowType(fieldNames, fieldTypes)
  }
}{code}
*TableSourceTable:*
{code:java}
 // We must enrich logical schema from catalog table with physical type coming from table source.
  // Schema coming from catalog table might not have proper conversion classes. Those must be
  // extracted from produced type, before converting to RelDataType
  def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]    val fieldNames = tableSchema.getFieldNames    val nameMapping: JFunction[String, String] = tableSource match {
      case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
        new JFunction[String, String] {
          override def apply(t: String): String = mapping.getFieldMapping.get(t)
        }
      case _ => JFunction.identity()
    }    val producedDataType = tableSource.getProducedDataType
    val fieldIndexes = TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
      tableSource,
      tableSchema.getTableColumns,
      isStreamingMode,
      nameMapping
    )    val typeInfos = if (LogicalTypeChecks.isCompositeType(producedDataType.getLogicalType)) {
      val physicalSchema = DataTypeUtils.expandCompositeTypeToSchema(producedDataType)
      fieldIndexes.map(mapIndex(_,
        idx =>
          TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataType(idx).get()))
      )
    } else {
      fieldIndexes.map(mapIndex(_, _ => TypeConversions.fromDataTypeToLegacyInfo(producedDataType)))
    }    flinkTypeFactory.buildLogicalRowType(fieldNames, typeInfos)
  }
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)