Issue deploying a topology to flink with a java api

classic Classic list List threaded Threaded
26 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

Matthias J. Sax-2
For the fix, you need to use the current development version of Flink,
ie, change your maven dependency from <version>1.0</version> to
<version>1.1-SNAPSHOT</version>

One question: what is FlinkGitService.class? It does only show up when
you get the ClassLoader:

> ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, FlinkGitService.class.getClassLoader());

It is the class that contains methods deploy() and getFlinkTopology() ?

-Matthias

On 04/14/2016 05:20 AM, star jlong wrote:

> What I'm  trying to say is that to get submit the flink topology to flink, I had to do an invocation of the mainMethod(which contain the actaul topology) of my topology with the class java.lang.reflect.Method.That is if you a take look at the following the topology the mainMethod is buildTopologypublic class WordCountTopology {
>     public static void main(String[] args) throws Exception {
>
>     Config conf = new Config();
>     conf.setDebug(true);
>     if (args != null && args.length > 0) {
>
>         conf.setNumWorkers(1);
>         conf.setMaxTaskParallelism(1);
>         FlinkSubmitter.submitTopology(args[0], conf, buildTopology());
>
>     }
>     // Otherwise, we are running locally
>     else {
>         conf.setMaxTaskParallelism(1);
>         FlinkLocalCluster cluster = new FlinkLocalCluster();
>         cluster.submitTopology("word-count", conf, buildTopology());
>         Thread.sleep(10000);
>     }
> }
>
> public static FlinkTopology buildTopology() {
>
>     TopologyBuilder builder = new TopologyBuilder();
>
>     builder.setSpout("spout", new RandomSentenceSpout(), 1);
>     builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout");
>     builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word"));
>
>     builder.setBolt("writeIntoFile", new BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() {
>         private static final long serialVersionUID = 1L;
>
>         @Override
>         public String format(Tuple tuple) {
>             return tuple.toString();
>         }
>     }), 1).shuffleGrouping("count");
>
>     return FlinkTopology.createTopology(builder);
>
> }
> }That is the method that I want to invoke from my jar so that I will be able to do the submitting of the topology without any problem ie
>
> final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, uploadedJarLocation, getFlinkTopogy(String.format("file://%s", jarPath),properties.getProperty("topologyMainClass"), properties.getProperty("methodName")));
> Where getFlinkTopology() return the contains actually topology
>
> But while doing that reflection I had an exception.
>
> Another question please. How do I make used of the hotflix of Till.
>
>     Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <[hidden email]> a écrit :
>  
>
>  I cannot follow completely in your last step when you fail. What do you
> mean by "I'm stuck at the level when I want to copy that from the jar to
> submit it to flink"?
>
> Btw: I copied the code from the SO question and it works for me on the
> current master (which includes Till's hotfix).
>
> -Matthias
>
>
> On 04/13/2016 09:39 PM, star jlong wrote:
>> Thanks Matthias for the reply.
>> Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken.
>> 1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with
>> ./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology
>> At this level everything went well.
>>
>> Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink.
>>
>> I know that is possible because I have used the same procedure with a storm topology that it works perfectly well.
>> What I'm missing please?
>>   jstar
>>
>>     Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit :
>>  
>>
>>   Hi jstar,
>>
>> I need to have a close look. But I am wondering why you use reflection
>> in the first place? Is there any specific reason for that?
>>
>> Furthermore, the example provided in project maven-example also covers
>> the case to submit a topology to Flink via Java. Have a look at
>> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter
>>
>> It contains a main() method and you can just run it as a regular Java
>> program in your IDE.
>>
>> The SO question example should also work; it also contains a main()
>> method, so you should be able to run it.
>>
>> Btw: If you use Storm-Compatiblitly-API there is no reason the get an
>> ExecutuionEnvironment in you code. This happen automatically with
>> FlinkClient/FlinkSubmitter.
>>
>> Furthermore, I would recommend to use FlinkSubmitter instead of
>> FlinkClient as it is somewhat simpler to use.
>>
>> About SO question: I guess the problem is the jar assembling. The user says
>>
>> "Since I'using maven to handle my dependencies, I do a Mvn clean install
>> to obtain the jar."
>>
>> I guess this is not sufficient to bundle a correct jar. Have a look into
>> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar
>> correctly. (Regular maven artifact do not work for job submission...)
>>
>> Will have a close look and follow up... Hope this helps already.
>>
>> -Matthias
>>
>> On 04/13/2016 06:23 PM, star jlong wrote:
>>> Thanks for the reply.
>>> @Stephen, I try using RemoteEnvironment to submit my topology to flink.
>>> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute();
>>> While running the program, this is the exception that I got.
>>> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
>>>  
>>>
>>>     Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit :
>>>  
>>>
>>>   I think this is not the problem here since the problem is still happening
>>> on the client side when the FlinkTopology tries to copy the registered
>>> spouts. This happens before the job is submitted to the cluster. Maybe
>>> Mathias could chime in here.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote:
>>>
>>>> Hi!
>>>>
>>>> For flink standalone programs, you would use a "RemoteEnvironment"
>>>>
>>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
>>>> That one should deal with jars, classloaders, etc for you.
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]>
>>>> wrote:
>>>>
>>>>> Thanks for the suggestion. Sure those examples are interesting and I have
>>>>> deploy them successfully on flink. The deployment is done the command
>>>> line
>>>>> that is doing something like
>>>>> bin/flink run example.jarBut what I want is to submit the topology to
>>>>> flink using a java program.
>>>>>
>>>>> Thanks.
>>>>>
>>>>>     Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
>>>> [hidden email]>
>>>>> a écrit :
>>>>>
>>>>>
>>>>>   you can find examples here:
>>>>>
>>>>>
>>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
>>>>>
>>>>> we haven't established yet that it is an API issue; it could very well
>>>>> be caused by the reflection magic you're using...
>>>>>
>>>>> On 13.04.2016 14:57, star jlong wrote:
>>>>>> Ok, it seems like there an issue with the api. So please does anybody
>>>>> has a working example for deploying a topology using the flink dependency
>>>>> flink-storm_2.11 or any other will be welcoming.
>>>>>>
>>>>>> Thanks,
>>>>>> jstar
>>>>>>
>>>>>>       Le Mercredi 13 avril 2016 13h44, star jlong
>>>>> <[hidden email]> a écrit :
>>>>>>
>>>>>>
>>>>>>   Hi Schepler,
>>>>>>
>>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as
>>>>> indicated on that post because I'm the one that posted that issue.
>>>>>>
>>>>>>       Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
>>>>> [hidden email]> a écrit :
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>>>>>>
>>>>>> On 13.04.2016 14:28, Till Rohrmann wrote:
>>>>>>> Hi jstar,
>>>>>>>
>>>>>>> what's exactly the problem you're observing?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
>>>> <[hidden email]
>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi there,
>>>>>>>>
>>>>>>>> I'm jstar. I have been playing around with flink. I'm very much
>>>>> interested
>>>>>>>> in submitting a topoloy  to flink using its api. As indicated
>>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck
>>>>> with
>>>>>>>> some exception. Please any help will be welcoming.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>> jstar
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>  
>>>
>>
>>
>>  
>>
>
>
>  
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

