Libin Qin created FLINK-18803:
--------------------------------- Summary: JobGraph cannot be GC when submit via RemoteStreamEnvironment in attach mode Key: FLINK-18803 URL: https://issues.apache.org/jira/browse/FLINK-18803 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.7.2 Reporter: Libin Qin Attachments: image-2020-08-03-18-01-56-062.png, image-2020-08-03-18-02-22-519.png, image-2020-08-03-18-03-29-748.png, image-2020-08-03-18-08-50-811.png, image-2020-08-03-18-10-08-353.png, image-2020-08-03-18-12-29-467.png When submit job using RemoteStreamEnvironment in attach mode. The client submission thread is blocked on "jobResultFuture.get()" in the "submitJob" method of RestClusterClient.java, it holds the local variable jobGraph, if the job is complex with lots of vertexs and edges or client submits quite a lot of jobs. The size of jobGraph become large and the client may OOM. I think there is no need for client to hold it. The biggest objects of client heap is as below ,The number of tasks of this job is more than 408 !image-2020-08-03-18-03-29-748.png! !image-2020-08-03-18-08-50-811.png! perhaps we can null out it after success of submission {code:java} //代码占位符 public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException { return submitJob(() -> getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings), classLoader); } public JobSubmissionResult submitJob(Supplier<JobGraph> jobGraphSupplier, ClassLoader classLoader) throws ProgramInvocationException { JobGraph jobGraph = jobGraphSupplier.get(); JobID jobID = jobGraph.getJobID(); log.info("Submitting job {} (detached: {}).", jobID, isDetached()); final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph); JobSubmissionResult result; try { result = jobSubmissionFuture.get(); //help GC jobGraph = null; } catch (Exception e) { throw new ProgramInvocationException("Could not submit job", jobID, ExceptionUtils.stripExecutionException(e)); } if (isDetached()) { return result; } else { final CompletableFuture<JobResult> jobResultFuture = requestJobResult(jobID); final JobResult jobResult; try { jobResult = jobResultFuture.get(); } catch (Exception e) { throw new ProgramInvocationException("Could not retrieve the execution result.", jobID, ExceptionUtils.stripExecutionException(e)); } try { this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader); return lastJobExecutionResult; } catch (JobExecutionException e) { throw new ProgramInvocationException("Job failed.", jobID, e); } catch (IOException | ClassNotFoundException e) { throw new ProgramInvocationException("Job failed.", jobID, e); } } } {code} we can see the job graph has been GC !image-2020-08-03-18-10-08-353.png! -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |