[jira] [Created] (FLINK-1438) ClassCastException for Custom InputSplit in local mode

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-1438) ClassCastException for Custom InputSplit in local mode

Shang Yuanchun (Jira)
Fabian Hueske created FLINK-1438:
------------------------------------

             Summary: ClassCastException for Custom InputSplit in local mode
                 Key: FLINK-1438
                 URL: https://issues.apache.org/jira/browse/FLINK-1438
             Project: Flink
          Issue Type: Bug
          Components: JobManager
    Affects Versions: 0.8
            Reporter: Fabian Hueske
            Priority: Minor


Jobs with custom InputSplits fail with a ClassCastException such as {{org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit cannot be cast to org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit}} if executed on a local setup.

This issue is probably related to different ClassLoaders used by the JobManager when InputSplits are generated and when they are handed to the InputFormat by the TaskManager. Moving the class of the custom InputSplit into the {{./lib}} folder and removing it from the job's makes the job work.

To reproduce the bug, run the following job on a local setup.

{code}
public class CustomSplitTestJob {

        public static void main(String[] args) throws Exception {

                ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

                DataSet<String> x = env.createInput(new TestFileInputFormat());
                x.print();

                env.execute();
        }

        public static class TestFileInputFormat implements InputFormat<String,TestFileInputSplit> {

                @Override
                public void configure(Configuration parameters) {

                }

                @Override
                public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
                        return null;
                }

                @Override
                public TestFileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
                        return new TestFileInputSplit[]{new TestFileInputSplit()};
                }

                @Override
                public InputSplitAssigner getInputSplitAssigner(TestFileInputSplit[] inputSplits) {
                        return new LocatableInputSplitAssigner(inputSplits);
                }

                @Override
                public void open(TestFileInputSplit split) throws IOException {

                }

                @Override
                public boolean reachedEnd() throws IOException {
                        return false;
                }

                @Override
                public String nextRecord(String reuse) throws IOException {
                        return null;
                }

                @Override
                public void close() throws IOException {

                }
        }

        public static class TestFileInputSplit extends FileInputSplit {

        }

}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)