I'm trying to use a BucketingSink to write files to S3 in my Flink job.
I have the Hadoop dependencies I need packaged in my user application jar. However, on running the job I get the following error (from the taskmanager): java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink. at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1125) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) ... 9 common frames omitted Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401) ... 13 common frames omitted What's the right way to do this? |
Hi,
You do not just need the hadoop dependencies in the jar but you need to have the hadoop file system running in your machine/cluster. Regards On 14 March 2018 at 18:38, [hidden email] <[hidden email]> wrote: > I'm trying to use a BucketingSink to write files to S3 in my Flink job. > > I have the Hadoop dependencies I need packaged in my user application jar. > However, on running the job I get the following error (from the > taskmanager): > > java.lang.RuntimeException: Error while creating FileSystem when > initializing the state of the BucketingSink. > at org.apache.flink.streaming.connectors.fs.bucketing. > BucketingSink.initializeState(BucketingSink.java:358) > at org.apache.flink.streaming.util.functions. > StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at org.apache.flink.streaming.util.functions. > StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java: > 160) > at org.apache.flink.streaming.api.operators. > AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator. > java:96) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeOperators(StreamTask.java:694) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeState(StreamTask.java:682) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Could not find a file system implementation for scheme 's3a'. The scheme is > not directly supported by Flink and no Hadoop file system to support this > scheme could be loaded. > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > FileSystem.java:405) > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) > at org.apache.flink.streaming.connectors.fs.bucketing. > BucketingSink.createHadoopFileSystem(BucketingSink.java:1125) > at org.apache.flink.streaming.connectors.fs.bucketing. > BucketingSink.initFileSystem(BucketingSink.java:411) > at org.apache.flink.streaming.connectors.fs.bucketing. > BucketingSink.initializeState(BucketingSink.java:355) > ... 9 common frames omitted > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Hadoop is not in the classpath/dependencies. > at org.apache.flink.core.fs.UnsupportedSchemeFactory.create( > UnsupportedSchemeFactory.java:64) > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > FileSystem.java:401) > ... 13 common frames omitted > > What's the right way to do this? > |
Hi,
I am running this on a Hadoop-free cluster (i.e. no YARN etc.). I have the following dependencies packaged in my user application JAR: aws-java-sdk 1.7.4 flink-hadoop-fs 1.4.0 flink-shaded-hadoop2 1.4.0 flink-connector-filesystem_2.11 1.4.0 hadoop-common 2.7.4 hadoop-aws 2.7.4 I have also tried the following conf: classloader.resolve-order: parent-first fs.hdfs.hadoopconf: /srv/hadoop/hadoop-2.7.5/etc/hadoop But no luck. Anything else I could be missing? On 2018/03/14 18:57:47, Francesco Ciuci <[hidden email]> wrote: > Hi, > > You do not just need the hadoop dependencies in the jar but you need to > have the hadoop file system running in your machine/cluster. > > Regards > > On 14 March 2018 at 18:38, [hidden email] <[hidden email]> wrote: > > > I'm trying to use a BucketingSink to write files to S3 in my Flink job. > > > > I have the Hadoop dependencies I need packaged in my user application jar. > > However, on running the job I get the following error (from the > > taskmanager): > > > > java.lang.RuntimeException: Error while creating FileSystem when > > initializing the state of the BucketingSink. > > at org.apache.flink.streaming.connectors.fs.bucketing. > > BucketingSink.initializeState(BucketingSink.java:358) > > at org.apache.flink.streaming.util.functions. > > StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > > at org.apache.flink.streaming.util.functions. > > StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java: > > 160) > > at org.apache.flink.streaming.api.operators. > > AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator. > > java:96) > > at org.apache.flink.streaming.api.operators. > > AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > initializeOperators(StreamTask.java:694) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > initializeState(StreamTask.java:682) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > invoke(StreamTask.java:253) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > > Could not find a file system implementation for scheme 's3a'. The scheme is > > not directly supported by Flink and no Hadoop file system to support this > > scheme could be loaded. > > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > > FileSystem.java:405) > > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) > > at org.apache.flink.streaming.connectors.fs.bucketing. > > BucketingSink.createHadoopFileSystem(BucketingSink.java:1125) > > at org.apache.flink.streaming.connectors.fs.bucketing. > > BucketingSink.initFileSystem(BucketingSink.java:411) > > at org.apache.flink.streaming.connectors.fs.bucketing. > > BucketingSink.initializeState(BucketingSink.java:355) > > ... 9 common frames omitted > > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > > Hadoop is not in the classpath/dependencies. > > at org.apache.flink.core.fs.UnsupportedSchemeFactory.create( > > UnsupportedSchemeFactory.java:64) > > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > > FileSystem.java:401) > > ... 13 common frames omitted > > > > What's the right way to do this? > > > |
Hi,
I believe for FileSystems to be correctly be picked up they have to be in the lib/ folder of Flink. Stephan (cc'ed), please correct me if I'm wrong here, you probably know that one best. Aljoscha > On 14. Mar 2018, at 18:26, [hidden email] wrote: > > Hi, > > I am running this on a Hadoop-free cluster (i.e. no YARN etc.). I have the following dependencies packaged in my user application JAR: > > aws-java-sdk 1.7.4 > flink-hadoop-fs 1.4.0 > flink-shaded-hadoop2 1.4.0 > flink-connector-filesystem_2.11 1.4.0 > hadoop-common 2.7.4 > hadoop-aws 2.7.4 > > I have also tried the following conf: > classloader.resolve-order: parent-first > fs.hdfs.hadoopconf: /srv/hadoop/hadoop-2.7.5/etc/hadoop > > But no luck. Anything else I could be missing? > > On 2018/03/14 18:57:47, Francesco Ciuci <[hidden email]> wrote: >> Hi, >> >> You do not just need the hadoop dependencies in the jar but you need to >> have the hadoop file system running in your machine/cluster. >> >> Regards >> >> On 14 March 2018 at 18:38, [hidden email] <[hidden email]> wrote: >> >>> I'm trying to use a BucketingSink to write files to S3 in my Flink job. >>> >>> I have the Hadoop dependencies I need packaged in my user application jar. >>> However, on running the job I get the following error (from the >>> taskmanager): >>> >>> java.lang.RuntimeException: Error while creating FileSystem when >>> initializing the state of the BucketingSink. >>> at org.apache.flink.streaming.connectors.fs.bucketing. >>> BucketingSink.initializeState(BucketingSink.java:358) >>> at org.apache.flink.streaming.util.functions. >>> StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) >>> at org.apache.flink.streaming.util.functions. >>> StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java: >>> 160) >>> at org.apache.flink.streaming.api.operators. >>> AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator. >>> java:96) >>> at org.apache.flink.streaming.api.operators. >>> AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask. >>> initializeOperators(StreamTask.java:694) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask. >>> initializeState(StreamTask.java:682) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask. >>> invoke(StreamTask.java:253) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: >>> Could not find a file system implementation for scheme 's3a'. The scheme is >>> not directly supported by Flink and no Hadoop file system to support this >>> scheme could be loaded. >>> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( >>> FileSystem.java:405) >>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) >>> at org.apache.flink.streaming.connectors.fs.bucketing. >>> BucketingSink.createHadoopFileSystem(BucketingSink.java:1125) >>> at org.apache.flink.streaming.connectors.fs.bucketing. >>> BucketingSink.initFileSystem(BucketingSink.java:411) >>> at org.apache.flink.streaming.connectors.fs.bucketing. >>> BucketingSink.initializeState(BucketingSink.java:355) >>> ... 9 common frames omitted >>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: >>> Hadoop is not in the classpath/dependencies. >>> at org.apache.flink.core.fs.UnsupportedSchemeFactory.create( >>> UnsupportedSchemeFactory.java:64) >>> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( >>> FileSystem.java:401) >>> ... 13 common frames omitted >>> >>> What's the right way to do this? >>> >> |
Hi,
You probably need to set core-site.xml and set the Hadoop conf path in flink-conf.yaml core-site.xml: <configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value></property> <!-- Comma separated list of local directories used to buffer large results prior to transmitting them to S3. --><property> <name>fs.s3.buffer.dir</name> <value>/tmp</value></property> </configuration> I’ve had similar issue when I tried to upgrade to Flink 1.4.2 . On Thu, Mar 15, 2018 at 9:39 AM Aljoscha Krettek <[hidden email]> wrote: > Hi, > > I believe for FileSystems to be correctly be picked up they have to be in > the lib/ folder of Flink. Stephan (cc'ed), please correct me if I'm wrong > here, you probably know that one best. > > Aljoscha > > > On 14. Mar 2018, at 18:26, [hidden email] wrote: > > > > Hi, > > > > I am running this on a Hadoop-free cluster (i.e. no YARN etc.). I have > the following dependencies packaged in my user application JAR: > > > > aws-java-sdk 1.7.4 > > flink-hadoop-fs 1.4.0 > > flink-shaded-hadoop2 1.4.0 > > flink-connector-filesystem_2.11 1.4.0 > > hadoop-common 2.7.4 > > hadoop-aws 2.7.4 > > > > I have also tried the following conf: > > classloader.resolve-order: parent-first > > fs.hdfs.hadoopconf: /srv/hadoop/hadoop-2.7.5/etc/hadoop > > > > But no luck. Anything else I could be missing? > > > > On 2018/03/14 18:57:47, Francesco Ciuci <[hidden email]> > wrote: > >> Hi, > >> > >> You do not just need the hadoop dependencies in the jar but you need to > >> have the hadoop file system running in your machine/cluster. > >> > >> Regards > >> > >> On 14 March 2018 at 18:38, [hidden email] <[hidden email]> wrote: > >> > >>> I'm trying to use a BucketingSink to write files to S3 in my Flink job. > >>> > >>> I have the Hadoop dependencies I need packaged in my user application > jar. > >>> However, on running the job I get the following error (from the > >>> taskmanager): > >>> > >>> java.lang.RuntimeException: Error while creating FileSystem when > >>> initializing the state of the BucketingSink. > >>> at org.apache.flink.streaming.connectors.fs.bucketing. > >>> BucketingSink.initializeState(BucketingSink.java:358) > >>> at org.apache.flink.streaming.util.functions. > >>> > StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > >>> at org.apache.flink.streaming.util.functions. > >>> > StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java: > >>> 160) > >>> at org.apache.flink.streaming.api.operators. > >>> AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator. > >>> java:96) > >>> at org.apache.flink.streaming.api.operators. > >>> AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) > >>> at org.apache.flink.streaming.runtime.tasks.StreamTask. > >>> initializeOperators(StreamTask.java:694) > >>> at org.apache.flink.streaming.runtime.tasks.StreamTask. > >>> initializeState(StreamTask.java:682) > >>> at org.apache.flink.streaming.runtime.tasks.StreamTask. > >>> invoke(StreamTask.java:253) > >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > >>> at java.lang.Thread.run(Thread.java:748) > >>> Caused by: > org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > >>> Could not find a file system implementation for scheme 's3a'. The > scheme is > >>> not directly supported by Flink and no Hadoop file system to support > this > >>> scheme could be loaded. > >>> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > >>> FileSystem.java:405) > >>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) > >>> at org.apache.flink.streaming.connectors.fs.bucketing. > >>> BucketingSink.createHadoopFileSystem(BucketingSink.java:1125) > >>> at org.apache.flink.streaming.connectors.fs.bucketing. > >>> BucketingSink.initFileSystem(BucketingSink.java:411) > >>> at org.apache.flink.streaming.connectors.fs.bucketing. > >>> BucketingSink.initializeState(BucketingSink.java:355) > >>> ... 9 common frames omitted > >>> Caused by: > org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > >>> Hadoop is not in the classpath/dependencies. > >>> at org.apache.flink.core.fs.UnsupportedSchemeFactory.create( > >>> UnsupportedSchemeFactory.java:64) > >>> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > >>> FileSystem.java:401) > >>> ... 13 common frames omitted > >>> > >>> What's the right way to do this? > >>> > >> > > -- |
Free forum by Nabble | Edit this page |