[jira] [Created] (FLINK-11389) Incorrectly use job information when call getSerializedTaskInformation in class TaskDeploymentDescriptor

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-11389) Incorrectly use job information when call getSerializedTaskInformation in class TaskDeploymentDescriptor

Shang Yuanchun (Jira)
yuqi created FLINK-11389:
----------------------------

             Summary: Incorrectly use job information when call getSerializedTaskInformation in class TaskDeploymentDescriptor
                 Key: FLINK-11389
                 URL: https://issues.apache.org/jira/browse/FLINK-11389
             Project: Flink
          Issue Type: Bug
            Reporter: yuqi
             Fix For: 1.7.2


See TaskDeploymentDescriptor


{code:java}
@Nullable
        public SerializedValue<TaskInformation> getSerializedTaskInformation() {
                if (serializedJobInformation instanceof NonOffloaded) {
                        NonOffloaded<TaskInformation> jobInformation =
                                (NonOffloaded<TaskInformation>) serializedTaskInformation;
                        return jobInformation.serializedValue;
                } else {
                        throw new IllegalStateException(
                                "Trying to work with offloaded serialized job information.");
                }
        }
{code}

the condition serializedJobInformation instanceof NonOffloaded is not correctly,
as serializedJobInformation and serializedTaskInformation are passed from ExecutionVertex#createDeploymentDescriptor


{code:java}

                if (jobInformationOrBlobKey.isLeft()) {
                        serializedJobInformation = new TaskDeploymentDescriptor.NonOffloaded<>(jobInformationOrBlobKey.left());
                } else {
                        serializedJobInformation = new TaskDeploymentDescriptor.Offloaded<>(jobInformationOrBlobKey.right());
                }

                final Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey;

                try {
                        taskInformationOrBlobKey = jobVertex.getTaskInformationOrBlobKey();
                } catch (IOException e) {
                        throw new ExecutionGraphException(
                                "Could not create a serialized JobVertexInformation for " +
                                        jobVertex.getJobVertexId(), e);
                }

                final TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> serializedTaskInformation;

                if (taskInformationOrBlobKey.isLeft()) {
                        serializedTaskInformation = new TaskDeploymentDescriptor.NonOffloaded<>(taskInformationOrBlobKey.left());
                } else {
                        serializedTaskInformation = new TaskDeploymentDescriptor.Offloaded<>(taskInformationOrBlobKey.right());
                }
{code}





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)