Question about Infinite Streaming Job on Mini Cluster and ITCase

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about Infinite Streaming Job on Mini Cluster and ITCase

Matthias J. Sax
Hi,

I am trying to run an infinite streaming job (ie, one that does not
terminate because it is generating output date randomly on the fly). I
kill this job with .stop() or .shutdown() method of
ForkableFlinkMiniCluster.

I did not find any example using a similar setup. In the provided
examples, each job terminate automatically, because only a finite input
is processed and the source returns after all data is emitted.


I have multiple question about my setup:

 1) The job never terminates "clean", ie, I get some exceptions. Is this
behavior desired?

 2) Is it possible to get a result back? Similar to
JobClient.submitJobAndWait(...)?

 3) Is it somehow possible, to send a signal to the running job such
that the source can terminate regularly as if finite input would be
processed? Right now, I use an while(running) loop and set 'running' to
false in the .cancel() method.



Thanks for your help!

-Matthias



signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Question about Infinite Streaming Job on Mini Cluster and ITCase

Robert Metzger
Hi Matthias,

the streaming folks can probably answer the questions better. But I'll
write something to bring this message back to their attention ;)

1) Which exceptions are you seeing? Flink should be able to cleanly shut
down.
2) As far as I saw it, the execute() method (of the Streaming API) got an
JobExecutionResult return type in the latest master. That contains
accumulator results.
3) I think the cancel() method is there for exactly that purpose. If the
job is shutting down before the cancel method, that probably a bug.


Robert



On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
[hidden email]> wrote:

> Hi,
>
> I am trying to run an infinite streaming job (ie, one that does not
> terminate because it is generating output date randomly on the fly). I
> kill this job with .stop() or .shutdown() method of
> ForkableFlinkMiniCluster.
>
> I did not find any example using a similar setup. In the provided
> examples, each job terminate automatically, because only a finite input
> is processed and the source returns after all data is emitted.
>
>
> I have multiple question about my setup:
>
>  1) The job never terminates "clean", ie, I get some exceptions. Is this
> behavior desired?
>
>  2) Is it possible to get a result back? Similar to
> JobClient.submitJobAndWait(...)?
>
>  3) Is it somehow possible, to send a signal to the running job such
> that the source can terminate regularly as if finite input would be
> processed? Right now, I use an while(running) loop and set 'running' to
> false in the .cancel() method.
>
>
>
> Thanks for your help!
>
> -Matthias
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Question about Infinite Streaming Job on Mini Cluster and ITCase

Matthias J. Sax
Hi Robert,

thanks for your answer.

I get an InterruptedException when I call shutdown():

java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1225)
        at java.lang.Thread.join(Thread.java:1278)
        at
org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55)
        at
org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77)
        at
org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204)
        at
org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195)
        at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
        at java.lang.Thread.run(Thread.java:701)


About the JobExecutionResult:

I added a new method to the API, that calls
JobClient.submitJobDetached(...) instead of
JobClient.submitJobAndWait(...). The "detached" version has no return
value, while the blocking one returns a JobExecutionResult that is
further returned by execute(). So I cannot get a JobExecutionResult
right now.

It would be nice to get the JobExecutionResult when stopping the running
program via a "stop-execution"-call (is there any way to do this?).
Right now, I sleep for a certain time after calling
submitJobDetached(...) an call stop() and shutdown() later on (from
ForkableMiniCluster). The stop() call does not seem to do anything...
shutdown() works (except for the Exception I get -- as described above).


-Matthias


On 03/30/2015 09:08 PM, Robert Metzger wrote:

> Hi Matthias,
>
> the streaming folks can probably answer the questions better. But I'll
> write something to bring this message back to their attention ;)
>
> 1) Which exceptions are you seeing? Flink should be able to cleanly shut
> down.
> 2) As far as I saw it, the execute() method (of the Streaming API) got an
> JobExecutionResult return type in the latest master. That contains
> accumulator results.
> 3) I think the cancel() method is there for exactly that purpose. If the
> job is shutting down before the cancel method, that probably a bug.
>
>
> Robert
>
>
>
> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
> [hidden email]> wrote:
>
>> Hi,
>>
>> I am trying to run an infinite streaming job (ie, one that does not
>> terminate because it is generating output date randomly on the fly). I
>> kill this job with .stop() or .shutdown() method of
>> ForkableFlinkMiniCluster.
>>
>> I did not find any example using a similar setup. In the provided
>> examples, each job terminate automatically, because only a finite input
>> is processed and the source returns after all data is emitted.
>>
>>
>> I have multiple question about my setup:
>>
>>  1) The job never terminates "clean", ie, I get some exceptions. Is this
>> behavior desired?
>>
>>  2) Is it possible to get a result back? Similar to
>> JobClient.submitJobAndWait(...)?
>>
>>  3) Is it somehow possible, to send a signal to the running job such
>> that the source can terminate regularly as if finite input would be
>> processed? Right now, I use an while(running) loop and set 'running' to
>> false in the .cancel() method.
>>
>>
>>
>> Thanks for your help!
>>
>> -Matthias
>>
>>
>>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Question about Infinite Streaming Job on Mini Cluster and ITCase

Márton Balassi
Hey Matthias,

Thanks for reporting the Exception thrown, we were not preparing for this
use case yet. We fixed it with Gyula, he is pushing a fix for it right now:
When the job is cancelled (for example due to shutting down the executor
underneath) you should not see that InterruptedException as soon as this
commit is in. [1]

As for getting the streaming JobExecutionResult back from a detached job my
current best practice is what you can see in
the ProcessFailureRecoveryTestBase and its streaming implementation:
starting an executor in a separate thread and then joining it with the main
one. Would you prefer a more Storm example-ish solution? [2]

[1] https://github.com/mbalassi/flink/commit/5db06d6d
[2]
https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104

On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax <
[hidden email]> wrote:

> Hi Robert,
>
> thanks for your answer.
>
> I get an InterruptedException when I call shutdown():
>
> java.lang.InterruptedException
>         at java.lang.Object.wait(Native Method)
>         at java.lang.Thread.join(Thread.java:1225)
>         at java.lang.Thread.join(Thread.java:1278)
>         at
>
> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55)
>         at
>
> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77)
>         at
>
> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204)
>         at
>
> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195)
>         at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>         at java.lang.Thread.run(Thread.java:701)
>
>
> About the JobExecutionResult:
>
> I added a new method to the API, that calls
> JobClient.submitJobDetached(...) instead of
> JobClient.submitJobAndWait(...). The "detached" version has no return
> value, while the blocking one returns a JobExecutionResult that is
> further returned by execute(). So I cannot get a JobExecutionResult
> right now.
>
> It would be nice to get the JobExecutionResult when stopping the running
> program via a "stop-execution"-call (is there any way to do this?).
> Right now, I sleep for a certain time after calling
> submitJobDetached(...) an call stop() and shutdown() later on (from
> ForkableMiniCluster). The stop() call does not seem to do anything...
> shutdown() works (except for the Exception I get -- as described above).
>
>
> -Matthias
>
>
> On 03/30/2015 09:08 PM, Robert Metzger wrote:
> > Hi Matthias,
> >
> > the streaming folks can probably answer the questions better. But I'll
> > write something to bring this message back to their attention ;)
> >
> > 1) Which exceptions are you seeing? Flink should be able to cleanly shut
> > down.
> > 2) As far as I saw it, the execute() method (of the Streaming API) got an
> > JobExecutionResult return type in the latest master. That contains
> > accumulator results.
> > 3) I think the cancel() method is there for exactly that purpose. If the
> > job is shutting down before the cancel method, that probably a bug.
> >
> >
> > Robert
> >
> >
> >
> > On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
> > [hidden email]> wrote:
> >
> >> Hi,
> >>
> >> I am trying to run an infinite streaming job (ie, one that does not
> >> terminate because it is generating output date randomly on the fly). I
> >> kill this job with .stop() or .shutdown() method of
> >> ForkableFlinkMiniCluster.
> >>
> >> I did not find any example using a similar setup. In the provided
> >> examples, each job terminate automatically, because only a finite input
> >> is processed and the source returns after all data is emitted.
> >>
> >>
> >> I have multiple question about my setup:
> >>
> >>  1) The job never terminates "clean", ie, I get some exceptions. Is this
> >> behavior desired?
> >>
> >>  2) Is it possible to get a result back? Similar to
> >> JobClient.submitJobAndWait(...)?
> >>
> >>  3) Is it somehow possible, to send a signal to the running job such
> >> that the source can terminate regularly as if finite input would be
> >> processed? Right now, I use an while(running) loop and set 'running' to
> >> false in the .cancel() method.
> >>
> >>
> >>
> >> Thanks for your help!
> >>
> >> -Matthias
> >>
> >>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Question about Infinite Streaming Job on Mini Cluster and ITCase

Matthias J. Sax
Hi,

I will pull the fix and try it out.

Thanks for the hint with the extra Thread. That should work for me. But
you are actually right; my setup is Storm inspired. I thinks its a very
natural way to deploy and stop and infinite streaming job. Maybe, you
want to adopt to it.

The ITCase I am writing bases on StreamingProgramTestBase, so I need the
JobExecutionResult because the test fails without it.


-Matthias



On 04/01/2015 11:09 AM, Márton Balassi wrote:

> Hey Matthias,
>
> Thanks for reporting the Exception thrown, we were not preparing for this
> use case yet. We fixed it with Gyula, he is pushing a fix for it right now:
> When the job is cancelled (for example due to shutting down the executor
> underneath) you should not see that InterruptedException as soon as this
> commit is in. [1]
>
> As for getting the streaming JobExecutionResult back from a detached job my
> current best practice is what you can see in
> the ProcessFailureRecoveryTestBase and its streaming implementation:
> starting an executor in a separate thread and then joining it with the main
> one. Would you prefer a more Storm example-ish solution? [2]
>
> [1] https://github.com/mbalassi/flink/commit/5db06d6d
> [2]
> https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104
>
> On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax <
> [hidden email]> wrote:
>
>> Hi Robert,
>>
>> thanks for your answer.
>>
>> I get an InterruptedException when I call shutdown():
>>
>> java.lang.InterruptedException
>>         at java.lang.Object.wait(Native Method)
>>         at java.lang.Thread.join(Thread.java:1225)
>>         at java.lang.Thread.join(Thread.java:1278)
>>         at
>>
>> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55)
>>         at
>>
>> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77)
>>         at
>>
>> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204)
>>         at
>>
>> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195)
>>         at
>>
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>>         at java.lang.Thread.run(Thread.java:701)
>>
>>
>> About the JobExecutionResult:
>>
>> I added a new method to the API, that calls
>> JobClient.submitJobDetached(...) instead of
>> JobClient.submitJobAndWait(...). The "detached" version has no return
>> value, while the blocking one returns a JobExecutionResult that is
>> further returned by execute(). So I cannot get a JobExecutionResult
>> right now.
>>
>> It would be nice to get the JobExecutionResult when stopping the running
>> program via a "stop-execution"-call (is there any way to do this?).
>> Right now, I sleep for a certain time after calling
>> submitJobDetached(...) an call stop() and shutdown() later on (from
>> ForkableMiniCluster). The stop() call does not seem to do anything...
>> shutdown() works (except for the Exception I get -- as described above).
>>
>>
>> -Matthias
>>
>>
>> On 03/30/2015 09:08 PM, Robert Metzger wrote:
>>> Hi Matthias,
>>>
>>> the streaming folks can probably answer the questions better. But I'll
>>> write something to bring this message back to their attention ;)
>>>
>>> 1) Which exceptions are you seeing? Flink should be able to cleanly shut
>>> down.
>>> 2) As far as I saw it, the execute() method (of the Streaming API) got an
>>> JobExecutionResult return type in the latest master. That contains
>>> accumulator results.
>>> 3) I think the cancel() method is there for exactly that purpose. If the
>>> job is shutting down before the cancel method, that probably a bug.
>>>
>>>
>>> Robert
>>>
>>>
>>>
>>> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
>>> [hidden email]> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to run an infinite streaming job (ie, one that does not
>>>> terminate because it is generating output date randomly on the fly). I
>>>> kill this job with .stop() or .shutdown() method of
>>>> ForkableFlinkMiniCluster.
>>>>
>>>> I did not find any example using a similar setup. In the provided
>>>> examples, each job terminate automatically, because only a finite input
>>>> is processed and the source returns after all data is emitted.
>>>>
>>>>
>>>> I have multiple question about my setup:
>>>>
>>>>  1) The job never terminates "clean", ie, I get some exceptions. Is this
>>>> behavior desired?
>>>>
>>>>  2) Is it possible to get a result back? Similar to
>>>> JobClient.submitJobAndWait(...)?
>>>>
>>>>  3) Is it somehow possible, to send a signal to the running job such
>>>> that the source can terminate regularly as if finite input would be
>>>> processed? Right now, I use an while(running) loop and set 'running' to
>>>> false in the .cancel() method.
>>>>
>>>>
>>>>
>>>> Thanks for your help!
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>
>>
>>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Question about Infinite Streaming Job on Mini Cluster and ITCase

Stephan Ewen
As a followup - I think it would be a good thing to add a way to gracefully
stop a streaming job.

Something that sends "close" to the sources, and they quit.

We can use this for graceful shutdown wen re-partitioninig / scaling in or
out, ...

On Wed, Apr 1, 2015 at 1:29 PM, Matthias J. Sax <
[hidden email]> wrote:

> Hi,
>
> I will pull the fix and try it out.
>
> Thanks for the hint with the extra Thread. That should work for me. But
> you are actually right; my setup is Storm inspired. I thinks its a very
> natural way to deploy and stop and infinite streaming job. Maybe, you
> want to adopt to it.
>
> The ITCase I am writing bases on StreamingProgramTestBase, so I need the
> JobExecutionResult because the test fails without it.
>
>
> -Matthias
>
>
>
> On 04/01/2015 11:09 AM, Márton Balassi wrote:
> > Hey Matthias,
> >
> > Thanks for reporting the Exception thrown, we were not preparing for this
> > use case yet. We fixed it with Gyula, he is pushing a fix for it right
> now:
> > When the job is cancelled (for example due to shutting down the executor
> > underneath) you should not see that InterruptedException as soon as this
> > commit is in. [1]
> >
> > As for getting the streaming JobExecutionResult back from a detached job
> my
> > current best practice is what you can see in
> > the ProcessFailureRecoveryTestBase and its streaming implementation:
> > starting an executor in a separate thread and then joining it with the
> main
> > one. Would you prefer a more Storm example-ish solution? [2]
> >
> > [1] https://github.com/mbalassi/flink/commit/5db06d6d
> > [2]
> >
> https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104
> >
> > On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax <
> > [hidden email]> wrote:
> >
> >> Hi Robert,
> >>
> >> thanks for your answer.
> >>
> >> I get an InterruptedException when I call shutdown():
> >>
> >> java.lang.InterruptedException
> >>         at java.lang.Object.wait(Native Method)
> >>         at java.lang.Thread.join(Thread.java:1225)
> >>         at java.lang.Thread.join(Thread.java:1278)
> >>         at
> >>
> >>
> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55)
> >>         at
> >>
> >>
> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77)
> >>         at
> >>
> >>
> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204)
> >>         at
> >>
> >>
> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195)
> >>         at
> >>
> >>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> >>         at java.lang.Thread.run(Thread.java:701)
> >>
> >>
> >> About the JobExecutionResult:
> >>
> >> I added a new method to the API, that calls
> >> JobClient.submitJobDetached(...) instead of
> >> JobClient.submitJobAndWait(...). The "detached" version has no return
> >> value, while the blocking one returns a JobExecutionResult that is
> >> further returned by execute(). So I cannot get a JobExecutionResult
> >> right now.
> >>
> >> It would be nice to get the JobExecutionResult when stopping the running
> >> program via a "stop-execution"-call (is there any way to do this?).
> >> Right now, I sleep for a certain time after calling
> >> submitJobDetached(...) an call stop() and shutdown() later on (from
> >> ForkableMiniCluster). The stop() call does not seem to do anything...
> >> shutdown() works (except for the Exception I get -- as described above).
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 03/30/2015 09:08 PM, Robert Metzger wrote:
> >>> Hi Matthias,
> >>>
> >>> the streaming folks can probably answer the questions better. But I'll
> >>> write something to bring this message back to their attention ;)
> >>>
> >>> 1) Which exceptions are you seeing? Flink should be able to cleanly
> shut
> >>> down.
> >>> 2) As far as I saw it, the execute() method (of the Streaming API) got
> an
> >>> JobExecutionResult return type in the latest master. That contains
> >>> accumulator results.
> >>> 3) I think the cancel() method is there for exactly that purpose. If
> the
> >>> job is shutting down before the cancel method, that probably a bug.
> >>>
> >>>
> >>> Robert
> >>>
> >>>
> >>>
> >>> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
> >>> [hidden email]> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> I am trying to run an infinite streaming job (ie, one that does not
> >>>> terminate because it is generating output date randomly on the fly). I
> >>>> kill this job with .stop() or .shutdown() method of
> >>>> ForkableFlinkMiniCluster.
> >>>>
> >>>> I did not find any example using a similar setup. In the provided
> >>>> examples, each job terminate automatically, because only a finite
> input
> >>>> is processed and the source returns after all data is emitted.
> >>>>
> >>>>
> >>>> I have multiple question about my setup:
> >>>>
> >>>>  1) The job never terminates "clean", ie, I get some exceptions. Is
> this
> >>>> behavior desired?
> >>>>
> >>>>  2) Is it possible to get a result back? Similar to
> >>>> JobClient.submitJobAndWait(...)?
> >>>>
> >>>>  3) Is it somehow possible, to send a signal to the running job such
> >>>> that the source can terminate regularly as if finite input would be
> >>>> processed? Right now, I use an while(running) loop and set 'running'
> to
> >>>> false in the .cancel() method.
> >>>>
> >>>>
> >>>>
> >>>> Thanks for your help!
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Question about Infinite Streaming Job on Mini Cluster and ITCase

Matthias J. Sax
I agree.

@Marton:
The idea with the extra thread does not work, because the method
JobClient.submitJobAndWait(...) does not return regularly if
ForkableFlinkMiniCluster.shutdown() is called -- instead an exception
occurs:

> Exception in thread "Thread-8" java.lang.RuntimeException: org.apache.flink.runtime.client.JobTimeoutException: Lost connection to job manager.
> at org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:119)
> Caused by: org.apache.flink.runtime.client.JobTimeoutException: Lost connection to job manager.
> at org.apache.flink.runtime.client.JobClient$.submitJobAndWait(JobClient.scala:228)
> at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.scala)
> at org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:117)
> Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://flink/user/jobclient#-596117797]] had already been terminated.
> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
> at akka.pattern.AskableActorRef$.$qmark$extension(AskSupport.scala:144)
> at org.apache.flink.runtime.client.JobClient$.submitJobAndWait(JobClient.scala:222)
> ... 2 more

Thus, I cannot get an JobExecutionResult this way, either.


-Matthias


On 04/01/2015 02:36 PM, Stephan Ewen wrote:

> As a followup - I think it would be a good thing to add a way to gracefully
> stop a streaming job.
>
> Something that sends "close" to the sources, and they quit.
>
> We can use this for graceful shutdown wen re-partitioninig / scaling in or
> out, ...
>
> On Wed, Apr 1, 2015 at 1:29 PM, Matthias J. Sax <
> [hidden email]> wrote:
>
>> Hi,
>>
>> I will pull the fix and try it out.
>>
>> Thanks for the hint with the extra Thread. That should work for me. But
>> you are actually right; my setup is Storm inspired. I thinks its a very
>> natural way to deploy and stop and infinite streaming job. Maybe, you
>> want to adopt to it.
>>
>> The ITCase I am writing bases on StreamingProgramTestBase, so I need the
>> JobExecutionResult because the test fails without it.
>>
>>
>> -Matthias
>>
>>
>>
>> On 04/01/2015 11:09 AM, Márton Balassi wrote:
>>> Hey Matthias,
>>>
>>> Thanks for reporting the Exception thrown, we were not preparing for this
>>> use case yet. We fixed it with Gyula, he is pushing a fix for it right
>> now:
>>> When the job is cancelled (for example due to shutting down the executor
>>> underneath) you should not see that InterruptedException as soon as this
>>> commit is in. [1]
>>>
>>> As for getting the streaming JobExecutionResult back from a detached job
>> my
>>> current best practice is what you can see in
>>> the ProcessFailureRecoveryTestBase and its streaming implementation:
>>> starting an executor in a separate thread and then joining it with the
>> main
>>> one. Would you prefer a more Storm example-ish solution? [2]
>>>
>>> [1] https://github.com/mbalassi/flink/commit/5db06d6d
>>> [2]
>>>
>> https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104
>>>
>>> On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax <
>>> [hidden email]> wrote:
>>>
>>>> Hi Robert,
>>>>
>>>> thanks for your answer.
>>>>
>>>> I get an InterruptedException when I call shutdown():
>>>>
>>>> java.lang.InterruptedException
>>>>         at java.lang.Object.wait(Native Method)
>>>>         at java.lang.Thread.join(Thread.java:1225)
>>>>         at java.lang.Thread.join(Thread.java:1278)
>>>>         at
>>>>
>>>>
>> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55)
>>>>         at
>>>>
>>>>
>> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77)
>>>>         at
>>>>
>>>>
>> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204)
>>>>         at
>>>>
>>>>
>> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195)
>>>>         at
>>>>
>>>>
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>>>>         at java.lang.Thread.run(Thread.java:701)
>>>>
>>>>
>>>> About the JobExecutionResult:
>>>>
>>>> I added a new method to the API, that calls
>>>> JobClient.submitJobDetached(...) instead of
>>>> JobClient.submitJobAndWait(...). The "detached" version has no return
>>>> value, while the blocking one returns a JobExecutionResult that is
>>>> further returned by execute(). So I cannot get a JobExecutionResult
>>>> right now.
>>>>
>>>> It would be nice to get the JobExecutionResult when stopping the running
>>>> program via a "stop-execution"-call (is there any way to do this?).
>>>> Right now, I sleep for a certain time after calling
>>>> submitJobDetached(...) an call stop() and shutdown() later on (from
>>>> ForkableMiniCluster). The stop() call does not seem to do anything...
>>>> shutdown() works (except for the Exception I get -- as described above).
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 03/30/2015 09:08 PM, Robert Metzger wrote:
>>>>> Hi Matthias,
>>>>>
>>>>> the streaming folks can probably answer the questions better. But I'll
>>>>> write something to bring this message back to their attention ;)
>>>>>
>>>>> 1) Which exceptions are you seeing? Flink should be able to cleanly
>> shut
>>>>> down.
>>>>> 2) As far as I saw it, the execute() method (of the Streaming API) got
>> an
>>>>> JobExecutionResult return type in the latest master. That contains
>>>>> accumulator results.
>>>>> 3) I think the cancel() method is there for exactly that purpose. If
>> the
>>>>> job is shutting down before the cancel method, that probably a bug.
>>>>>
>>>>>
>>>>> Robert
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
>>>>> [hidden email]> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to run an infinite streaming job (ie, one that does not
>>>>>> terminate because it is generating output date randomly on the fly). I
>>>>>> kill this job with .stop() or .shutdown() method of
>>>>>> ForkableFlinkMiniCluster.
>>>>>>
>>>>>> I did not find any example using a similar setup. In the provided
>>>>>> examples, each job terminate automatically, because only a finite
>> input
>>>>>> is processed and the source returns after all data is emitted.
>>>>>>
>>>>>>
>>>>>> I have multiple question about my setup:
>>>>>>
>>>>>>  1) The job never terminates "clean", ie, I get some exceptions. Is
>> this
>>>>>> behavior desired?
>>>>>>
>>>>>>  2) Is it possible to get a result back? Similar to
>>>>>> JobClient.submitJobAndWait(...)?
>>>>>>
>>>>>>  3) Is it somehow possible, to send a signal to the running job such
>>>>>> that the source can terminate regularly as if finite input would be
>>>>>> processed? Right now, I use an while(running) loop and set 'running'
>> to
>>>>>> false in the .cancel() method.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks for your help!
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>


signature.asc (836 bytes) Download Attachment