Intermittent No FileSystem found exception

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Intermittent No FileSystem found exception

Maulik Soneji
Hi everyone,

We are running a Batch job on flink that reads data from GCS and does some
aggregation on this data.
We are intermittently getting issue: `No filesystem found for scheme gs`

We are running Beam version 2.15.0 with FlinkRunner, Flink version: 1.6.4

On remote debugging the task managers, we found that in a few task
managers, the *GcsFileSystemRegistrar is not added to the list of
FileSystem Schemes*. In these task managers, we get this issue.

The collection *SCHEME_TO_FILESYSTEM* is getting modified only in
*setDefaultPipelineOptions* function call in
org.apache.beam.sdk.io.FileSystems class and this function is not getting
called and thus the GcsFileSystemRegistrar is not added to
*SCHEME_TO_FILESYSTEM*.

*Detailed stacktrace:*


java.lang.IllegalArgumentException: No filesystem found for scheme gs
        at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
        at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
        at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
        at org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)
        at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)
        at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)
        at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
        at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
        at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
        at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)
        at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
        at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
        at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
        at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)

Inorder to resolve this issue, we tried calling the following in
PTransform's expand function:

FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());

This function call is to make sure that the GcsFileSystemRegistrar is added
to the list, but this hasn't solved the issue.

Can someone please help in checking why this might be happening and what
can be done to resolve this issue.

Thanks and Regards,
Maulik
Reply | Threaded
Open this post in threaded view
|

Re: Intermittent No FileSystem found exception

Congxian Qiu
Hi Maulik,
Do you try flink 1.9, is the problem is still there?

Best,
Congxian


Maulik Soneji <[hidden email]> 于2019年10月24日周四 下午9:03写道:

> Hi everyone,
>
> We are running a Batch job on flink that reads data from GCS and does some
> aggregation on this data.
> We are intermittently getting issue: `No filesystem found for scheme gs`
>
> We are running Beam version 2.15.0 with FlinkRunner, Flink version: 1.6.4
>
> On remote debugging the task managers, we found that in a few task
> managers, the *GcsFileSystemRegistrar is not added to the list of
> FileSystem Schemes*. In these task managers, we get this issue.
>
> The collection *SCHEME_TO_FILESYSTEM* is getting modified only in
> *setDefaultPipelineOptions* function call in
> org.apache.beam.sdk.io.FileSystems class and this function is not getting
> called and thus the GcsFileSystemRegistrar is not added to
> *SCHEME_TO_FILESYSTEM*.
>
> *Detailed stacktrace:*
>
>
> java.lang.IllegalArgumentException: No filesystem found for scheme gs
>         at org.apache.beam.sdk.io
> .FileSystems.getFileSystemInternal(FileSystems.java:463)
>         at org.apache.beam.sdk.io
> .FileSystems.matchNewResource(FileSystems.java:533)
>         at
> org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
>         at
> org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)
>         at
> org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)
>         at
> org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)
>         at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>         at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>         at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
>         at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)
>         at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>         at org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>         at org.apache.flink.runtime.io
> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>         at org.apache.flink.runtime.io
> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>         at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>         at
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
>
> Inorder to resolve this issue, we tried calling the following in
> PTransform's expand function:
>
> FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
>
> This function call is to make sure that the GcsFileSystemRegistrar is added
> to the list, but this hasn't solved the issue.
>
> Can someone please help in checking why this might be happening and what
> can be done to resolve this issue.
>
> Thanks and Regards,
> Maulik
>