Flink's Checking and uploading JAR files Issue

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink's Checking and uploading JAR files Issue

hanan meyer
Hello All

I use Flink in order to filter data from Hdfs and write it back as CSV.

I keep getting the "Checking and uploading JAR files" on every DataSet
filtering action or
executionEnvironment execution.

I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) because I
launch Flink from
a J2EE Aplication Server .

The Jars serialization and transportation takes a huge part of the
execution time .
Is there a way to force Flink to pass the Jars only once?

Please advise

Thanks,

Hanan Meyer
Reply | Threaded
Open this post in threaded view
|

Re: Flink's Checking and uploading JAR files Issue

Stephan Ewen
I think there is not yet any mechanism, but it would be a good addition, I
agree.

Between JobManager and TaskManagers, the JARs are cached. The TaskManagers
receive hashes of the JARs only, and only load them if they do not already
have them. The same mechanism should be used for the Client to upload JARs
to the JobManager - that way, they would be transferred only once.

For now, a workaround is to directly put the user JARs into the "lib"
directory of the flink directory. That way they are available to every
worker without and uploads per job. Your RemoteExecutionEnvironment would
then not have any JARs at all.

Would the workaround work for you for now?

Greetings,
Stephan


On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer <[hidden email]> wrote:

> Hello All
>
> I use Flink in order to filter data from Hdfs and write it back as CSV.
>
> I keep getting the "Checking and uploading JAR files" on every DataSet
> filtering action or
> executionEnvironment execution.
>
> I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) because I
> launch Flink from
> a J2EE Aplication Server .
>
> The Jars serialization and transportation takes a huge part of the
> execution time .
> Is there a way to force Flink to pass the Jars only once?
>
> Please advise
>
> Thanks,
>
> Hanan Meyer
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink's Checking and uploading JAR files Issue

till.rohrmann
In reply to this post by hanan meyer
Hi Hanan,

you're right that currently every time you submit a job to the Flink
cluster, all user code jars are uploaded and overwrite possibly existing
files. This is not really necessary if they don't change. Maybe we should
add a check that already existing files on the JobManager are not uploaded
again by the JobClient. This should improve the performance for your use
case.

The corresponding JIRA issue is
https://issues.apache.org/jira/browse/FLINK-2760.

Cheers,
Till

On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer <[hidden email]> wrote:

> Hello All
>
> I use Flink in order to filter data from Hdfs and write it back as CSV.
>
> I keep getting the "Checking and uploading JAR files" on every DataSet
> filtering action or
> executionEnvironment execution.
>
> I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) because I
> launch Flink from
> a J2EE Aplication Server .
>
> The Jars serialization and transportation takes a huge part of the
> execution time .
> Is there a way to force Flink to pass the Jars only once?
>
> Please advise
>
> Thanks,
>
> Hanan Meyer
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink's Checking and uploading JAR files Issue

hanan meyer
Hi
Thanks for the fast response
I Have tried the walk-around by excluding the Jars from the
RemoteEnvironment's init line :
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT);
instrad of :
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT,  list
of Jars ......);
I copied the jars to the Flink's Lib folder and when I submit my job I get
the following exception which is  caused because
Flink can't find my Jars and Types :
org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Cannot initialize task 'CHAIN DataSource
(at createInput(ExecutionEnvironment.java:502)
(org.apache.flink.api.java.io.AvroInputFormat)) ->
Filter (Filter at generateCsv(FlinkCSVProducer.java:51)) -> FlatMap
(FlatMap at generateCsv(FlinkCSVProducer.java:78))':
Deserializing the InputFormat (File Input
(hdfs://localhost:9000/data/kpi/38fbbdef-d822-4e13-9031-faff907469df))
failed:
Could not read the user code wrapper: com.scalabill.it.pa.event.Event
at org.apache.flink.client.program.Client.run(Client.java:413)
at org.apache.flink.client.program.Client.run(Client.java:356)
at org.apache.flink.client.program.Client.run(Client.java:349)
at
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
at
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
at
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
at org.apache.flink.api.java.DataSet.count(DataSet.java:391)
at
com.scalabill.it.pa.core.FlinkCSVProducer.generateCsv(FlinkCSVProducer.java:70)
at
com.scalabill.it.pa.core.FlinkDriver.generateChannelsCSVsforThisBackendServer(FlinkDriver.java:94)
Have I been doing the walk-around currently ?
Can you try to reproduce it in your environment  ?
Thanks for your attention!
Hanan Meyer

On Thu, Sep 24, 2015 at 4:58 PM, Till Rohrmann <[hidden email]>
wrote:

> Hi Hanan,
>
> you're right that currently every time you submit a job to the Flink
> cluster, all user code jars are uploaded and overwrite possibly existing
> files. This is not really necessary if they don't change. Maybe we should
> add a check that already existing files on the JobManager are not uploaded
> again by the JobClient. This should improve the performance for your use
> case.
>
> The corresponding JIRA issue is
> https://issues.apache.org/jira/browse/FLINK-2760.
>
> Cheers,
> Till
>
> On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer <[hidden email]> wrote:
>
> > Hello All
> >
> > I use Flink in order to filter data from Hdfs and write it back as CSV.
> >
> > I keep getting the "Checking and uploading JAR files" on every DataSet
> > filtering action or
> > executionEnvironment execution.
> >
> > I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) because I
> > launch Flink from
> > a J2EE Aplication Server .
> >
> > The Jars serialization and transportation takes a huge part of the
> > execution time .
> > Is there a way to force Flink to pass the Jars only once?
> >
> > Please advise
> >
> > Thanks,
> >
> > Hanan Meyer
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink's Checking and uploading JAR files Issue

Stephan Ewen
My first guess would be that you did not put all jars into the lib folder.

To help us understand this, do you start the cluster manually, or via YARN?

On Thu, Sep 24, 2015 at 4:59 PM, Hanan Meyer <[hidden email]> wrote:

> Hi
> Thanks for the fast response
> I Have tried the walk-around by excluding the Jars from the
> RemoteEnvironment's init line :
> ExecutionEnvironment env =
> ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT);
> instrad of :
> ExecutionEnvironment env =
> ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT,  list
> of Jars ......);
> I copied the jars to the Flink's Lib folder and when I submit my job I get
> the following exception which is  caused because
> Flink can't find my Jars and Types :
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Cannot initialize task 'CHAIN DataSource
> (at createInput(ExecutionEnvironment.java:502)
> (org.apache.flink.api.java.io.AvroInputFormat)) ->
> Filter (Filter at generateCsv(FlinkCSVProducer.java:51)) -> FlatMap
> (FlatMap at generateCsv(FlinkCSVProducer.java:78))':
> Deserializing the InputFormat (File Input
> (hdfs://localhost:9000/data/kpi/38fbbdef-d822-4e13-9031-faff907469df))
> failed:
> Could not read the user code wrapper: com.scalabill.it.pa.event.Event
> at org.apache.flink.client.program.Client.run(Client.java:413)
> at org.apache.flink.client.program.Client.run(Client.java:356)
> at org.apache.flink.client.program.Client.run(Client.java:349)
> at
>
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> at
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> at
>
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> at
>
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> at org.apache.flink.api.java.DataSet.count(DataSet.java:391)
> at
>
> com.scalabill.it.pa.core.FlinkCSVProducer.generateCsv(FlinkCSVProducer.java:70)
> at
>
> com.scalabill.it.pa.core.FlinkDriver.generateChannelsCSVsforThisBackendServer(FlinkDriver.java:94)
> Have I been doing the walk-around currently ?
> Can you try to reproduce it in your environment  ?
> Thanks for your attention!
> Hanan Meyer
>
> On Thu, Sep 24, 2015 at 4:58 PM, Till Rohrmann <[hidden email]>
> wrote:
>
> > Hi Hanan,
> >
> > you're right that currently every time you submit a job to the Flink
> > cluster, all user code jars are uploaded and overwrite possibly existing
> > files. This is not really necessary if they don't change. Maybe we should
> > add a check that already existing files on the JobManager are not
> uploaded
> > again by the JobClient. This should improve the performance for your use
> > case.
> >
> > The corresponding JIRA issue is
> > https://issues.apache.org/jira/browse/FLINK-2760.
> >
> > Cheers,
> > Till
> >
> > On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer <[hidden email]> wrote:
> >
> > > Hello All
> > >
> > > I use Flink in order to filter data from Hdfs and write it back as CSV.
> > >
> > > I keep getting the "Checking and uploading JAR files" on every DataSet
> > > filtering action or
> > > executionEnvironment execution.
> > >
> > > I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) because I
> > > launch Flink from
> > > a J2EE Aplication Server .
> > >
> > > The Jars serialization and transportation takes a huge part of the
> > > execution time .
> > > Is there a way to force Flink to pass the Jars only once?
> > >
> > > Please advise
> > >
> > > Thanks,
> > >
> > > Hanan Meyer
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink's Checking and uploading JAR files Issue

hanan meyer
Hi

I rechecked that I put all my Jars in the Lib folder .
I have also noticed that it fails while loading my first Pojo class .

I start the cluster via Yarn using Flink 0.9.1 .

Thanks ,

Mr Hanan Meyer


On Thu, Sep 24, 2015 at 6:08 PM, Stephan Ewen <[hidden email]> wrote:

> My first guess would be that you did not put all jars into the lib folder.
>
> To help us understand this, do you start the cluster manually, or via YARN?
>
> On Thu, Sep 24, 2015 at 4:59 PM, Hanan Meyer <[hidden email]> wrote:
>
> > Hi
> > Thanks for the fast response
> > I Have tried the walk-around by excluding the Jars from the
> > RemoteEnvironment's init line :
> > ExecutionEnvironment env =
> > ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT);
> > instrad of :
> > ExecutionEnvironment env =
> > ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT,  list
> > of Jars ......);
> > I copied the jars to the Flink's Lib folder and when I submit my job I
> get
> > the following exception which is  caused because
> > Flink can't find my Jars and Types :
> > org.apache.flink.client.program.ProgramInvocationException: The program
> > execution failed: Cannot initialize task 'CHAIN DataSource
> > (at createInput(ExecutionEnvironment.java:502)
> > (org.apache.flink.api.java.io.AvroInputFormat)) ->
> > Filter (Filter at generateCsv(FlinkCSVProducer.java:51)) -> FlatMap
> > (FlatMap at generateCsv(FlinkCSVProducer.java:78))':
> > Deserializing the InputFormat (File Input
> > (hdfs://localhost:9000/data/kpi/38fbbdef-d822-4e13-9031-faff907469df))
> > failed:
> > Could not read the user code wrapper: com.scalabill.it.pa.event.Event
> > at org.apache.flink.client.program.Client.run(Client.java:413)
> > at org.apache.flink.client.program.Client.run(Client.java:356)
> > at org.apache.flink.client.program.Client.run(Client.java:349)
> > at
> >
> >
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> > at
> >
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> > at
> >
> >
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> > at
> >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > at org.apache.flink.api.java.DataSet.count(DataSet.java:391)
> > at
> >
> >
> com.scalabill.it.pa.core.FlinkCSVProducer.generateCsv(FlinkCSVProducer.java:70)
> > at
> >
> >
> com.scalabill.it.pa.core.FlinkDriver.generateChannelsCSVsforThisBackendServer(FlinkDriver.java:94)
> > Have I been doing the walk-around currently ?
> > Can you try to reproduce it in your environment  ?
> > Thanks for your attention!
> > Hanan Meyer
> >
> > On Thu, Sep 24, 2015 at 4:58 PM, Till Rohrmann <[hidden email]>
> > wrote:
> >
> > > Hi Hanan,
> > >
> > > you're right that currently every time you submit a job to the Flink
> > > cluster, all user code jars are uploaded and overwrite possibly
> existing
> > > files. This is not really necessary if they don't change. Maybe we
> should
> > > add a check that already existing files on the JobManager are not
> > uploaded
> > > again by the JobClient. This should improve the performance for your
> use
> > > case.
> > >
> > > The corresponding JIRA issue is
> > > https://issues.apache.org/jira/browse/FLINK-2760.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer <[hidden email]>
> wrote:
> > >
> > > > Hello All
> > > >
> > > > I use Flink in order to filter data from Hdfs and write it back as
> CSV.
> > > >
> > > > I keep getting the "Checking and uploading JAR files" on every
> DataSet
> > > > filtering action or
> > > > executionEnvironment execution.
> > > >
> > > > I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..)
> because I
> > > > launch Flink from
> > > > a J2EE Aplication Server .
> > > >
> > > > The Jars serialization and transportation takes a huge part of the
> > > > execution time .
> > > > Is there a way to force Flink to pass the Jars only once?
> > > >
> > > > Please advise
> > > >
> > > > Thanks,
> > > >
> > > > Hanan Meyer
> > > >
> > >
> >
>
mxm
Reply | Threaded
Open this post in threaded view
|

Re: Flink's Checking and uploading JAR files Issue

mxm
Hi Hanan,

Could you please, by any chance, run your program in a local cluster with
your dependencies in the lib folder? You can use "./bin/start-local.sh" and
try submitting your program to localhost. That would help us to find out if
it is a YARN issue.

Thanks,
Max

On Fri, Sep 25, 2015 at 8:39 AM, Hanan Meyer <[hidden email]> wrote:

> Hi
>
> I rechecked that I put all my Jars in the Lib folder .
> I have also noticed that it fails while loading my first Pojo class .
>
> I start the cluster via Yarn using Flink 0.9.1 .
>
> Thanks ,
>
> Mr Hanan Meyer
>
>
> On Thu, Sep 24, 2015 at 6:08 PM, Stephan Ewen <[hidden email]> wrote:
>
> > My first guess would be that you did not put all jars into the lib
> folder.
> >
> > To help us understand this, do you start the cluster manually, or via
> YARN?
> >
> > On Thu, Sep 24, 2015 at 4:59 PM, Hanan Meyer <[hidden email]> wrote:
> >
> > > Hi
> > > Thanks for the fast response
> > > I Have tried the walk-around by excluding the Jars from the
> > > RemoteEnvironment's init line :
> > > ExecutionEnvironment env =
> > > ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT);
> > > instrad of :
> > > ExecutionEnvironment env =
> > > ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT,
> list
> > > of Jars ......);
> > > I copied the jars to the Flink's Lib folder and when I submit my job I
> > get
> > > the following exception which is  caused because
> > > Flink can't find my Jars and Types :
> > > org.apache.flink.client.program.ProgramInvocationException: The program
> > > execution failed: Cannot initialize task 'CHAIN DataSource
> > > (at createInput(ExecutionEnvironment.java:502)
> > > (org.apache.flink.api.java.io.AvroInputFormat)) ->
> > > Filter (Filter at generateCsv(FlinkCSVProducer.java:51)) -> FlatMap
> > > (FlatMap at generateCsv(FlinkCSVProducer.java:78))':
> > > Deserializing the InputFormat (File Input
> > > (hdfs://localhost:9000/data/kpi/38fbbdef-d822-4e13-9031-faff907469df))
> > > failed:
> > > Could not read the user code wrapper: com.scalabill.it.pa.event.Event
> > > at org.apache.flink.client.program.Client.run(Client.java:413)
> > > at org.apache.flink.client.program.Client.run(Client.java:356)
> > > at org.apache.flink.client.program.Client.run(Client.java:349)
> > > at
> > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> > > at
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> > > at
> > >
> > >
> >
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> > > at
> > >
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > > at org.apache.flink.api.java.DataSet.count(DataSet.java:391)
> > > at
> > >
> > >
> >
> com.scalabill.it.pa.core.FlinkCSVProducer.generateCsv(FlinkCSVProducer.java:70)
> > > at
> > >
> > >
> >
> com.scalabill.it.pa.core.FlinkDriver.generateChannelsCSVsforThisBackendServer(FlinkDriver.java:94)
> > > Have I been doing the walk-around currently ?
> > > Can you try to reproduce it in your environment  ?
> > > Thanks for your attention!
> > > Hanan Meyer
> > >
> > > On Thu, Sep 24, 2015 at 4:58 PM, Till Rohrmann <
> [hidden email]>
> > > wrote:
> > >
> > > > Hi Hanan,
> > > >
> > > > you're right that currently every time you submit a job to the Flink
> > > > cluster, all user code jars are uploaded and overwrite possibly
> > existing
> > > > files. This is not really necessary if they don't change. Maybe we
> > should
> > > > add a check that already existing files on the JobManager are not
> > > uploaded
> > > > again by the JobClient. This should improve the performance for your
> > use
> > > > case.
> > > >
> > > > The corresponding JIRA issue is
> > > > https://issues.apache.org/jira/browse/FLINK-2760.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer <[hidden email]>
> > wrote:
> > > >
> > > > > Hello All
> > > > >
> > > > > I use Flink in order to filter data from Hdfs and write it back as
> > CSV.
> > > > >
> > > > > I keep getting the "Checking and uploading JAR files" on every
> > DataSet
> > > > > filtering action or
> > > > > executionEnvironment execution.
> > > > >
> > > > > I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..)
> > because I
> > > > > launch Flink from
> > > > > a J2EE Aplication Server .
> > > > >
> > > > > The Jars serialization and transportation takes a huge part of the
> > > > > execution time .
> > > > > Is there a way to force Flink to pass the Jars only once?
> > > > >
> > > > > Please advise
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Hanan Meyer
> > > > >
> > > >
> > >
> >
>