Hi,
I am trying to run an infinite streaming job (ie, one that does not terminate because it is generating output date randomly on the fly). I kill this job with .stop() or .shutdown() method of ForkableFlinkMiniCluster. I did not find any example using a similar setup. In the provided examples, each job terminate automatically, because only a finite input is processed and the source returns after all data is emitted. I have multiple question about my setup: 1) The job never terminates "clean", ie, I get some exceptions. Is this behavior desired? 2) Is it possible to get a result back? Similar to JobClient.submitJobAndWait(...)? 3) Is it somehow possible, to send a signal to the running job such that the source can terminate regularly as if finite input would be processed? Right now, I use an while(running) loop and set 'running' to false in the .cancel() method. Thanks for your help! -Matthias |
Hi Matthias,
the streaming folks can probably answer the questions better. But I'll write something to bring this message back to their attention ;) 1) Which exceptions are you seeing? Flink should be able to cleanly shut down. 2) As far as I saw it, the execute() method (of the Streaming API) got an JobExecutionResult return type in the latest master. That contains accumulator results. 3) I think the cancel() method is there for exactly that purpose. If the job is shutting down before the cancel method, that probably a bug. Robert On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax < [hidden email]> wrote: > Hi, > > I am trying to run an infinite streaming job (ie, one that does not > terminate because it is generating output date randomly on the fly). I > kill this job with .stop() or .shutdown() method of > ForkableFlinkMiniCluster. > > I did not find any example using a similar setup. In the provided > examples, each job terminate automatically, because only a finite input > is processed and the source returns after all data is emitted. > > > I have multiple question about my setup: > > 1) The job never terminates "clean", ie, I get some exceptions. Is this > behavior desired? > > 2) Is it possible to get a result back? Similar to > JobClient.submitJobAndWait(...)? > > 3) Is it somehow possible, to send a signal to the running job such > that the source can terminate regularly as if finite input would be > processed? Right now, I use an while(running) loop and set 'running' to > false in the .cancel() method. > > > > Thanks for your help! > > -Matthias > > > |
Hi Robert,
thanks for your answer. I get an InterruptedException when I call shutdown(): java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1225) at java.lang.Thread.join(Thread.java:1278) at org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55) at org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77) at org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204) at org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:701) About the JobExecutionResult: I added a new method to the API, that calls JobClient.submitJobDetached(...) instead of JobClient.submitJobAndWait(...). The "detached" version has no return value, while the blocking one returns a JobExecutionResult that is further returned by execute(). So I cannot get a JobExecutionResult right now. It would be nice to get the JobExecutionResult when stopping the running program via a "stop-execution"-call (is there any way to do this?). Right now, I sleep for a certain time after calling submitJobDetached(...) an call stop() and shutdown() later on (from ForkableMiniCluster). The stop() call does not seem to do anything... shutdown() works (except for the Exception I get -- as described above). -Matthias On 03/30/2015 09:08 PM, Robert Metzger wrote: > Hi Matthias, > > the streaming folks can probably answer the questions better. But I'll > write something to bring this message back to their attention ;) > > 1) Which exceptions are you seeing? Flink should be able to cleanly shut > down. > 2) As far as I saw it, the execute() method (of the Streaming API) got an > JobExecutionResult return type in the latest master. That contains > accumulator results. > 3) I think the cancel() method is there for exactly that purpose. If the > job is shutting down before the cancel method, that probably a bug. > > > Robert > > > > On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax < > [hidden email]> wrote: > >> Hi, >> >> I am trying to run an infinite streaming job (ie, one that does not >> terminate because it is generating output date randomly on the fly). I >> kill this job with .stop() or .shutdown() method of >> ForkableFlinkMiniCluster. >> >> I did not find any example using a similar setup. In the provided >> examples, each job terminate automatically, because only a finite input >> is processed and the source returns after all data is emitted. >> >> >> I have multiple question about my setup: >> >> 1) The job never terminates "clean", ie, I get some exceptions. Is this >> behavior desired? >> >> 2) Is it possible to get a result back? Similar to >> JobClient.submitJobAndWait(...)? >> >> 3) Is it somehow possible, to send a signal to the running job such >> that the source can terminate regularly as if finite input would be >> processed? Right now, I use an while(running) loop and set 'running' to >> false in the .cancel() method. >> >> >> >> Thanks for your help! >> >> -Matthias >> >> >> > |
Hey Matthias,
Thanks for reporting the Exception thrown, we were not preparing for this use case yet. We fixed it with Gyula, he is pushing a fix for it right now: When the job is cancelled (for example due to shutting down the executor underneath) you should not see that InterruptedException as soon as this commit is in. [1] As for getting the streaming JobExecutionResult back from a detached job my current best practice is what you can see in the ProcessFailureRecoveryTestBase and its streaming implementation: starting an executor in a separate thread and then joining it with the main one. Would you prefer a more Storm example-ish solution? [2] [1] https://github.com/mbalassi/flink/commit/5db06d6d [2] https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104 On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax < [hidden email]> wrote: > Hi Robert, > > thanks for your answer. > > I get an InterruptedException when I call shutdown(): > > java.lang.InterruptedException > at java.lang.Object.wait(Native Method) > at java.lang.Thread.join(Thread.java:1225) > at java.lang.Thread.join(Thread.java:1278) > at > > org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55) > at > > org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77) > at > > org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204) > at > > org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195) > at > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > at java.lang.Thread.run(Thread.java:701) > > > About the JobExecutionResult: > > I added a new method to the API, that calls > JobClient.submitJobDetached(...) instead of > JobClient.submitJobAndWait(...). The "detached" version has no return > value, while the blocking one returns a JobExecutionResult that is > further returned by execute(). So I cannot get a JobExecutionResult > right now. > > It would be nice to get the JobExecutionResult when stopping the running > program via a "stop-execution"-call (is there any way to do this?). > Right now, I sleep for a certain time after calling > submitJobDetached(...) an call stop() and shutdown() later on (from > ForkableMiniCluster). The stop() call does not seem to do anything... > shutdown() works (except for the Exception I get -- as described above). > > > -Matthias > > > On 03/30/2015 09:08 PM, Robert Metzger wrote: > > Hi Matthias, > > > > the streaming folks can probably answer the questions better. But I'll > > write something to bring this message back to their attention ;) > > > > 1) Which exceptions are you seeing? Flink should be able to cleanly shut > > down. > > 2) As far as I saw it, the execute() method (of the Streaming API) got an > > JobExecutionResult return type in the latest master. That contains > > accumulator results. > > 3) I think the cancel() method is there for exactly that purpose. If the > > job is shutting down before the cancel method, that probably a bug. > > > > > > Robert > > > > > > > > On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax < > > [hidden email]> wrote: > > > >> Hi, > >> > >> I am trying to run an infinite streaming job (ie, one that does not > >> terminate because it is generating output date randomly on the fly). I > >> kill this job with .stop() or .shutdown() method of > >> ForkableFlinkMiniCluster. > >> > >> I did not find any example using a similar setup. In the provided > >> examples, each job terminate automatically, because only a finite input > >> is processed and the source returns after all data is emitted. > >> > >> > >> I have multiple question about my setup: > >> > >> 1) The job never terminates "clean", ie, I get some exceptions. Is this > >> behavior desired? > >> > >> 2) Is it possible to get a result back? Similar to > >> JobClient.submitJobAndWait(...)? > >> > >> 3) Is it somehow possible, to send a signal to the running job such > >> that the source can terminate regularly as if finite input would be > >> processed? Right now, I use an while(running) loop and set 'running' to > >> false in the .cancel() method. > >> > >> > >> > >> Thanks for your help! > >> > >> -Matthias > >> > >> > >> > > > > |
Hi,
I will pull the fix and try it out. Thanks for the hint with the extra Thread. That should work for me. But you are actually right; my setup is Storm inspired. I thinks its a very natural way to deploy and stop and infinite streaming job. Maybe, you want to adopt to it. The ITCase I am writing bases on StreamingProgramTestBase, so I need the JobExecutionResult because the test fails without it. -Matthias On 04/01/2015 11:09 AM, Márton Balassi wrote: > Hey Matthias, > > Thanks for reporting the Exception thrown, we were not preparing for this > use case yet. We fixed it with Gyula, he is pushing a fix for it right now: > When the job is cancelled (for example due to shutting down the executor > underneath) you should not see that InterruptedException as soon as this > commit is in. [1] > > As for getting the streaming JobExecutionResult back from a detached job my > current best practice is what you can see in > the ProcessFailureRecoveryTestBase and its streaming implementation: > starting an executor in a separate thread and then joining it with the main > one. Would you prefer a more Storm example-ish solution? [2] > > [1] https://github.com/mbalassi/flink/commit/5db06d6d > [2] > https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104 > > On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax < > [hidden email]> wrote: > >> Hi Robert, >> >> thanks for your answer. >> >> I get an InterruptedException when I call shutdown(): >> >> java.lang.InterruptedException >> at java.lang.Object.wait(Native Method) >> at java.lang.Thread.join(Thread.java:1225) >> at java.lang.Thread.join(Thread.java:1278) >> at >> >> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55) >> at >> >> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77) >> at >> >> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204) >> at >> >> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195) >> at >> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) >> at java.lang.Thread.run(Thread.java:701) >> >> >> About the JobExecutionResult: >> >> I added a new method to the API, that calls >> JobClient.submitJobDetached(...) instead of >> JobClient.submitJobAndWait(...). The "detached" version has no return >> value, while the blocking one returns a JobExecutionResult that is >> further returned by execute(). So I cannot get a JobExecutionResult >> right now. >> >> It would be nice to get the JobExecutionResult when stopping the running >> program via a "stop-execution"-call (is there any way to do this?). >> Right now, I sleep for a certain time after calling >> submitJobDetached(...) an call stop() and shutdown() later on (from >> ForkableMiniCluster). The stop() call does not seem to do anything... >> shutdown() works (except for the Exception I get -- as described above). >> >> >> -Matthias >> >> >> On 03/30/2015 09:08 PM, Robert Metzger wrote: >>> Hi Matthias, >>> >>> the streaming folks can probably answer the questions better. But I'll >>> write something to bring this message back to their attention ;) >>> >>> 1) Which exceptions are you seeing? Flink should be able to cleanly shut >>> down. >>> 2) As far as I saw it, the execute() method (of the Streaming API) got an >>> JobExecutionResult return type in the latest master. That contains >>> accumulator results. >>> 3) I think the cancel() method is there for exactly that purpose. If the >>> job is shutting down before the cancel method, that probably a bug. >>> >>> >>> Robert >>> >>> >>> >>> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax < >>> [hidden email]> wrote: >>> >>>> Hi, >>>> >>>> I am trying to run an infinite streaming job (ie, one that does not >>>> terminate because it is generating output date randomly on the fly). I >>>> kill this job with .stop() or .shutdown() method of >>>> ForkableFlinkMiniCluster. >>>> >>>> I did not find any example using a similar setup. In the provided >>>> examples, each job terminate automatically, because only a finite input >>>> is processed and the source returns after all data is emitted. >>>> >>>> >>>> I have multiple question about my setup: >>>> >>>> 1) The job never terminates "clean", ie, I get some exceptions. Is this >>>> behavior desired? >>>> >>>> 2) Is it possible to get a result back? Similar to >>>> JobClient.submitJobAndWait(...)? >>>> >>>> 3) Is it somehow possible, to send a signal to the running job such >>>> that the source can terminate regularly as if finite input would be >>>> processed? Right now, I use an while(running) loop and set 'running' to >>>> false in the .cancel() method. >>>> >>>> >>>> >>>> Thanks for your help! >>>> >>>> -Matthias >>>> >>>> >>>> >>> >> >> > |
As a followup - I think it would be a good thing to add a way to gracefully
stop a streaming job. Something that sends "close" to the sources, and they quit. We can use this for graceful shutdown wen re-partitioninig / scaling in or out, ... On Wed, Apr 1, 2015 at 1:29 PM, Matthias J. Sax < [hidden email]> wrote: > Hi, > > I will pull the fix and try it out. > > Thanks for the hint with the extra Thread. That should work for me. But > you are actually right; my setup is Storm inspired. I thinks its a very > natural way to deploy and stop and infinite streaming job. Maybe, you > want to adopt to it. > > The ITCase I am writing bases on StreamingProgramTestBase, so I need the > JobExecutionResult because the test fails without it. > > > -Matthias > > > > On 04/01/2015 11:09 AM, Márton Balassi wrote: > > Hey Matthias, > > > > Thanks for reporting the Exception thrown, we were not preparing for this > > use case yet. We fixed it with Gyula, he is pushing a fix for it right > now: > > When the job is cancelled (for example due to shutting down the executor > > underneath) you should not see that InterruptedException as soon as this > > commit is in. [1] > > > > As for getting the streaming JobExecutionResult back from a detached job > my > > current best practice is what you can see in > > the ProcessFailureRecoveryTestBase and its streaming implementation: > > starting an executor in a separate thread and then joining it with the > main > > one. Would you prefer a more Storm example-ish solution? [2] > > > > [1] https://github.com/mbalassi/flink/commit/5db06d6d > > [2] > > > https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104 > > > > On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax < > > [hidden email]> wrote: > > > >> Hi Robert, > >> > >> thanks for your answer. > >> > >> I get an InterruptedException when I call shutdown(): > >> > >> java.lang.InterruptedException > >> at java.lang.Object.wait(Native Method) > >> at java.lang.Thread.join(Thread.java:1225) > >> at java.lang.Thread.join(Thread.java:1278) > >> at > >> > >> > org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55) > >> at > >> > >> > org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77) > >> at > >> > >> > org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204) > >> at > >> > >> > org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195) > >> at > >> > >> > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > >> at java.lang.Thread.run(Thread.java:701) > >> > >> > >> About the JobExecutionResult: > >> > >> I added a new method to the API, that calls > >> JobClient.submitJobDetached(...) instead of > >> JobClient.submitJobAndWait(...). The "detached" version has no return > >> value, while the blocking one returns a JobExecutionResult that is > >> further returned by execute(). So I cannot get a JobExecutionResult > >> right now. > >> > >> It would be nice to get the JobExecutionResult when stopping the running > >> program via a "stop-execution"-call (is there any way to do this?). > >> Right now, I sleep for a certain time after calling > >> submitJobDetached(...) an call stop() and shutdown() later on (from > >> ForkableMiniCluster). The stop() call does not seem to do anything... > >> shutdown() works (except for the Exception I get -- as described above). > >> > >> > >> -Matthias > >> > >> > >> On 03/30/2015 09:08 PM, Robert Metzger wrote: > >>> Hi Matthias, > >>> > >>> the streaming folks can probably answer the questions better. But I'll > >>> write something to bring this message back to their attention ;) > >>> > >>> 1) Which exceptions are you seeing? Flink should be able to cleanly > shut > >>> down. > >>> 2) As far as I saw it, the execute() method (of the Streaming API) got > an > >>> JobExecutionResult return type in the latest master. That contains > >>> accumulator results. > >>> 3) I think the cancel() method is there for exactly that purpose. If > the > >>> job is shutting down before the cancel method, that probably a bug. > >>> > >>> > >>> Robert > >>> > >>> > >>> > >>> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax < > >>> [hidden email]> wrote: > >>> > >>>> Hi, > >>>> > >>>> I am trying to run an infinite streaming job (ie, one that does not > >>>> terminate because it is generating output date randomly on the fly). I > >>>> kill this job with .stop() or .shutdown() method of > >>>> ForkableFlinkMiniCluster. > >>>> > >>>> I did not find any example using a similar setup. In the provided > >>>> examples, each job terminate automatically, because only a finite > input > >>>> is processed and the source returns after all data is emitted. > >>>> > >>>> > >>>> I have multiple question about my setup: > >>>> > >>>> 1) The job never terminates "clean", ie, I get some exceptions. Is > this > >>>> behavior desired? > >>>> > >>>> 2) Is it possible to get a result back? Similar to > >>>> JobClient.submitJobAndWait(...)? > >>>> > >>>> 3) Is it somehow possible, to send a signal to the running job such > >>>> that the source can terminate regularly as if finite input would be > >>>> processed? Right now, I use an while(running) loop and set 'running' > to > >>>> false in the .cancel() method. > >>>> > >>>> > >>>> > >>>> Thanks for your help! > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> > >>> > >> > >> > > > > |
I agree.
@Marton: The idea with the extra thread does not work, because the method JobClient.submitJobAndWait(...) does not return regularly if ForkableFlinkMiniCluster.shutdown() is called -- instead an exception occurs: > Exception in thread "Thread-8" java.lang.RuntimeException: org.apache.flink.runtime.client.JobTimeoutException: Lost connection to job manager. > at org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:119) > Caused by: org.apache.flink.runtime.client.JobTimeoutException: Lost connection to job manager. > at org.apache.flink.runtime.client.JobClient$.submitJobAndWait(JobClient.scala:228) > at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.scala) > at org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:117) > Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://flink/user/jobclient#-596117797]] had already been terminated. > at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132) > at akka.pattern.AskableActorRef$.$qmark$extension(AskSupport.scala:144) > at org.apache.flink.runtime.client.JobClient$.submitJobAndWait(JobClient.scala:222) > ... 2 more Thus, I cannot get an JobExecutionResult this way, either. -Matthias On 04/01/2015 02:36 PM, Stephan Ewen wrote: > As a followup - I think it would be a good thing to add a way to gracefully > stop a streaming job. > > Something that sends "close" to the sources, and they quit. > > We can use this for graceful shutdown wen re-partitioninig / scaling in or > out, ... > > On Wed, Apr 1, 2015 at 1:29 PM, Matthias J. Sax < > [hidden email]> wrote: > >> Hi, >> >> I will pull the fix and try it out. >> >> Thanks for the hint with the extra Thread. That should work for me. But >> you are actually right; my setup is Storm inspired. I thinks its a very >> natural way to deploy and stop and infinite streaming job. Maybe, you >> want to adopt to it. >> >> The ITCase I am writing bases on StreamingProgramTestBase, so I need the >> JobExecutionResult because the test fails without it. >> >> >> -Matthias >> >> >> >> On 04/01/2015 11:09 AM, Márton Balassi wrote: >>> Hey Matthias, >>> >>> Thanks for reporting the Exception thrown, we were not preparing for this >>> use case yet. We fixed it with Gyula, he is pushing a fix for it right >> now: >>> When the job is cancelled (for example due to shutting down the executor >>> underneath) you should not see that InterruptedException as soon as this >>> commit is in. [1] >>> >>> As for getting the streaming JobExecutionResult back from a detached job >> my >>> current best practice is what you can see in >>> the ProcessFailureRecoveryTestBase and its streaming implementation: >>> starting an executor in a separate thread and then joining it with the >> main >>> one. Would you prefer a more Storm example-ish solution? [2] >>> >>> [1] https://github.com/mbalassi/flink/commit/5db06d6d >>> [2] >>> >> https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104 >>> >>> On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax < >>> [hidden email]> wrote: >>> >>>> Hi Robert, >>>> >>>> thanks for your answer. >>>> >>>> I get an InterruptedException when I call shutdown(): >>>> >>>> java.lang.InterruptedException >>>> at java.lang.Object.wait(Native Method) >>>> at java.lang.Thread.join(Thread.java:1225) >>>> at java.lang.Thread.join(Thread.java:1278) >>>> at >>>> >>>> >> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55) >>>> at >>>> >>>> >> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77) >>>> at >>>> >>>> >> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204) >>>> at >>>> >>>> >> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195) >>>> at >>>> >>>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) >>>> at java.lang.Thread.run(Thread.java:701) >>>> >>>> >>>> About the JobExecutionResult: >>>> >>>> I added a new method to the API, that calls >>>> JobClient.submitJobDetached(...) instead of >>>> JobClient.submitJobAndWait(...). The "detached" version has no return >>>> value, while the blocking one returns a JobExecutionResult that is >>>> further returned by execute(). So I cannot get a JobExecutionResult >>>> right now. >>>> >>>> It would be nice to get the JobExecutionResult when stopping the running >>>> program via a "stop-execution"-call (is there any way to do this?). >>>> Right now, I sleep for a certain time after calling >>>> submitJobDetached(...) an call stop() and shutdown() later on (from >>>> ForkableMiniCluster). The stop() call does not seem to do anything... >>>> shutdown() works (except for the Exception I get -- as described above). >>>> >>>> >>>> -Matthias >>>> >>>> >>>> On 03/30/2015 09:08 PM, Robert Metzger wrote: >>>>> Hi Matthias, >>>>> >>>>> the streaming folks can probably answer the questions better. But I'll >>>>> write something to bring this message back to their attention ;) >>>>> >>>>> 1) Which exceptions are you seeing? Flink should be able to cleanly >> shut >>>>> down. >>>>> 2) As far as I saw it, the execute() method (of the Streaming API) got >> an >>>>> JobExecutionResult return type in the latest master. That contains >>>>> accumulator results. >>>>> 3) I think the cancel() method is there for exactly that purpose. If >> the >>>>> job is shutting down before the cancel method, that probably a bug. >>>>> >>>>> >>>>> Robert >>>>> >>>>> >>>>> >>>>> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax < >>>>> [hidden email]> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am trying to run an infinite streaming job (ie, one that does not >>>>>> terminate because it is generating output date randomly on the fly). I >>>>>> kill this job with .stop() or .shutdown() method of >>>>>> ForkableFlinkMiniCluster. >>>>>> >>>>>> I did not find any example using a similar setup. In the provided >>>>>> examples, each job terminate automatically, because only a finite >> input >>>>>> is processed and the source returns after all data is emitted. >>>>>> >>>>>> >>>>>> I have multiple question about my setup: >>>>>> >>>>>> 1) The job never terminates "clean", ie, I get some exceptions. Is >> this >>>>>> behavior desired? >>>>>> >>>>>> 2) Is it possible to get a result back? Similar to >>>>>> JobClient.submitJobAndWait(...)? >>>>>> >>>>>> 3) Is it somehow possible, to send a signal to the running job such >>>>>> that the source can terminate regularly as if finite input would be >>>>>> processed? Right now, I use an while(running) loop and set 'running' >> to >>>>>> false in the .cancel() method. >>>>>> >>>>>> >>>>>> >>>>>> Thanks for your help! >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >> > |
Free forum by Nabble | Edit this page |