GlobFilePathFilter NotSerializableException

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

GlobFilePathFilter NotSerializableException

Andrew Psaltis
Hi,
I am trying to use the GlobFilePathFIlter with Flink 1.2-SNAPSHOT have also
tried using the latest 1.3-SNAPSHOT code and get the same error. Basically
if using the GlobFilePathFilter there is a serialization exception due to
the inner class in sun.nio.fs.UnixFileSystem not being serializable. I have
tried various different kryo registrations, but must be missing something,
I am happy to work on fixing this, but may need some direction. The below
code (which I lifted from the testReadMultiplePatterns() in the
FileInputFormatTest class) reproduces the error, the exception and stack
trace follows. FWIW, I am testing this on OSX.

public static void main(String[] args) throws Exception {

    final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    final TextInputFormat format = new TextInputFormat(new Path("/temp"));

    format.setFilesFilter(new GlobFilePathFilter(
            Collections.singletonList("**"),
            Arrays.asList("**/another_file.bin", "**/dataFile1.txt")
    ));

    DataSet<String> result = env.readFile(format,"/tmp");
    result.writeAsText("/temp/out");
    env.execute("GlobFilePathFilter-Test");

}


Exception in thread "main" org.apache.flink.optimizer.CompilerException:
Error translating node 'Data Source "at
readFile(ExecutionEnvironment.java:520)
(org.apache.flink.api.java.io.TextInputFormat)" : NONE [[ GlobalProperties
[partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
grouped=null, unique=null] ]]': Could not write the user code wrapper class
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
java.io.NotSerializableException: sun.nio.fs.UnixFileSystem$3
at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:381)
at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:106)
at
org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86)
at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at
org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:192)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:188)
at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at com.apsaltis.EventDetectionJob.main(EventDetectionJob.java:75)
Caused by:
org.apache.flink.runtime.operators.util.CorruptConfigurationException:
Could not write the user code wrapper class
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
java.io.NotSerializableException: sun.nio.fs.UnixFileSystem$3
at
org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281)
at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:888)
at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:281)
... 8 more
Caused by: java.io.NotSerializableException: sun.nio.fs.UnixFileSystem$3
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.ArrayList.writeObject(ArrayList.java:747)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
at
org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:254)
at
org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279)
... 10 more
--


Thanks,
Andrew
Reply | Threaded
Open this post in threaded view
|

Re: GlobFilePathFilter NotSerializableException

Chesnay Schepler-3
Hello,

this appears to be a bug in Flink.

The problem is that the PathMatcher objects in the GlobFilePathFilter
all contain a reference to
the encapsulating class.

The easiest solution to this would be to build the PathMatcher after
they were shipped within
the filterPath method. Since this code is in flink-core we can't use the
ClosureCleaner unfortunately.

I have created a JIRA for this:
https://issues.apache.org/jira/browse/FLINK-5612

Regards,
Chesnay

On 23.01.2017 00:07, Andrew Psaltis wrote:

> Hi,
> I am trying to use the GlobFilePathFIlter with Flink 1.2-SNAPSHOT have also
> tried using the latest 1.3-SNAPSHOT code and get the same error. Basically
> if using the GlobFilePathFilter there is a serialization exception due to
> the inner class in sun.nio.fs.UnixFileSystem not being serializable. I have
> tried various different kryo registrations, but must be missing something,
> I am happy to work on fixing this, but may need some direction. The below
> code (which I lifted from the testReadMultiplePatterns() in the
> FileInputFormatTest class) reproduces the error, the exception and stack
> trace follows. FWIW, I am testing this on OSX.
>
> public static void main(String[] args) throws Exception {
>
>      final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>      final TextInputFormat format = new TextInputFormat(new Path("/temp"));
>
>      format.setFilesFilter(new GlobFilePathFilter(
>              Collections.singletonList("**"),
>              Arrays.asList("**/another_file.bin", "**/dataFile1.txt")
>      ));
>
>      DataSet<String> result = env.readFile(format,"/tmp");
>      result.writeAsText("/temp/out");
>      env.execute("GlobFilePathFilter-Test");
>
> }
>
>
> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
> Error translating node 'Data Source "at
> readFile(ExecutionEnvironment.java:520)
> (org.apache.flink.api.java.io.TextInputFormat)" : NONE [[ GlobalProperties
> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
> grouped=null, unique=null] ]]': Could not write the user code wrapper class
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> java.io.NotSerializableException: sun.nio.fs.UnixFileSystem$3
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:381)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:106)
> at
> org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86)
> at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> at
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:192)
> at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:188)
> at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
> at com.apsaltis.EventDetectionJob.main(EventDetectionJob.java:75)
> Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
> Could not write the user code wrapper class
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> java.io.NotSerializableException: sun.nio.fs.UnixFileSystem$3
> at
> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:888)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:281)
> ... 8 more
> Caused by: java.io.NotSerializableException: sun.nio.fs.UnixFileSystem$3
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.ArrayList.writeObject(ArrayList.java:747)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
> at
> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:254)
> at
> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279)
> ... 10 more