Information regarding RichParallelSourceFunction

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Information regarding RichParallelSourceFunction

Saikat Maitra
Hello,

I am working on building Apache Ignite connector for Apache flink. I am
currently developing the SourceFunction to consume Cache event from Ignite
cluster.

Here is the PR https://github.com/apache/ignite/pull/870/files

I am observing that during unit tests the IgniteSource instances are
different which is created using the IgniteSource constructor and inside
run() method. As a result when igniteSrc.cancel() is called the igniteSrc
instance is not getting stopped.

I wanted to discuss:

1. If Flink create a copy of IgniteSource object when env.addSource(igniteSrc)
is called?

A quick work around the problem is to use static boolean stopped variable
which allows single IgniteSource instance but this limits using multiple
IgniteSource with different cache combination.


Regards
Saikat
Reply | Threaded
Open this post in threaded view
|

Re: Information regarding RichParallelSourceFunction

Fabian Hueske-2
Hi,

Flink serializes all user functions (including source functions) with Java
Serialization to ship them to the worker processes.
That's also why everything in a user function must be Serializable.

There is not an easy way to synchronize running tasks. Each task has its
own function object and these might be distributed across different JVMs.
So even a static field won't help here.

Best, Fabian

2016-12-22 9:12 GMT+01:00 Saikat Maitra <[hidden email]>:

> Hello,
>
> I am working on building Apache Ignite connector for Apache flink. I am
> currently developing the SourceFunction to consume Cache event from Ignite
> cluster.
>
> Here is the PR https://github.com/apache/ignite/pull/870/files
>
> I am observing that during unit tests the IgniteSource instances are
> different which is created using the IgniteSource constructor and inside
> run() method. As a result when igniteSrc.cancel() is called the igniteSrc
> instance is not getting stopped.
>
> I wanted to discuss:
>
> 1. If Flink create a copy of IgniteSource object when
> env.addSource(igniteSrc)
> is called?
>
> A quick work around the problem is to use static boolean stopped variable
> which allows single IgniteSource instance but this limits using multiple
> IgniteSource with different cache combination.
>
>
> Regards
> Saikat
>
Reply | Threaded
Open this post in threaded view
|

Re: Information regarding RichParallelSourceFunction

Robert Metzger
Hi Saikat,

there is already a connector for Ignite and Flink in the Apache Ignite
project: https://github.com/apache/ignite/tree/master/modules/flink
Maybe you can contribute your Ignite source to that project as well.

Regards,
Robert


On Thu, Dec 22, 2016 at 10:04 AM, Fabian Hueske <[hidden email]> wrote:

> Hi,
>
> Flink serializes all user functions (including source functions) with Java
> Serialization to ship them to the worker processes.
> That's also why everything in a user function must be Serializable.
>
> There is not an easy way to synchronize running tasks. Each task has its
> own function object and these might be distributed across different JVMs.
> So even a static field won't help here.
>
> Best, Fabian
>
> 2016-12-22 9:12 GMT+01:00 Saikat Maitra <[hidden email]>:
>
> > Hello,
> >
> > I am working on building Apache Ignite connector for Apache flink. I am
> > currently developing the SourceFunction to consume Cache event from
> Ignite
> > cluster.
> >
> > Here is the PR https://github.com/apache/ignite/pull/870/files
> >
> > I am observing that during unit tests the IgniteSource instances are
> > different which is created using the IgniteSource constructor and inside
> > run() method. As a result when igniteSrc.cancel() is called the igniteSrc
> > instance is not getting stopped.
> >
> > I wanted to discuss:
> >
> > 1. If Flink create a copy of IgniteSource object when
> > env.addSource(igniteSrc)
> > is called?
> >
> > A quick work around the problem is to use static boolean stopped variable
> > which allows single IgniteSource instance but this limits using multiple
> > IgniteSource with different cache combination.
> >
> >
> > Regards
> > Saikat
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Information regarding RichParallelSourceFunction

Saikat Maitra
Hi Robert

Yes, I intend to commit the Ignite Source module as part of this jira
ticket.

https://issues.apache.org/jira/browse/IGNITE-3303

I am trying to resolve an issue specific to cancelling the Source function.
Flink serializes and distributes source function and I can see in debugger
different instances of Source function running in
StreamExecutionEnvironment.

During unit tests post execution of assert I want to cancel the Source
function but I can notice that the initial Source object created and the
current Source object are different and as a result the Source functions is
not getting cancelled causing a timeout exception in the unit tests.

Regards,
Saikat


On Sat, Dec 24, 2016 at 1:45 PM, Robert Metzger <[hidden email]> wrote:

> Hi Saikat,
>
> there is already a connector for Ignite and Flink in the Apache Ignite
> project: https://github.com/apache/ignite/tree/master/modules/flink
> Maybe you can contribute your Ignite source to that project as well.
>
> Regards,
> Robert
>
>
> On Thu, Dec 22, 2016 at 10:04 AM, Fabian Hueske <[hidden email]> wrote:
>
> > Hi,
> >
> > Flink serializes all user functions (including source functions) with
> Java
> > Serialization to ship them to the worker processes.
> > That's also why everything in a user function must be Serializable.
> >
> > There is not an easy way to synchronize running tasks. Each task has its
> > own function object and these might be distributed across different JVMs.
> > So even a static field won't help here.
> >
> > Best, Fabian
> >
> > 2016-12-22 9:12 GMT+01:00 Saikat Maitra <[hidden email]>:
> >
> > > Hello,
> > >
> > > I am working on building Apache Ignite connector for Apache flink. I am
> > > currently developing the SourceFunction to consume Cache event from
> > Ignite
> > > cluster.
> > >
> > > Here is the PR https://github.com/apache/ignite/pull/870/files
> > >
> > > I am observing that during unit tests the IgniteSource instances are
> > > different which is created using the IgniteSource constructor and
> inside
> > > run() method. As a result when igniteSrc.cancel() is called the
> igniteSrc
> > > instance is not getting stopped.
> > >
> > > I wanted to discuss:
> > >
> > > 1. If Flink create a copy of IgniteSource object when
> > > env.addSource(igniteSrc)
> > > is called?
> > >
> > > A quick work around the problem is to use static boolean stopped
> variable
> > > which allows single IgniteSource instance but this limits using
> multiple
> > > IgniteSource with different cache combination.
> > >
> > >
> > > Regards
> > > Saikat
> > >
> >
>