star jlong
Yes it is.

    Le Jeudi 14 avril 2016 10h39, Matthias J. Sax <[hidden email]> a écrit :
 

 For the fix, you need to use the current development version of Flink,
ie, change your maven dependency from <version>1.0</version> to
<version>1.1-SNAPSHOT</version>

One question: what is FlinkGitService.class? It does only show up when
you get the ClassLoader:

> ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, FlinkGitService.class.getClassLoader());

It is the class that contains methods deploy() and getFlinkTopology() ?

-Matthias

On 04/14/2016 05:20 AM, star jlong wrote:

> What I'm  trying to say is that to get submit the flink topology to flink, I had to do an invocation of the mainMethod(which contain the actaul topology) of my topology with the class java.lang.reflect.Method.That is if you a take look at the following the topology the mainMethod is buildTopologypublic class WordCountTopology {
>    public static void main(String[] args) throws Exception {
>
>    Config conf = new Config();
>    conf.setDebug(true);
>    if (args != null && args.length > 0) {
>
>        conf.setNumWorkers(1);
>        conf.setMaxTaskParallelism(1);
>        FlinkSubmitter.submitTopology(args[0], conf, buildTopology());
>
>    }
>    // Otherwise, we are running locally
>    else {
>        conf.setMaxTaskParallelism(1);
>        FlinkLocalCluster cluster = new FlinkLocalCluster();
>        cluster.submitTopology("word-count", conf, buildTopology());
>        Thread.sleep(10000);
>    }
> }
>
> public static FlinkTopology buildTopology() {
>
>    TopologyBuilder builder = new TopologyBuilder();
>
>    builder.setSpout("spout", new RandomSentenceSpout(), 1);
>    builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout");
>    builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word"));
>
>    builder.setBolt("writeIntoFile", new BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() {
>        private static final long serialVersionUID = 1L;
>
>        @Override
>        public String format(Tuple tuple) {
>            return tuple.toString();
>        }
>    }), 1).shuffleGrouping("count");
>
>    return FlinkTopology.createTopology(builder);
>
> }
> }That is the method that I want to invoke from my jar so that I will be able to do the submitting of the topology without any problem ie
>
> final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, uploadedJarLocation, getFlinkTopogy(String.format("file://%s", jarPath),properties.getProperty("topologyMainClass"), properties.getProperty("methodName")));
> Where getFlinkTopology() return the contains actually topology
>
> But while doing that reflection I had an exception.
>
> Another question please. How do I make used of the hotflix of Till.
>
>    Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <[hidden email]> a écrit :

