Wei Zhong created FLINK-21679:
---------------------------------
Summary: Set output type for transformations from SourceProvider and DataStreamScanProvider in CommonExecTableSourceScan
Key: FLINK-21679
URL:
https://issues.apache.org/jira/browse/FLINK-21679 Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: Wei Zhong
Assignee: Wei Zhong
Currently we only set output type for the transformations from SourceFunctionProvider and InputFormatProvider in CommonExecTableSourceScan:
{code:java}
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
final StreamExecutionEnvironment env = planner.getExecEnv();
final String operatorName = getDescription();
final InternalTypeInfo<RowData> outputTypeInfo =
InternalTypeInfo.of((RowType) getOutputType());
final ScanTableSource tableSource = tableSourceSpec.getScanTableSource(planner);
ScanTableSource.ScanRuntimeProvider provider =
tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
if (provider instanceof SourceFunctionProvider) {
SourceFunction<RowData> sourceFunction =
((SourceFunctionProvider) provider).createSourceFunction();
return env.addSource(sourceFunction, operatorName, outputTypeInfo).getTransformation();
} else if (provider instanceof InputFormatProvider) {
InputFormat<RowData, ?> inputFormat =
((InputFormatProvider) provider).createInputFormat();
return createInputFormatTransformation(env, inputFormat, outputTypeInfo, operatorName);
} else if (provider instanceof SourceProvider) {
// outputTypeInfo is not set here
Source<RowData, ?, ?> source = ((SourceProvider) provider).createSource();
return env.fromSource(source, WatermarkStrategy.noWatermarks(), operatorName)
.getTransformation();
} else if (provider instanceof DataStreamScanProvider) {
// outputTypeInfo is not set here
return ((DataStreamScanProvider) provider).produceDataStream(env).getTransformation();
} else {
throw new UnsupportedOperationException(
provider.getClass().getSimpleName() + " is unsupported now.");
}
}{code}
We can also set output type for transformations from SourceProvider and DataStreamScanProvider in CommonExecTableSourceScan, so that users do not need to implement a ResultQueryable interface when implementing the new Source interface in FLIP-27.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)