[jira] [Created] (FLINK-12195) Incorrect resource time setting causes flink to fail to submit

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-12195) Incorrect resource time setting causes flink to fail to submit

Shang Yuanchun (Jira)
tangshangwen created FLINK-12195:
------------------------------------

             Summary: Incorrect resource time setting causes flink to fail to submit
                 Key: FLINK-12195
                 URL: https://issues.apache.org/jira/browse/FLINK-12195
             Project: Flink
          Issue Type: Bug
          Components: Deployment / YARN
    Affects Versions: 1.6.3
            Reporter: tangshangwen


We used Tencent cos as defaultFS, and when we submitted the job, we ran into a YARN checking resource time mismatch that prevented the job from being submitted

 

{{2019-04-15 14:45:47,683 DEBUG org.apache.hadoop.security.UserGroupInformation: PrivilegedActionException as:hadoop (auth:SIMPLE) cause:java.io.IOException: Resource cosn://xxx-xxx/user/hadoop/.flink/application_1555078596113_0014/logback.xml changed on src filesystem (expected 1555259286000, was 1555310742000}}

 

 

I found that flink uses the lastModified of the local file, and why is it not the latest time for the remote file system?
{code:java}
LOG.debug("Copying from {} to {}", localSrcPath, dst);

fs.copyFromLocalFile(false, true, localSrcPath, dst);

// Note: If we used registerLocalResource(FileSystem, Path) here, we would access the remote
// file once again which has problems with eventually consistent read-after-write file
// systems. Instead, we decide to preserve the modification time at the remote
// location because this and the size of the resource will be checked by YARN based on
// the values we provide to #registerLocalResource() below.
fs.setTimes(dst, localFile.lastModified(), -1);

// now create the resource instance
LocalResource resource = registerLocalResource(dst, localFile.length(), localFile.lastModified());

return Tuple2.of(dst, resource);{code}
Maybe it should be
{code:java}
// now create the resource instance
LocalResource resource = registerLocalResource(dst, localFile.length(), fs.getFileStatus(dst).getModificationTime());{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)