Husky Zeng created FLINK-19154:
---------------------------------- Summary: Always clean up HA data when application completion Key: FLINK-19154 URL: https://issues.apache.org/jira/browse/FLINK-19154 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.11.1 Environment: Run a stand-alone cluster that runs a single job (if you are familiar with the way Ververica Platform runs Flink jobs, we use a very similar approach). It runs Flink 1.11.1 straight from the official docker image. Reporter: Husky Zeng http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-metadata-deleted-by-Flink-after-ZK-connection-issues-td37937.html As this mail say , when the application completed with unknown throwable, the program catch and ignore it , and finally leads to clean up HA data. `` org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#runApplicationAndShutdownClusterAsync CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync( final DispatcherGateway dispatcher, final ScheduledExecutor scheduledExecutor) { applicationCompletionFuture = fixJobIdAndRunApplicationAsync(dispatcher, scheduledExecutor); return applicationCompletionFuture .handle((r, t) -> { final ApplicationStatus applicationStatus; if (t != null) { final Optional<JobCancellationException> cancellationException = ExceptionUtils.findThrowable(t, JobCancellationException.class); if (cancellationException.isPresent()) { // this means the Flink Job was cancelled applicationStatus = ApplicationStatus.CANCELED; } else if (t instanceof CancellationException) { // this means that the future was cancelled applicationStatus = ApplicationStatus.UNKNOWN; } else { applicationStatus = ApplicationStatus.FAILED; } LOG.warn("Application {}: ", applicationStatus, t); } else { applicationStatus = ApplicationStatus.SUCCEEDED; LOG.info("Application completed SUCCESSFULLY"); } * // notes: whatever the throwable is,we will ignore it,* *return dispatcher.shutDownCluster(applicationStatus);* }) .thenCompose(Function.identity()); } org.apache.flink.runtime.dispatcher.Dispatcher#shutDownCluster(org.apache.flink.runtime.clusterframework.ApplicationStatus) @Override public CompletableFuture<Acknowledge> shutDownCluster(final ApplicationStatus applicationStatus) { // only complete , no completeExceptionally *shutDownFuture.complete(applicationStatus);* return CompletableFuture.completedFuture(Acknowledge.get()); } org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runCluster private void runCluster(Configuration configuration, PluginManager pluginManager) throws Exception { synchronized (lock) { initializeServices(configuration, pluginManager); // write host information into configuration configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration); clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, ioExecutor, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), this); // the throwable will always null *clusterComponent.getShutDownFuture().whenComplete*( (ApplicationStatus applicationStatus, Throwable throwable) -> { if (throwable != null) { shutDownAsync( ApplicationStatus.UNKNOWN, ExceptionUtils.stringifyException(throwable), false); } else { // This is the general shutdown path. If a separate more specific shutdown was // already triggered, this will do nothing shutDownAsync( applicationStatus, null, true); } }); } } `` So ,if we change code like this ,it will not clean up ha data when failed such as shutDownFuture..completeExceptionally(t) when there is an unknown error. By the way, this is the first time I submit an issue , if there are any fault , please told me. I am very glad to do something for the community. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |