Hello,
I am working on building Apache Ignite connector for Apache flink. I am currently developing the SourceFunction to consume Cache event from Ignite cluster. Here is the PR https://github.com/apache/ignite/pull/870/files I am observing that during unit tests the IgniteSource instances are different which is created using the IgniteSource constructor and inside run() method. As a result when igniteSrc.cancel() is called the igniteSrc instance is not getting stopped. I wanted to discuss: 1. If Flink create a copy of IgniteSource object when env.addSource(igniteSrc) is called? A quick work around the problem is to use static boolean stopped variable which allows single IgniteSource instance but this limits using multiple IgniteSource with different cache combination. Regards Saikat |
Hi,
Flink serializes all user functions (including source functions) with Java Serialization to ship them to the worker processes. That's also why everything in a user function must be Serializable. There is not an easy way to synchronize running tasks. Each task has its own function object and these might be distributed across different JVMs. So even a static field won't help here. Best, Fabian 2016-12-22 9:12 GMT+01:00 Saikat Maitra <[hidden email]>: > Hello, > > I am working on building Apache Ignite connector for Apache flink. I am > currently developing the SourceFunction to consume Cache event from Ignite > cluster. > > Here is the PR https://github.com/apache/ignite/pull/870/files > > I am observing that during unit tests the IgniteSource instances are > different which is created using the IgniteSource constructor and inside > run() method. As a result when igniteSrc.cancel() is called the igniteSrc > instance is not getting stopped. > > I wanted to discuss: > > 1. If Flink create a copy of IgniteSource object when > env.addSource(igniteSrc) > is called? > > A quick work around the problem is to use static boolean stopped variable > which allows single IgniteSource instance but this limits using multiple > IgniteSource with different cache combination. > > > Regards > Saikat > |
Hi Saikat,
there is already a connector for Ignite and Flink in the Apache Ignite project: https://github.com/apache/ignite/tree/master/modules/flink Maybe you can contribute your Ignite source to that project as well. Regards, Robert On Thu, Dec 22, 2016 at 10:04 AM, Fabian Hueske <[hidden email]> wrote: > Hi, > > Flink serializes all user functions (including source functions) with Java > Serialization to ship them to the worker processes. > That's also why everything in a user function must be Serializable. > > There is not an easy way to synchronize running tasks. Each task has its > own function object and these might be distributed across different JVMs. > So even a static field won't help here. > > Best, Fabian > > 2016-12-22 9:12 GMT+01:00 Saikat Maitra <[hidden email]>: > > > Hello, > > > > I am working on building Apache Ignite connector for Apache flink. I am > > currently developing the SourceFunction to consume Cache event from > Ignite > > cluster. > > > > Here is the PR https://github.com/apache/ignite/pull/870/files > > > > I am observing that during unit tests the IgniteSource instances are > > different which is created using the IgniteSource constructor and inside > > run() method. As a result when igniteSrc.cancel() is called the igniteSrc > > instance is not getting stopped. > > > > I wanted to discuss: > > > > 1. If Flink create a copy of IgniteSource object when > > env.addSource(igniteSrc) > > is called? > > > > A quick work around the problem is to use static boolean stopped variable > > which allows single IgniteSource instance but this limits using multiple > > IgniteSource with different cache combination. > > > > > > Regards > > Saikat > > > |
Hi Robert
Yes, I intend to commit the Ignite Source module as part of this jira ticket. https://issues.apache.org/jira/browse/IGNITE-3303 I am trying to resolve an issue specific to cancelling the Source function. Flink serializes and distributes source function and I can see in debugger different instances of Source function running in StreamExecutionEnvironment. During unit tests post execution of assert I want to cancel the Source function but I can notice that the initial Source object created and the current Source object are different and as a result the Source functions is not getting cancelled causing a timeout exception in the unit tests. Regards, Saikat On Sat, Dec 24, 2016 at 1:45 PM, Robert Metzger <[hidden email]> wrote: > Hi Saikat, > > there is already a connector for Ignite and Flink in the Apache Ignite > project: https://github.com/apache/ignite/tree/master/modules/flink > Maybe you can contribute your Ignite source to that project as well. > > Regards, > Robert > > > On Thu, Dec 22, 2016 at 10:04 AM, Fabian Hueske <[hidden email]> wrote: > > > Hi, > > > > Flink serializes all user functions (including source functions) with > Java > > Serialization to ship them to the worker processes. > > That's also why everything in a user function must be Serializable. > > > > There is not an easy way to synchronize running tasks. Each task has its > > own function object and these might be distributed across different JVMs. > > So even a static field won't help here. > > > > Best, Fabian > > > > 2016-12-22 9:12 GMT+01:00 Saikat Maitra <[hidden email]>: > > > > > Hello, > > > > > > I am working on building Apache Ignite connector for Apache flink. I am > > > currently developing the SourceFunction to consume Cache event from > > Ignite > > > cluster. > > > > > > Here is the PR https://github.com/apache/ignite/pull/870/files > > > > > > I am observing that during unit tests the IgniteSource instances are > > > different which is created using the IgniteSource constructor and > inside > > > run() method. As a result when igniteSrc.cancel() is called the > igniteSrc > > > instance is not getting stopped. > > > > > > I wanted to discuss: > > > > > > 1. If Flink create a copy of IgniteSource object when > > > env.addSource(igniteSrc) > > > is called? > > > > > > A quick work around the problem is to use static boolean stopped > variable > > > which allows single IgniteSource instance but this limits using > multiple > > > IgniteSource with different cache combination. > > > > > > > > > Regards > > > Saikat > > > > > > |
Free forum by Nabble | Edit this page |