Hello All
I use Flink in order to filter data from Hdfs and write it back as CSV. I keep getting the "Checking and uploading JAR files" on every DataSet filtering action or executionEnvironment execution. I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) because I launch Flink from a J2EE Aplication Server . The Jars serialization and transportation takes a huge part of the execution time . Is there a way to force Flink to pass the Jars only once? Please advise Thanks, Hanan Meyer |
I think there is not yet any mechanism, but it would be a good addition, I
agree. Between JobManager and TaskManagers, the JARs are cached. The TaskManagers receive hashes of the JARs only, and only load them if they do not already have them. The same mechanism should be used for the Client to upload JARs to the JobManager - that way, they would be transferred only once. For now, a workaround is to directly put the user JARs into the "lib" directory of the flink directory. That way they are available to every worker without and uploads per job. Your RemoteExecutionEnvironment would then not have any JARs at all. Would the workaround work for you for now? Greetings, Stephan On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer <[hidden email]> wrote: > Hello All > > I use Flink in order to filter data from Hdfs and write it back as CSV. > > I keep getting the "Checking and uploading JAR files" on every DataSet > filtering action or > executionEnvironment execution. > > I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) because I > launch Flink from > a J2EE Aplication Server . > > The Jars serialization and transportation takes a huge part of the > execution time . > Is there a way to force Flink to pass the Jars only once? > > Please advise > > Thanks, > > Hanan Meyer > |
In reply to this post by hanan meyer
Hi Hanan,
you're right that currently every time you submit a job to the Flink cluster, all user code jars are uploaded and overwrite possibly existing files. This is not really necessary if they don't change. Maybe we should add a check that already existing files on the JobManager are not uploaded again by the JobClient. This should improve the performance for your use case. The corresponding JIRA issue is https://issues.apache.org/jira/browse/FLINK-2760. Cheers, Till On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer <[hidden email]> wrote: > Hello All > > I use Flink in order to filter data from Hdfs and write it back as CSV. > > I keep getting the "Checking and uploading JAR files" on every DataSet > filtering action or > executionEnvironment execution. > > I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) because I > launch Flink from > a J2EE Aplication Server . > > The Jars serialization and transportation takes a huge part of the > execution time . > Is there a way to force Flink to pass the Jars only once? > > Please advise > > Thanks, > > Hanan Meyer > |
Hi
Thanks for the fast response I Have tried the walk-around by excluding the Jars from the RemoteEnvironment's init line : ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT); instrad of : ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT, list of Jars ......); I copied the jars to the Flink's Lib folder and when I submit my job I get the following exception which is caused because Flink can't find my Jars and Types : org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.io.AvroInputFormat)) -> Filter (Filter at generateCsv(FlinkCSVProducer.java:51)) -> FlatMap (FlatMap at generateCsv(FlinkCSVProducer.java:78))': Deserializing the InputFormat (File Input (hdfs://localhost:9000/data/kpi/38fbbdef-d822-4e13-9031-faff907469df)) failed: Could not read the user code wrapper: com.scalabill.it.pa.event.Event at org.apache.flink.client.program.Client.run(Client.java:413) at org.apache.flink.client.program.Client.run(Client.java:356) at org.apache.flink.client.program.Client.run(Client.java:349) at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89) at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82) at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at org.apache.flink.api.java.DataSet.count(DataSet.java:391) at com.scalabill.it.pa.core.FlinkCSVProducer.generateCsv(FlinkCSVProducer.java:70) at com.scalabill.it.pa.core.FlinkDriver.generateChannelsCSVsforThisBackendServer(FlinkDriver.java:94) Have I been doing the walk-around currently ? Can you try to reproduce it in your environment ? Thanks for your attention! Hanan Meyer On Thu, Sep 24, 2015 at 4:58 PM, Till Rohrmann <[hidden email]> wrote: > Hi Hanan, > > you're right that currently every time you submit a job to the Flink > cluster, all user code jars are uploaded and overwrite possibly existing > files. This is not really necessary if they don't change. Maybe we should > add a check that already existing files on the JobManager are not uploaded > again by the JobClient. This should improve the performance for your use > case. > > The corresponding JIRA issue is > https://issues.apache.org/jira/browse/FLINK-2760. > > Cheers, > Till > > On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer <[hidden email]> wrote: > > > Hello All > > > > I use Flink in order to filter data from Hdfs and write it back as CSV. > > > > I keep getting the "Checking and uploading JAR files" on every DataSet > > filtering action or > > executionEnvironment execution. > > > > I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) because I > > launch Flink from > > a J2EE Aplication Server . > > > > The Jars serialization and transportation takes a huge part of the > > execution time . > > Is there a way to force Flink to pass the Jars only once? > > > > Please advise > > > > Thanks, > > > > Hanan Meyer > > > |
My first guess would be that you did not put all jars into the lib folder.
To help us understand this, do you start the cluster manually, or via YARN? On Thu, Sep 24, 2015 at 4:59 PM, Hanan Meyer <[hidden email]> wrote: > Hi > Thanks for the fast response > I Have tried the walk-around by excluding the Jars from the > RemoteEnvironment's init line : > ExecutionEnvironment env = > ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT); > instrad of : > ExecutionEnvironment env = > ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT, list > of Jars ......); > I copied the jars to the Flink's Lib folder and when I submit my job I get > the following exception which is caused because > Flink can't find my Jars and Types : > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Cannot initialize task 'CHAIN DataSource > (at createInput(ExecutionEnvironment.java:502) > (org.apache.flink.api.java.io.AvroInputFormat)) -> > Filter (Filter at generateCsv(FlinkCSVProducer.java:51)) -> FlatMap > (FlatMap at generateCsv(FlinkCSVProducer.java:78))': > Deserializing the InputFormat (File Input > (hdfs://localhost:9000/data/kpi/38fbbdef-d822-4e13-9031-faff907469df)) > failed: > Could not read the user code wrapper: com.scalabill.it.pa.event.Event > at org.apache.flink.client.program.Client.run(Client.java:413) > at org.apache.flink.client.program.Client.run(Client.java:356) > at org.apache.flink.client.program.Client.run(Client.java:349) > at > > org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89) > at > org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82) > at > > org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71) > at > > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) > at org.apache.flink.api.java.DataSet.count(DataSet.java:391) > at > > com.scalabill.it.pa.core.FlinkCSVProducer.generateCsv(FlinkCSVProducer.java:70) > at > > com.scalabill.it.pa.core.FlinkDriver.generateChannelsCSVsforThisBackendServer(FlinkDriver.java:94) > Have I been doing the walk-around currently ? > Can you try to reproduce it in your environment ? > Thanks for your attention! > Hanan Meyer > > On Thu, Sep 24, 2015 at 4:58 PM, Till Rohrmann <[hidden email]> > wrote: > > > Hi Hanan, > > > > you're right that currently every time you submit a job to the Flink > > cluster, all user code jars are uploaded and overwrite possibly existing > > files. This is not really necessary if they don't change. Maybe we should > > add a check that already existing files on the JobManager are not > uploaded > > again by the JobClient. This should improve the performance for your use > > case. > > > > The corresponding JIRA issue is > > https://issues.apache.org/jira/browse/FLINK-2760. > > > > Cheers, > > Till > > > > On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer <[hidden email]> wrote: > > > > > Hello All > > > > > > I use Flink in order to filter data from Hdfs and write it back as CSV. > > > > > > I keep getting the "Checking and uploading JAR files" on every DataSet > > > filtering action or > > > executionEnvironment execution. > > > > > > I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) because I > > > launch Flink from > > > a J2EE Aplication Server . > > > > > > The Jars serialization and transportation takes a huge part of the > > > execution time . > > > Is there a way to force Flink to pass the Jars only once? > > > > > > Please advise > > > > > > Thanks, > > > > > > Hanan Meyer > > > > > > |
Hi
I rechecked that I put all my Jars in the Lib folder . I have also noticed that it fails while loading my first Pojo class . I start the cluster via Yarn using Flink 0.9.1 . Thanks , Mr Hanan Meyer On Thu, Sep 24, 2015 at 6:08 PM, Stephan Ewen <[hidden email]> wrote: > My first guess would be that you did not put all jars into the lib folder. > > To help us understand this, do you start the cluster manually, or via YARN? > > On Thu, Sep 24, 2015 at 4:59 PM, Hanan Meyer <[hidden email]> wrote: > > > Hi > > Thanks for the fast response > > I Have tried the walk-around by excluding the Jars from the > > RemoteEnvironment's init line : > > ExecutionEnvironment env = > > ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT); > > instrad of : > > ExecutionEnvironment env = > > ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT, list > > of Jars ......); > > I copied the jars to the Flink's Lib folder and when I submit my job I > get > > the following exception which is caused because > > Flink can't find my Jars and Types : > > org.apache.flink.client.program.ProgramInvocationException: The program > > execution failed: Cannot initialize task 'CHAIN DataSource > > (at createInput(ExecutionEnvironment.java:502) > > (org.apache.flink.api.java.io.AvroInputFormat)) -> > > Filter (Filter at generateCsv(FlinkCSVProducer.java:51)) -> FlatMap > > (FlatMap at generateCsv(FlinkCSVProducer.java:78))': > > Deserializing the InputFormat (File Input > > (hdfs://localhost:9000/data/kpi/38fbbdef-d822-4e13-9031-faff907469df)) > > failed: > > Could not read the user code wrapper: com.scalabill.it.pa.event.Event > > at org.apache.flink.client.program.Client.run(Client.java:413) > > at org.apache.flink.client.program.Client.run(Client.java:356) > > at org.apache.flink.client.program.Client.run(Client.java:349) > > at > > > > > org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89) > > at > > > org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82) > > at > > > > > org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71) > > at > > > > > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) > > at org.apache.flink.api.java.DataSet.count(DataSet.java:391) > > at > > > > > com.scalabill.it.pa.core.FlinkCSVProducer.generateCsv(FlinkCSVProducer.java:70) > > at > > > > > com.scalabill.it.pa.core.FlinkDriver.generateChannelsCSVsforThisBackendServer(FlinkDriver.java:94) > > Have I been doing the walk-around currently ? > > Can you try to reproduce it in your environment ? > > Thanks for your attention! > > Hanan Meyer > > > > On Thu, Sep 24, 2015 at 4:58 PM, Till Rohrmann <[hidden email]> > > wrote: > > > > > Hi Hanan, > > > > > > you're right that currently every time you submit a job to the Flink > > > cluster, all user code jars are uploaded and overwrite possibly > existing > > > files. This is not really necessary if they don't change. Maybe we > should > > > add a check that already existing files on the JobManager are not > > uploaded > > > again by the JobClient. This should improve the performance for your > use > > > case. > > > > > > The corresponding JIRA issue is > > > https://issues.apache.org/jira/browse/FLINK-2760. > > > > > > Cheers, > > > Till > > > > > > On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer <[hidden email]> > wrote: > > > > > > > Hello All > > > > > > > > I use Flink in order to filter data from Hdfs and write it back as > CSV. > > > > > > > > I keep getting the "Checking and uploading JAR files" on every > DataSet > > > > filtering action or > > > > executionEnvironment execution. > > > > > > > > I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) > because I > > > > launch Flink from > > > > a J2EE Aplication Server . > > > > > > > > The Jars serialization and transportation takes a huge part of the > > > > execution time . > > > > Is there a way to force Flink to pass the Jars only once? > > > > > > > > Please advise > > > > > > > > Thanks, > > > > > > > > Hanan Meyer > > > > > > > > > > |
Hi Hanan,
Could you please, by any chance, run your program in a local cluster with your dependencies in the lib folder? You can use "./bin/start-local.sh" and try submitting your program to localhost. That would help us to find out if it is a YARN issue. Thanks, Max On Fri, Sep 25, 2015 at 8:39 AM, Hanan Meyer <[hidden email]> wrote: > Hi > > I rechecked that I put all my Jars in the Lib folder . > I have also noticed that it fails while loading my first Pojo class . > > I start the cluster via Yarn using Flink 0.9.1 . > > Thanks , > > Mr Hanan Meyer > > > On Thu, Sep 24, 2015 at 6:08 PM, Stephan Ewen <[hidden email]> wrote: > > > My first guess would be that you did not put all jars into the lib > folder. > > > > To help us understand this, do you start the cluster manually, or via > YARN? > > > > On Thu, Sep 24, 2015 at 4:59 PM, Hanan Meyer <[hidden email]> wrote: > > > > > Hi > > > Thanks for the fast response > > > I Have tried the walk-around by excluding the Jars from the > > > RemoteEnvironment's init line : > > > ExecutionEnvironment env = > > > ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT); > > > instrad of : > > > ExecutionEnvironment env = > > > ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT, > list > > > of Jars ......); > > > I copied the jars to the Flink's Lib folder and when I submit my job I > > get > > > the following exception which is caused because > > > Flink can't find my Jars and Types : > > > org.apache.flink.client.program.ProgramInvocationException: The program > > > execution failed: Cannot initialize task 'CHAIN DataSource > > > (at createInput(ExecutionEnvironment.java:502) > > > (org.apache.flink.api.java.io.AvroInputFormat)) -> > > > Filter (Filter at generateCsv(FlinkCSVProducer.java:51)) -> FlatMap > > > (FlatMap at generateCsv(FlinkCSVProducer.java:78))': > > > Deserializing the InputFormat (File Input > > > (hdfs://localhost:9000/data/kpi/38fbbdef-d822-4e13-9031-faff907469df)) > > > failed: > > > Could not read the user code wrapper: com.scalabill.it.pa.event.Event > > > at org.apache.flink.client.program.Client.run(Client.java:413) > > > at org.apache.flink.client.program.Client.run(Client.java:356) > > > at org.apache.flink.client.program.Client.run(Client.java:349) > > > at > > > > > > > > > org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89) > > > at > > > > > > org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82) > > > at > > > > > > > > > org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71) > > > at > > > > > > > > > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) > > > at org.apache.flink.api.java.DataSet.count(DataSet.java:391) > > > at > > > > > > > > > com.scalabill.it.pa.core.FlinkCSVProducer.generateCsv(FlinkCSVProducer.java:70) > > > at > > > > > > > > > com.scalabill.it.pa.core.FlinkDriver.generateChannelsCSVsforThisBackendServer(FlinkDriver.java:94) > > > Have I been doing the walk-around currently ? > > > Can you try to reproduce it in your environment ? > > > Thanks for your attention! > > > Hanan Meyer > > > > > > On Thu, Sep 24, 2015 at 4:58 PM, Till Rohrmann < > [hidden email]> > > > wrote: > > > > > > > Hi Hanan, > > > > > > > > you're right that currently every time you submit a job to the Flink > > > > cluster, all user code jars are uploaded and overwrite possibly > > existing > > > > files. This is not really necessary if they don't change. Maybe we > > should > > > > add a check that already existing files on the JobManager are not > > > uploaded > > > > again by the JobClient. This should improve the performance for your > > use > > > > case. > > > > > > > > The corresponding JIRA issue is > > > > https://issues.apache.org/jira/browse/FLINK-2760. > > > > > > > > Cheers, > > > > Till > > > > > > > > On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer <[hidden email]> > > wrote: > > > > > > > > > Hello All > > > > > > > > > > I use Flink in order to filter data from Hdfs and write it back as > > CSV. > > > > > > > > > > I keep getting the "Checking and uploading JAR files" on every > > DataSet > > > > > filtering action or > > > > > executionEnvironment execution. > > > > > > > > > > I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) > > because I > > > > > launch Flink from > > > > > a J2EE Aplication Server . > > > > > > > > > > The Jars serialization and transportation takes a huge part of the > > > > > execution time . > > > > > Is there a way to force Flink to pass the Jars only once? > > > > > > > > > > Please advise > > > > > > > > > > Thanks, > > > > > > > > > > Hanan Meyer > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |