Yuval Itzchakov created FLINK-17600:
--------------------------------------- Summary: Blink Planner fails to generate RowtimeAttribute based on TableSource's DefinedRowtimeAttributes implementation Key: FLINK-17600 URL: https://issues.apache.org/jira/browse/FLINK-17600 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.10.0 Reporter: Yuval Itzchakov {code:java} // code placeholder {code} Given the following SQL statement: {code:java} tableEnv.sqlQuery("SELECT EVENT_TIME, B, C FROM FOO"){code} and a custom StreamTableSource[Row] which implements `DefinedRowtimeAttributes.getRowtimeAttributeDescriptors`, Blink Planner fails to mark the selected field with a `RowtimeAttribute`. This happens because `TableSourceUtil.getSourceRowType`s implementation receives a `None` TableSource from `CatalogSchemaTable.getRowType`, presumably because the Catalog has yet to create the underlying TableSource which is deferred to implementing TableFactory (in this case my own custom one). *This* *does not reproduce in the old Flink planner*, because the old planner uses `TableSourceTable` which explicitly holds a reference to the underlying `TableSource` and extracts it's row time attributes. Relevant code: *CatalogSchemaTable*: {code:java} private static RelDataType getRowType(RelDataTypeFactory typeFactory, CatalogBaseTable catalogBaseTable, boolean isStreamingMode) { final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory; TableSchema tableSchema = catalogBaseTable.getSchema(); final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes(); if (!isStreamingMode && catalogBaseTable instanceof ConnectorCatalogTable && ((ConnectorCatalogTable) catalogBaseTable).getTableSource().isPresent()) { // If the table source is bounded, materialize the time attributes to normal TIMESTAMP type. // Now for ConnectorCatalogTable, there is no way to // deduce if it is bounded in the table environment, so the data types in TableSchema // always patched with TimeAttribute. // See ConnectorCatalogTable#calculateSourceSchema // for details. // Remove the patched time attributes type to let the TableSourceTable handle it. // We should remove this logic if the isBatch flag in ConnectorCatalogTable is fixed. // TODO: Fix FLINK-14844. for (int i = 0; i < fieldDataTypes.length; i++) { LogicalType lt = fieldDataTypes[i].getLogicalType(); if (lt instanceof TimestampType && (((TimestampType) lt).getKind() == TimestampKind.PROCTIME || ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) { int precision = ((TimestampType) lt).getPrecision(); fieldDataTypes[i] = DataTypes.TIMESTAMP(precision); } } } return TableSourceUtil.getSourceRowType(flinkTypeFactory, tableSchema, scala.Option.empty(), isStreamingMode); } {code} *TableSourceUtil:* {code:java} def getSourceRowType( typeFactory: FlinkTypeFactory, tableSchema: TableSchema, tableSource: Option[TableSource[_]], streaming: Boolean): RelDataType = { val fieldNames = tableSchema.getFieldNames val fieldDataTypes = tableSchema.getFieldDataTypes if (tableSchema.getWatermarkSpecs.nonEmpty) { getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSchema.getWatermarkSpecs.head, streaming) } else if (tableSource.isDefined) { getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSource.get, streaming) } else { val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType) typeFactory.buildRelNodeRowType(fieldNames, fieldTypes) } }{code} *TableSourceTable:* {code:java} // We must enrich logical schema from catalog table with physical type coming from table source. // Schema coming from catalog table might not have proper conversion classes. Those must be // extracted from produced type, before converting to RelDataType def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] val fieldNames = tableSchema.getFieldNames val nameMapping: JFunction[String, String] = tableSource match { case mapping: DefinedFieldMapping if mapping.getFieldMapping != null => new JFunction[String, String] { override def apply(t: String): String = mapping.getFieldMapping.get(t) } case _ => JFunction.identity() } val producedDataType = tableSource.getProducedDataType val fieldIndexes = TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers( tableSource, tableSchema.getTableColumns, isStreamingMode, nameMapping ) val typeInfos = if (LogicalTypeChecks.isCompositeType(producedDataType.getLogicalType)) { val physicalSchema = DataTypeUtils.expandCompositeTypeToSchema(producedDataType) fieldIndexes.map(mapIndex(_, idx => TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataType(idx).get())) ) } else { fieldIndexes.map(mapIndex(_, _ => TypeConversions.fromDataTypeToLegacyInfo(producedDataType))) } flinkTypeFactory.buildLogicalRowType(fieldNames, typeInfos) } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |