Hi,
I am able to read from a topic using FlinkKafkaConsumer and return the result, however when I am testing this scenario in Junit the result is getting printed(kafkaStream.print()) but I am not able to exit the Job, env.execute keeps running, I tried to return env.execute from method but that did not work either. 1) Is there any way to end the execution of job forcefully. 2) How do I test if the data has come from topic - One way I think of is to get the output of stream.print() in a PrintStream and check the result.(but not able to test this since job is not getting exited) Please help with these issues Regards, Vinay Patil |
I'm also curious for a solution here. My test code executes the flow from a
separate thread. Once i've joined on all my producer threads and I've verified the output, I simply interrupt the flow thread. This spews exceptions, but it all appears to be harmless. Maybe there's a better way? I think you'd need some "death pill" to send into the stream that signals its termination. On Tue, May 24, 2016 at 7:29 PM, Vinay Patil <[hidden email]> wrote: > Hi, > > I am able to read from a topic using FlinkKafkaConsumer and return the > result, however when I am testing this scenario in Junit the result is > getting printed(kafkaStream.print()) but I am not able to exit the Job, > env.execute keeps running, > I tried to return env.execute from method but that did not work either. > > 1) Is there any way to end the execution of job forcefully. > 2) How do I test if the data has come from topic > > - One way I think of is to get the output of stream.print() in a > PrintStream and check the result.(but not able to test this since job is > not getting exited) > > Please help with these issues > > Regards, > Vinay Patil > |
Hi,
what we are doing in most internal tests is to verify in a sink whether the data is correct and then throw a SuccessException. This brings down the job and we check whether we catch a SuccessException to verify that the test was successful. Look, for example, at the ValidatingSink in EventTimeWindowCheckpointingITCase in the Flink source. Cheers, Aljoscha On Thu, 26 May 2016 at 01:58 Nick Dimiduk <[hidden email]> wrote: > I'm also curious for a solution here. My test code executes the flow from a > separate thread. Once i've joined on all my producer threads and I've > verified the output, I simply interrupt the flow thread. This spews > exceptions, but it all appears to be harmless. > > Maybe there's a better way? I think you'd need some "death pill" to send > into the stream that signals its termination. > > On Tue, May 24, 2016 at 7:29 PM, Vinay Patil <[hidden email]> > wrote: > > > Hi, > > > > I am able to read from a topic using FlinkKafkaConsumer and return the > > result, however when I am testing this scenario in Junit the result is > > getting printed(kafkaStream.print()) but I am not able to exit the Job, > > env.execute keeps running, > > I tried to return env.execute from method but that did not work either. > > > > 1) Is there any way to end the execution of job forcefully. > > 2) How do I test if the data has come from topic > > > > - One way I think of is to get the output of stream.print() in a > > PrintStream and check the result.(but not able to test this since job > is > > not getting exited) > > > > Please help with these issues > > > > Regards, > > Vinay Patil > > > |
Hi Aljoscha,
Thank you for answering. Throwing SuccessException is a good idea , however when I am adding following dependency, no classes are getting added to the jar: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-tests_2.10</artifactId> <version>1.0.3</version> </dependency> Is there any other dependency that I have to add ? I have also added test-utils dependency. I am trying the following in my test case : 1) Consuming data from Kafka using FlinkKafkaConsumer and passing it to map as Tuple2 2) In the map function I am just checking if Tuple2 contains data, if yes, throw the exception("success") 3) This way I am verifying that the configuration is correct and that we are able to read from kafka. Am I doing it right, is there any better approach ? Regards, Vinay Patil *+91-800-728-4749* On Thu, May 26, 2016 at 1:01 PM, Aljoscha Krettek <[hidden email]> wrote: > Hi, > what we are doing in most internal tests is to verify in a sink whether the > data is correct and then throw a SuccessException. This brings down the job > and we check whether we catch a SuccessException to verify that the test > was successful. Look, for example, at the ValidatingSink in > EventTimeWindowCheckpointingITCase in the Flink source. > > Cheers, > Aljoscha > > On Thu, 26 May 2016 at 01:58 Nick Dimiduk <[hidden email]> wrote: > > > I'm also curious for a solution here. My test code executes the flow > from a > > separate thread. Once i've joined on all my producer threads and I've > > verified the output, I simply interrupt the flow thread. This spews > > exceptions, but it all appears to be harmless. > > > > Maybe there's a better way? I think you'd need some "death pill" to send > > into the stream that signals its termination. > > > > On Tue, May 24, 2016 at 7:29 PM, Vinay Patil <[hidden email]> > > wrote: > > > > > Hi, > > > > > > I am able to read from a topic using FlinkKafkaConsumer and return the > > > result, however when I am testing this scenario in Junit the result is > > > getting printed(kafkaStream.print()) but I am not able to exit the > Job, > > > env.execute keeps running, > > > I tried to return env.execute from method but that did not work either. > > > > > > 1) Is there any way to end the execution of job forcefully. > > > 2) How do I test if the data has come from topic > > > > > > - One way I think of is to get the output of stream.print() in a > > > PrintStream and check the result.(but not able to test this since > job > > is > > > not getting exited) > > > > > > Please help with these issues > > > > > > Regards, > > > Vinay Patil > > > > > > |
Hi!
On Flink 1.0, there is the "flink-test-utils_2.10" dependency that has a some useful things. The "SuccessException" seems a quite common thing - I have seen that in other infinite program tests as well (Google Dataflow / Beam) Another way you can architect tests is to have an element in the stream that signals end-of-stream. The DeserializationSchema can check for that and return "end of stream". Greetings, Stephan On Thu, May 26, 2016 at 11:55 AM, Vinay Patil <[hidden email]> wrote: > Hi Aljoscha, > > Thank you for answering. > Throwing SuccessException is a good idea , however when I am adding > following dependency, no classes are getting added to the jar: > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-tests_2.10</artifactId> > <version>1.0.3</version> > </dependency> > > Is there any other dependency that I have to add ? I have also added > test-utils dependency. > > I am trying the following in my test case : > 1) Consuming data from Kafka using FlinkKafkaConsumer and passing it to map > as Tuple2 > 2) In the map function I am just checking if Tuple2 contains data, if yes, > throw the exception("success") > 3) This way I am verifying that the configuration is correct and that we > are able to read from kafka. > > Am I doing it right, is there any better approach ? > > Regards, > Vinay Patil > > *+91-800-728-4749* > > On Thu, May 26, 2016 at 1:01 PM, Aljoscha Krettek <[hidden email]> > wrote: > > > Hi, > > what we are doing in most internal tests is to verify in a sink whether > the > > data is correct and then throw a SuccessException. This brings down the > job > > and we check whether we catch a SuccessException to verify that the test > > was successful. Look, for example, at the ValidatingSink in > > EventTimeWindowCheckpointingITCase in the Flink source. > > > > Cheers, > > Aljoscha > > > > On Thu, 26 May 2016 at 01:58 Nick Dimiduk <[hidden email]> wrote: > > > > > I'm also curious for a solution here. My test code executes the flow > > from a > > > separate thread. Once i've joined on all my producer threads and I've > > > verified the output, I simply interrupt the flow thread. This spews > > > exceptions, but it all appears to be harmless. > > > > > > Maybe there's a better way? I think you'd need some "death pill" to > send > > > into the stream that signals its termination. > > > > > > On Tue, May 24, 2016 at 7:29 PM, Vinay Patil <[hidden email]> > > > wrote: > > > > > > > Hi, > > > > > > > > I am able to read from a topic using FlinkKafkaConsumer and return > the > > > > result, however when I am testing this scenario in Junit the result > is > > > > getting printed(kafkaStream.print()) but I am not able to exit the > > Job, > > > > env.execute keeps running, > > > > I tried to return env.execute from method but that did not work > either. > > > > > > > > 1) Is there any way to end the execution of job forcefully. > > > > 2) How do I test if the data has come from topic > > > > > > > > - One way I think of is to get the output of stream.print() in a > > > > PrintStream and check the result.(but not able to test this since > > job > > > is > > > > not getting exited) > > > > > > > > Please help with these issues > > > > > > > > Regards, > > > > Vinay Patil > > > > > > > > > > |
Hi Stephan,
Yes using DeserializationSchema solution will definitely work. I am not able to get the dependency for SuccessException. Any help on this Regards, Vinay Patil *+91-800-728-4749* On Thu, May 26, 2016 at 3:32 PM, Stephan Ewen <[hidden email]> wrote: > Hi! > > On Flink 1.0, there is the "flink-test-utils_2.10" dependency that has a > some useful things. > > The "SuccessException" seems a quite common thing - I have seen that in > other infinite program tests as well (Google Dataflow / Beam) > > Another way you can architect tests is to have an element in the stream > that signals end-of-stream. The DeserializationSchema can check for that > and return "end of stream". > > Greetings, > Stephan > > > > On Thu, May 26, 2016 at 11:55 AM, Vinay Patil <[hidden email]> > wrote: > > > Hi Aljoscha, > > > > Thank you for answering. > > Throwing SuccessException is a good idea , however when I am adding > > following dependency, no classes are getting added to the jar: > > > > <dependency> > > <groupId>org.apache.flink</groupId> > > <artifactId>flink-tests_2.10</artifactId> > > <version>1.0.3</version> > > </dependency> > > > > Is there any other dependency that I have to add ? I have also added > > test-utils dependency. > > > > I am trying the following in my test case : > > 1) Consuming data from Kafka using FlinkKafkaConsumer and passing it to > map > > as Tuple2 > > 2) In the map function I am just checking if Tuple2 contains data, if > yes, > > throw the exception("success") > > 3) This way I am verifying that the configuration is correct and that we > > are able to read from kafka. > > > > Am I doing it right, is there any better approach ? > > > > Regards, > > Vinay Patil > > > > *+91-800-728-4749* > > > > On Thu, May 26, 2016 at 1:01 PM, Aljoscha Krettek <[hidden email]> > > wrote: > > > > > Hi, > > > what we are doing in most internal tests is to verify in a sink whether > > the > > > data is correct and then throw a SuccessException. This brings down the > > job > > > and we check whether we catch a SuccessException to verify that the > test > > > was successful. Look, for example, at the ValidatingSink in > > > EventTimeWindowCheckpointingITCase in the Flink source. > > > > > > Cheers, > > > Aljoscha > > > > > > On Thu, 26 May 2016 at 01:58 Nick Dimiduk <[hidden email]> wrote: > > > > > > > I'm also curious for a solution here. My test code executes the flow > > > from a > > > > separate thread. Once i've joined on all my producer threads and I've > > > > verified the output, I simply interrupt the flow thread. This spews > > > > exceptions, but it all appears to be harmless. > > > > > > > > Maybe there's a better way? I think you'd need some "death pill" to > > send > > > > into the stream that signals its termination. > > > > > > > > On Tue, May 24, 2016 at 7:29 PM, Vinay Patil < > [hidden email]> > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > I am able to read from a topic using FlinkKafkaConsumer and return > > the > > > > > result, however when I am testing this scenario in Junit the > result > > is > > > > > getting printed(kafkaStream.print()) but I am not able to exit the > > > Job, > > > > > env.execute keeps running, > > > > > I tried to return env.execute from method but that did not work > > either. > > > > > > > > > > 1) Is there any way to end the execution of job forcefully. > > > > > 2) How do I test if the data has come from topic > > > > > > > > > > - One way I think of is to get the output of stream.print() in a > > > > > PrintStream and check the result.(but not able to test this > since > > > job > > > > is > > > > > not getting exited) > > > > > > > > > > Please help with these issues > > > > > > > > > > Regards, > > > > > Vinay Patil > > > > > > > > > > > > > > > |
The SuccessException does not really have a dependency.
It is just a special Exception class that you throw in your code when you want to stop. The code that calls "env.execute()" catches the exception and checks whether the failure cause is that special exceptions. Flink propagates the exceptions from the workers back to the client. Greetings, Step On Thu, May 26, 2016 at 12:37 PM, Vinay Patil <[hidden email]> wrote: > Hi Stephan, > > Yes using DeserializationSchema solution will definitely work. > I am not able to get the dependency for SuccessException. > Any help on this > > Regards, > Vinay Patil > > *+91-800-728-4749* > > On Thu, May 26, 2016 at 3:32 PM, Stephan Ewen <[hidden email]> wrote: > > > Hi! > > > > On Flink 1.0, there is the "flink-test-utils_2.10" dependency that has a > > some useful things. > > > > The "SuccessException" seems a quite common thing - I have seen that in > > other infinite program tests as well (Google Dataflow / Beam) > > > > Another way you can architect tests is to have an element in the stream > > that signals end-of-stream. The DeserializationSchema can check for that > > and return "end of stream". > > > > Greetings, > > Stephan > > > > > > > > On Thu, May 26, 2016 at 11:55 AM, Vinay Patil <[hidden email]> > > wrote: > > > > > Hi Aljoscha, > > > > > > Thank you for answering. > > > Throwing SuccessException is a good idea , however when I am adding > > > following dependency, no classes are getting added to the jar: > > > > > > <dependency> > > > <groupId>org.apache.flink</groupId> > > > <artifactId>flink-tests_2.10</artifactId> > > > <version>1.0.3</version> > > > </dependency> > > > > > > Is there any other dependency that I have to add ? I have also added > > > test-utils dependency. > > > > > > I am trying the following in my test case : > > > 1) Consuming data from Kafka using FlinkKafkaConsumer and passing it to > > map > > > as Tuple2 > > > 2) In the map function I am just checking if Tuple2 contains data, if > > yes, > > > throw the exception("success") > > > 3) This way I am verifying that the configuration is correct and that > we > > > are able to read from kafka. > > > > > > Am I doing it right, is there any better approach ? > > > > > > Regards, > > > Vinay Patil > > > > > > *+91-800-728-4749* > > > > > > On Thu, May 26, 2016 at 1:01 PM, Aljoscha Krettek <[hidden email] > > > > > wrote: > > > > > > > Hi, > > > > what we are doing in most internal tests is to verify in a sink > whether > > > the > > > > data is correct and then throw a SuccessException. This brings down > the > > > job > > > > and we check whether we catch a SuccessException to verify that the > > test > > > > was successful. Look, for example, at the ValidatingSink in > > > > EventTimeWindowCheckpointingITCase in the Flink source. > > > > > > > > Cheers, > > > > Aljoscha > > > > > > > > On Thu, 26 May 2016 at 01:58 Nick Dimiduk <[hidden email]> > wrote: > > > > > > > > > I'm also curious for a solution here. My test code executes the > flow > > > > from a > > > > > separate thread. Once i've joined on all my producer threads and > I've > > > > > verified the output, I simply interrupt the flow thread. This spews > > > > > exceptions, but it all appears to be harmless. > > > > > > > > > > Maybe there's a better way? I think you'd need some "death pill" to > > > send > > > > > into the stream that signals its termination. > > > > > > > > > > On Tue, May 24, 2016 at 7:29 PM, Vinay Patil < > > [hidden email]> > > > > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > I am able to read from a topic using FlinkKafkaConsumer and > return > > > the > > > > > > result, however when I am testing this scenario in Junit the > > result > > > is > > > > > > getting printed(kafkaStream.print()) but I am not able to exit > the > > > > Job, > > > > > > env.execute keeps running, > > > > > > I tried to return env.execute from method but that did not work > > > either. > > > > > > > > > > > > 1) Is there any way to end the execution of job forcefully. > > > > > > 2) How do I test if the data has come from topic > > > > > > > > > > > > - One way I think of is to get the output of stream.print() > in a > > > > > > PrintStream and check the result.(but not able to test this > > since > > > > job > > > > > is > > > > > > not getting exited) > > > > > > > > > > > > Please help with these issues > > > > > > > > > > > > Regards, > > > > > > Vinay Patil > > > > > > > > > > > > > > > > > > > > > |
Yeah understood.
Thank you for helping guys. Regards, Vinay Patil *+91-800-728-4749* On Thu, May 26, 2016 at 5:40 PM, Stephan Ewen <[hidden email]> wrote: > The SuccessException does not really have a dependency. > > It is just a special Exception class that you throw in your code when you > want to stop. > The code that calls "env.execute()" catches the exception and checks > whether the failure cause is that special exceptions. > Flink propagates the exceptions from the workers back to the client. > > Greetings, > Step > > On Thu, May 26, 2016 at 12:37 PM, Vinay Patil <[hidden email]> > wrote: > > > Hi Stephan, > > > > Yes using DeserializationSchema solution will definitely work. > > I am not able to get the dependency for SuccessException. > > Any help on this > > > > Regards, > > Vinay Patil > > > > *+91-800-728-4749* > > > > On Thu, May 26, 2016 at 3:32 PM, Stephan Ewen <[hidden email]> wrote: > > > > > Hi! > > > > > > On Flink 1.0, there is the "flink-test-utils_2.10" dependency that has > a > > > some useful things. > > > > > > The "SuccessException" seems a quite common thing - I have seen that in > > > other infinite program tests as well (Google Dataflow / Beam) > > > > > > Another way you can architect tests is to have an element in the stream > > > that signals end-of-stream. The DeserializationSchema can check for > that > > > and return "end of stream". > > > > > > Greetings, > > > Stephan > > > > > > > > > > > > On Thu, May 26, 2016 at 11:55 AM, Vinay Patil <[hidden email] > > > > > wrote: > > > > > > > Hi Aljoscha, > > > > > > > > Thank you for answering. > > > > Throwing SuccessException is a good idea , however when I am adding > > > > following dependency, no classes are getting added to the jar: > > > > > > > > <dependency> > > > > <groupId>org.apache.flink</groupId> > > > > <artifactId>flink-tests_2.10</artifactId> > > > > <version>1.0.3</version> > > > > </dependency> > > > > > > > > Is there any other dependency that I have to add ? I have also added > > > > test-utils dependency. > > > > > > > > I am trying the following in my test case : > > > > 1) Consuming data from Kafka using FlinkKafkaConsumer and passing it > to > > > map > > > > as Tuple2 > > > > 2) In the map function I am just checking if Tuple2 contains data, if > > > yes, > > > > throw the exception("success") > > > > 3) This way I am verifying that the configuration is correct and that > > we > > > > are able to read from kafka. > > > > > > > > Am I doing it right, is there any better approach ? > > > > > > > > Regards, > > > > Vinay Patil > > > > > > > > *+91-800-728-4749* > > > > > > > > On Thu, May 26, 2016 at 1:01 PM, Aljoscha Krettek < > [hidden email] > > > > > > > wrote: > > > > > > > > > Hi, > > > > > what we are doing in most internal tests is to verify in a sink > > whether > > > > the > > > > > data is correct and then throw a SuccessException. This brings down > > the > > > > job > > > > > and we check whether we catch a SuccessException to verify that the > > > test > > > > > was successful. Look, for example, at the ValidatingSink in > > > > > EventTimeWindowCheckpointingITCase in the Flink source. > > > > > > > > > > Cheers, > > > > > Aljoscha > > > > > > > > > > On Thu, 26 May 2016 at 01:58 Nick Dimiduk <[hidden email]> > > wrote: > > > > > > > > > > > I'm also curious for a solution here. My test code executes the > > flow > > > > > from a > > > > > > separate thread. Once i've joined on all my producer threads and > > I've > > > > > > verified the output, I simply interrupt the flow thread. This > spews > > > > > > exceptions, but it all appears to be harmless. > > > > > > > > > > > > Maybe there's a better way? I think you'd need some "death pill" > to > > > > send > > > > > > into the stream that signals its termination. > > > > > > > > > > > > On Tue, May 24, 2016 at 7:29 PM, Vinay Patil < > > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > I am able to read from a topic using FlinkKafkaConsumer and > > return > > > > the > > > > > > > result, however when I am testing this scenario in Junit the > > > result > > > > is > > > > > > > getting printed(kafkaStream.print()) but I am not able to exit > > the > > > > > Job, > > > > > > > env.execute keeps running, > > > > > > > I tried to return env.execute from method but that did not work > > > > either. > > > > > > > > > > > > > > 1) Is there any way to end the execution of job forcefully. > > > > > > > 2) How do I test if the data has come from topic > > > > > > > > > > > > > > - One way I think of is to get the output of stream.print() > > in a > > > > > > > PrintStream and check the result.(but not able to test this > > > since > > > > > job > > > > > > is > > > > > > > not getting exited) > > > > > > > > > > > > > > Please help with these issues > > > > > > > > > > > > > > Regards, > > > > > > > Vinay Patil > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |