Wei Zhong created FLINK-14591:
---------------------------------
Summary: Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method
Key: FLINK-14591
URL:
https://issues.apache.org/jira/browse/FLINK-14591 Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Reporter: Wei Zhong
In current implementation of blink planner, the method "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method to merge the configuration inside TableConfig into global job parameters:
{code:scala}
override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
if (modifyOperations.isEmpty) {
return List.empty[Transformation[_]]
}
mergeParameters()
val relNodes = modifyOperations.map(translateToRel)
val optimizedRelNodes = optimize(relNodes)
val execNodes = translateToExecNodePlan(optimizedRelNodes)
translateToPlan(execNodes)
}
{code}
This translate method is called in every important moment, e.g. execute, toDataStream, insertInto, etc.
But as shown above, there is a chance that the method return directly and not call the "mergeParameters".
In fact if we set some configurations between the "Table#insertInto" method and "TableEnvironment#execute" method, these configurations will not be merged into global job parameters because the "mergeParameters" method is not called:
{code:scala}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance.useBlinkPlanner.build)
...
...
val result = ...
val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
tEnv.registerTableSink("MySink", sink)
tEnv.getConfig.getConfiguration.setString("jobparam1", "value1")
result.insertInto("MySink")
// the "jobparam2" configuration will loss
tEnv.getConfig.getConfiguration.setString("jobparam2", "value2")
tEnv.execute("test")
val jobConfig = env.getConfig.getGlobalJobParameters.toMap
assertTrue(jobConfig.get("jobparam1")=="value1")
// this assertion will fail:
assertTrue(jobConfig.get("jobparam2")=="value2"){code}
This may bring some confusion to the user. It will be great if we can fix this problem.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)