For the fix, you need to use the current development version of Flink,
ie, change your maven dependency from <version>1.0</version> to <version>1.1-SNAPSHOT</version> One question: what is FlinkGitService.class? It does only show up when you get the ClassLoader: > ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, FlinkGitService.class.getClassLoader()); It is the class that contains methods deploy() and getFlinkTopology() ? -Matthias On 04/14/2016 05:20 AM, star jlong wrote: > What I'm trying to say is that to get submit the flink topology to flink, I had to do an invocation of the mainMethod(which contain the actaul topology) of my topology with the class java.lang.reflect.Method.That is if you a take look at the following the topology the mainMethod is buildTopologypublic class WordCountTopology { > public static void main(String[] args) throws Exception { > > Config conf = new Config(); > conf.setDebug(true); > if (args != null && args.length > 0) { > > conf.setNumWorkers(1); > conf.setMaxTaskParallelism(1); > FlinkSubmitter.submitTopology(args[0], conf, buildTopology()); > > } > // Otherwise, we are running locally > else { > conf.setMaxTaskParallelism(1); > FlinkLocalCluster cluster = new FlinkLocalCluster(); > cluster.submitTopology("word-count", conf, buildTopology()); > Thread.sleep(10000); > } > } > > public static FlinkTopology buildTopology() { > > TopologyBuilder builder = new TopologyBuilder(); > > builder.setSpout("spout", new RandomSentenceSpout(), 1); > builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout"); > builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word")); > > builder.setBolt("writeIntoFile", new BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() { > private static final long serialVersionUID = 1L; > > @Override > public String format(Tuple tuple) { > return tuple.toString(); > } > }), 1).shuffleGrouping("count"); > > return FlinkTopology.createTopology(builder); > > } > }That is the method that I want to invoke from my jar so that I will be able to do the submitting of the topology without any problem ie > > final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, uploadedJarLocation, getFlinkTopogy(String.format("file://%s", jarPath),properties.getProperty("topologyMainClass"), properties.getProperty("methodName"))); > Where getFlinkTopology() return the contains actually topology > > But while doing that reflection I had an exception. > > Another question please. How do I make used of the hotflix of Till. > > Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <[hidden email]> a écrit : > > > I cannot follow completely in your last step when you fail. What do you > mean by "I'm stuck at the level when I want to copy that from the jar to > submit it to flink"? > > Btw: I copied the code from the SO question and it works for me on the > current master (which includes Till's hotfix). > > -Matthias > > > On 04/13/2016 09:39 PM, star jlong wrote: >> Thanks Matthias for the reply. >> Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken. >> 1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with >> ./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology >> At this level everything went well. >> >> Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink. >> >> I know that is possible because I have used the same procedure with a storm topology that it works perfectly well. >> What I'm missing please? >> jstar >> >> Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit : >> >> >> Hi jstar, >> >> I need to have a close look. But I am wondering why you use reflection >> in the first place? Is there any specific reason for that? >> >> Furthermore, the example provided in project maven-example also covers >> the case to submit a topology to Flink via Java. Have a look at >> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter >> >> It contains a main() method and you can just run it as a regular Java >> program in your IDE. >> >> The SO question example should also work; it also contains a main() >> method, so you should be able to run it. >> >> Btw: If you use Storm-Compatiblitly-API there is no reason the get an >> ExecutuionEnvironment in you code. This happen automatically with >> FlinkClient/FlinkSubmitter. >> >> Furthermore, I would recommend to use FlinkSubmitter instead of >> FlinkClient as it is somewhat simpler to use. >> >> About SO question: I guess the problem is the jar assembling. The user says >> >> "Since I'using maven to handle my dependencies, I do a Mvn clean install >> to obtain the jar." >> >> I guess this is not sufficient to bundle a correct jar. Have a look into >> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar >> correctly. (Regular maven artifact do not work for job submission...) >> >> Will have a close look and follow up... Hope this helps already. >> >> -Matthias >> >> On 04/13/2016 06:23 PM, star jlong wrote: >>> Thanks for the reply. >>> @Stephen, I try using RemoteEnvironment to submit my topology to flink. >>> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute(); >>> While running the program, this is the exception that I got. >>> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it. >>> >>> >>> Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit : >>> >>> >>> I think this is not the problem here since the problem is still happening >>> on the client side when the FlinkTopology tries to copy the registered >>> spouts. This happens before the job is submitted to the cluster. Maybe >>> Mathias could chime in here. >>> >>> Cheers, >>> Till >>> >>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote: >>> >>>> Hi! >>>> >>>> For flink standalone programs, you would use a "RemoteEnvironment" >>>> >>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api". >>>> That one should deal with jars, classloaders, etc for you. >>>> >>>> Stephan >>>> >>>> >>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]> >>>> wrote: >>>> >>>>> Thanks for the suggestion. Sure those examples are interesting and I have >>>>> deploy them successfully on flink. The deployment is done the command >>>> line >>>>> that is doing something like >>>>> bin/flink run example.jarBut what I want is to submit the topology to >>>>> flink using a java program. >>>>> >>>>> Thanks. >>>>> >>>>> Le Mercredi 13 avril 2016 14h12, Chesnay Schepler < >>>> [hidden email]> >>>>> a écrit : >>>>> >>>>> >>>>> you can find examples here: >>>>> >>>>> >>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples >>>>> >>>>> we haven't established yet that it is an API issue; it could very well >>>>> be caused by the reflection magic you're using... >>>>> >>>>> On 13.04.2016 14:57, star jlong wrote: >>>>>> Ok, it seems like there an issue with the api. So please does anybody >>>>> has a working example for deploying a topology using the flink dependency >>>>> flink-storm_2.11 or any other will be welcoming. >>>>>> >>>>>> Thanks, >>>>>> jstar >>>>>> >>>>>> Le Mercredi 13 avril 2016 13h44, star jlong >>>>> <[hidden email]> a écrit : >>>>>> >>>>>> >>>>>> Hi Schepler, >>>>>> >>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as >>>>> indicated on that post because I'm the one that posted that issue. >>>>>> >>>>>> Le Mercredi 13 avril 2016 13h35, Chesnay Schepler < >>>>> [hidden email]> a écrit : >>>>>> >>>>>> >>>>>> >>>>> >>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api >>>>>> >>>>>> On 13.04.2016 14:28, Till Rohrmann wrote: >>>>>>> Hi jstar, >>>>>>> >>>>>>> what's exactly the problem you're observing? >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong >>>> <[hidden email] >>>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi there, >>>>>>>> >>>>>>>> I'm jstar. I have been playing around with flink. I'm very much >>>>> interested >>>>>>>> in submitting a topoloy to flink using its api. As indicated >>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck >>>>> with >>>>>>>> some exception. Please any help will be welcoming. >>>>>>>> >>>>>>>> >>>>>>>> Thanks. >>>>>>>> jstar >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>> >>> >>> >> >> >> >> > > > > |
Yes it is.
Le Jeudi 14 avril 2016 10h39, Matthias J. Sax <[hidden email]> a écrit : For the fix, you need to use the current development version of Flink, ie, change your maven dependency from <version>1.0</version> to <version>1.1-SNAPSHOT</version> One question: what is FlinkGitService.class? It does only show up when you get the ClassLoader: > ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, FlinkGitService.class.getClassLoader()); It is the class that contains methods deploy() and getFlinkTopology() ? -Matthias On 04/14/2016 05:20 AM, star jlong wrote: > What I'm trying to say is that to get submit the flink topology to flink, I had to do an invocation of the mainMethod(which contain the actaul topology) of my topology with the class java.lang.reflect.Method.That is if you a take look at the following the topology the mainMethod is buildTopologypublic class WordCountTopology { > public static void main(String[] args) throws Exception { > > Config conf = new Config(); > conf.setDebug(true); > if (args != null && args.length > 0) { > > conf.setNumWorkers(1); > conf.setMaxTaskParallelism(1); > FlinkSubmitter.submitTopology(args[0], conf, buildTopology()); > > } > // Otherwise, we are running locally > else { > conf.setMaxTaskParallelism(1); > FlinkLocalCluster cluster = new FlinkLocalCluster(); > cluster.submitTopology("word-count", conf, buildTopology()); > Thread.sleep(10000); > } > } > > public static FlinkTopology buildTopology() { > > TopologyBuilder builder = new TopologyBuilder(); > > builder.setSpout("spout", new RandomSentenceSpout(), 1); > builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout"); > builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word")); > > builder.setBolt("writeIntoFile", new BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() { > private static final long serialVersionUID = 1L; > > @Override > public String format(Tuple tuple) { > return tuple.toString(); > } > }), 1).shuffleGrouping("count"); > > return FlinkTopology.createTopology(builder); > > } > }That is the method that I want to invoke from my jar so that I will be able to do the submitting of the topology without any problem ie > > final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, uploadedJarLocation, getFlinkTopogy(String.format("file://%s", jarPath),properties.getProperty("topologyMainClass"), properties.getProperty("methodName"))); > Where getFlinkTopology() return the contains actually topology > > But while doing that reflection I had an exception. > > Another question please. How do I make used of the hotflix of Till. > > Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <[hidden email]> a écrit : > > > I cannot follow completely in your last step when you fail. What do you > mean by "I'm stuck at the level when I want to copy that from the jar to > submit it to flink"? > > Btw: I copied the code from the SO question and it works for me on the > current master (which includes Till's hotfix). > > -Matthias > > > On 04/13/2016 09:39 PM, star jlong wrote: >> Thanks Matthias for the reply. >> Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken. >> 1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with >> ./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology >> At this level everything went well. >> >> Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink. >> >> I know that is possible because I have used the same procedure with a storm topology that it works perfectly well. >> What I'm missing please? >> jstar >> >> Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit : >> >> >> Hi jstar, >> >> I need to have a close look. But I am wondering why you use reflection >> in the first place? Is there any specific reason for that? >> >> Furthermore, the example provided in project maven-example also covers >> the case to submit a topology to Flink via Java. Have a look at >> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter >> >> It contains a main() method and you can just run it as a regular Java >> program in your IDE. >> >> The SO question example should also work; it also contains a main() >> method, so you should be able to run it. >> >> Btw: If you use Storm-Compatiblitly-API there is no reason the get an >> ExecutuionEnvironment in you code. This happen automatically with >> FlinkClient/FlinkSubmitter. >> >> Furthermore, I would recommend to use FlinkSubmitter instead of >> FlinkClient as it is somewhat simpler to use. >> >> About SO question: I guess the problem is the jar assembling. The user says >> >> "Since I'using maven to handle my dependencies, I do a Mvn clean install >> to obtain the jar." >> >> I guess this is not sufficient to bundle a correct jar. Have a look into >> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar >> correctly. (Regular maven artifact do not work for job submission...) >> >> Will have a close look and follow up... Hope this helps already. >> >> -Matthias >> >> On 04/13/2016 06:23 PM, star jlong wrote: >>> Thanks for the reply. >>> @Stephen, I try using RemoteEnvironment to submit my topology to flink. >>> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute(); >>> While running the program, this is the exception that I got. >>> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it. >>> >>> >>> Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit : >>> >>> >>> I think this is not the problem here since the problem is still happening >>> on the client side when the FlinkTopology tries to copy the registered >>> spouts. This happens before the job is submitted to the cluster. Maybe >>> Mathias could chime in here. >>> >>> Cheers, >>> Till >>> >>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote: >>> >>>> Hi! >>>> >>>> For flink standalone programs, you would use a "RemoteEnvironment" >>>> >>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api". >>>> That one should deal with jars, classloaders, etc for you. >>>> >>>> Stephan >>>> >>>> >>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]> >>>> wrote: >>>> >>>>> Thanks for the suggestion. Sure those examples are interesting and I have >>>>> deploy them successfully on flink. The deployment is done the command >>>> line >>>>> that is doing something like >>>>> bin/flink run example.jarBut what I want is to submit the topology to >>>>> flink using a java program. >>>>> >>>>> Thanks. >>>>> >>>>> Le Mercredi 13 avril 2016 14h12, Chesnay Schepler < >>>> [hidden email]> >>>>> a écrit : >>>>> >>>>> >>>>> you can find examples here: >>>>> >>>>> >>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples >>>>> >>>>> we haven't established yet that it is an API issue; it could very well >>>>> be caused by the reflection magic you're using... >>>>> >>>>> On 13.04.2016 14:57, star jlong wrote: >>>>>> Ok, it seems like there an issue with the api. So please does anybody >>>>> has a working example for deploying a topology using the flink dependency >>>>> flink-storm_2.11 or any other will be welcoming. >>>>>> >>>>>> Thanks, >>>>>> jstar >>>>>> >>>>>> Le Mercredi 13 avril 2016 13h44, star jlong >>>>> <[hidden email]> a écrit : >>>>>> >>>>>> >>>>>> Hi Schepler, >>>>>> >>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as >>>>> indicated on that post because I'm the one that posted that issue. >>>>>> >>>>>> Le Mercredi 13 avril 2016 13h35, Chesnay Schepler < >>>>> [hidden email]> a écrit : >>>>>> >>>>>> >>>>>> >>>>> >>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api >>>>>> >>>>>> On 13.04.2016 14:28, Till Rohrmann wrote: >>>>>>> Hi jstar, >>>>>>> >>>>>>> what's exactly the problem you're observing? >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong >>>> <[hidden email] >>>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi there, >>>>>>>> >>>>>>>> I'm jstar. I have been playing around with flink. I'm very much >>>>> interested >>>>>>>> in submitting a topoloy to flink using its api. As indicated >>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck >>>>> with >>>>>>>> some exception. Please any help will be welcoming. >>>>>>>> >>>>>>>> >>>>>>>> Thanks. >>>>>>>> jstar >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>> >>> >>> >> >> >> >> > > > > |
One question which dependency of flink are you using because I'm using
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm-examples_2.11</artifactId> <version>1.0.0</version></dependency> And once I change the version to SNAPSHOT version, the pom.xml complains that it could not satisfy the given dependency. Le Jeudi 14 avril 2016 10h45, star jlong <[hidden email]> a écrit : Yes it is. Le Jeudi 14 avril 2016 10h39, Matthias J. Sax <[hidden email]> a écrit : For the fix, you need to use the current development version of Flink, ie, change your maven dependency from <version>1.0</version> to <version>1.1-SNAPSHOT</version> One question: what is FlinkGitService.class? It does only show up when you get the ClassLoader: > ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, FlinkGitService.class.getClassLoader()); It is the class that contains methods deploy() and getFlinkTopology() ? -Matthias On 04/14/2016 05:20 AM, star jlong wrote: > What I'm trying to say is that to get submit the flink topology to flink, I had to do an invocation of the mainMethod(which contain the actaul topology) of my topology with the class java.lang.reflect.Method.That is if you a take look at the following the topology the mainMethod is buildTopologypublic class WordCountTopology { > public static void main(String[] args) throws Exception { > > Config conf = new Config(); > conf.setDebug(true); > if (args != null && args.length > 0) { > > conf.setNumWorkers(1); > conf.setMaxTaskParallelism(1); > FlinkSubmitter.submitTopology(args[0], conf, buildTopology()); > > } > // Otherwise, we are running locally > else { > conf.setMaxTaskParallelism(1); > FlinkLocalCluster cluster = new FlinkLocalCluster(); > cluster.submitTopology("word-count", conf, buildTopology()); > Thread.sleep(10000); > } > } > > public static FlinkTopology buildTopology() { > > TopologyBuilder builder = new TopologyBuilder(); > > builder.setSpout("spout", new RandomSentenceSpout(), 1); > builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout"); > builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word")); > > builder.setBolt("writeIntoFile", new BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() { > private static final long serialVersionUID = 1L; > > @Override > public String format(Tuple tuple) { > return tuple.toString(); > } > }), 1).shuffleGrouping("count"); > > return FlinkTopology.createTopology(builder); > > } > }That is the method that I want to invoke from my jar so that I will be able to do the submitting of the topology without any problem ie > > final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, uploadedJarLocation, getFlinkTopogy(String.format("file://%s", jarPath),properties.getProperty("topologyMainClass"), properties.getProperty("methodName"))); > Where getFlinkTopology() return the contains actually topology > > But while doing that reflection I had an exception. > > Another question please. How do I make used of the hotflix of Till. > > Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <[hidden email]> a écrit : > > > I cannot follow completely in your last step when you fail. What do you > mean by "I'm stuck at the level when I want to copy that from the jar to > submit it to flink"? > > Btw: I copied the code from the SO question and it works for me on the > current master (which includes Till's hotfix). > > -Matthias > > > On 04/13/2016 09:39 PM, star jlong wrote: >> Thanks Matthias for the reply. >> Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken. >> 1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with >> ./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology >> At this level everything went well. >> >> Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink. >> >> I know that is possible because I have used the same procedure with a storm topology that it works perfectly well. >> What I'm missing please? >> jstar >> >> Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit : >> >> >> Hi jstar, >> >> I need to have a close look. But I am wondering why you use reflection >> in the first place? Is there any specific reason for that? >> >> Furthermore, the example provided in project maven-example also covers >> the case to submit a topology to Flink via Java. Have a look at >> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter >> >> It contains a main() method and you can just run it as a regular Java >> program in your IDE. >> >> The SO question example should also work; it also contains a main() >> method, so you should be able to run it. >> >> Btw: If you use Storm-Compatiblitly-API there is no reason the get an >> ExecutuionEnvironment in you code. This happen automatically with >> FlinkClient/FlinkSubmitter. >> >> Furthermore, I would recommend to use FlinkSubmitter instead of >> FlinkClient as it is somewhat simpler to use. >> >> About SO question: I guess the problem is the jar assembling. The user says >> >> "Since I'using maven to handle my dependencies, I do a Mvn clean install >> to obtain the jar." >> >> I guess this is not sufficient to bundle a correct jar. Have a look into >> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar >> correctly. (Regular maven artifact do not work for job submission...) >> >> Will have a close look and follow up... Hope this helps already. >> >> -Matthias >> >> On 04/13/2016 06:23 PM, star jlong wrote: >>> Thanks for the reply. >>> @Stephen, I try using RemoteEnvironment to submit my topology to flink. >>> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute(); >>> While running the program, this is the exception that I got. >>> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it. >>> >>> >>> Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit : >>> >>> >>> I think this is not the problem here since the problem is still happening >>> on the client side when the FlinkTopology tries to copy the registered >>> spouts. This happens before the job is submitted to the cluster. Maybe >>> Mathias could chime in here. >>> >>> Cheers, >>> Till >>> >>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote: >>> >>>> Hi! >>>> >>>> For flink standalone programs, you would use a "RemoteEnvironment" >>>> >>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api". >>>> That one should deal with jars, classloaders, etc for you. >>>> >>>> Stephan >>>> >>>> >>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]> >>>> wrote: >>>> >>>>> Thanks for the suggestion. Sure those examples are interesting and I have >>>>> deploy them successfully on flink. The deployment is done the command >>>> line >>>>> that is doing something like >>>>> bin/flink run example.jarBut what I want is to submit the topology to >>>>> flink using a java program. >>>>> >>>>> Thanks. >>>>> >>>>> Le Mercredi 13 avril 2016 14h12, Chesnay Schepler < >>>> [hidden email]> >>>>> a écrit : >>>>> >>>>> >>>>> you can find examples here: >>>>> >>>>> >>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples >>>>> >>>>> we haven't established yet that it is an API issue; it could very well >>>>> be caused by the reflection magic you're using... >>>>> >>>>> On 13.04.2016 14:57, star jlong wrote: >>>>>> Ok, it seems like there an issue with the api. So please does anybody >>>>> has a working example for deploying a topology using the flink dependency >>>>> flink-storm_2.11 or any other will be welcoming. >>>>>> >>>>>> Thanks, >>>>>> jstar >>>>>> >>>>>> Le Mercredi 13 avril 2016 13h44, star jlong >>>>> <[hidden email]> a écrit : >>>>>> >>>>>> >>>>>> Hi Schepler, >>>>>> >>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as >>>>> indicated on that post because I'm the one that posted that issue. >>>>>> >>>>>> Le Mercredi 13 avril 2016 13h35, Chesnay Schepler < >>>>> [hidden email]> a écrit : >>>>>> >>>>>> >>>>>> >>>>> >>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api >>>>>> >>>>>> On 13.04.2016 14:28, Till Rohrmann wrote: >>>>>>> Hi jstar, >>>>>>> >>>>>>> what's exactly the problem you're observing? >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong >>>> <[hidden email] >>>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi there, >>>>>>>> >>>>>>>> I'm jstar. I have been playing around with flink. I'm very much >>>>> interested >>>>>>>> in submitting a topoloy to flink using its api. As indicated >>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck >>>>> with >>>>>>>> some exception. Please any help will be welcoming. >>>>>>>> >>>>>>>> >>>>>>>> Thanks. >>>>>>>> jstar >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>> >>> >>> >> >> >> >> > > > > |
change the version to 1.1-SNAPSHOT
On 04/14/2016 11:52 AM, star jlong wrote: > One question which dependency of flink are you using because I'm using > <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm-examples_2.11</artifactId> <version>1.0.0</version></dependency> > And once I change the version to SNAPSHOT version, the pom.xml complains that it could not satisfy the given dependency. > > Le Jeudi 14 avril 2016 10h45, star jlong <[hidden email]> a écrit : > > > Yes it is. > > Le Jeudi 14 avril 2016 10h39, Matthias J. Sax <[hidden email]> a écrit : > > > For the fix, you need to use the current development version of Flink, > ie, change your maven dependency from <version>1.0</version> to > <version>1.1-SNAPSHOT</version> > > One question: what is FlinkGitService.class? It does only show up when > you get the ClassLoader: > >> ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, FlinkGitService.class.getClassLoader()); > > It is the class that contains methods deploy() and getFlinkTopology() ? > > -Matthias > > On 04/14/2016 05:20 AM, star jlong wrote: >> What I'm trying to say is that to get submit the flink topology to flink, I had to do an invocation of the mainMethod(which contain the actaul topology) of my topology with the class java.lang.reflect.Method.That is if you a take look at the following the topology the mainMethod is buildTopologypublic class WordCountTopology { >> public static void main(String[] args) throws Exception { >> >> Config conf = new Config(); >> conf.setDebug(true); >> if (args != null && args.length > 0) { >> >> conf.setNumWorkers(1); >> conf.setMaxTaskParallelism(1); >> FlinkSubmitter.submitTopology(args[0], conf, buildTopology()); >> >> } >> // Otherwise, we are running locally >> else { >> conf.setMaxTaskParallelism(1); >> FlinkLocalCluster cluster = new FlinkLocalCluster(); >> cluster.submitTopology("word-count", conf, buildTopology()); >> Thread.sleep(10000); >> } >> } >> >> public static FlinkTopology buildTopology() { >> >> TopologyBuilder builder = new TopologyBuilder(); >> >> builder.setSpout("spout", new RandomSentenceSpout(), 1); >> builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout"); >> builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word")); >> >> builder.setBolt("writeIntoFile", new BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() { >> private static final long serialVersionUID = 1L; >> >> @Override >> public String format(Tuple tuple) { >> return tuple.toString(); >> } >> }), 1).shuffleGrouping("count"); >> >> return FlinkTopology.createTopology(builder); >> >> } >> }That is the method that I want to invoke from my jar so that I will be able to do the submitting of the topology without any problem ie >> >> final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, uploadedJarLocation, getFlinkTopogy(String.format("file://%s", jarPath),properties.getProperty("topologyMainClass"), properties.getProperty("methodName"))); >> Where getFlinkTopology() return the contains actually topology >> >> But while doing that reflection I had an exception. >> >> Another question please. How do I make used of the hotflix of Till. >> >> Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <[hidden email]> a écrit : >> >> >> I cannot follow completely in your last step when you fail. What do you >> mean by "I'm stuck at the level when I want to copy that from the jar to >> submit it to flink"? >> >> Btw: I copied the code from the SO question and it works for me on the >> current master (which includes Till's hotfix). >> >> -Matthias >> >> >> On 04/13/2016 09:39 PM, star jlong wrote: >>> Thanks Matthias for the reply. >>> Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken. >>> 1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with >>> ./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology >>> At this level everything went well. >>> >>> Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink. >>> >>> I know that is possible because I have used the same procedure with a storm topology that it works perfectly well. >>> What I'm missing please? >>> jstar >>> >>> Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit : >>> >>> >>> Hi jstar, >>> >>> I need to have a close look. But I am wondering why you use reflection >>> in the first place? Is there any specific reason for that? >>> >>> Furthermore, the example provided in project maven-example also covers >>> the case to submit a topology to Flink via Java. Have a look at >>> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter >>> >>> It contains a main() method and you can just run it as a regular Java >>> program in your IDE. >>> >>> The SO question example should also work; it also contains a main() >>> method, so you should be able to run it. >>> >>> Btw: If you use Storm-Compatiblitly-API there is no reason the get an >>> ExecutuionEnvironment in you code. This happen automatically with >>> FlinkClient/FlinkSubmitter. >>> >>> Furthermore, I would recommend to use FlinkSubmitter instead of >>> FlinkClient as it is somewhat simpler to use. >>> >>> About SO question: I guess the problem is the jar assembling. The user says >>> >>> "Since I'using maven to handle my dependencies, I do a Mvn clean install >>> to obtain the jar." >>> >>> I guess this is not sufficient to bundle a correct jar. Have a look into >>> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar >>> correctly. (Regular maven artifact do not work for job submission...) >>> >>> Will have a close look and follow up... Hope this helps already. >>> >>> -Matthias >>> >>> On 04/13/2016 06:23 PM, star jlong wrote: >>>> Thanks for the reply. >>>> @Stephen, I try using RemoteEnvironment to submit my topology to flink. >>>> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute(); >>>> While running the program, this is the exception that I got. >>>> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it. >>>> >>>> >>>> Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit : >>>> >>>> >>>> I think this is not the problem here since the problem is still happening >>>> on the client side when the FlinkTopology tries to copy the registered >>>> spouts. This happens before the job is submitted to the cluster. Maybe >>>> Mathias could chime in here. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote: >>>> >>>>> Hi! >>>>> >>>>> For flink standalone programs, you would use a "RemoteEnvironment" >>>>> >>>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api". >>>>> That one should deal with jars, classloaders, etc for you. >>>>> >>>>> Stephan >>>>> >>>>> >>>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]> >>>>> wrote: >>>>> >>>>>> Thanks for the suggestion. Sure those examples are interesting and I have >>>>>> deploy them successfully on flink. The deployment is done the command >>>>> line >>>>>> that is doing something like >>>>>> bin/flink run example.jarBut what I want is to submit the topology to >>>>>> flink using a java program. >>>>>> >>>>>> Thanks. >>>>>> >>>>>> Le Mercredi 13 avril 2016 14h12, Chesnay Schepler < >>>>> [hidden email]> >>>>>> a écrit : >>>>>> >>>>>> >>>>>> you can find examples here: >>>>>> >>>>>> >>>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples >>>>>> >>>>>> we haven't established yet that it is an API issue; it could very well >>>>>> be caused by the reflection magic you're using... >>>>>> >>>>>> On 13.04.2016 14:57, star jlong wrote: >>>>>>> Ok, it seems like there an issue with the api. So please does anybody >>>>>> has a working example for deploying a topology using the flink dependency >>>>>> flink-storm_2.11 or any other will be welcoming. >>>>>>> >>>>>>> Thanks, >>>>>>> jstar >>>>>>> >>>>>>> Le Mercredi 13 avril 2016 13h44, star jlong >>>>>> <[hidden email]> a écrit : >>>>>>> >>>>>>> >>>>>>> Hi Schepler, >>>>>>> >>>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as >>>>>> indicated on that post because I'm the one that posted that issue. >>>>>>> >>>>>>> Le Mercredi 13 avril 2016 13h35, Chesnay Schepler < >>>>>> [hidden email]> a écrit : >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api >>>>>>> >>>>>>> On 13.04.2016 14:28, Till Rohrmann wrote: >>>>>>>> Hi jstar, >>>>>>>> >>>>>>>> what's exactly the problem you're observing? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Till >>>>>>>> >>>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong >>>>> <[hidden email] >>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi there, >>>>>>>>> >>>>>>>>> I'm jstar. I have been playing around with flink. I'm very much >>>>>> interested >>>>>>>>> in submitting a topoloy to flink using its api. As indicated >>>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck >>>>>> with >>>>>>>>> some exception. Please any help will be welcoming. >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks. >>>>>>>>> jstar >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>>> >>> >>> >>> >>> >> >> >> >> > > > > > > |
Hi Matthias,
I change the version as per your requirement but when I do that I have a compilation error at the level of the classes org.apache.flink.storm.util.BoltFileSink and org.apache.flink.storm.util.OutputFormatter Btw, the dependency <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm-examples_2.11</artifactId> <version>1.1-SNAPSHOT</version> </dependency> Could not be satisfy, but this one could be <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm_2.10</artifactId> <version>1.1-SNAPSHOT</version> </dependency> Le Jeudi 14 avril 2016 13h05, Matthias J. Sax <[hidden email]> a écrit : change the version to 1.1-SNAPSHOT On 04/14/2016 11:52 AM, star jlong wrote: > One question which dependency of flink are you using because I'm using > <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm-examples_2.11</artifactId> <version>1.0.0</version></dependency> > And once I change the version to SNAPSHOT version, the pom.xml complains that it could not satisfy the given dependency. > > Le Jeudi 14 avril 2016 10h45, star jlong <[hidden email]> a écrit : > > > Yes it is. > > Le Jeudi 14 avril 2016 10h39, Matthias J. Sax <[hidden email]> a écrit : > > > For the fix, you need to use the current development version of Flink, > ie, change your maven dependency from <version>1.0</version> to > <version>1.1-SNAPSHOT</version> > > One question: what is FlinkGitService.class? It does only show up when > you get the ClassLoader: > >> ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, FlinkGitService.class.getClassLoader()); > > It is the class that contains methods deploy() and getFlinkTopology() ? > > -Matthias > > On 04/14/2016 05:20 AM, star jlong wrote: >> What I'm trying to say is that to get submit the flink topology to flink, I had to do an invocation of the mainMethod(which contain the actaul topology) of my topology with the class java.lang.reflect.Method.That is if you a take look at the following the topology the mainMethod is buildTopologypublic class WordCountTopology { >> public static void main(String[] args) throws Exception { >> >> Config conf = new Config(); >> conf.setDebug(true); >> if (args != null && args.length > 0) { >> >> conf.setNumWorkers(1); >> conf.setMaxTaskParallelism(1); >> FlinkSubmitter.submitTopology(args[0], conf, buildTopology()); >> >> } >> // Otherwise, we are running locally >> else { >> conf.setMaxTaskParallelism(1); >> FlinkLocalCluster cluster = new FlinkLocalCluster(); >> cluster.submitTopology("word-count", conf, buildTopology()); >> Thread.sleep(10000); >> } >> } >> >> public static FlinkTopology buildTopology() { >> >> TopologyBuilder builder = new TopologyBuilder(); >> >> builder.setSpout("spout", new RandomSentenceSpout(), 1); >> builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout"); >> builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word")); >> >> builder.setBolt("writeIntoFile", new BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() { >> private static final long serialVersionUID = 1L; >> >> @Override >> public String format(Tuple tuple) { >> return tuple.toString(); >> } >> }), 1).shuffleGrouping("count"); >> >> return FlinkTopology.createTopology(builder); >> >> } >> }That is the method that I want to invoke from my jar so that I will be able to do the submitting of the topology without any problem ie >> >> final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, uploadedJarLocation, getFlinkTopogy(String.format("file://%s", jarPath),properties.getProperty("topologyMainClass"), properties.getProperty("methodName"))); >> Where getFlinkTopology() return the contains actually topology >> >> But while doing that reflection I had an exception. >> >> Another question please. How do I make used of the hotflix of Till. >> >> Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <[hidden email]> a écrit : >> >> >> I cannot follow completely in your last step when you fail. What do you >> mean by "I'm stuck at the level when I want to copy that from the jar to >> submit it to flink"? >> >> Btw: I copied the code from the SO question and it works for me on the >> current master (which includes Till's hotfix). >> >> -Matthias >> >> >> On 04/13/2016 09:39 PM, star jlong wrote: >>> Thanks Matthias for the reply. >>> Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken. >>> 1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with >>> ./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology >>> At this level everything went well. >>> >>> Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink. >>> >>> I know that is possible because I have used the same procedure with a storm topology that it works perfectly well. >>> What I'm missing please? >>> jstar >>> >>> Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit : >>> >>> >>> Hi jstar, >>> >>> I need to have a close look. But I am wondering why you use reflection >>> in the first place? Is there any specific reason for that? >>> >>> Furthermore, the example provided in project maven-example also covers >>> the case to submit a topology to Flink via Java. Have a look at >>> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter >>> >>> It contains a main() method and you can just run it as a regular Java >>> program in your IDE. >>> >>> The SO question example should also work; it also contains a main() >>> method, so you should be able to run it. >>> >>> Btw: If you use Storm-Compatiblitly-API there is no reason the get an >>> ExecutuionEnvironment in you code. This happen automatically with >>> FlinkClient/FlinkSubmitter. >>> >>> Furthermore, I would recommend to use FlinkSubmitter instead of >>> FlinkClient as it is somewhat simpler to use. >>> >>> About SO question: I guess the problem is the jar assembling. The user says >>> >>> "Since I'using maven to handle my dependencies, I do a Mvn clean install >>> to obtain the jar." >>> >>> I guess this is not sufficient to bundle a correct jar. Have a look into >>> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar >>> correctly. (Regular maven artifact do not work for job submission...) >>> >>> Will have a close look and follow up... Hope this helps already. >>> >>> -Matthias >>> >>> On 04/13/2016 06:23 PM, star jlong wrote: >>>> Thanks for the reply. >>>> @Stephen, I try using RemoteEnvironment to submit my topology to flink. >>>> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute(); >>>> While running the program, this is the exception that I got. >>>> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it. >>>> >>>> >>>> Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit : >>>> >>>> >>>> I think this is not the problem here since the problem is still happening >>>> on the client side when the FlinkTopology tries to copy the registered >>>> spouts. This happens before the job is submitted to the cluster. Maybe >>>> Mathias could chime in here. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote: >>>> >>>>> Hi! >>>>> >>>>> For flink standalone programs, you would use a "RemoteEnvironment" >>>>> >>>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api". >>>>> That one should deal with jars, classloaders, etc for you. >>>>> >>>>> Stephan >>>>> >>>>> >>>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]> >>>>> wrote: >>>>> >>>>>> Thanks for the suggestion. Sure those examples are interesting and I have >>>>>> deploy them successfully on flink. The deployment is done the command >>>>> line >>>>>> that is doing something like >>>>>> bin/flink run example.jarBut what I want is to submit the topology to >>>>>> flink using a java program. >>>>>> >>>>>> Thanks. >>>>>> >>>>>> Le Mercredi 13 avril 2016 14h12, Chesnay Schepler < >>>>> [hidden email]> >>>>>> a écrit : >>>>>> >>>>>> >>>>>> you can find examples here: >>>>>> >>>>>> >>>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples >>>>>> >>>>>> we haven't established yet that it is an API issue; it could very well >>>>>> be caused by the reflection magic you're using... >>>>>> >>>>>> On 13.04.2016 14:57, star jlong wrote: >>>>>>> Ok, it seems like there an issue with the api. So please does anybody >>>>>> has a working example for deploying a topology using the flink dependency >>>>>> flink-storm_2.11 or any other will be welcoming. >>>>>>> >>>>>>> Thanks, >>>>>>> jstar >>>>>>> >>>>>>> Le Mercredi 13 avril 2016 13h44, star jlong >>>>>> <[hidden email]> a écrit : >>>>>>> >>>>>>> >>>>>>> Hi Schepler, >>>>>>> >>>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as >>>>>> indicated on that post because I'm the one that posted that issue. >>>>>>> >>>>>>> Le Mercredi 13 avril 2016 13h35, Chesnay Schepler < >>>>>> [hidden email]> a écrit : >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api >>>>>>> >>>>>>> On 13.04.2016 14:28, Till Rohrmann wrote: >>>>>>>> Hi jstar, >>>>>>>> >>>>>>>> what's exactly the problem you're observing? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Till >>>>>>>> >>>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong >>>>> <[hidden email] >>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi there, >>>>>>>>> >>>>>>>>> I'm jstar. I have been playing around with flink. I'm very much >>>>>> interested >>>>>>>>> in submitting a topoloy to flink using its api. As indicated >>>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck >>>>>> with >>>>>>>>> some exception. Please any help will be welcoming. >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks. >>>>>>>>> jstar >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>>> >>> >>> >>> >>> >> >> >> >> > > > > > > |
What is the compile error? Or could you already resolve it? there should
not be a difference from 1.0 to 1.1-SNAPSHOT for both classes... And yes, prefix _2.11 is only available for released but not for current development branch. But it is the same code (no need to worry about it). -Matthias On 04/14/2016 07:38 PM, star jlong wrote: > Hi Matthias, > > I change the version as per your requirement but when I do that I have a compilation error at the level of the classes > org.apache.flink.storm.util.BoltFileSink and org.apache.flink.storm.util.OutputFormatter > > Btw, the dependency > <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm-examples_2.11</artifactId> <version>1.1-SNAPSHOT</version> </dependency> > > Could not be satisfy, but this one could be > <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm_2.10</artifactId> <version>1.1-SNAPSHOT</version> </dependency> > > Le Jeudi 14 avril 2016 13h05, Matthias J. Sax <[hidden email]> a écrit : > > > change the version to 1.1-SNAPSHOT > > On 04/14/2016 11:52 AM, star jlong wrote: >> One question which dependency of flink are you using because I'm using >> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm-examples_2.11</artifactId> <version>1.0.0</version></dependency> >> And once I change the version to SNAPSHOT version, the pom.xml complains that it could not satisfy the given dependency. >> >> Le Jeudi 14 avril 2016 10h45, star jlong <[hidden email]> a écrit : >> >> >> Yes it is. >> >> Le Jeudi 14 avril 2016 10h39, Matthias J. Sax <[hidden email]> a écrit : >> >> >> For the fix, you need to use the current development version of Flink, >> ie, change your maven dependency from <version>1.0</version> to >> <version>1.1-SNAPSHOT</version> >> >> One question: what is FlinkGitService.class? It does only show up when >> you get the ClassLoader: >> >>> ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, FlinkGitService.class.getClassLoader()); >> >> It is the class that contains methods deploy() and getFlinkTopology() ? >> >> -Matthias >> >> On 04/14/2016 05:20 AM, star jlong wrote: >>> What I'm trying to say is that to get submit the flink topology to flink, I had to do an invocation of the mainMethod(which contain the actaul topology) of my topology with the class java.lang.reflect.Method.That is if you a take look at the following the topology the mainMethod is buildTopologypublic class WordCountTopology { >>> public static void main(String[] args) throws Exception { >>> >>> Config conf = new Config(); >>> conf.setDebug(true); >>> if (args != null && args.length > 0) { >>> >>> conf.setNumWorkers(1); >>> conf.setMaxTaskParallelism(1); >>> FlinkSubmitter.submitTopology(args[0], conf, buildTopology()); >>> >>> } >>> // Otherwise, we are running locally >>> else { >>> conf.setMaxTaskParallelism(1); >>> FlinkLocalCluster cluster = new FlinkLocalCluster(); >>> cluster.submitTopology("word-count", conf, buildTopology()); >>> Thread.sleep(10000); >>> } >>> } >>> >>> public static FlinkTopology buildTopology() { >>> >>> TopologyBuilder builder = new TopologyBuilder(); >>> >>> builder.setSpout("spout", new RandomSentenceSpout(), 1); >>> builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout"); >>> builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word")); >>> >>> builder.setBolt("writeIntoFile", new BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() { >>> private static final long serialVersionUID = 1L; >>> >>> @Override >>> public String format(Tuple tuple) { >>> return tuple.toString(); >>> } >>> }), 1).shuffleGrouping("count"); >>> >>> return FlinkTopology.createTopology(builder); >>> >>> } >>> }That is the method that I want to invoke from my jar so that I will be able to do the submitting of the topology without any problem ie >>> >>> final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, uploadedJarLocation, getFlinkTopogy(String.format("file://%s", jarPath),properties.getProperty("topologyMainClass"), properties.getProperty("methodName"))); >>> Where getFlinkTopology() return the contains actually topology >>> >>> But while doing that reflection I had an exception. >>> >>> Another question please. How do I make used of the hotflix of Till. >>> >>> Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <[hidden email]> a écrit : >>> >>> >>> I cannot follow completely in your last step when you fail. What do you >>> mean by "I'm stuck at the level when I want to copy that from the jar to >>> submit it to flink"? >>> >>> Btw: I copied the code from the SO question and it works for me on the >>> current master (which includes Till's hotfix). >>> >>> -Matthias >>> >>> >>> On 04/13/2016 09:39 PM, star jlong wrote: >>>> Thanks Matthias for the reply. >>>> Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken. >>>> 1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with >>>> ./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology >>>> At this level everything went well. >>>> >>>> Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink. >>>> >>>> I know that is possible because I have used the same procedure with a storm topology that it works perfectly well. >>>> What I'm missing please? >>>> jstar >>>> >>>> Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit : >>>> >>>> >>>> Hi jstar, >>>> >>>> I need to have a close look. But I am wondering why you use reflection >>>> in the first place? Is there any specific reason for that? >>>> >>>> Furthermore, the example provided in project maven-example also covers >>>> the case to submit a topology to Flink via Java. Have a look at >>>> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter >>>> >>>> It contains a main() method and you can just run it as a regular Java >>>> program in your IDE. >>>> >>>> The SO question example should also work; it also contains a main() >>>> method, so you should be able to run it. >>>> >>>> Btw: If you use Storm-Compatiblitly-API there is no reason the get an >>>> ExecutuionEnvironment in you code. This happen automatically with >>>> FlinkClient/FlinkSubmitter. >>>> >>>> Furthermore, I would recommend to use FlinkSubmitter instead of >>>> FlinkClient as it is somewhat simpler to use. >>>> >>>> About SO question: I guess the problem is the jar assembling. The user says >>>> >>>> "Since I'using maven to handle my dependencies, I do a Mvn clean install >>>> to obtain the jar." >>>> >>>> I guess this is not sufficient to bundle a correct jar. Have a look into >>>> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar >>>> correctly. (Regular maven artifact do not work for job submission...) >>>> >>>> Will have a close look and follow up... Hope this helps already. >>>> >>>> -Matthias >>>> >>>> On 04/13/2016 06:23 PM, star jlong wrote: >>>>> Thanks for the reply. >>>>> @Stephen, I try using RemoteEnvironment to submit my topology to flink. >>>>> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute(); >>>>> While running the program, this is the exception that I got. >>>>> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it. >>>>> >>>>> >>>>> Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit : >>>>> >>>>> >>>>> I think this is not the problem here since the problem is still happening >>>>> on the client side when the FlinkTopology tries to copy the registered >>>>> spouts. This happens before the job is submitted to the cluster. Maybe >>>>> Mathias could chime in here. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote: >>>>> >>>>>> Hi! >>>>>> >>>>>> For flink standalone programs, you would use a "RemoteEnvironment" >>>>>> >>>>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api". >>>>>> That one should deal with jars, classloaders, etc for you. >>>>>> >>>>>> Stephan >>>>>> >>>>>> >>>>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the suggestion. Sure those examples are interesting and I have >>>>>>> deploy them successfully on flink. The deployment is done the command >>>>>> line >>>>>>> that is doing something like >>>>>>> bin/flink run example.jarBut what I want is to submit the topology to >>>>>>> flink using a java program. >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> Le Mercredi 13 avril 2016 14h12, Chesnay Schepler < >>>>>> [hidden email]> >>>>>>> a écrit : >>>>>>> >>>>>>> >>>>>>> you can find examples here: >>>>>>> >>>>>>> >>>>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples >>>>>>> >>>>>>> we haven't established yet that it is an API issue; it could very well >>>>>>> be caused by the reflection magic you're using... >>>>>>> >>>>>>> On 13.04.2016 14:57, star jlong wrote: >>>>>>>> Ok, it seems like there an issue with the api. So please does anybody >>>>>>> has a working example for deploying a topology using the flink dependency >>>>>>> flink-storm_2.11 or any other will be welcoming. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> jstar >>>>>>>> >>>>>>>> Le Mercredi 13 avril 2016 13h44, star jlong >>>>>>> <[hidden email]> a écrit : >>>>>>>> >>>>>>>> >>>>>>>> Hi Schepler, >>>>>>>> >>>>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as >>>>>>> indicated on that post because I'm the one that posted that issue. >>>>>>>> >>>>>>>> Le Mercredi 13 avril 2016 13h35, Chesnay Schepler < >>>>>>> [hidden email]> a écrit : >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api >>>>>>>> >>>>>>>> On 13.04.2016 14:28, Till Rohrmann wrote: >>>>>>>>> Hi jstar, >>>>>>>>> >>>>>>>>> what's exactly the problem you're observing? >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Till >>>>>>>>> >>>>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong >>>>>> <[hidden email] >>>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi there, >>>>>>>>>> >>>>>>>>>> I'm jstar. I have been playing around with flink. I'm very much >>>>>>> interested >>>>>>>>>> in submitting a topoloy to flink using its api. As indicated >>>>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck >>>>>>> with >>>>>>>>>> some exception. Please any help will be welcoming. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks. >>>>>>>>>> jstar >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> >>>> >>>> >>>> >>>> >>> >>> >>> >>> >> >> >> >> >> >> > > > > |
Free forum by Nabble | Edit this page |