[jira] [Created] (FLINK-14861) parallelism.default in flink-conf.yaml do not work which is a bug imported by[FLINK-14745]

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-14861) parallelism.default in flink-conf.yaml do not work which is a bug imported by[FLINK-14745]

Shang Yuanchun (Jira)
Leonard Xu created FLINK-14861:
----------------------------------

             Summary: parallelism.default in flink-conf.yaml do not work which is a bug imported by[FLINK-14745]
                 Key: FLINK-14861
                 URL: https://issues.apache.org/jira/browse/FLINK-14861
             Project: Flink
          Issue Type: Bug
          Components: Client / Job Submission
    Affects Versions: 1.10.0
            Reporter: Leonard Xu
             Fix For: 1.10.0


I set parameter "parallelism.default" in flink-conf.yaml, but it's do not work any more when I rebased my branch to master. I debug and find it's a bug imported  by [FLINK-14745](https://issues.apache.org/jira/browse/FLINK-14745).

Detail: 
{code:java}
// ExecutionConfigAccessor#fromProgramOptions
public static ExecutionConfigAccessor fromProgramOptions(final ProgramOptions options, final List<URL> jobJars) {
   checkNotNull(options);
   checkNotNull(jobJars);

   final Configuration configuration = new Configuration();

   if (options.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT) {
      configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, options.getParallelism());
   }

   configuration.setBoolean(DeploymentOptions.ATTACHED, !options.getDetachedMode());
   configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, options.isShutdownOnAttachedExit());

   ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString);
   ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, jobJars, URL::toString);

   SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(), configuration);

   return new ExecutionConfigAccessor(configuration);
}{code}
 
[1]. function executionConfigAccessor.getParallelism() will return 1 rather than -1 when options.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT because 
when getParallelism() function will return the defaultValue of CoreOptions.DEFAULT_PARALLELISM.

 
{code:java}
// ExecutionConfigAccessor.java
public int getParallelism() {
 return configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);

// Configuration.java
public int getInteger(ConfigOption<Integer> configOption) {
 return getOptional(configOption)
 .orElseGet(configOption::defaultValue);
}{code}
 

And function executionConfigAccessor.getParallelism()  still return 1 when options.getParallelism() == 1.

So, the following code in  CliFrontend.java will never reach if user not set parallelism in flink run command line.
{code:java}
// CliFrontend.java
int parallelism = executionParameters.getParallelism() == -1 ? defaultParallelism : executionParameters.getParallelism();{code}
[2]and another  position, I think we should keep two line which deleted in FLINK-14745--. 
{code:java}
//
int userParallelism = executionParameters.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);

{code}
*if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) \{
   userParallelism = defaultParallelism;
}*
{code:java}
executeProgram(program, client, userParallelism, executionParameters.getDetachedMode());
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)