Leonard Xu created FLINK-14861:
---------------------------------- Summary: parallelism.default in flink-conf.yaml do not work which is a bug imported by[FLINK-14745] Key: FLINK-14861 URL: https://issues.apache.org/jira/browse/FLINK-14861 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.10.0 Reporter: Leonard Xu Fix For: 1.10.0 I set parameter "parallelism.default" in flink-conf.yaml, but it's do not work any more when I rebased my branch to master. I debug and find it's a bug imported by [FLINK-14745](https://issues.apache.org/jira/browse/FLINK-14745). Detail: {code:java} // ExecutionConfigAccessor#fromProgramOptions public static ExecutionConfigAccessor fromProgramOptions(final ProgramOptions options, final List<URL> jobJars) { checkNotNull(options); checkNotNull(jobJars); final Configuration configuration = new Configuration(); if (options.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT) { configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, options.getParallelism()); } configuration.setBoolean(DeploymentOptions.ATTACHED, !options.getDetachedMode()); configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, options.isShutdownOnAttachedExit()); ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString); ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, jobJars, URL::toString); SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(), configuration); return new ExecutionConfigAccessor(configuration); }{code} [1]. function executionConfigAccessor.getParallelism() will return 1 rather than -1 when options.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT because when getParallelism() function will return the defaultValue of CoreOptions.DEFAULT_PARALLELISM. {code:java} // ExecutionConfigAccessor.java public int getParallelism() { return configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); } // Configuration.java public int getInteger(ConfigOption<Integer> configOption) { return getOptional(configOption) .orElseGet(configOption::defaultValue); }{code} And function executionConfigAccessor.getParallelism() still return 1 when options.getParallelism() == 1. So, the following code in CliFrontend.java will never reach if user not set parallelism in flink run command line. {code:java} // CliFrontend.java int parallelism = executionParameters.getParallelism() == -1 ? defaultParallelism : executionParameters.getParallelism();{code} [2]and another position, I think we should keep two line which deleted in FLINK-14745--. {code:java} // int userParallelism = executionParameters.getParallelism(); LOG.debug("User parallelism is set to {}", userParallelism); {code} *if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) \{ userParallelism = defaultParallelism; }* {code:java} executeProgram(program, client, userParallelism, executionParameters.getDetachedMode()); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |