venn wu created FLINK-18545:
-------------------------------
Summary: Sql api cannot special flink job name
Key: FLINK-18545
URL:
https://issues.apache.org/jira/browse/FLINK-18545 Project: Flink
Issue Type: Improvement
Components: Client / Job Submission, Table SQL / API
Affects Versions: 1.11.0
Environment: execute sql :
StreamTableEnvironment.executeSql("insert into user_log_sink select user_id, item_id, category_id, behavior, ts from user_log")
current job name : org.apache.flink.table.api.internal.TableEnvironmentImpl
{code:java}
public TableResult executeInternal(List<ModifyOperation> operations) {
List<Transformation<?>> transformations = translate(operations);
List<String> sinkIdentifierNames = extractSinkIdentifierNames(operations);
String jobName = "insert-into_" + String.join(",", sinkIdentifierNames);
Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName);
try {
JobClient jobClient = execEnv.executeAsync(pipeline);
TableSchema.Builder builder = TableSchema.builder();
Object[] affectedRowCounts = new Long[operations.size()];
for (int i = 0; i < operations.size(); ++i) {
// use sink identifier name as field name
builder.field(sinkIdentifierNames.get(i), DataTypes.BIGINT());
affectedRowCounts[i] = -1L;
} return TableResultImpl.builder()
.jobClient(jobClient)
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.tableSchema(builder.build())
.data(Collections.singletonList(Row.of(affectedRowCounts)))
.build();
} catch (Exception e) {
throw new TableException("Failed to execute sql", e);
}
}
{code}
Reporter: venn wu
In Flink 1.11.0, {color:#172b4d}StreamTableEnvironment.executeSql(sql) {color}will explan and execute job Immediately, The job name will special as "insert-into_sink-table-name". But we have Multiple sql job will insert into a same sink table, this is not very friendly.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)