>
>  I cannot follow completely in your last step when you fail. What do you
> mean by "I'm stuck at the level when I want to copy that from the jar to
> submit it to flink"?
>
> Btw: I copied the code from the SO question and it works for me on the
> current master (which includes Till's hotfix).
>
> -Matthias
>
>
> On 04/13/2016 09:39 PM, star jlong wrote:
>> Thanks Matthias for the reply.
>> Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken.
>> 1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with
>> ./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology
>> At this level everything went well.
>>
>> Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink.
>>
>> I know that is possible because I have used the same procedure with a storm topology that it works perfectly well.
>> What I'm missing please?
>>  jstar
>>
>>    Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit :
>> 
>>
>>  Hi jstar,
>>
>> I need to have a close look. But I am wondering why you use reflection
>> in the first place? Is there any specific reason for that?
>>
>> Furthermore, the example provided in project maven-example also covers
>> the case to submit a topology to Flink via Java. Have a look at
>> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter
>>
>> It contains a main() method and you can just run it as a regular Java
>> program in your IDE.
>>
>> The SO question example should also work; it also contains a main()
>> method, so you should be able to run it.
>>
>> Btw: If you use Storm-Compatiblitly-API there is no reason the get an
>> ExecutuionEnvironment in you code. This happen automatically with
>> FlinkClient/FlinkSubmitter.
>>
>> Furthermore, I would recommend to use FlinkSubmitter instead of
>> FlinkClient as it is somewhat simpler to use.
>>
>> About SO question: I guess the problem is the jar assembling. The user says
>>
>> "Since I'using maven to handle my dependencies, I do a Mvn clean install
>> to obtain the jar."
>>
>> I guess this is not sufficient to bundle a correct jar. Have a look into
>> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar
>> correctly. (Regular maven artifact do not work for job submission...)
>>
>> Will have a close look and follow up... Hope this helps already.
>>
>> -Matthias
>>
>> On 04/13/2016 06:23 PM, star jlong wrote:
>>> Thanks for the reply.
>>> @Stephen, I try using RemoteEnvironment to submit my topology to flink.
>>> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute();
>>> While running the program, this is the exception that I got.
>>> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
>>> 
>>>
>>>    Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit :
>>> 
>>>
>>>  I think this is not the problem here since the problem is still happening
>>> on the client side when the FlinkTopology tries to copy the registered
>>> spouts. This happens before the job is submitted to the cluster. Maybe
>>> Mathias could chime in here.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote:
>>>
>>>> Hi!
>>>>
>>>> For flink standalone programs, you would use a "RemoteEnvironment"
>>>>
>>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
>>>> That one should deal with jars, classloaders, etc for you.
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]>
>>>> wrote:
>>>>
>>>>> Thanks for the suggestion. Sure those examples are interesting and I have
>>>>> deploy them successfully on flink. The deployment is done the command
>>>> line
>>>>> that is doing something like
>>>>> bin/flink run example.jarBut what I want is to submit the topology to
>>>>> flink using a java program.
>>>>>
>>>>> Thanks.
>>>>>
>>>>>    Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
>>>> [hidden email]>
>>>>> a écrit :
>>>>>
>>>>>
>>>>>  you can find examples here:
>>>>>
>>>>>
>>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
>>>>>
>>>>> we haven't established yet that it is an API issue; it could very well
>>>>> be caused by the reflection magic you're using...
>>>>>
>>>>> On 13.04.2016 14:57, star jlong wrote:
>>>>>> Ok, it seems like there an issue with the api. So please does anybody
>>>>> has a working example for deploying a topology using the flink dependency
>>>>> flink-storm_2.11 or any other will be welcoming.
>>>>>>
>>>>>> Thanks,
>>>>>> jstar
>>>>>>
>>>>>>      Le Mercredi 13 avril 2016 13h44, star jlong
>>>>> <[hidden email]> a écrit :
>>>>>>
>>>>>>
>>>>>>  Hi Schepler,
>>>>>>
>>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as
>>>>> indicated on that post because I'm the one that posted that issue.
>>>>>>
>>>>>>      Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
>>>>> [hidden email]> a écrit :
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>>>>>>
>>>>>> On 13.04.2016 14:28, Till Rohrmann wrote:
>>>>>>> Hi jstar,
>>>>>>>
>>>>>>> what's exactly the problem you're observing?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
>>>> <[hidden email]
>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi there,
>>>>>>>>
>>>>>>>> I'm jstar. I have been playing around with flink. I'm very much
>>>>> interested
>>>>>>>> in submitting a topoloy  to flink using its api. As indicated
>>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck
>>>>> with
>>>>>>>> some exception. Please any help will be welcoming.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>> jstar
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>> 
>>>
>>
>>
>> 
>>
>
>

>


Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

star jlong
One question which dependency of flink are you using because I'm using 
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm-examples_2.11</artifactId> <version>1.0.0</version></dependency>
And once I change the version to SNAPSHOT version, the pom.xml complains that it could not satisfy the given dependency.

    Le Jeudi 14 avril 2016 10h45, star jlong <[hidden email]> a écrit :
 

 Yes it is.

    Le Jeudi 14 avril 2016 10h39, Matthias J. Sax <[hidden email]> a écrit :
 

 For the fix, you need to use the current development version of Flink,
ie, change your maven dependency from <version>1.0</version> to
<version>1.1-SNAPSHOT</version>

One question: what is FlinkGitService.class? It does only show up when
you get the ClassLoader:

> ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, FlinkGitService.class.getClassLoader());

It is the class that contains methods deploy() and getFlinkTopology() ?

-Matthias

On 04/14/2016 05:20 AM, star jlong wrote:

> What I'm  trying to say is that to get submit the flink topology to flink, I had to do an invocation of the mainMethod(which contain the actaul topology) of my topology with the class java.lang.reflect.Method.That is if you a take look at the following the topology the mainMethod is buildTopologypublic class WordCountTopology {
>    public static void main(String[] args) throws Exception {
>
>    Config conf = new Config();
>    conf.setDebug(true);
>    if (args != null && args.length > 0) {
>
>        conf.setNumWorkers(1);
>        conf.setMaxTaskParallelism(1);
>        FlinkSubmitter.submitTopology(args[0], conf, buildTopology());
>
>    }
>    // Otherwise, we are running locally
>    else {
>        conf.setMaxTaskParallelism(1);
>        FlinkLocalCluster cluster = new FlinkLocalCluster();
>        cluster.submitTopology("word-count", conf, buildTopology());
>        Thread.sleep(10000);
>    }
> }
>
> public static FlinkTopology buildTopology() {
>
>    TopologyBuilder builder = new TopologyBuilder();
>
>    builder.setSpout("spout", new RandomSentenceSpout(), 1);
>    builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout");
>    builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word"));
>
>    builder.setBolt("writeIntoFile", new BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() {
>        private static final long serialVersionUID = 1L;
>
>        @Override
>        public String format(Tuple tuple) {
>            return tuple.toString();
>        }
>    }), 1).shuffleGrouping("count");
>
>    return FlinkTopology.createTopology(builder);
>
> }
> }That is the method that I want to invoke from my jar so that I will be able to do the submitting of the topology without any problem ie
>
> final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, uploadedJarLocation, getFlinkTopogy(String.format("file://%s", jarPath),properties.getProperty("topologyMainClass"), properties.getProperty("methodName")));
> Where getFlinkTopology() return the contains actually topology
>
> But while doing that reflection I had an exception.
>
> Another question please. How do I make used of the hotflix of Till.
>
>    Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <[hidden email]> a écrit :

>
>  I cannot follow completely in your last step when you fail. What do you
> mean by "I'm stuck at the level when I want to copy that from the jar to
> submit it to flink"?
>
> Btw: I copied the code from the SO question and it works for me on the
> current master (which includes Till's hotfix).
>
> -Matthias
>
>
> On 04/13/2016 09:39 PM, star jlong wrote:
>> Thanks Matthias for the reply.
>> Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken.
>> 1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with
>> ./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology
>> At this level everything went well.
>>
>> Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink.
>>
>> I know that is possible because I have used the same procedure with a storm topology that it works perfectly well.
>> What I'm missing please?
>>  jstar
>>
>>    Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit :
>> 
>>
>>  Hi jstar,
>>
>> I need to have a close look. But I am wondering why you use reflection
>> in the first place? Is there any specific reason for that?
>>
>> Furthermore, the example provided in project maven-example also covers
>> the case to submit a topology to Flink via Java. Have a look at
>> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter
>>
>> It contains a main() method and you can just run it as a regular Java
>> program in your IDE.
>>
>> The SO question example should also work; it also contains a main()
>> method, so you should be able to run it.
>>
>> Btw: If you use Storm-Compatiblitly-API there is no reason the get an
>> ExecutuionEnvironment in you code. This happen automatically with
>> FlinkClient/FlinkSubmitter.
>>
>> Furthermore, I would recommend to use FlinkSubmitter instead of
>> FlinkClient as it is somewhat simpler to use.
>>
>> About SO question: I guess the problem is the jar assembling. The user says
>>
>> "Since I'using maven to handle my dependencies, I do a Mvn clean install
>> to obtain the jar."
>>
>> I guess this is not sufficient to bundle a correct jar. Have a look into
>> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar
>> correctly. (Regular maven artifact do not work for job submission...)
>>
>> Will have a close look and follow up... Hope this helps already.
>>
>> -Matthias
>>
>> On 04/13/2016 06:23 PM, star jlong wrote:
>>> Thanks for the reply.
>>> @Stephen, I try using RemoteEnvironment to submit my topology to flink.
>>> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute();
>>> While running the program, this is the exception that I got.
>>> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
>>> 
>>>
>>>    Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit :
>>> 
>>>
>>>  I think this is not the problem here since the problem is still happening
>>> on the client side when the FlinkTopology tries to copy the registered
>>> spouts. This happens before the job is submitted to the cluster. Maybe
>>> Mathias could chime in here.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote:
>>>
>>>> Hi!
>>>>
>>>> For flink standalone programs, you would use a "RemoteEnvironment"
>>>>
>>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
>>>> That one should deal with jars, classloaders, etc for you.
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]>
>>>> wrote:
>>>>
>>>>> Thanks for the suggestion. Sure those examples are interesting and I have
>>>>> deploy them successfully on flink. The deployment is done the command
>>>> line
>>>>> that is doing something like
>>>>> bin/flink run example.jarBut what I want is to submit the topology to
>>>>> flink using a java program.
>>>>>
>>>>> Thanks.
>>>>>
>>>>>    Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
>>>> [hidden email]>
>>>>> a écrit :
>>>>>
>>>>>
>>>>>  you can find examples here:
>>>>>
>>>>>
>>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
>>>>>
>>>>> we haven't established yet that it is an API issue; it could very well
>>>>> be caused by the reflection magic you're using...
>>>>>
>>>>> On 13.04.2016 14:57, star jlong wrote:
>>>>>> Ok, it seems like there an issue with the api. So please does anybody
>>>>> has a working example for deploying a topology using the flink dependency
>>>>> flink-storm_2.11 or any other will be welcoming.
>>>>>>
>>>>>> Thanks,
>>>>>> jstar
>>>>>>
>>>>>>      Le Mercredi 13 avril 2016 13h44, star jlong
>>>>> <[hidden email]> a écrit :
>>>>>>
>>>>>>
>>>>>>  Hi Schepler,
>>>>>>
>>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as
>>>>> indicated on that post because I'm the one that posted that issue.
>>>>>>
>>>>>>      Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
>>>>> [hidden email]> a écrit :
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>>>>>>
>>>>>> On 13.04.2016 14:28, Till Rohrmann wrote:
>>>>>>> Hi jstar,
>>>>>>>
>>>>>>> what's exactly the problem you're observing?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
>>>> <[hidden email]
>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi there,
>>>>>>>>
>>>>>>>> I'm jstar. I have been playing around with flink. I'm very much
>>>>> interested
>>>>>>>> in submitting a topoloy  to flink using its api. As indicated
>>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck
>>>>> with
>>>>>>>> some exception. Please any help will be welcoming.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>> jstar
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>> 
>>>
>>
>>
>> 
>>
>
>

>




Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

Matthias J. Sax-2
change the version to 1.1-SNAPSHOT

On 04/14/2016 11:52 AM, star jlong wrote:

