Dawid Wysakowicz created FLINK-10370:
---------------------------------------- Summary: DistributedCache does not work in job cluster mode. Key: FLINK-10370 URL: https://issues.apache.org/jira/browse/FLINK-10370 Project: Flink Issue Type: Bug Components: Job-Submission Affects Versions: 1.6.0 Reporter: Dawid Wysakowicz When using job cluster mode the client does not follow a standard submission path during which {{DistributedCacheEntries}} are written into {{Configuration}}. Therefore the files cannot be accessed in the job. How to reproduce: Simple job that uses {{DistributedCache}}: {code} public class DistributedCacheViaDfsTestProgram { public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final String inputFile = "hdfs://172.17.0.2:8020/home/hadoop-user/in"; final String outputFile = "/tmp/out"; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.registerCachedFile(inputFile, "test_data", false); env.fromElements(1) .map(new TestMapFunction()) .writeAsText(outputFile, FileSystem.WriteMode.OVERWRITE); env.execute("Distributed Cache Via Blob Test Program"); } static class TestMapFunction extends RichMapFunction<Integer, String> { @Override public String map(Integer value) throws Exception { final Path testFile = getRuntimeContext().getDistributedCache().getFile("test_data").toPath(); return Files.readAllLines(testFile) .stream() .collect(Collectors.joining("\n")); } } } {code} If one runs this program e.g. in yarn job cluster mode this will produce: {code} java.lang.IllegalArgumentException: File with name 'test_data' is not available. Did you forget to register the file? at org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:110) at org.apache.flink.streaming.tests.DistributedCacheViaDfsTestProgram$TestMapFunction.map(DistributedCacheViaDfsTestProgram.java:59) at org.apache.flink.streaming.tests.DistributedCacheViaDfsTestProgram$TestMapFunction.map(DistributedCacheViaDfsTestProgram.java:55) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:164) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) {code} This job will run fine though, if it will be submitted to yarn-session cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |