[jira] [Created] (FLINK-15669) SQL client can't cancel flink job

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-15669) SQL client can't cancel flink job

Shang Yuanchun (Jira)
godfrey he created FLINK-15669:
----------------------------------

             Summary: SQL client can't cancel flink job
                 Key: FLINK-15669
                 URL: https://issues.apache.org/jira/browse/FLINK-15669
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Client
    Affects Versions: 1.10.0
            Reporter: godfrey he
             Fix For: 1.10.0


in sql client, CLI client do cancel query through {{void cancelQuery(String sessionId, String resultId)}} method in {{Executor}}. However, the {{resultId}} is a random UUID, is not the job id. So CLI client can't cancel a running job.


{code:java}
        private <C> ResultDescriptor executeQueryInternal(String sessionId, ExecutionContext<C> context, String query) {
                ......

                // store the result with a unique id
                final String resultId = UUID.randomUUID().toString();
                resultStore.storeResult(resultId, result);

               ......

                // create execution
                final ProgramDeployer deployer = new ProgramDeployer(
                                configuration, jobName, pipeline);

                // start result retrieval
                result.startRetrieval(deployer);

                return new ResultDescriptor(
                                resultId,
                                removeTimeAttributes(table.getSchema()),
                                result.isMaterialized());
        }

private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId) {
                ......

                // stop Flink job
                try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
                        ClusterClient<T> clusterClient = null;
                        try {
                                // retrieve existing cluster
                                clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
                                try {
                                        clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
                                } catch (Throwable t) {
                                        // the job might has finished earlier
                                }
                        } catch (Exception e) {
                                throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
                        } finally {
                                try {
                                        if (clusterClient != null) {
                                                clusterClient.close();
                                        }
                                } catch (Exception e) {
                                        // ignore
                                }
                        }
                } catch (SqlExecutionException e) {
                        throw e;
                } catch (Exception e) {
                        throw new SqlExecutionException("Could not locate a cluster.", e);
                }
        }
{code}







--
This message was sent by Atlassian Jira
(v8.3.4#803005)