> One question which dependency of flink are you using because I'm using
> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm-examples_2.11</artifactId> <version>1.0.0</version></dependency>
> And once I change the version to SNAPSHOT version, the pom.xml complains that it could not satisfy the given dependency.
>
>     Le Jeudi 14 avril 2016 10h45, star jlong <[hidden email]> a écrit :
>  
>
>  Yes it is.
>
>     Le Jeudi 14 avril 2016 10h39, Matthias J. Sax <[hidden email]> a écrit :
>  
>
>  For the fix, you need to use the current development version of Flink,
> ie, change your maven dependency from <version>1.0</version> to
> <version>1.1-SNAPSHOT</version>
>
> One question: what is FlinkGitService.class? It does only show up when
> you get the ClassLoader:
>
>> ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, FlinkGitService.class.getClassLoader());
>
> It is the class that contains methods deploy() and getFlinkTopology() ?
>
> -Matthias
>
> On 04/14/2016 05:20 AM, star jlong wrote:
>> What I'm  trying to say is that to get submit the flink topology to flink, I had to do an invocation of the mainMethod(which contain the actaul topology) of my topology with the class java.lang.reflect.Method.That is if you a take look at the following the topology the mainMethod is buildTopologypublic class WordCountTopology {
>>     public static void main(String[] args) throws Exception {
>>
>>     Config conf = new Config();
>>     conf.setDebug(true);
>>     if (args != null && args.length > 0) {
>>
>>         conf.setNumWorkers(1);
>>         conf.setMaxTaskParallelism(1);
>>         FlinkSubmitter.submitTopology(args[0], conf, buildTopology());
>>
>>     }
>>     // Otherwise, we are running locally
>>     else {
>>         conf.setMaxTaskParallelism(1);
>>         FlinkLocalCluster cluster = new FlinkLocalCluster();
>>         cluster.submitTopology("word-count", conf, buildTopology());
>>         Thread.sleep(10000);
>>     }
>> }
>>
>> public static FlinkTopology buildTopology() {
>>
>>     TopologyBuilder builder = new TopologyBuilder();
>>
>>     builder.setSpout("spout", new RandomSentenceSpout(), 1);
>>     builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout");
>>     builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word"));
>>
>>     builder.setBolt("writeIntoFile", new BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() {
>>         private static final long serialVersionUID = 1L;
>>
>>         @Override
>>         public String format(Tuple tuple) {
>>             return tuple.toString();
>>         }
>>     }), 1).shuffleGrouping("count");
>>
>>     return FlinkTopology.createTopology(builder);
>>
>> }
>> }That is the method that I want to invoke from my jar so that I will be able to do the submitting of the topology without any problem ie
>>
>> final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, uploadedJarLocation, getFlinkTopogy(String.format("file://%s", jarPath),properties.getProperty("topologyMainClass"), properties.getProperty("methodName")));
>> Where getFlinkTopology() return the contains actually topology
>>
>> But while doing that reflection I had an exception.
>>
>> Another question please. How do I make used of the hotflix of Till.
>>
>>     Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <[hidden email]> a écrit :
>>  
>>
>>   I cannot follow completely in your last step when you fail. What do you
>> mean by "I'm stuck at the level when I want to copy that from the jar to
>> submit it to flink"?
>>
>> Btw: I copied the code from the SO question and it works for me on the
>> current master (which includes Till's hotfix).
>>
>> -Matthias
>>
>>
>> On 04/13/2016 09:39 PM, star jlong wrote:
>>> Thanks Matthias for the reply.
>>> Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken.
>>> 1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with
>>> ./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology
>>> At this level everything went well.
>>>
>>> Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink.
>>>
>>> I know that is possible because I have used the same procedure with a storm topology that it works perfectly well.
>>> What I'm missing please?
>>>   jstar
>>>
>>>     Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit :
>>>  
>>>
>>>   Hi jstar,
>>>
>>> I need to have a close look. But I am wondering why you use reflection
>>> in the first place? Is there any specific reason for that?
>>>
>>> Furthermore, the example provided in project maven-example also covers
>>> the case to submit a topology to Flink via Java. Have a look at
>>> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter
>>>
>>> It contains a main() method and you can just run it as a regular Java
>>> program in your IDE.
>>>
>>> The SO question example should also work; it also contains a main()
>>> method, so you should be able to run it.
>>>
>>> Btw: If you use Storm-Compatiblitly-API there is no reason the get an
>>> ExecutuionEnvironment in you code. This happen automatically with
>>> FlinkClient/FlinkSubmitter.
>>>
>>> Furthermore, I would recommend to use FlinkSubmitter instead of
>>> FlinkClient as it is somewhat simpler to use.
>>>
>>> About SO question: I guess the problem is the jar assembling. The user says
>>>
>>> "Since I'using maven to handle my dependencies, I do a Mvn clean install
>>> to obtain the jar."
>>>
>>> I guess this is not sufficient to bundle a correct jar. Have a look into
>>> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar
>>> correctly. (Regular maven artifact do not work for job submission...)
>>>
>>> Will have a close look and follow up... Hope this helps already.
>>>
>>> -Matthias
>>>
>>> On 04/13/2016 06:23 PM, star jlong wrote:
>>>> Thanks for the reply.
>>>> @Stephen, I try using RemoteEnvironment to submit my topology to flink.
>>>> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute();
>>>> While running the program, this is the exception that I got.
>>>> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
>>>>  
>>>>
>>>>     Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit :
>>>>  
>>>>
>>>>   I think this is not the problem here since the problem is still happening
>>>> on the client side when the FlinkTopology tries to copy the registered
>>>> spouts. This happens before the job is submitted to the cluster. Maybe
>>>> Mathias could chime in here.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> For flink standalone programs, you would use a "RemoteEnvironment"
>>>>>
>>>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
>>>>> That one should deal with jars, classloaders, etc for you.
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the suggestion. Sure those examples are interesting and I have
>>>>>> deploy them successfully on flink. The deployment is done the command
>>>>> line
>>>>>> that is doing something like
>>>>>> bin/flink run example.jarBut what I want is to submit the topology to
>>>>>> flink using a java program.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>>     Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
>>>>> [hidden email]>
>>>>>> a écrit :
>>>>>>
>>>>>>
>>>>>>   you can find examples here:
>>>>>>
>>>>>>
>>>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
>>>>>>
>>>>>> we haven't established yet that it is an API issue; it could very well
>>>>>> be caused by the reflection magic you're using...
>>>>>>
>>>>>> On 13.04.2016 14:57, star jlong wrote:
>>>>>>> Ok, it seems like there an issue with the api. So please does anybody
>>>>>> has a working example for deploying a topology using the flink dependency
>>>>>> flink-storm_2.11 or any other will be welcoming.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> jstar
>>>>>>>
>>>>>>>       Le Mercredi 13 avril 2016 13h44, star jlong
>>>>>> <[hidden email]> a écrit :
>>>>>>>
>>>>>>>
>>>>>>>   Hi Schepler,
>>>>>>>
>>>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as
>>>>>> indicated on that post because I'm the one that posted that issue.
>>>>>>>
>>>>>>>       Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
>>>>>> [hidden email]> a écrit :
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>>>>>>>
>>>>>>> On 13.04.2016 14:28, Till Rohrmann wrote:
>>>>>>>> Hi jstar,
>>>>>>>>
>>>>>>>> what's exactly the problem you're observing?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
>>>>> <[hidden email]
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi there,
>>>>>>>>>
>>>>>>>>> I'm jstar. I have been playing around with flink. I'm very much
>>>>>> interested
>>>>>>>>> in submitting a topoloy  to flink using its api. As indicated
>>>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck
>>>>>> with
>>>>>>>>> some exception. Please any help will be welcoming.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>> jstar
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>  
>>>>
>>>
>>>
>>>  
>>>
>>
>>
>>  
>>
>
>
>
>
>  
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

star jlong
Hi Matthias,

I change the version as per your requirement but when I do that I have a compilation error at the level of the classes
org.apache.flink.storm.util.BoltFileSink and org.apache.flink.storm.util.OutputFormatter

Btw, the dependency 
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm-examples_2.11</artifactId> <version>1.1-SNAPSHOT</version> </dependency>

Could  not be satisfy, but this one could be
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm_2.10</artifactId> <version>1.1-SNAPSHOT</version> </dependency>

    Le Jeudi 14 avril 2016 13h05, Matthias J. Sax <[hidden email]> a écrit :
 

 change the version to 1.1-SNAPSHOT

On 04/14/2016 11:52 AM, star jlong wrote:

> One question which dependency of flink are you using because I'm using
> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm-examples_2.11</artifactId> <version>1.0.0</version></dependency>
> And once I change the version to SNAPSHOT version, the pom.xml complains that it could not satisfy the given dependency.
>
>    Le Jeudi 14 avril 2016 10h45, star jlong <[hidden email]> a écrit :

>
>  Yes it is.
>
>    Le Jeudi 14 avril 2016 10h39, Matthias J. Sax <[hidden email]> a écrit :

>
>  For the fix, you need to use the current development version of Flink,
> ie, change your maven dependency from <version>1.0</version> to
> <version>1.1-SNAPSHOT</version>
>
> One question: what is FlinkGitService.class? It does only show up when
> you get the ClassLoader:
>
>> ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, FlinkGitService.class.getClassLoader());
>
> It is the class that contains methods deploy() and getFlinkTopology() ?
>
> -Matthias
>
> On 04/14/2016 05:20 AM, star jlong wrote:
>> What I'm  trying to say is that to get submit the flink topology to flink, I had to do an invocation of the mainMethod(which contain the actaul topology) of my topology with the class java.lang.reflect.Method.That is if you a take look at the following the topology the mainMethod is buildTopologypublic class WordCountTopology {
>>    public static void main(String[] args) throws Exception {
>>
>>    Config conf = new Config();
>>    conf.setDebug(true);
>>    if (args != null && args.length > 0) {
>>
>>        conf.setNumWorkers(1);
>>        conf.setMaxTaskParallelism(1);
>>        FlinkSubmitter.submitTopology(args[0], conf, buildTopology());
>>
>>    }
>>    // Otherwise, we are running locally
>>    else {
>>        conf.setMaxTaskParallelism(1);
>>        FlinkLocalCluster cluster = new FlinkLocalCluster();
>>        cluster.submitTopology("word-count", conf, buildTopology());
>>        Thread.sleep(10000);
>>    }
>> }
>>
>> public static FlinkTopology buildTopology() {
>>
>>    TopologyBuilder builder = new TopologyBuilder();
>>
>>    builder.setSpout("spout", new RandomSentenceSpout(), 1);
>>    builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout");
>>    builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word"));
>>
>>    builder.setBolt("writeIntoFile", new BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() {
>>        private static final long serialVersionUID = 1L;
>>
>>        @Override
>>        public String format(Tuple tuple) {
>>            return tuple.toString();
>>        }
>>    }), 1).shuffleGrouping("count");
>>
>>    return FlinkTopology.createTopology(builder);
>>
>> }
>> }That is the method that I want to invoke from my jar so that I will be able to do the submitting of the topology without any problem ie
>>
>> final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, uploadedJarLocation, getFlinkTopogy(String.format("file://%s", jarPath),properties.getProperty("topologyMainClass"), properties.getProperty("methodName")));
>> Where getFlinkTopology() return the contains actually topology
>>
>> But while doing that reflection I had an exception.
>>
>> Another question please. How do I make used of the hotflix of Till.
>>
>>    Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <[hidden email]> a écrit :
>> 
>>
>>  I cannot follow completely in your last step when you fail. What do you
>> mean by "I'm stuck at the level when I want to copy that from the jar to
>> submit it to flink"?
>>
>> Btw: I copied the code from the SO question and it works for me on the
>> current master (which includes Till's hotfix).
>>
>> -Matthias
>>
>>
>> On 04/13/2016 09:39 PM, star jlong wrote:
>>> Thanks Matthias for the reply.
>>> Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken.
>>> 1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with
>>> ./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology
>>> At this level everything went well.
>>>
>>> Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink.
>>>
>>> I know that is possible because I have used the same procedure with a storm topology that it works perfectly well.
>>> What I'm missing please?
>>>  jstar
>>>
>>>    Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit :
>>> 
>>>
>>>  Hi jstar,
>>>
>>> I need to have a close look. But I am wondering why you use reflection
>>> in the first place? Is there any specific reason for that?
>>>
>>> Furthermore, the example provided in project maven-example also covers
>>> the case to submit a topology to Flink via Java. Have a look at
>>> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter
>>>
>>> It contains a main() method and you can just run it as a regular Java
>>> program in your IDE.
>>>
>>> The SO question example should also work; it also contains a main()
>>> method, so you should be able to run it.
>>>
>>> Btw: If you use Storm-Compatiblitly-API there is no reason the get an
>>> ExecutuionEnvironment in you code. This happen automatically with
>>> FlinkClient/FlinkSubmitter.
>>>
>>> Furthermore, I would recommend to use FlinkSubmitter instead of
>>> FlinkClient as it is somewhat simpler to use.
>>>
>>> About SO question: I guess the problem is the jar assembling. The user says
>>>
>>> "Since I'using maven to handle my dependencies, I do a Mvn clean install
>>> to obtain the jar."
>>>
>>> I guess this is not sufficient to bundle a correct jar. Have a look into
>>> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar
>>> correctly. (Regular maven artifact do not work for job submission...)
>>>
>>> Will have a close look and follow up... Hope this helps already.
>>>
>>> -Matthias
>>>
>>> On 04/13/2016 06:23 PM, star jlong wrote:
>>>> Thanks for the reply.
>>>> @Stephen, I try using RemoteEnvironment to submit my topology to flink.
>>>> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute();
>>>> While running the program, this is the exception that I got.
>>>> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
>>>> 
>>>>
>>>>    Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit :
>>>> 
>>>>
>>>>  I think this is not the problem here since the problem is still happening
>>>> on the client side when the FlinkTopology tries to copy the registered
>>>> spouts. This happens before the job is submitted to the cluster. Maybe
>>>> Mathias could chime in here.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> For flink standalone programs, you would use a "RemoteEnvironment"
>>>>>
>>>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
>>>>> That one should deal with jars, classloaders, etc for you.
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the suggestion. Sure those examples are interesting and I have
>>>>>> deploy them successfully on flink. The deployment is done the command
>>>>> line
>>>>>> that is doing something like
>>>>>> bin/flink run example.jarBut what I want is to submit the topology to
>>>>>> flink using a java program.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>>    Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
>>>>> [hidden email]>
>>>>>> a écrit :
>>>>>>
>>>>>>
>>>>>>  you can find examples here:
>>>>>>
>>>>>>
>>>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
>>>>>>
>>>>>> we haven't established yet that it is an API issue; it could very well
>>>>>> be caused by the reflection magic you're using...
>>>>>>
>>>>>> On 13.04.2016 14:57, star jlong wrote:
>>>>>>> Ok, it seems like there an issue with the api. So please does anybody
>>>>>> has a working example for deploying a topology using the flink dependency
>>>>>> flink-storm_2.11 or any other will be welcoming.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> jstar
>>>>>>>
>>>>>>>      Le Mercredi 13 avril 2016 13h44, star jlong
>>>>>> <[hidden email]> a écrit :
>>>>>>>
>>>>>>>
>>>>>>>  Hi Schepler,
>>>>>>>
>>>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as
>>>>>> indicated on that post because I'm the one that posted that issue.
>>>>>>>
>>>>>>>      Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
>>>>>> [hidden email]> a écrit :
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>>>>>>>
>>>>>>> On 13.04.2016 14:28, Till Rohrmann wrote:
>>>>>>>> Hi jstar,
>>>>>>>>
>>>>>>>> what's exactly the problem you're observing?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
>>>>> <[hidden email]
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi there,
>>>>>>>>>
>>>>>>>>> I'm jstar. I have been playing around with flink. I'm very much
>>>>>> interested
>>>>>>>>> in submitting a topoloy  to flink using its api. As indicated
>>>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck
>>>>>> with
>>>>>>>>> some exception. Please any help will be welcoming.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>> jstar
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>> 
>>>>
>>>
>>>
>>> 
>>>
>>
>>
>> 
>>
>
>
>
>

>


Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

Matthias J. Sax-2
What is the compile error? Or could you already resolve it? there should
not be a difference from 1.0 to 1.1-SNAPSHOT for both classes...

And yes, prefix _2.11 is only available for released but not for current
development branch. But it is the same code (no need to worry about it).

-Matthias

On 04/14/2016 07:38 PM, star jlong wrote:

> Hi Matthias,
>
> I change the version as per your requirement but when I do that I have a compilation error at the level of the classes
> org.apache.flink.storm.util.BoltFileSink and org.apache.flink.storm.util.OutputFormatter
>
> Btw, the dependency
> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm-examples_2.11</artifactId> <version>1.1-SNAPSHOT</version> </dependency>
>
> Could  not be satisfy, but this one could be
> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm_2.10</artifactId> <version>1.1-SNAPSHOT</version> </dependency>
>
>     Le Jeudi 14 avril 2016 13h05, Matthias J. Sax <[hidden email]> a écrit :
>  
>
>  change the version to 1.1-SNAPSHOT
>
> On 04/14/2016 11:52 AM, star jlong wrote:
>> One question which dependency of flink are you using because I'm using
>> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm-examples_2.11</artifactId> <version>1.0.0</version></dependency>
>> And once I change the version to SNAPSHOT version, the pom.xml complains that it could not satisfy the given dependency.
>>
>>     Le Jeudi 14 avril 2016 10h45, star jlong <[hidden email]> a écrit :
>>  
>>
>>   Yes it is.
>>
>>     Le Jeudi 14 avril 2016 10h39, Matthias J. Sax <[hidden email]> a écrit :
>>  
>>
>>   For the fix, you need to use the current development version of Flink,
>> ie, change your maven dependency from <version>1.0</version> to
>> <version>1.1-SNAPSHOT</version>
>>
>> One question: what is FlinkGitService.class? It does only show up when
>> you get the ClassLoader:
>>
>>> ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, FlinkGitService.class.getClassLoader());
>>
>> It is the class that contains methods deploy() and getFlinkTopology() ?
>>
>> -Matthias
>>
>> On 04/14/2016 05:20 AM, star jlong wrote:
>>> What I'm  trying to say is that to get submit the flink topology to flink, I had to do an invocation of the mainMethod(which contain the actaul topology) of my topology with the class java.lang.reflect.Method.That is if you a take look at the following the topology the mainMethod is buildTopologypublic class WordCountTopology {
>>>     public static void main(String[] args) throws Exception {
>>>
>>>     Config conf = new Config();
>>>     conf.setDebug(true);
>>>     if (args != null && args.length > 0) {
>>>
>>>         conf.setNumWorkers(1);
>>>         conf.setMaxTaskParallelism(1);
>>>         FlinkSubmitter.submitTopology(args[0], conf, buildTopology());
>>>
>>>     }
>>>     // Otherwise, we are running locally
>>>     else {
>>>         conf.setMaxTaskParallelism(1);
>>>         FlinkLocalCluster cluster = new FlinkLocalCluster();
>>>         cluster.submitTopology("word-count", conf, buildTopology());
>>>         Thread.sleep(10000);
>>>     }
>>> }
>>>
>>> public static FlinkTopology buildTopology() {
>>>
>>>     TopologyBuilder builder = new TopologyBuilder();
>>>
>>>     builder.setSpout("spout", new RandomSentenceSpout(), 1);
>>>     builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout");
>>>     builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word"));
>>>
>>>     builder.setBolt("writeIntoFile", new BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() {
>>>         private static final long serialVersionUID = 1L;
>>>
>>>         @Override
>>>         public String format(Tuple tuple) {
>>>             return tuple.toString();
>>>         }
>>>     }), 1).shuffleGrouping("count");
>>>
>>>     return FlinkTopology.createTopology(builder);
>>>
>>> }
>>> }That is the method that I want to invoke from my jar so that I will be able to do the submitting of the topology without any problem ie
>>>
>>> final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, uploadedJarLocation, getFlinkTopogy(String.format("file://%s", jarPath),properties.getProperty("topologyMainClass"), properties.getProperty("methodName")));
>>> Where getFlinkTopology() return the contains actually topology
>>>
>>> But while doing that reflection I had an exception.
>>>
>>> Another question please. How do I make used of the hotflix of Till.
>>>
>>>     Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <[hidden email]> a écrit :
>>>  
>>>
>>>   I cannot follow completely in your last step when you fail. What do you
>>> mean by "I'm stuck at the level when I want to copy that from the jar to
>>> submit it to flink"?
>>>
>>> Btw: I copied the code from the SO question and it works for me on the
>>> current master (which includes Till's hotfix).
>>>
>>> -Matthias
>>>
>>>
>>> On 04/13/2016 09:39 PM, star jlong wrote:
>>>> Thanks Matthias for the reply.
>>>> Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken.
>>>> 1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with
>>>> ./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology
>>>> At this level everything went well.
>>>>
>>>> Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink.
>>>>
>>>> I know that is possible because I have used the same procedure with a storm topology that it works perfectly well.
>>>> What I'm missing please?
>>>>   jstar
>>>>
>>>>     Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit :
>>>>  
>>>>
>>>>   Hi jstar,
>>>>
>>>> I need to have a close look. But I am wondering why you use reflection
>>>> in the first place? Is there any specific reason for that?
>>>>
>>>> Furthermore, the example provided in project maven-example also covers
>>>> the case to submit a topology to Flink via Java. Have a look at
>>>> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter
>>>>
>>>> It contains a main() method and you can just run it as a regular Java
>>>> program in your IDE.
>>>>
>>>> The SO question example should also work; it also contains a main()
>>>> method, so you should be able to run it.
>>>>
>>>> Btw: If you use Storm-Compatiblitly-API there is no reason the get an
>>>> ExecutuionEnvironment in you code. This happen automatically with
>>>> FlinkClient/FlinkSubmitter.
>>>>
>>>> Furthermore, I would recommend to use FlinkSubmitter instead of
>>>> FlinkClient as it is somewhat simpler to use.
>>>>
>>>> About SO question: I guess the problem is the jar assembling. The user says
>>>>
>>>> "Since I'using maven to handle my dependencies, I do a Mvn clean install
>>>> to obtain the jar."
>>>>
>>>> I guess this is not sufficient to bundle a correct jar. Have a look into
>>>> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar
>>>> correctly. (Regular maven artifact do not work for job submission...)
>>>>
>>>> Will have a close look and follow up... Hope this helps already.
>>>>
>>>> -Matthias
>>>>
>>>> On 04/13/2016 06:23 PM, star jlong wrote:
>>>>> Thanks for the reply.
>>>>> @Stephen, I try using RemoteEnvironment to submit my topology to flink.
>>>>> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute();
>>>>> While running the program, this is the exception that I got.
>>>>> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
>>>>>  
>>>>>
>>>>>     Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit :
>>>>>  
>>>>>
>>>>>   I think this is not the problem here since the problem is still happening
>>>>> on the client side when the FlinkTopology tries to copy the registered
>>>>> spouts. This happens before the job is submitted to the cluster. Maybe
>>>>> Mathias could chime in here.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> For flink standalone programs, you would use a "RemoteEnvironment"
>>>>>>
>>>>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
>>>>>> That one should deal with jars, classloaders, etc for you.
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the suggestion. Sure those examples are interesting and I have
>>>>>>> deploy them successfully on flink. The deployment is done the command
>>>>>> line
>>>>>>> that is doing something like
>>>>>>> bin/flink run example.jarBut what I want is to submit the topology to
>>>>>>> flink using a java program.
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>>     Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
>>>>>> [hidden email]>
>>>>>>> a écrit :
>>>>>>>
>>>>>>>
>>>>>>>   you can find examples here:
>>>>>>>
>>>>>>>
>>>>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
>>>>>>>
>>>>>>> we haven't established yet that it is an API issue; it could very well
>>>>>>> be caused by the reflection magic you're using...
>>>>>>>
>>>>>>> On 13.04.2016 14:57, star jlong wrote:
>>>>>>>> Ok, it seems like there an issue with the api. So please does anybody
>>>>>>> has a working example for deploying a topology using the flink dependency
>>>>>>> flink-storm_2.11 or any other will be welcoming.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> jstar
>>>>>>>>
>>>>>>>>       Le Mercredi 13 avril 2016 13h44, star jlong
>>>>>>> <[hidden email]> a écrit :
>>>>>>>>
>>>>>>>>
>>>>>>>>   Hi Schepler,
>>>>>>>>
>>>>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as
>>>>>>> indicated on that post because I'm the one that posted that issue.
>>>>>>>>
>>>>>>>>       Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
>>>>>>> [hidden email]> a écrit :
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>>>>>>>>
>>>>>>>> On 13.04.2016 14:28, Till Rohrmann wrote:
>>>>>>>>> Hi jstar,
>>>>>>>>>
>>>>>>>>> what's exactly the problem you're observing?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>>
>>>>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
>>>>>> <[hidden email]
>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi there,
>>>>>>>>>>
>>>>>>>>>> I'm jstar. I have been playing around with flink. I'm very much
>>>>>>> interested
>>>>>>>>>> in submitting a topoloy  to flink using its api. As indicated
>>>>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck
>>>>>>> with
>>>>>>>>>> some exception. Please any help will be welcoming.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>> jstar
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>  
>>>>>
>>>>
>>>>
>>>>  
>>>>
>>>
>>>
>>>  
>>>
>>
>>
>>
>>
>>  
>>
>
>
>  
>


signature.asc (836 bytes) Download Attachment
12