[jira] [Created] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method

Shang Yuanchun (Jira)
Wei Zhong created FLINK-14591:
---------------------------------

             Summary:  Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method
                 Key: FLINK-14591
                 URL: https://issues.apache.org/jira/browse/FLINK-14591
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: Wei Zhong


In current implementation of blink planner, the method "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method to merge the configuration inside TableConfig into global job parameters:
{code:scala}
  override def translate(
      modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
    if (modifyOperations.isEmpty) {
      return List.empty[Transformation[_]]
    }
    mergeParameters()
    val relNodes = modifyOperations.map(translateToRel)
    val optimizedRelNodes = optimize(relNodes)
    val execNodes = translateToExecNodePlan(optimizedRelNodes)
    translateToPlan(execNodes)
  }
{code}
This translate method is called in every important moment, e.g. execute, toDataStream, insertInto, etc.

But as shown above, there is a chance that the method return directly and not call the "mergeParameters".

In fact if we set some configurations between the "Table#insertInto" method and "TableEnvironment#execute" method, these configurations will not be merged into global job parameters because the "mergeParameters" method is not called:
{code:scala}
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance.useBlinkPlanner.build)
    ...
    ...
    val result = ...
    val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
    tEnv.registerTableSink("MySink", sink)
    tEnv.getConfig.getConfiguration.setString("jobparam1", "value1")
    result.insertInto("MySink")
   
    // the "jobparam2" configuration will loss
    tEnv.getConfig.getConfiguration.setString("jobparam2", "value2")
    tEnv.execute("test")
    val jobConfig = env.getConfig.getGlobalJobParameters.toMap
   
    assertTrue(jobConfig.get("jobparam1")=="value1")
    // this assertion will fail:
    assertTrue(jobConfig.get("jobparam2")=="value2"){code}
This may bring some confusion to the user. It will be great if we can fix this problem.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)