Issue deploying a topology to flink with a java api

classic Classic list List threaded Threaded
26 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Issue deploying a topology to flink with a java api

star jlong
Hi there,

I'm jstar. I have been playing around with flink. I'm very much interested in submitting a topoloy  to flink using its api. As indicated on stackoverflow, that is the try that I have given. But I was stuck with some exception. Please any help will be welcoming. 


Thanks.
jstar
Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

till.rohrmann
Hi jstar,

what's exactly the problem you're observing?

Cheers,
Till

On Wed, Apr 13, 2016 at 2:23 PM, star jlong <[hidden email]>
wrote:

> Hi there,
>
> I'm jstar. I have been playing around with flink. I'm very much interested
> in submitting a topoloy  to flink using its api. As indicated
> on stackoverflow, that is the try that I have given. But I was stuck with
> some exception. Please any help will be welcoming.
>
>
> Thanks.
> jstar
Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

star jlong
Hi Till,
Thank for the quick reply. I'm unable to copy the mainMethod of my topology using the instruction
(FlinkTopology) method.invoke(null, new Object[] {});

where method is variable of type java.lang.reflect.Method

    Le Mercredi 13 avril 2016 13h28, Till Rohrmann <[hidden email]> a écrit :
 

 Hi jstar,

what's exactly the problem you're observing?

Cheers,
Till

On Wed, Apr 13, 2016 at 2:23 PM, star jlong <[hidden email]>
wrote:

> Hi there,
>
> I'm jstar. I have been playing around with flink. I'm very much interested
> in submitting a topoloy  to flink using its api. As indicated
> on stackoverflow, that is the try that I have given. But I was stuck with
> some exception. Please any help will be welcoming.
>
>
> Thanks.
> jstar


Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

Chesnay Schepler-3
In reply to this post by till.rohrmann
http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api

On 13.04.2016 14:28, Till Rohrmann wrote:

> Hi jstar,
>
> what's exactly the problem you're observing?
>
> Cheers,
> Till
>
> On Wed, Apr 13, 2016 at 2:23 PM, star jlong <[hidden email]>
> wrote:
>
>> Hi there,
>>
>> I'm jstar. I have been playing around with flink. I'm very much interested
>> in submitting a topoloy  to flink using its api. As indicated
>> on stackoverflow, that is the try that I have given. But I was stuck with
>> some exception. Please any help will be welcoming.
>>
>>
>> Thanks.
>> jstar

Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

Chesnay Schepler-3
I think the following is the interesting part of the stack-trace:

|Causedby:java.lang.RuntimeException:Failedto copy object.at
org.apache.flink.storm.api.FlinkTopology.copyObject(FlinkTopology.java:145)at
org.apache.flink.storm.api.FlinkTopology.getPrivateField(FlinkTopology.java:132)at
org.apache.flink.storm.api.FlinkTopology.<init>(FlinkTopology.java:89)at
org.apache.flink.storm.api.FlinkTopology.createTopology(FlinkTopology.java:105)at
stormWorldCount.WordCountTopology.buildTopology(WordCountTopology.java:96)|

relevant method:

private <T> T copyObject(T object) {
         try {
             return InstantiationUtil.deserializeObject(
                     InstantiationUtil.serializeObject(object),
                     getClass().getClassLoader()
                     );
         } catch (IOException | ClassNotFoundException e) {
             throw new RuntimeException("Failed to copy object.");
         }
     }

sadly another case where we just swallow the exception cause.

On 13.04.2016 14:35, Chesnay Schepler wrote:

> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api 
>
>
> On 13.04.2016 14:28, Till Rohrmann wrote:
>> Hi jstar,
>>
>> what's exactly the problem you're observing?
>>
>> Cheers,
>> Till
>>
>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong <[hidden email]>
>> wrote:
>>
>>> Hi there,
>>>
>>> I'm jstar. I have been playing around with flink. I'm very much
>>> interested
>>> in submitting a topoloy  to flink using its api. As indicated
>>> on stackoverflow, that is the try that I have given. But I was stuck
>>> with
>>> some exception. Please any help will be welcoming.
>>>
>>>
>>> Thanks.
>>> jstar
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

star jlong
In reply to this post by Chesnay Schepler-3
Hi Schepler,

Thanks for the concerned. Yes I'm actaully having the same issue as indicated on that post because I'm the one that posted that issue.

    Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <[hidden email]> a écrit :
 

 http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api

On 13.04.2016 14:28, Till Rohrmann wrote:

> Hi jstar,
>
> what's exactly the problem you're observing?
>
> Cheers,
> Till
>
> On Wed, Apr 13, 2016 at 2:23 PM, star jlong <[hidden email]>
> wrote:
>
>> Hi there,
>>
>> I'm jstar. I have been playing around with flink. I'm very much interested
>> in submitting a topoloy  to flink using its api. As indicated
>> on stackoverflow, that is the try that I have given. But I was stuck with
>> some exception. Please any help will be welcoming.
>>
>>
>> Thanks.
>> jstar



Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

Till Rohrmann
Yes that is true. I'll commit a hotfix for that.

My suspicion is that we use the wrong class loader in the
FlinkTopology.copyObject method to load the RandomSentenceSpout class. We
can see that once I removed the exception swallowing in the current master.

On Wed, Apr 13, 2016 at 2:40 PM, star jlong <[hidden email]>
wrote:

> Hi Schepler,
>
> Thanks for the concerned. Yes I'm actaully having the same issue as
> indicated on that post because I'm the one that posted that issue.
>
>     Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <[hidden email]>
> a écrit :
>
>
>
> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>
> On 13.04.2016 14:28, Till Rohrmann wrote:
> > Hi jstar,
> >
> > what's exactly the problem you're observing?
> >
> > Cheers,
> > Till
> >
> > On Wed, Apr 13, 2016 at 2:23 PM, star jlong <[hidden email]>
> > wrote:
> >
> >> Hi there,
> >>
> >> I'm jstar. I have been playing around with flink. I'm very much
> interested
> >> in submitting a topoloy  to flink using its api. As indicated
> >> on stackoverflow, that is the try that I have given. But I was stuck
> with
> >> some exception. Please any help will be welcoming.
> >>
> >>
> >> Thanks.
> >> jstar
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

Till Rohrmann
I've updated the master. Could you check it out and run your program with
the latest master? I would expect to see a ClassNotFoundException.

On Wed, Apr 13, 2016 at 2:54 PM, Till Rohrmann <[hidden email]> wrote:

> Yes that is true. I'll commit a hotfix for that.
>
> My suspicion is that we use the wrong class loader in the
> FlinkTopology.copyObject method to load the RandomSentenceSpout class. We
> can see that once I removed the exception swallowing in the current master.
>
> On Wed, Apr 13, 2016 at 2:40 PM, star jlong <[hidden email]>
> wrote:
>
>> Hi Schepler,
>>
>> Thanks for the concerned. Yes I'm actaully having the same issue as
>> indicated on that post because I'm the one that posted that issue.
>>
>>     Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <[hidden email]>
>> a écrit :
>>
>>
>>
>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>>
>> On 13.04.2016 14:28, Till Rohrmann wrote:
>> > Hi jstar,
>> >
>> > what's exactly the problem you're observing?
>> >
>> > Cheers,
>> > Till
>> >
>> > On Wed, Apr 13, 2016 at 2:23 PM, star jlong <[hidden email]
>> >
>> > wrote:
>> >
>> >> Hi there,
>> >>
>> >> I'm jstar. I have been playing around with flink. I'm very much
>> interested
>> >> in submitting a topoloy  to flink using its api. As indicated
>> >> on stackoverflow, that is the try that I have given. But I was stuck
>> with
>> >> some exception. Please any help will be welcoming.
>> >>
>> >>
>> >> Thanks.
>> >> jstar
>>
>>
>>
>>
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

star jlong
In reply to this post by star jlong
Ok, it seems like there an issue with the api. So please does anybody has a working example for deploying a topology using the flink dependency flink-storm_2.11 or any other will be welcoming.

Thanks,
jstar

    Le Mercredi 13 avril 2016 13h44, star jlong <[hidden email]> a écrit :
 

 Hi Schepler,

Thanks for the concerned. Yes I'm actaully having the same issue as indicated on that post because I'm the one that posted that issue.

    Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <[hidden email]> a écrit :
 

 http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api

On 13.04.2016 14:28, Till Rohrmann wrote:

> Hi jstar,
>
> what's exactly the problem you're observing?
>
> Cheers,
> Till
>
> On Wed, Apr 13, 2016 at 2:23 PM, star jlong <[hidden email]>
> wrote:
>
>> Hi there,
>>
>> I'm jstar. I have been playing around with flink. I'm very much interested
>> in submitting a topoloy  to flink using its api. As indicated
>> on stackoverflow, that is the try that I have given. But I was stuck with
>> some exception. Please any help will be welcoming.
>>
>>
>> Thanks.
>> jstar





Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

Chesnay Schepler-3
you can find examples here:
https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples

we haven't established yet that it is an API issue; it could very well
be caused by the reflection magic you're using...

On 13.04.2016 14:57, star jlong wrote:

> Ok, it seems like there an issue with the api. So please does anybody has a working example for deploying a topology using the flink dependency flink-storm_2.11 or any other will be welcoming.
>
> Thanks,
> jstar
>
>      Le Mercredi 13 avril 2016 13h44, star jlong <[hidden email]> a écrit :
>  
>
>   Hi Schepler,
>
> Thanks for the concerned. Yes I'm actaully having the same issue as indicated on that post because I'm the one that posted that issue.
>
>      Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <[hidden email]> a écrit :
>  
>
>   http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>
> On 13.04.2016 14:28, Till Rohrmann wrote:
>> Hi jstar,
>>
>> what's exactly the problem you're observing?
>>
>> Cheers,
>> Till
>>
>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong <[hidden email]>
>> wrote:
>>
>>> Hi there,
>>>
>>> I'm jstar. I have been playing around with flink. I'm very much interested
>>> in submitting a topoloy  to flink using its api. As indicated
>>> on stackoverflow, that is the try that I have given. But I was stuck with
>>> some exception. Please any help will be welcoming.
>>>
>>>
>>> Thanks.
>>> jstar
>
>
>
>
>    

Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

star jlong
Thanks for the suggestion. Sure those examples are interesting and I have deploy them successfully on flink. The deployment is done the command line that is doing something like
bin/flink run example.jarBut what I want is to submit the topology to flink using a java program.

Thanks.

    Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <[hidden email]> a écrit :
 

 you can find examples here:
https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples

we haven't established yet that it is an API issue; it could very well
be caused by the reflection magic you're using...

On 13.04.2016 14:57, star jlong wrote:

> Ok, it seems like there an issue with the api. So please does anybody has a working example for deploying a topology using the flink dependency flink-storm_2.11 or any other will be welcoming.
>
> Thanks,
> jstar
>
>      Le Mercredi 13 avril 2016 13h44, star jlong <[hidden email]> a écrit :

>
>  Hi Schepler,
>
> Thanks for the concerned. Yes I'm actaully having the same issue as indicated on that post because I'm the one that posted that issue.
>
>      Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <[hidden email]> a écrit :

