Junit Issue while testing Kafka Source

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Junit Issue while testing Kafka Source

Vinay Patil
Hi,

I am able to read from a topic using FlinkKafkaConsumer and return the
result, however  when I am testing this scenario in Junit the result is
getting printed(kafkaStream.print()) but  I am not able to exit the Job,
env.execute keeps running,
I tried to return env.execute from method but that did not work either.

1) Is there any way to end the execution of job forcefully.
2) How do I test if the data has come from topic

   - One way I think of is to get the output of stream.print() in a
   PrintStream and check the result.(but not able to test this since job is
   not getting exited)

Please help with these issues

Regards,
Vinay Patil
Reply | Threaded
Open this post in threaded view
|

Re: Junit Issue while testing Kafka Source

Nick Dimiduk
I'm also curious for a solution here. My test code executes the flow from a
separate thread. Once i've joined on all my producer threads and I've
verified the output, I simply interrupt the flow thread. This spews
exceptions, but it all appears to be harmless.

Maybe there's a better way? I think you'd need some "death pill" to send
into the stream that signals its termination.

On Tue, May 24, 2016 at 7:29 PM, Vinay Patil <[hidden email]>
wrote:

> Hi,
>
> I am able to read from a topic using FlinkKafkaConsumer and return the
> result, however  when I am testing this scenario in Junit the result is
> getting printed(kafkaStream.print()) but  I am not able to exit the Job,
> env.execute keeps running,
> I tried to return env.execute from method but that did not work either.
>
> 1) Is there any way to end the execution of job forcefully.
> 2) How do I test if the data has come from topic
>
>    - One way I think of is to get the output of stream.print() in a
>    PrintStream and check the result.(but not able to test this since job is
>    not getting exited)
>
> Please help with these issues
>
> Regards,
> Vinay Patil
>
Reply | Threaded
Open this post in threaded view
|

Re: Junit Issue while testing Kafka Source

Aljoscha Krettek-2
Hi,
what we are doing in most internal tests is to verify in a sink whether the
data is correct and then throw a SuccessException. This brings down the job
and we check whether we catch a SuccessException to verify that the test
was successful. Look, for example, at the ValidatingSink in
EventTimeWindowCheckpointingITCase in the Flink source.

Cheers,
Aljoscha

On Thu, 26 May 2016 at 01:58 Nick Dimiduk <[hidden email]> wrote:

> I'm also curious for a solution here. My test code executes the flow from a
> separate thread. Once i've joined on all my producer threads and I've
> verified the output, I simply interrupt the flow thread. This spews
> exceptions, but it all appears to be harmless.
>
> Maybe there's a better way? I think you'd need some "death pill" to send
> into the stream that signals its termination.
>
> On Tue, May 24, 2016 at 7:29 PM, Vinay Patil <[hidden email]>
> wrote:
>
> > Hi,
> >
> > I am able to read from a topic using FlinkKafkaConsumer and return the
> > result, however  when I am testing this scenario in Junit the result is
> > getting printed(kafkaStream.print()) but  I am not able to exit the Job,
> > env.execute keeps running,
> > I tried to return env.execute from method but that did not work either.
> >
> > 1) Is there any way to end the execution of job forcefully.
> > 2) How do I test if the data has come from topic
> >
> >    - One way I think of is to get the output of stream.print() in a
> >    PrintStream and check the result.(but not able to test this since job
> is
> >    not getting exited)
> >
> > Please help with these issues
> >
> > Regards,
> > Vinay Patil
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Junit Issue while testing Kafka Source

Vinay Patil
Hi Aljoscha,

Thank you for answering.
Throwing SuccessException is a good idea , however when I am adding
following dependency, no classes are getting added to the jar:

               <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests_2.10</artifactId>
<version>1.0.3</version>
</dependency>

Is there any other dependency that I have to add ? I have also added
test-utils dependency.

I am trying the following in my test case :
1) Consuming data from Kafka using FlinkKafkaConsumer and passing it to map
as Tuple2
2) In the map function I am just checking if Tuple2 contains data, if yes,
throw the exception("success")
3) This way I am verifying that the configuration is correct and that we
are able to read from kafka.

Am I doing it right, is there any better approach ?

Regards,
Vinay Patil

*+91-800-728-4749*

On Thu, May 26, 2016 at 1:01 PM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> what we are doing in most internal tests is to verify in a sink whether the
> data is correct and then throw a SuccessException. This brings down the job
> and we check whether we catch a SuccessException to verify that the test
> was successful. Look, for example, at the ValidatingSink in
> EventTimeWindowCheckpointingITCase in the Flink source.
>
> Cheers,
> Aljoscha
>
> On Thu, 26 May 2016 at 01:58 Nick Dimiduk <[hidden email]> wrote:
>
> > I'm also curious for a solution here. My test code executes the flow
> from a
> > separate thread. Once i've joined on all my producer threads and I've
> > verified the output, I simply interrupt the flow thread. This spews
> > exceptions, but it all appears to be harmless.
> >
> > Maybe there's a better way? I think you'd need some "death pill" to send
> > into the stream that signals its termination.
> >
> > On Tue, May 24, 2016 at 7:29 PM, Vinay Patil <[hidden email]>
> > wrote:
> >
> > > Hi,
> > >
> > > I am able to read from a topic using FlinkKafkaConsumer and return the
> > > result, however  when I am testing this scenario in Junit the result is
> > > getting printed(kafkaStream.print()) but  I am not able to exit the
> Job,
> > > env.execute keeps running,
> > > I tried to return env.execute from method but that did not work either.
> > >
> > > 1) Is there any way to end the execution of job forcefully.
> > > 2) How do I test if the data has come from topic
> > >
> > >    - One way I think of is to get the output of stream.print() in a
> > >    PrintStream and check the result.(but not able to test this since
> job
> > is
> > >    not getting exited)
> > >
> > > Please help with these issues
> > >
> > > Regards,
> > > Vinay Patil
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Junit Issue while testing Kafka Source

Stephan Ewen
Hi!

On Flink 1.0, there is the "flink-test-utils_2.10" dependency that has a
some useful things.

The "SuccessException" seems a quite common thing - I have seen that in
other infinite program tests as well (Google Dataflow / Beam)

Another way you can architect tests is to have an element in the stream
that signals end-of-stream. The DeserializationSchema can check for that
and return "end of stream".

Greetings,
Stephan



On Thu, May 26, 2016 at 11:55 AM, Vinay Patil <[hidden email]>
wrote:

> Hi Aljoscha,
>
> Thank you for answering.
> Throwing SuccessException is a good idea , however when I am adding
> following dependency, no classes are getting added to the jar:
>
>                <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-tests_2.10</artifactId>
> <version>1.0.3</version>
> </dependency>
>
> Is there any other dependency that I have to add ? I have also added
> test-utils dependency.
>
> I am trying the following in my test case :
> 1) Consuming data from Kafka using FlinkKafkaConsumer and passing it to map
> as Tuple2
> 2) In the map function I am just checking if Tuple2 contains data, if yes,
> throw the exception("success")
> 3) This way I am verifying that the configuration is correct and that we
> are able to read from kafka.
>
> Am I doing it right, is there any better approach ?
>
> Regards,
> Vinay Patil
>
> *+91-800-728-4749*
>
> On Thu, May 26, 2016 at 1:01 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Hi,
> > what we are doing in most internal tests is to verify in a sink whether
> the
> > data is correct and then throw a SuccessException. This brings down the
> job
> > and we check whether we catch a SuccessException to verify that the test
> > was successful. Look, for example, at the ValidatingSink in
> > EventTimeWindowCheckpointingITCase in the Flink source.
> >
> > Cheers,
> > Aljoscha
> >
> > On Thu, 26 May 2016 at 01:58 Nick Dimiduk <[hidden email]> wrote:
> >
> > > I'm also curious for a solution here. My test code executes the flow
> > from a
> > > separate thread. Once i've joined on all my producer threads and I've
> > > verified the output, I simply interrupt the flow thread. This spews
> > > exceptions, but it all appears to be harmless.
> > >
> > > Maybe there's a better way? I think you'd need some "death pill" to
> send
> > > into the stream that signals its termination.
> > >
> > > On Tue, May 24, 2016 at 7:29 PM, Vinay Patil <[hidden email]>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I am able to read from a topic using FlinkKafkaConsumer and return
> the
> > > > result, however  when I am testing this scenario in Junit the result
> is
> > > > getting printed(kafkaStream.print()) but  I am not able to exit the
> > Job,
> > > > env.execute keeps running,
> > > > I tried to return env.execute from method but that did not work
> either.
> > > >
> > > > 1) Is there any way to end the execution of job forcefully.
> > > > 2) How do I test if the data has come from topic
> > > >
> > > >    - One way I think of is to get the output of stream.print() in a
> > > >    PrintStream and check the result.(but not able to test this since
> > job
> > > is
> > > >    not getting exited)
> > > >
> > > > Please help with these issues
> > > >
> > > > Regards,
> > > > Vinay Patil
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Junit Issue while testing Kafka Source

Vinay Patil
Hi Stephan,

Yes using DeserializationSchema solution will definitely work.
I am not able to get the dependency for SuccessException.
Any help on this

Regards,
Vinay Patil

*+91-800-728-4749*

On Thu, May 26, 2016 at 3:32 PM, Stephan Ewen <[hidden email]> wrote:

> Hi!
>
> On Flink 1.0, there is the "flink-test-utils_2.10" dependency that has a
> some useful things.
>
> The "SuccessException" seems a quite common thing - I have seen that in
> other infinite program tests as well (Google Dataflow / Beam)
>
> Another way you can architect tests is to have an element in the stream
> that signals end-of-stream. The DeserializationSchema can check for that
> and return "end of stream".
>
> Greetings,
> Stephan
>
>
>
> On Thu, May 26, 2016 at 11:55 AM, Vinay Patil <[hidden email]>
> wrote:
>
> > Hi Aljoscha,
> >
> > Thank you for answering.
> > Throwing SuccessException is a good idea , however when I am adding
> > following dependency, no classes are getting added to the jar:
> >
> >                <dependency>
> > <groupId>org.apache.flink</groupId>
> > <artifactId>flink-tests_2.10</artifactId>
> > <version>1.0.3</version>
> > </dependency>
> >
> > Is there any other dependency that I have to add ? I have also added
> > test-utils dependency.
> >
> > I am trying the following in my test case :
> > 1) Consuming data from Kafka using FlinkKafkaConsumer and passing it to
> map
> > as Tuple2
> > 2) In the map function I am just checking if Tuple2 contains data, if
> yes,
> > throw the exception("success")
> > 3) This way I am verifying that the configuration is correct and that we
> > are able to read from kafka.
> >
> > Am I doing it right, is there any better approach ?
> >
> > Regards,
> > Vinay Patil
> >
> > *+91-800-728-4749*
> >
> > On Thu, May 26, 2016 at 1:01 PM, Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > Hi,
> > > what we are doing in most internal tests is to verify in a sink whether
> > the
> > > data is correct and then throw a SuccessException. This brings down the
> > job
> > > and we check whether we catch a SuccessException to verify that the
> test
> > > was successful. Look, for example, at the ValidatingSink in
> > > EventTimeWindowCheckpointingITCase in the Flink source.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Thu, 26 May 2016 at 01:58 Nick Dimiduk <[hidden email]> wrote:
> > >
> > > > I'm also curious for a solution here. My test code executes the flow
> > > from a
> > > > separate thread. Once i've joined on all my producer threads and I've
> > > > verified the output, I simply interrupt the flow thread. This spews
> > > > exceptions, but it all appears to be harmless.
> > > >
> > > > Maybe there's a better way? I think you'd need some "death pill" to
> > send
> > > > into the stream that signals its termination.
> > > >
> > > > On Tue, May 24, 2016 at 7:29 PM, Vinay Patil <
> [hidden email]>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I am able to read from a topic using FlinkKafkaConsumer and return
> > the
> > > > > result, however  when I am testing this scenario in Junit the
> result
> > is
> > > > > getting printed(kafkaStream.print()) but  I am not able to exit the
> > > Job,
> > > > > env.execute keeps running,
> > > > > I tried to return env.execute from method but that did not work
> > either.
> > > > >
> > > > > 1) Is there any way to end the execution of job forcefully.
> > > > > 2) How do I test if the data has come from topic
> > > > >
> > > > >    - One way I think of is to get the output of stream.print() in a
> > > > >    PrintStream and check the result.(but not able to test this
> since
> > > job
> > > > is
> > > > >    not getting exited)
> > > > >
> > > > > Please help with these issues
> > > > >
> > > > > Regards,
> > > > > Vinay Patil
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Junit Issue while testing Kafka Source

Stephan Ewen
The SuccessException does not really have a dependency.

It is just a special Exception class that you throw in your code when you
want to stop.
The code that calls "env.execute()" catches the exception and checks
whether the failure cause is that special exceptions.
Flink propagates the exceptions from the workers back to the client.

Greetings,
Step

On Thu, May 26, 2016 at 12:37 PM, Vinay Patil <[hidden email]>
wrote:

> Hi Stephan,
>
> Yes using DeserializationSchema solution will definitely work.
> I am not able to get the dependency for SuccessException.
> Any help on this
>
> Regards,
> Vinay Patil
>
> *+91-800-728-4749*
>
> On Thu, May 26, 2016 at 3:32 PM, Stephan Ewen <[hidden email]> wrote:
>
> > Hi!
> >
> > On Flink 1.0, there is the "flink-test-utils_2.10" dependency that has a
> > some useful things.
> >
> > The "SuccessException" seems a quite common thing - I have seen that in
> > other infinite program tests as well (Google Dataflow / Beam)
> >
> > Another way you can architect tests is to have an element in the stream
> > that signals end-of-stream. The DeserializationSchema can check for that
> > and return "end of stream".
> >
> > Greetings,
> > Stephan
> >
> >
> >
> > On Thu, May 26, 2016 at 11:55 AM, Vinay Patil <[hidden email]>
> > wrote:
> >
> > > Hi Aljoscha,
> > >
> > > Thank you for answering.
> > > Throwing SuccessException is a good idea , however when I am adding
> > > following dependency, no classes are getting added to the jar:
> > >
> > >                <dependency>
> > > <groupId>org.apache.flink</groupId>
> > > <artifactId>flink-tests_2.10</artifactId>
> > > <version>1.0.3</version>
> > > </dependency>
> > >
> > > Is there any other dependency that I have to add ? I have also added
> > > test-utils dependency.
> > >
> > > I am trying the following in my test case :
> > > 1) Consuming data from Kafka using FlinkKafkaConsumer and passing it to
> > map
> > > as Tuple2
> > > 2) In the map function I am just checking if Tuple2 contains data, if
> > yes,
> > > throw the exception("success")
> > > 3) This way I am verifying that the configuration is correct and that
> we
> > > are able to read from kafka.
> > >
> > > Am I doing it right, is there any better approach ?
> > >
> > > Regards,
> > > Vinay Patil
> > >
> > > *+91-800-728-4749*
> > >
> > > On Thu, May 26, 2016 at 1:01 PM, Aljoscha Krettek <[hidden email]
> >
> > > wrote:
> > >
> > > > Hi,
> > > > what we are doing in most internal tests is to verify in a sink
> whether
> > > the
> > > > data is correct and then throw a SuccessException. This brings down
> the
> > > job
> > > > and we check whether we catch a SuccessException to verify that the
> > test
> > > > was successful. Look, for example, at the ValidatingSink in
> > > > EventTimeWindowCheckpointingITCase in the Flink source.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Thu, 26 May 2016 at 01:58 Nick Dimiduk <[hidden email]>
> wrote:
> > > >
> > > > > I'm also curious for a solution here. My test code executes the
> flow
> > > > from a
> > > > > separate thread. Once i've joined on all my producer threads and
> I've
> > > > > verified the output, I simply interrupt the flow thread. This spews
> > > > > exceptions, but it all appears to be harmless.
> > > > >
> > > > > Maybe there's a better way? I think you'd need some "death pill" to
> > > send
> > > > > into the stream that signals its termination.
> > > > >
> > > > > On Tue, May 24, 2016 at 7:29 PM, Vinay Patil <
> > [hidden email]>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I am able to read from a topic using FlinkKafkaConsumer and
> return
> > > the
> > > > > > result, however  when I am testing this scenario in Junit the
> > result
> > > is
> > > > > > getting printed(kafkaStream.print()) but  I am not able to exit
> the
> > > > Job,
> > > > > > env.execute keeps running,
> > > > > > I tried to return env.execute from method but that did not work
> > > either.
> > > > > >
> > > > > > 1) Is there any way to end the execution of job forcefully.
> > > > > > 2) How do I test if the data has come from topic
> > > > > >
> > > > > >    - One way I think of is to get the output of stream.print()
> in a
> > > > > >    PrintStream and check the result.(but not able to test this
> > since
> > > > job
> > > > > is
> > > > > >    not getting exited)
> > > > > >
> > > > > > Please help with these issues
> > > > > >
> > > > > > Regards,
> > > > > > Vinay Patil
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Junit Issue while testing Kafka Source

Vinay Patil
Yeah understood.
Thank you for helping guys.

Regards,
Vinay Patil

*+91-800-728-4749*

On Thu, May 26, 2016 at 5:40 PM, Stephan Ewen <[hidden email]> wrote:

> The SuccessException does not really have a dependency.
>
> It is just a special Exception class that you throw in your code when you
> want to stop.
> The code that calls "env.execute()" catches the exception and checks
> whether the failure cause is that special exceptions.
> Flink propagates the exceptions from the workers back to the client.
>
> Greetings,
> Step
>
> On Thu, May 26, 2016 at 12:37 PM, Vinay Patil <[hidden email]>
> wrote:
>
> > Hi Stephan,
> >
> > Yes using DeserializationSchema solution will definitely work.
> > I am not able to get the dependency for SuccessException.
> > Any help on this
> >
> > Regards,
> > Vinay Patil
> >
> > *+91-800-728-4749*
> >
> > On Thu, May 26, 2016 at 3:32 PM, Stephan Ewen <[hidden email]> wrote:
> >
> > > Hi!
> > >
> > > On Flink 1.0, there is the "flink-test-utils_2.10" dependency that has
> a
> > > some useful things.
> > >
> > > The "SuccessException" seems a quite common thing - I have seen that in
> > > other infinite program tests as well (Google Dataflow / Beam)
> > >
> > > Another way you can architect tests is to have an element in the stream
> > > that signals end-of-stream. The DeserializationSchema can check for
> that
> > > and return "end of stream".
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > >
> > > On Thu, May 26, 2016 at 11:55 AM, Vinay Patil <[hidden email]
> >
> > > wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > Thank you for answering.
> > > > Throwing SuccessException is a good idea , however when I am adding
> > > > following dependency, no classes are getting added to the jar:
> > > >
> > > >                <dependency>
> > > > <groupId>org.apache.flink</groupId>
> > > > <artifactId>flink-tests_2.10</artifactId>
> > > > <version>1.0.3</version>
> > > > </dependency>
> > > >
> > > > Is there any other dependency that I have to add ? I have also added
> > > > test-utils dependency.
> > > >
> > > > I am trying the following in my test case :
> > > > 1) Consuming data from Kafka using FlinkKafkaConsumer and passing it
> to
> > > map
> > > > as Tuple2
> > > > 2) In the map function I am just checking if Tuple2 contains data, if
> > > yes,
> > > > throw the exception("success")
> > > > 3) This way I am verifying that the configuration is correct and that
> > we
> > > > are able to read from kafka.
> > > >
> > > > Am I doing it right, is there any better approach ?
> > > >
> > > > Regards,
> > > > Vinay Patil
> > > >
> > > > *+91-800-728-4749*
> > > >
> > > > On Thu, May 26, 2016 at 1:01 PM, Aljoscha Krettek <
> [hidden email]
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > what we are doing in most internal tests is to verify in a sink
> > whether
> > > > the
> > > > > data is correct and then throw a SuccessException. This brings down
> > the
> > > > job
> > > > > and we check whether we catch a SuccessException to verify that the
> > > test
> > > > > was successful. Look, for example, at the ValidatingSink in
> > > > > EventTimeWindowCheckpointingITCase in the Flink source.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Thu, 26 May 2016 at 01:58 Nick Dimiduk <[hidden email]>
> > wrote:
> > > > >
> > > > > > I'm also curious for a solution here. My test code executes the
> > flow
> > > > > from a
> > > > > > separate thread. Once i've joined on all my producer threads and
> > I've
> > > > > > verified the output, I simply interrupt the flow thread. This
> spews
> > > > > > exceptions, but it all appears to be harmless.
> > > > > >
> > > > > > Maybe there's a better way? I think you'd need some "death pill"
> to
> > > > send
> > > > > > into the stream that signals its termination.
> > > > > >
> > > > > > On Tue, May 24, 2016 at 7:29 PM, Vinay Patil <
> > > [hidden email]>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I am able to read from a topic using FlinkKafkaConsumer and
> > return
> > > > the
> > > > > > > result, however  when I am testing this scenario in Junit the
> > > result
> > > > is
> > > > > > > getting printed(kafkaStream.print()) but  I am not able to exit
> > the
> > > > > Job,
> > > > > > > env.execute keeps running,
> > > > > > > I tried to return env.execute from method but that did not work
> > > > either.
> > > > > > >
> > > > > > > 1) Is there any way to end the execution of job forcefully.
> > > > > > > 2) How do I test if the data has come from topic
> > > > > > >
> > > > > > >    - One way I think of is to get the output of stream.print()
> > in a
> > > > > > >    PrintStream and check the result.(but not able to test this
> > > since
> > > > > job
> > > > > > is
> > > > > > >    not getting exited)
> > > > > > >
> > > > > > > Please help with these issues
> > > > > > >
> > > > > > > Regards,
> > > > > > > Vinay Patil
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>