[jira] [Created] (FLINK-19154) Always clean up HA data when application completion

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-19154) Always clean up HA data when application completion

Shang Yuanchun (Jira)
Husky Zeng created FLINK-19154:
----------------------------------

             Summary: Always  clean up HA data when application completion
                 Key: FLINK-19154
                 URL: https://issues.apache.org/jira/browse/FLINK-19154
             Project: Flink
          Issue Type: Bug
          Components: Client / Job Submission
    Affects Versions: 1.11.1
         Environment: Run a stand-alone cluster that runs a single job (if you are familiar with the way Ververica Platform runs Flink jobs, we use a very similar approach). It runs Flink 1.11.1 straight from the official docker image.
            Reporter: Husky Zeng


http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-metadata-deleted-by-Flink-after-ZK-connection-issues-td37937.html

As this mail say , when the application completed with unknown throwable, the program catch and ignore it , and finally leads to clean up HA data.

``
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#runApplicationAndShutdownClusterAsync

        CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(
                        final DispatcherGateway dispatcher,
                        final ScheduledExecutor scheduledExecutor) {

                applicationCompletionFuture = fixJobIdAndRunApplicationAsync(dispatcher, scheduledExecutor);

                return applicationCompletionFuture
                                .handle((r, t) -> {
                                        final ApplicationStatus applicationStatus;
                                        if (t != null) {

                                                final Optional<JobCancellationException> cancellationException =
                                                                ExceptionUtils.findThrowable(t, JobCancellationException.class);

                                                if (cancellationException.isPresent()) {
                                                        // this means the Flink Job was cancelled
                                                        applicationStatus = ApplicationStatus.CANCELED;
                                                } else if (t instanceof CancellationException) {
                                                        // this means that the future was cancelled
                                                        applicationStatus = ApplicationStatus.UNKNOWN;
                                                } else {
                                                        applicationStatus = ApplicationStatus.FAILED;
                                                }

                                                LOG.warn("Application {}: ", applicationStatus, t);
                                        } else {
                                                applicationStatus = ApplicationStatus.SUCCEEDED;
                                                LOG.info("Application completed SUCCESSFULLY");
                                        }
                                      * // notes: whatever the throwable is,we will ignore it,*
                                        *return dispatcher.shutDownCluster(applicationStatus);*
                                })
                                .thenCompose(Function.identity());
        }

org.apache.flink.runtime.dispatcher.Dispatcher#shutDownCluster(org.apache.flink.runtime.clusterframework.ApplicationStatus)


@Override
        public CompletableFuture<Acknowledge> shutDownCluster(final ApplicationStatus applicationStatus) {
                // only complete , no completeExceptionally
                *shutDownFuture.complete(applicationStatus);*
                return CompletableFuture.completedFuture(Acknowledge.get());
        }


org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runCluster


private void runCluster(Configuration configuration, PluginManager pluginManager) throws Exception {
                synchronized (lock) {

                        initializeServices(configuration, pluginManager);

                        // write host information into configuration
                        configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
                        configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

                        final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

                        clusterComponent = dispatcherResourceManagerComponentFactory.create(
                                configuration,
                                ioExecutor,
                                commonRpcService,
                                haServices,
                                blobServer,
                                heartbeatServices,
                                metricRegistry,
                                archivedExecutionGraphStore,
                                new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
                                this);

// the throwable  will always null
                        *clusterComponent.getShutDownFuture().whenComplete*(
                                (ApplicationStatus applicationStatus, Throwable throwable) -> {
                                        if (throwable != null) {
                                                shutDownAsync(
                                                        ApplicationStatus.UNKNOWN,
                                                        ExceptionUtils.stringifyException(throwable),
                                                        false);
                                        } else {
                                                // This is the general shutdown path. If a separate more specific shutdown was
                                                // already triggered, this will do nothing
                                                shutDownAsync(
                                                        applicationStatus,
                                                        null,
                                                        true);
                                        }
                                });
                }
        }
``


So ,if we change code like this ,it will not clean up ha data when failed such as shutDownFuture..completeExceptionally(t)  when there is an unknown error.

By the way, this is the first time I submit an issue , if there are any fault , please told me. I am very glad to do something for the community.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)