Correct way to reference Hadoop dependencies in Flink 1.4.0

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Correct way to reference Hadoop dependencies in Flink 1.4.0

lrao@lyft.com
I'm trying to use a BucketingSink to write files to S3 in my Flink job.

I have the Hadoop dependencies I need packaged in my user application jar. However, on running the job I get the following error (from the taskmanager):

java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1125)
        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
        ... 9 common frames omitted
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
        at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
        ... 13 common frames omitted

What's the right way to do this?
Reply | Threaded
Open this post in threaded view
|

Re: Correct way to reference Hadoop dependencies in Flink 1.4.0

Francesco Ciuci
Hi,

You do not just need the hadoop dependencies in the jar but you need to
have the hadoop file system running in your machine/cluster.

Regards

On 14 March 2018 at 18:38, [hidden email] <[hidden email]> wrote:

> I'm trying to use a BucketingSink to write files to S3 in my Flink job.
>
> I have the Hadoop dependencies I need packaged in my user application jar.
> However, on running the job I get the following error (from the
> taskmanager):
>
> java.lang.RuntimeException: Error while creating FileSystem when
> initializing the state of the BucketingSink.
>         at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:358)
>         at org.apache.flink.streaming.util.functions.
> StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>         at org.apache.flink.streaming.util.functions.
> StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:
> 160)
>         at org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.
> java:96)
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:694)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:682)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 's3a'. The scheme is
> not directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.
>         at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:405)
>         at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>         at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.createHadoopFileSystem(BucketingSink.java:1125)
>         at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initFileSystem(BucketingSink.java:411)
>         at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:355)
>         ... 9 common frames omitted
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is not in the classpath/dependencies.
>         at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> UnsupportedSchemeFactory.java:64)
>         at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:401)
>         ... 13 common frames omitted
>
> What's the right way to do this?
>
Reply | Threaded
Open this post in threaded view
|

Re: Correct way to reference Hadoop dependencies in Flink 1.4.0

lrao@lyft.com
Hi,

I am running this on a Hadoop-free cluster (i.e. no YARN etc.). I have the following dependencies packaged in my user application JAR:

aws-java-sdk 1.7.4
flink-hadoop-fs 1.4.0
flink-shaded-hadoop2 1.4.0
flink-connector-filesystem_2.11 1.4.0
hadoop-common 2.7.4
hadoop-aws 2.7.4

I have also tried the following conf:
classloader.resolve-order: parent-first
fs.hdfs.hadoopconf: /srv/hadoop/hadoop-2.7.5/etc/hadoop

But no luck. Anything else I could be missing?

On 2018/03/14 18:57:47, Francesco Ciuci <[hidden email]> wrote:

> Hi,
>
> You do not just need the hadoop dependencies in the jar but you need to
> have the hadoop file system running in your machine/cluster.
>
> Regards
>
> On 14 March 2018 at 18:38, [hidden email] <[hidden email]> wrote:
>
> > I'm trying to use a BucketingSink to write files to S3 in my Flink job.
> >
> > I have the Hadoop dependencies I need packaged in my user application jar.
> > However, on running the job I get the following error (from the
> > taskmanager):
> >
> > java.lang.RuntimeException: Error while creating FileSystem when
> > initializing the state of the BucketingSink.
> >         at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.initializeState(BucketingSink.java:358)
> >         at org.apache.flink.streaming.util.functions.
> > StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> >         at org.apache.flink.streaming.util.functions.
> > StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:
> > 160)
> >         at org.apache.flink.streaming.api.operators.
> > AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.
> > java:96)
> >         at org.apache.flink.streaming.api.operators.
> > AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > initializeOperators(StreamTask.java:694)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > initializeState(StreamTask.java:682)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > invoke(StreamTask.java:253)
> >         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> >         at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> > Could not find a file system implementation for scheme 's3a'. The scheme is
> > not directly supported by Flink and no Hadoop file system to support this
> > scheme could be loaded.
> >         at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> > FileSystem.java:405)
> >         at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> >         at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.createHadoopFileSystem(BucketingSink.java:1125)
> >         at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.initFileSystem(BucketingSink.java:411)
> >         at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.initializeState(BucketingSink.java:355)
> >         ... 9 common frames omitted
> > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> > Hadoop is not in the classpath/dependencies.
> >         at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> > UnsupportedSchemeFactory.java:64)
> >         at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> > FileSystem.java:401)
> >         ... 13 common frames omitted
> >
> > What's the right way to do this?
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Correct way to reference Hadoop dependencies in Flink 1.4.0

Aljoscha Krettek-2
Hi,

I believe for FileSystems to be correctly be picked up they have to be in the lib/ folder of Flink. Stephan (cc'ed), please correct me if I'm wrong here, you probably know that one best.

Aljoscha

> On 14. Mar 2018, at 18:26, [hidden email] wrote:
>
> Hi,
>
> I am running this on a Hadoop-free cluster (i.e. no YARN etc.). I have the following dependencies packaged in my user application JAR:
>
> aws-java-sdk 1.7.4
> flink-hadoop-fs 1.4.0
> flink-shaded-hadoop2 1.4.0
> flink-connector-filesystem_2.11 1.4.0
> hadoop-common 2.7.4
> hadoop-aws 2.7.4
>
> I have also tried the following conf:
> classloader.resolve-order: parent-first
> fs.hdfs.hadoopconf: /srv/hadoop/hadoop-2.7.5/etc/hadoop
>
> But no luck. Anything else I could be missing?
>
> On 2018/03/14 18:57:47, Francesco Ciuci <[hidden email]> wrote:
>> Hi,
>>
>> You do not just need the hadoop dependencies in the jar but you need to
>> have the hadoop file system running in your machine/cluster.
>>
>> Regards
>>
>> On 14 March 2018 at 18:38, [hidden email] <[hidden email]> wrote:
>>
>>> I'm trying to use a BucketingSink to write files to S3 in my Flink job.
>>>
>>> I have the Hadoop dependencies I need packaged in my user application jar.
>>> However, on running the job I get the following error (from the
>>> taskmanager):
>>>
>>> java.lang.RuntimeException: Error while creating FileSystem when
>>> initializing the state of the BucketingSink.
>>>        at org.apache.flink.streaming.connectors.fs.bucketing.
>>> BucketingSink.initializeState(BucketingSink.java:358)
>>>        at org.apache.flink.streaming.util.functions.
>>> StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>>        at org.apache.flink.streaming.util.functions.
>>> StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:
>>> 160)
>>>        at org.apache.flink.streaming.api.operators.
>>> AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.
>>> java:96)
>>>        at org.apache.flink.streaming.api.operators.
>>> AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>>>        at org.apache.flink.streaming.runtime.tasks.StreamTask.
>>> initializeOperators(StreamTask.java:694)
>>>        at org.apache.flink.streaming.runtime.tasks.StreamTask.
>>> initializeState(StreamTask.java:682)
>>>        at org.apache.flink.streaming.runtime.tasks.StreamTask.
>>> invoke(StreamTask.java:253)
>>>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>        at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>>> Could not find a file system implementation for scheme 's3a'. The scheme is
>>> not directly supported by Flink and no Hadoop file system to support this
>>> scheme could be loaded.
>>>        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
>>> FileSystem.java:405)
>>>        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>>>        at org.apache.flink.streaming.connectors.fs.bucketing.
>>> BucketingSink.createHadoopFileSystem(BucketingSink.java:1125)
>>>        at org.apache.flink.streaming.connectors.fs.bucketing.
>>> BucketingSink.initFileSystem(BucketingSink.java:411)
>>>        at org.apache.flink.streaming.connectors.fs.bucketing.
>>> BucketingSink.initializeState(BucketingSink.java:355)
>>>        ... 9 common frames omitted
>>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>>> Hadoop is not in the classpath/dependencies.
>>>        at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
>>> UnsupportedSchemeFactory.java:64)
>>>        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
>>> FileSystem.java:401)
>>>        ... 13 common frames omitted
>>>
>>> What's the right way to do this?
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Correct way to reference Hadoop dependencies in Flink 1.4.0

Deepak Jha
Hi,
You probably need to set core-site.xml and set the Hadoop conf path in
flink-conf.yaml

core-site.xml:

<configuration>
<property>
  <name>fs.s3.impl</name>
  <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value></property>
<!-- Comma separated list of local directories used to buffer
     large results prior to transmitting them to S3. --><property>
  <name>fs.s3.buffer.dir</name>
  <value>/tmp</value></property>
</configuration>


I’ve had similar issue when I tried to upgrade to Flink 1.4.2 .

On Thu, Mar 15, 2018 at 9:39 AM Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
>
> I believe for FileSystems to be correctly be picked up they have to be in
> the lib/ folder of Flink. Stephan (cc'ed), please correct me if I'm wrong
> here, you probably know that one best.
>
> Aljoscha
>
> > On 14. Mar 2018, at 18:26, [hidden email] wrote:
> >
> > Hi,
> >
> > I am running this on a Hadoop-free cluster (i.e. no YARN etc.). I have
> the following dependencies packaged in my user application JAR:
> >
> > aws-java-sdk 1.7.4
> > flink-hadoop-fs 1.4.0
> > flink-shaded-hadoop2 1.4.0
> > flink-connector-filesystem_2.11 1.4.0
> > hadoop-common 2.7.4
> > hadoop-aws 2.7.4
> >
> > I have also tried the following conf:
> > classloader.resolve-order: parent-first
> > fs.hdfs.hadoopconf: /srv/hadoop/hadoop-2.7.5/etc/hadoop
> >
> > But no luck. Anything else I could be missing?
> >
> > On 2018/03/14 18:57:47, Francesco Ciuci <[hidden email]>
> wrote:
> >> Hi,
> >>
> >> You do not just need the hadoop dependencies in the jar but you need to
> >> have the hadoop file system running in your machine/cluster.
> >>
> >> Regards
> >>
> >> On 14 March 2018 at 18:38, [hidden email] <[hidden email]> wrote:
> >>
> >>> I'm trying to use a BucketingSink to write files to S3 in my Flink job.
> >>>
> >>> I have the Hadoop dependencies I need packaged in my user application
> jar.
> >>> However, on running the job I get the following error (from the
> >>> taskmanager):
> >>>
> >>> java.lang.RuntimeException: Error while creating FileSystem when
> >>> initializing the state of the BucketingSink.
> >>>        at org.apache.flink.streaming.connectors.fs.bucketing.
> >>> BucketingSink.initializeState(BucketingSink.java:358)
> >>>        at org.apache.flink.streaming.util.functions.
> >>>
> StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> >>>        at org.apache.flink.streaming.util.functions.
> >>>
> StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:
> >>> 160)
> >>>        at org.apache.flink.streaming.api.operators.
> >>> AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.
> >>> java:96)
> >>>        at org.apache.flink.streaming.api.operators.
> >>> AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
> >>>        at org.apache.flink.streaming.runtime.tasks.StreamTask.
> >>> initializeOperators(StreamTask.java:694)
> >>>        at org.apache.flink.streaming.runtime.tasks.StreamTask.
> >>> initializeState(StreamTask.java:682)
> >>>        at org.apache.flink.streaming.runtime.tasks.StreamTask.
> >>> invoke(StreamTask.java:253)
> >>>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> >>>        at java.lang.Thread.run(Thread.java:748)
> >>> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> >>> Could not find a file system implementation for scheme 's3a'. The
> scheme is
> >>> not directly supported by Flink and no Hadoop file system to support
> this
> >>> scheme could be loaded.
> >>>        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> >>> FileSystem.java:405)
> >>>        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> >>>        at org.apache.flink.streaming.connectors.fs.bucketing.
> >>> BucketingSink.createHadoopFileSystem(BucketingSink.java:1125)
> >>>        at org.apache.flink.streaming.connectors.fs.bucketing.
> >>> BucketingSink.initFileSystem(BucketingSink.java:411)
> >>>        at org.apache.flink.streaming.connectors.fs.bucketing.
> >>> BucketingSink.initializeState(BucketingSink.java:355)
> >>>        ... 9 common frames omitted
> >>> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> >>> Hadoop is not in the classpath/dependencies.
> >>>        at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> >>> UnsupportedSchemeFactory.java:64)
> >>>        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> >>> FileSystem.java:401)
> >>>        ... 13 common frames omitted
> >>>
> >>> What's the right way to do this?
> >>>
> >>
>
> --
Sent from Gmail Mobile