>
http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>
> On 13.04.2016 14:28, Till Rohrmann wrote:
>> Hi jstar,
>>
>> what's exactly the problem you're observing?
>>
>> Cheers,
>> Till
>>
>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong <[hidden email]>
>> wrote:
>>
>>> Hi there,
>>>
>>> I'm jstar. I have been playing around with flink. I'm very much interested
>>> in submitting a topoloy  to flink using its api. As indicated
>>> on stackoverflow, that is the try that I have given. But I was stuck with
>>> some exception. Please any help will be welcoming.
>>>
>>>
>>> Thanks.
>>> jstar
>
>
>
>
>   



Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

Stephan Ewen
Hi!

For flink standalone programs, you would use a "RemoteEnvironment"

For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
That one should deal with jars, classloaders, etc for you.

Stephan


On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]>
wrote:

> Thanks for the suggestion. Sure those examples are interesting and I have
> deploy them successfully on flink. The deployment is done the command line
> that is doing something like
> bin/flink run example.jarBut what I want is to submit the topology to
> flink using a java program.
>
> Thanks.
>
>     Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <[hidden email]>
> a écrit :
>
>
>  you can find examples here:
>
> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
>
> we haven't established yet that it is an API issue; it could very well
> be caused by the reflection magic you're using...
>
> On 13.04.2016 14:57, star jlong wrote:
> > Ok, it seems like there an issue with the api. So please does anybody
> has a working example for deploying a topology using the flink dependency
> flink-storm_2.11 or any other will be welcoming.
> >
> > Thanks,
> > jstar
> >
> >      Le Mercredi 13 avril 2016 13h44, star jlong
> <[hidden email]> a écrit :
> >
> >
> >  Hi Schepler,
> >
> > Thanks for the concerned. Yes I'm actaully having the same issue as
> indicated on that post because I'm the one that posted that issue.
> >
> >      Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
> [hidden email]> a écrit :
> >
> >
> >
> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
> >
> > On 13.04.2016 14:28, Till Rohrmann wrote:
> >> Hi jstar,
> >>
> >> what's exactly the problem you're observing?
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Apr 13, 2016 at 2:23 PM, star jlong <[hidden email]
> >
> >> wrote:
> >>
> >>> Hi there,
> >>>
> >>> I'm jstar. I have been playing around with flink. I'm very much
> interested
> >>> in submitting a topoloy  to flink using its api. As indicated
> >>> on stackoverflow, that is the try that I have given. But I was stuck
> with
> >>> some exception. Please any help will be welcoming.
> >>>
> >>>
> >>> Thanks.
> >>> jstar
> >
> >
> >
> >
> >
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

Till Rohrmann
I think this is not the problem here since the problem is still happening
on the client side when the FlinkTopology tries to copy the registered
spouts. This happens before the job is submitted to the cluster. Maybe
Mathias could chime in here.

Cheers,
Till

On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote:

> Hi!
>
> For flink standalone programs, you would use a "RemoteEnvironment"
>
> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
> That one should deal with jars, classloaders, etc for you.
>
> Stephan
>
>
> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]>
> wrote:
>
> > Thanks for the suggestion. Sure those examples are interesting and I have
> > deploy them successfully on flink. The deployment is done the command
> line
> > that is doing something like
> > bin/flink run example.jarBut what I want is to submit the topology to
> > flink using a java program.
> >
> > Thanks.
> >
> >     Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
> [hidden email]>
> > a écrit :
> >
> >
> >  you can find examples here:
> >
> >
> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
> >
> > we haven't established yet that it is an API issue; it could very well
> > be caused by the reflection magic you're using...
> >
> > On 13.04.2016 14:57, star jlong wrote:
> > > Ok, it seems like there an issue with the api. So please does anybody
> > has a working example for deploying a topology using the flink dependency
> > flink-storm_2.11 or any other will be welcoming.
> > >
> > > Thanks,
> > > jstar
> > >
> > >      Le Mercredi 13 avril 2016 13h44, star jlong
> > <[hidden email]> a écrit :
> > >
> > >
> > >  Hi Schepler,
> > >
> > > Thanks for the concerned. Yes I'm actaully having the same issue as
> > indicated on that post because I'm the one that posted that issue.
> > >
> > >      Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
> > [hidden email]> a écrit :
> > >
> > >
> > >
> >
> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
> > >
> > > On 13.04.2016 14:28, Till Rohrmann wrote:
> > >> Hi jstar,
> > >>
> > >> what's exactly the problem you're observing?
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
> <[hidden email]
> > >
> > >> wrote:
> > >>
> > >>> Hi there,
> > >>>
> > >>> I'm jstar. I have been playing around with flink. I'm very much
> > interested
> > >>> in submitting a topoloy  to flink using its api. As indicated
> > >>> on stackoverflow, that is the try that I have given. But I was stuck
> > with
> > >>> some exception. Please any help will be welcoming.
> > >>>
> > >>>
> > >>> Thanks.
> > >>> jstar
> > >
> > >
> > >
> > >
> > >
> >
> >
> >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

star jlong
Thanks for the reply.
@Stephen, I try using RemoteEnvironment to submit my topology to flink. 
Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute();
While running the program, this is the exception that I got.
java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
 

    Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit :
 

 I think this is not the problem here since the problem is still happening
on the client side when the FlinkTopology tries to copy the registered
spouts. This happens before the job is submitted to the cluster. Maybe
Mathias could chime in here.

Cheers,
Till

On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote:

> Hi!
>
> For flink standalone programs, you would use a "RemoteEnvironment"
>
> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
> That one should deal with jars, classloaders, etc for you.
>
> Stephan
>
>
> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]>
> wrote:
>
> > Thanks for the suggestion. Sure those examples are interesting and I have
> > deploy them successfully on flink. The deployment is done the command
> line
> > that is doing something like
> > bin/flink run example.jarBut what I want is to submit the topology to
> > flink using a java program.
> >
> > Thanks.
> >
> >    Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
> [hidden email]>
> > a écrit :
> >
> >
> >  you can find examples here:
> >
> >
> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
> >
> > we haven't established yet that it is an API issue; it could very well
> > be caused by the reflection magic you're using...
> >
> > On 13.04.2016 14:57, star jlong wrote:
> > > Ok, it seems like there an issue with the api. So please does anybody
> > has a working example for deploying a topology using the flink dependency
> > flink-storm_2.11 or any other will be welcoming.
> > >
> > > Thanks,
> > > jstar
> > >
> > >      Le Mercredi 13 avril 2016 13h44, star jlong
> > <[hidden email]> a écrit :
> > >
> > >
> > >  Hi Schepler,
> > >
> > > Thanks for the concerned. Yes I'm actaully having the same issue as
> > indicated on that post because I'm the one that posted that issue.
> > >
> > >      Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
> > [hidden email]> a écrit :
> > >
> > >
> > >
> >
> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
> > >
> > > On 13.04.2016 14:28, Till Rohrmann wrote:
> > >> Hi jstar,
> > >>
> > >> what's exactly the problem you're observing?
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
> <[hidden email]
> > >
> > >> wrote:
> > >>
> > >>> Hi there,
> > >>>
> > >>> I'm jstar. I have been playing around with flink. I'm very much
> > interested
> > >>> in submitting a topoloy  to flink using its api. As indicated
> > >>> on stackoverflow, that is the try that I have given. But I was stuck
> > with
> > >>> some exception. Please any help will be welcoming.
> > >>>
> > >>>
> > >>> Thanks.
> > >>> jstar
> > >
> > >
> > >
> > >
> > >
> >
> >
> >
> >
> >
>

Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

Matthias J. Sax-2
Hi jstar,

I need to have a close look. But I am wondering why you use reflection
in the first place? Is there any specific reason for that?

Furthermore, the example provided in project maven-example also covers
the case to submit a topology to Flink via Java. Have a look at
org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter

It contains a main() method and you can just run it as a regular Java
program in your IDE.

The SO question example should also work; it also contains a main()
method, so you should be able to run it.

Btw: If you use Storm-Compatiblitly-API there is no reason the get an
ExecutuionEnvironment in you code. This happen automatically with
FlinkClient/FlinkSubmitter.

Furthermore, I would recommend to use FlinkSubmitter instead of
FlinkClient as it is somewhat simpler to use.

About SO question: I guess the problem is the jar assembling. The user says

"Since I'using maven to handle my dependencies, I do a Mvn clean install
to obtain the jar."

I guess this is not sufficient to bundle a correct jar. Have a look into
pom.xml from storm-examples. It uses maven plug-ins in assemble the jar
correctly. (Regular maven artifact do not work for job submission...)

Will have a close look and follow up... Hope this helps already.

-Matthias

On 04/13/2016 06:23 PM, star jlong wrote:

> Thanks for the reply.
> @Stephen, I try using RemoteEnvironment to submit my topology to flink.
> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute();
> While running the program, this is the exception that I got.
> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
>  
>
>     Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit :
>  
>
>  I think this is not the problem here since the problem is still happening
> on the client side when the FlinkTopology tries to copy the registered
> spouts. This happens before the job is submitted to the cluster. Maybe
> Mathias could chime in here.
>
> Cheers,
> Till
>
> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote:
>
>> Hi!
>>
>> For flink standalone programs, you would use a "RemoteEnvironment"
>>
>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
>> That one should deal with jars, classloaders, etc for you.
>>
>> Stephan
>>
>>
>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]>
>> wrote:
>>
>>> Thanks for the suggestion. Sure those examples are interesting and I have
>>> deploy them successfully on flink. The deployment is done the command
>> line
>>> that is doing something like
>>> bin/flink run example.jarBut what I want is to submit the topology to
>>> flink using a java program.
>>>
>>> Thanks.
>>>
>>>     Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
>> [hidden email]>
>>> a écrit :
>>>
>>>
>>>   you can find examples here:
>>>
>>>
>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
>>>
>>> we haven't established yet that it is an API issue; it could very well
>>> be caused by the reflection magic you're using...
>>>
>>> On 13.04.2016 14:57, star jlong wrote:
>>>> Ok, it seems like there an issue with the api. So please does anybody
>>> has a working example for deploying a topology using the flink dependency
>>> flink-storm_2.11 or any other will be welcoming.
>>>>
>>>> Thanks,
>>>> jstar
>>>>
>>>>       Le Mercredi 13 avril 2016 13h44, star jlong
>>> <[hidden email]> a écrit :
>>>>
>>>>
>>>>   Hi Schepler,
>>>>
>>>> Thanks for the concerned. Yes I'm actaully having the same issue as
>>> indicated on that post because I'm the one that posted that issue.
>>>>
>>>>       Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
>>> [hidden email]> a écrit :
>>>>
>>>>
>>>>
>>>
>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>>>>
>>>> On 13.04.2016 14:28, Till Rohrmann wrote:
>>>>> Hi jstar,
>>>>>
>>>>> what's exactly the problem you're observing?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
>> <[hidden email]
>>>>
>>>>> wrote:
>>>>>
>>>>>> Hi there,
>>>>>>
>>>>>> I'm jstar. I have been playing around with flink. I'm very much
>>> interested
>>>>>> in submitting a topoloy  to flink using its api. As indicated
>>>>>> on stackoverflow, that is the try that I have given. But I was stuck
>>> with
>>>>>> some exception. Please any help will be welcoming.
>>>>>>
>>>>>>
>>>>>> Thanks.
>>>>>> jstar
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>
>>>
>>
>
>  
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

Stephan Ewen
In reply to this post by star jlong
Hi!

For a Storm program, you would need a "RemoteStreamEnvironment" - the
"RemoteEnvironment" is for batch programs.

Stephan

On Wed, Apr 13, 2016 at 6:23 PM, star jlong <[hidden email]>
wrote:

> Thanks for the reply.
> @Stephen, I try using RemoteEnvironment to submit my topology to flink.
> Here is the try that I did RemoteEnvironment remote = new
> RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute();
> While running the program, this is the exception that I got.
> java.lang.RuntimeException: No data sinks have been created yet. A program
> needs at least one sink that consumes data. Examples are writing the data
> set or printing it.
>
>
>     Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]>
> a écrit :
>
>
>  I think this is not the problem here since the problem is still happening
> on the client side when the FlinkTopology tries to copy the registered
> spouts. This happens before the job is submitted to the cluster. Maybe
> Mathias could chime in here.
>
> Cheers,
> Till
>
> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote:
>
> > Hi!
> >
> > For flink standalone programs, you would use a "RemoteEnvironment"
> >
> > For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
> > That one should deal with jars, classloaders, etc for you.
> >
> > Stephan
> >
> >
> > On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]>
> > wrote:
> >
> > > Thanks for the suggestion. Sure those examples are interesting and I
> have
> > > deploy them successfully on flink. The deployment is done the command
> > line
> > > that is doing something like
> > > bin/flink run example.jarBut what I want is to submit the topology to
> > > flink using a java program.
> > >
> > > Thanks.
> > >
> > >    Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
> > [hidden email]>
> > > a écrit :
> > >
> > >
> > >  you can find examples here:
> > >
> > >
> >
> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
> > >
> > > we haven't established yet that it is an API issue; it could very well
> > > be caused by the reflection magic you're using...
> > >
> > > On 13.04.2016 14:57, star jlong wrote:
> > > > Ok, it seems like there an issue with the api. So please does anybody
> > > has a working example for deploying a topology using the flink
> dependency
> > > flink-storm_2.11 or any other will be welcoming.
> > > >
> > > > Thanks,
> > > > jstar
> > > >
> > > >      Le Mercredi 13 avril 2016 13h44, star jlong
> > > <[hidden email]> a écrit :
> > > >
> > > >
> > > >  Hi Schepler,
> > > >
> > > > Thanks for the concerned. Yes I'm actaully having the same issue as
> > > indicated on that post because I'm the one that posted that issue.
> > > >
> > > >      Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
> > > [hidden email]> a écrit :
> > > >
> > > >
> > > >
> > >
> >
> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
> > > >
> > > > On 13.04.2016 14:28, Till Rohrmann wrote:
> > > >> Hi jstar,
> > > >>
> > > >> what's exactly the problem you're observing?
> > > >>
> > > >> Cheers,
> > > >> Till
> > > >>
> > > >> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
> > <[hidden email]
> > > >
> > > >> wrote:
> > > >>
> > > >>> Hi there,
> > > >>>
> > > >>> I'm jstar. I have been playing around with flink. I'm very much
> > > interested
> > > >>> in submitting a topoloy  to flink using its api. As indicated
> > > >>> on stackoverflow, that is the try that I have given. But I was
> stuck
> > > with
> > > >>> some exception. Please any help will be welcoming.
> > > >>>
> > > >>>
> > > >>> Thanks.
> > > >>> jstar
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > >
> > >
> > >
> >
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

star jlong
In reply to this post by Matthias J. Sax-2
Thanks Matthias for the reply. 
Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken.
1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with 
./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology
At this level everything went well.

Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink.

I know that is possible because I have used the same procedure with a storm topology that it works perfectly well.
What I'm missing please?
 jstar

    Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit :
 

 Hi jstar,

I need to have a close look. But I am wondering why you use reflection
in the first place? Is there any specific reason for that?

Furthermore, the example provided in project maven-example also covers
the case to submit a topology to Flink via Java. Have a look at
org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter

It contains a main() method and you can just run it as a regular Java
program in your IDE.

The SO question example should also work; it also contains a main()
method, so you should be able to run it.

Btw: If you use Storm-Compatiblitly-API there is no reason the get an
ExecutuionEnvironment in you code. This happen automatically with
FlinkClient/FlinkSubmitter.

Furthermore, I would recommend to use FlinkSubmitter instead of
FlinkClient as it is somewhat simpler to use.

About SO question: I guess the problem is the jar assembling. The user says

"Since I'using maven to handle my dependencies, I do a Mvn clean install
to obtain the jar."

I guess this is not sufficient to bundle a correct jar. Have a look into
pom.xml from storm-examples. It uses maven plug-ins in assemble the jar
correctly. (Regular maven artifact do not work for job submission...)

Will have a close look and follow up... Hope this helps already.

-Matthias

On 04/13/2016 06:23 PM, star jlong wrote:

> Thanks for the reply.
> @Stephen, I try using RemoteEnvironment to submit my topology to flink.
> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute();
> While running the program, this is the exception that I got.
> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.

>
>    Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit :

>
>  I think this is not the problem here since the problem is still happening
> on the client side when the FlinkTopology tries to copy the registered
> spouts. This happens before the job is submitted to the cluster. Maybe
> Mathias could chime in here.
>
> Cheers,
> Till
>
> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote:
>
>> Hi!
>>
>> For flink standalone programs, you would use a "RemoteEnvironment"
>>
>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
>> That one should deal with jars, classloaders, etc for you.
>>
>> Stephan
>>
>>
>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]>
>> wrote:
>>
>>> Thanks for the suggestion. Sure those examples are interesting and I have
>>> deploy them successfully on flink. The deployment is done the command
>> line
>>> that is doing something like
>>> bin/flink run example.jarBut what I want is to submit the topology to
>>> flink using a java program.
>>>
>>> Thanks.
>>>
>>>    Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
>> [hidden email]>
>>> a écrit :
>>>
>>>
>>>  you can find examples here:
>>>
>>>
>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
>>>
>>> we haven't established yet that it is an API issue; it could very well
>>> be caused by the reflection magic you're using...
>>>
>>> On 13.04.2016 14:57, star jlong wrote:
>>>> Ok, it seems like there an issue with the api. So please does anybody
>>> has a working example for deploying a topology using the flink dependency
>>> flink-storm_2.11 or any other will be welcoming.
>>>>
>>>> Thanks,
>>>> jstar
>>>>
>>>>      Le Mercredi 13 avril 2016 13h44, star jlong
>>> <[hidden email]> a écrit :
>>>>
>>>>
>>>>  Hi Schepler,
>>>>
>>>> Thanks for the concerned. Yes I'm actaully having the same issue as
>>> indicated on that post because I'm the one that posted that issue.
>>>>
>>>>      Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
>>> [hidden email]> a écrit :
>>>>
>>>>
>>>>
>>>
>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>>>>
>>>> On 13.04.2016 14:28, Till Rohrmann wrote:
>>>>> Hi jstar,
>>>>>
>>>>> what's exactly the problem you're observing?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
>> <[hidden email]
>>>>
>>>>> wrote:
>>>>>
>>>>>> Hi there,
>>>>>>
>>>>>> I'm jstar. I have been playing around with flink. I'm very much
>>> interested
>>>>>> in submitting a topoloy  to flink using its api. As indicated
>>>>>> on stackoverflow, that is the try that I have given. But I was stuck
>>> with
>>>>>> some exception. Please any help will be welcoming.
>>>>>>
>>>>>>
>>>>>> Thanks.
>>>>>> jstar
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>
>>>
>>
>

>


Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

star jlong
@Stephan, 

I have try using RemoteStreamEnvironment but I have another exception which is 
java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.


 

    Le Mercredi 13 avril 2016 20h40, star jlong <[hidden email]> a écrit :
 

 Thanks Matthias for the reply. 
Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken.
1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with 
./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology
At this level everything went well.

Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink.

I know that is possible because I have used the same procedure with a storm topology that it works perfectly well.
What I'm missing please?
 jstar

    Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit :
 

 Hi jstar,

I need to have a close look. But I am wondering why you use reflection
in the first place? Is there any specific reason for that?

Furthermore, the example provided in project maven-example also covers
the case to submit a topology to Flink via Java. Have a look at
org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter

It contains a main() method and you can just run it as a regular Java
program in your IDE.

The SO question example should also work; it also contains a main()
method, so you should be able to run it.

Btw: If you use Storm-Compatiblitly-API there is no reason the get an
ExecutuionEnvironment in you code. This happen automatically with
FlinkClient/FlinkSubmitter.

Furthermore, I would recommend to use FlinkSubmitter instead of
FlinkClient as it is somewhat simpler to use.

About SO question: I guess the problem is the jar assembling. The user says

"Since I'using maven to handle my dependencies, I do a Mvn clean install
to obtain the jar."

I guess this is not sufficient to bundle a correct jar. Have a look into
pom.xml from storm-examples. It uses maven plug-ins in assemble the jar
correctly. (Regular maven artifact do not work for job submission...)

Will have a close look and follow up... Hope this helps already.

-Matthias

On 04/13/2016 06:23 PM, star jlong wrote:

> Thanks for the reply.
> @Stephen, I try using RemoteEnvironment to submit my topology to flink.
> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute();
> While running the program, this is the exception that I got.
> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.

>
>    Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit :

>
>  I think this is not the problem here since the problem is still happening
> on the client side when the FlinkTopology tries to copy the registered
> spouts. This happens before the job is submitted to the cluster. Maybe
> Mathias could chime in here.
>
> Cheers,
> Till
>
> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote:
>
>> Hi!
>>
>> For flink standalone programs, you would use a "RemoteEnvironment"
>>
>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
>> That one should deal with jars, classloaders, etc for you.
>>
>> Stephan
>>
>>
>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]>
>> wrote:
>>
>>> Thanks for the suggestion. Sure those examples are interesting and I have
>>> deploy them successfully on flink. The deployment is done the command
>> line
>>> that is doing something like
>>> bin/flink run example.jarBut what I want is to submit the topology to
>>> flink using a java program.
>>>
>>> Thanks.
>>>
>>>    Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
>> [hidden email]>
>>> a écrit :
>>>
>>>
>>>  you can find examples here:
>>>
>>>
>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
>>>
>>> we haven't established yet that it is an API issue; it could very well
>>> be caused by the reflection magic you're using...
>>>
>>> On 13.04.2016 14:57, star jlong wrote:
>>>> Ok, it seems like there an issue with the api. So please does anybody
>>> has a working example for deploying a topology using the flink dependency
>>> flink-storm_2.11 or any other will be welcoming.
>>>>
>>>> Thanks,
>>>> jstar
>>>>
>>>>      Le Mercredi 13 avril 2016 13h44, star jlong
>>> <[hidden email]> a écrit :
>>>>
>>>>
>>>>  Hi Schepler,
>>>>
>>>> Thanks for the concerned. Yes I'm actaully having the same issue as
>>> indicated on that post because I'm the one that posted that issue.
>>>>
>>>>      Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
>>> [hidden email]> a écrit :
>>>>
>>>>
>>>>
>>>
>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>>>>
>>>> On 13.04.2016 14:28, Till Rohrmann wrote:
>>>>> Hi jstar,
>>>>>
>>>>> what's exactly the problem you're observing?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
>> <[hidden email]
>>>>
>>>>> wrote:
>>>>>
>>>>>> Hi there,
>>>>>>
>>>>>> I'm jstar. I have been playing around with flink. I'm very much
>>> interested
>>>>>> in submitting a topoloy  to flink using its api. As indicated
>>>>>> on stackoverflow, that is the try that I have given. But I was stuck
>>> with
>>>>>> some exception. Please any help will be welcoming.
>>>>>>
>>>>>>
>>>>>> Thanks.
>>>>>> jstar
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>
>>>
>>
>

>




Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

Matthias J. Sax-2
In reply to this post by star jlong
I cannot follow completely in your last step when you fail. What do you
mean by "I'm stuck at the level when I want to copy that from the jar to
submit it to flink"?

Btw: I copied the code from the SO question and it works for me on the
current master (which includes Till's hotfix).

-Matthias


On 04/13/2016 09:39 PM, star jlong wrote:

> Thanks Matthias for the reply.
> Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken.
> 1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with
> ./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology
> At this level everything went well.
>
> Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink.
>
> I know that is possible because I have used the same procedure with a storm topology that it works perfectly well.
> What I'm missing please?
>  jstar
>
>     Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit :
>  
>
>  Hi jstar,
>
> I need to have a close look. But I am wondering why you use reflection
> in the first place? Is there any specific reason for that?
>
> Furthermore, the example provided in project maven-example also covers
> the case to submit a topology to Flink via Java. Have a look at
> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter
>
> It contains a main() method and you can just run it as a regular Java
> program in your IDE.
>
> The SO question example should also work; it also contains a main()
> method, so you should be able to run it.
>
> Btw: If you use Storm-Compatiblitly-API there is no reason the get an
> ExecutuionEnvironment in you code. This happen automatically with
> FlinkClient/FlinkSubmitter.
>
> Furthermore, I would recommend to use FlinkSubmitter instead of
> FlinkClient as it is somewhat simpler to use.
>
> About SO question: I guess the problem is the jar assembling. The user says
>
> "Since I'using maven to handle my dependencies, I do a Mvn clean install
> to obtain the jar."
>
> I guess this is not sufficient to bundle a correct jar. Have a look into
> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar
> correctly. (Regular maven artifact do not work for job submission...)
>
> Will have a close look and follow up... Hope this helps already.
>
> -Matthias
>
> On 04/13/2016 06:23 PM, star jlong wrote:
>> Thanks for the reply.
>> @Stephen, I try using RemoteEnvironment to submit my topology to flink.
>> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute();
>> While running the program, this is the exception that I got.
>> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
>>  
>>
>>     Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit :
>>  
>>
>>   I think this is not the problem here since the problem is still happening
>> on the client side when the FlinkTopology tries to copy the registered
>> spouts. This happens before the job is submitted to the cluster. Maybe
>> Mathias could chime in here.
>>
>> Cheers,
>> Till
>>
>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote:
>>
>>> Hi!
>>>
>>> For flink standalone programs, you would use a "RemoteEnvironment"
>>>
>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
>>> That one should deal with jars, classloaders, etc for you.
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]>
>>> wrote:
>>>
>>>> Thanks for the suggestion. Sure those examples are interesting and I have
>>>> deploy them successfully on flink. The deployment is done the command
>>> line
>>>> that is doing something like
>>>> bin/flink run example.jarBut what I want is to submit the topology to
>>>> flink using a java program.
>>>>
>>>> Thanks.
>>>>
>>>>     Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
>>> [hidden email]>
>>>> a écrit :
>>>>
>>>>
>>>>   you can find examples here:
>>>>
>>>>
>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
>>>>
>>>> we haven't established yet that it is an API issue; it could very well
>>>> be caused by the reflection magic you're using...
>>>>
>>>> On 13.04.2016 14:57, star jlong wrote:
>>>>> Ok, it seems like there an issue with the api. So please does anybody
>>>> has a working example for deploying a topology using the flink dependency
>>>> flink-storm_2.11 or any other will be welcoming.
>>>>>
>>>>> Thanks,
>>>>> jstar
>>>>>
>>>>>       Le Mercredi 13 avril 2016 13h44, star jlong
>>>> <[hidden email]> a écrit :
>>>>>
>>>>>
>>>>>   Hi Schepler,
>>>>>
>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as
>>>> indicated on that post because I'm the one that posted that issue.
>>>>>
>>>>>       Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
>>>> [hidden email]> a écrit :
>>>>>
>>>>>
>>>>>
>>>>
>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>>>>>
>>>>> On 13.04.2016 14:28, Till Rohrmann wrote:
>>>>>> Hi jstar,
>>>>>>
>>>>>> what's exactly the problem you're observing?
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
>>> <[hidden email]
>>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi there,
>>>>>>>
>>>>>>> I'm jstar. I have been playing around with flink. I'm very much
>>>> interested
>>>>>>> in submitting a topoloy  to flink using its api. As indicated
>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck
>>>> with
>>>>>>> some exception. Please any help will be welcoming.
>>>>>>>
>>>>>>>
>>>>>>> Thanks.
>>>>>>> jstar
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>>  
>>
>
>
>  
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Issue deploying a topology to flink with a java api

star jlong
What I'm  trying to say is that to get submit the flink topology to flink, I had to do an invocation of the mainMethod(which contain the actaul topology) of my topology with the class java.lang.reflect.Method.That is if you a take look at the following the topology the mainMethod is buildTopologypublic class WordCountTopology {
    public static void main(String[] args) throws Exception {

    Config conf = new Config();
    conf.setDebug(true);
    if (args != null && args.length > 0) {

        conf.setNumWorkers(1);
        conf.setMaxTaskParallelism(1);
        FlinkSubmitter.submitTopology(args[0], conf, buildTopology());

    }
    // Otherwise, we are running locally
    else {
        conf.setMaxTaskParallelism(1);
        FlinkLocalCluster cluster = new FlinkLocalCluster();
        cluster.submitTopology("word-count", conf, buildTopology());
        Thread.sleep(10000);
    }
}

public static FlinkTopology buildTopology() {

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new RandomSentenceSpout(), 1);
    builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout");
    builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word"));

    builder.setBolt("writeIntoFile", new BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() {
        private static final long serialVersionUID = 1L;

        @Override
        public String format(Tuple tuple) {
            return tuple.toString();
        }
    }), 1).shuffleGrouping("count");

    return FlinkTopology.createTopology(builder);

}
}That is the method that I want to invoke from my jar so that I will be able to do the submitting of the topology without any problem ie

final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, uploadedJarLocation, getFlinkTopogy(String.format("file://%s", jarPath),properties.getProperty("topologyMainClass"), properties.getProperty("methodName")));
Where getFlinkTopology() return the contains actually topology

But while doing that reflection I had an exception.

Another question please. How do I make used of the hotflix of Till.

    Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <[hidden email]> a écrit :
 

 I cannot follow completely in your last step when you fail. What do you
mean by "I'm stuck at the level when I want to copy that from the jar to
submit it to flink"?

Btw: I copied the code from the SO question and it works for me on the
current master (which includes Till's hotfix).

-Matthias


On 04/13/2016 09:39 PM, star jlong wrote:

> Thanks Matthias for the reply.
> Maybe I should explain what I want to do better.My objective is to deploy a flink topology on flink using java but in the production mode. For that here are the step that I have taken.
> 1-Convert a sample wordcount storm topology to a flink topology as indicated here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean install then submitting the jar to flink on the command line with
> ./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar myFlinkTopology
> At this level everything went well.
>
> Then I wanted to submit the same jar to flink on the production mode by using a java program. Then I decided to create a mainMethod in my topology that returns the flinkTopology which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I want to copy that from the jar to submit it to flink.
>
> I know that is possible because I have used the same procedure with a storm topology that it works perfectly well.
> What I'm missing please?
>  jstar
>
>    Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[hidden email]> a écrit :

>
>  Hi jstar,
>
> I need to have a close look. But I am wondering why you use reflection
> in the first place? Is there any specific reason for that?
>
> Furthermore, the example provided in project maven-example also covers
> the case to submit a topology to Flink via Java. Have a look at
> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter
>
> It contains a main() method and you can just run it as a regular Java
> program in your IDE.
>
> The SO question example should also work; it also contains a main()
> method, so you should be able to run it.
>
> Btw: If you use Storm-Compatiblitly-API there is no reason the get an
> ExecutuionEnvironment in you code. This happen automatically with
> FlinkClient/FlinkSubmitter.
>
> Furthermore, I would recommend to use FlinkSubmitter instead of
> FlinkClient as it is somewhat simpler to use.
>
> About SO question: I guess the problem is the jar assembling. The user says
>
> "Since I'using maven to handle my dependencies, I do a Mvn clean install
> to obtain the jar."
>
> I guess this is not sufficient to bundle a correct jar. Have a look into
> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar
> correctly. (Regular maven artifact do not work for job submission...)
>
> Will have a close look and follow up... Hope this helps already.
>
> -Matthias
>
> On 04/13/2016 06:23 PM, star jlong wrote:
>> Thanks for the reply.
>> @Stephen, I try using RemoteEnvironment to submit my topology to flink.
>> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute();
>> While running the program, this is the exception that I got.
>> java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
>> 
>>
>>    Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[hidden email]> a écrit :
>> 
>>
>>  I think this is not the problem here since the problem is still happening
>> on the client side when the FlinkTopology tries to copy the registered
>> spouts. This happens before the job is submitted to the cluster. Maybe
>> Mathias could chime in here.
>>
>> Cheers,
>> Till
>>
>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[hidden email]> wrote:
>>
>>> Hi!
>>>
>>> For flink standalone programs, you would use a "RemoteEnvironment"
>>>
>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
>>> That one should deal with jars, classloaders, etc for you.
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[hidden email]>
>>> wrote:
>>>
>>>> Thanks for the suggestion. Sure those examples are interesting and I have
>>>> deploy them successfully on flink. The deployment is done the command
>>> line
>>>> that is doing something like
>>>> bin/flink run example.jarBut what I want is to submit the topology to
>>>> flink using a java program.
>>>>
>>>> Thanks.
>>>>
>>>>    Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
>>> [hidden email]>
>>>> a écrit :
>>>>
>>>>
>>>>  you can find examples here:
>>>>
>>>>
>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
>>>>
>>>> we haven't established yet that it is an API issue; it could very well
>>>> be caused by the reflection magic you're using...
>>>>
>>>> On 13.04.2016 14:57, star jlong wrote:
>>>>> Ok, it seems like there an issue with the api. So please does anybody
>>>> has a working example for deploying a topology using the flink dependency
>>>> flink-storm_2.11 or any other will be welcoming.
>>>>>
>>>>> Thanks,
>>>>> jstar
>>>>>
>>>>>      Le Mercredi 13 avril 2016 13h44, star jlong
>>>> <[hidden email]> a écrit :
>>>>>
>>>>>
>>>>>  Hi Schepler,
>>>>>
>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as
>>>> indicated on that post because I'm the one that posted that issue.
>>>>>
>>>>>      Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
>>>> [hidden email]> a écrit :
>>>>>
>>>>>
>>>>>
>>>>
>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>>>>>
>>>>> On 13.04.2016 14:28, Till Rohrmann wrote:
>>>>>> Hi jstar,
>>>>>>
>>>>>> what's exactly the problem you're observing?
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
>>> <[hidden email]
>>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi there,
>>>>>>>
>>>>>>> I'm jstar. I have been playing around with flink. I'm very much
>>>> interested
>>>>>>> in submitting a topoloy  to flink using its api. As indicated
>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck
>>>> with
>>>>>>> some exception. Please any help will be welcoming.
>>>>>>>
>>>>>>>
>>>>>>> Thanks.
>>>>>>> jstar
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>> 
>>
>
>

>


12