TaskManager job lifecycle hooks

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

TaskManager job lifecycle hooks

Ben Sidhom
Hey,

I'm working on the Apache Beam <https://beam.apache.org/> portability story
and trying to figure out how we can get the Flink runner to support
the new portability
API <https://beam.apache.org/contribute/portability/>.

In order to get the runner to work with portable SDKs, we need to be able
to spin up and manage user containers from the TaskManagers themselves. All
communication with user code (effectively user-defined functions) happens
over RPC endpoints between the container and the Flink worker threads.
Unfortunately, we cannot assume that the container images themselves are
small or that they are cheap to start up. For this reason, we cannot
reasonably start and stop these external services once per task (e.g., by
wrapping service lifetimes within mapPartions). In order to support
multiple jobs per JVM (either due to multiple task slots per manager or
multiple jobs submitted to a cluster serially) , we need to know when to
clean up resources associated with a particular job.

Is there a way to do this in user code? Ideally, this would be something
like a set of per-job startup and shutdown hooks that execute on each
TaskManager that a particular job runs on. If this does not currently
exist, how reasonable would it be to introduce client-facing APIs that
would allow it? Is there a better approach for this lifecycle management
that better fits into the Flink execution model?

Thanks
--
-Ben
Reply | Threaded
Open this post in threaded view
|

Re: TaskManager job lifecycle hooks

Eron Wright-2
Could you speak to whether the lifecycle provided by RichFunction
(open/close) would fit the requirement?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/api/common/functions/RichFunction.html#open-org.apache.flink.configuration.Configuration-

On Thu, Dec 7, 2017 at 1:57 PM, Ben Sidhom <[hidden email]>
wrote:

> Hey,
>
> I'm working on the Apache Beam <https://beam.apache.org/> portability
> story
> and trying to figure out how we can get the Flink runner to support
> the new portability
> API <https://beam.apache.org/contribute/portability/>.
>
> In order to get the runner to work with portable SDKs, we need to be able
> to spin up and manage user containers from the TaskManagers themselves. All
> communication with user code (effectively user-defined functions) happens
> over RPC endpoints between the container and the Flink worker threads.
> Unfortunately, we cannot assume that the container images themselves are
> small or that they are cheap to start up. For this reason, we cannot
> reasonably start and stop these external services once per task (e.g., by
> wrapping service lifetimes within mapPartions). In order to support
> multiple jobs per JVM (either due to multiple task slots per manager or
> multiple jobs submitted to a cluster serially) , we need to know when to
> clean up resources associated with a particular job.
>
> Is there a way to do this in user code? Ideally, this would be something
> like a set of per-job startup and shutdown hooks that execute on each
> TaskManager that a particular job runs on. If this does not currently
> exist, how reasonable would it be to introduce client-facing APIs that
> would allow it? Is there a better approach for this lifecycle management
> that better fits into the Flink execution model?
>
> Thanks
> --
> -Ben
>
Reply | Threaded
Open this post in threaded view
|

Re: TaskManager job lifecycle hooks

Aljoscha Krettek-2
Hi Ben,

I think that's a good question but I also think that Erons answer is sufficient for an initial implementation. We suggest more and more to use a single "cluster" per job (either a single per-job YARN cluster or in Kubernetes or in Mesos), thus we don't really have to solve the problem of efficiently supporting multiple jobs per JobManager. Regarding Job lifetime vs. Task lifetime, the open() and close() methods that Eron mentioned are, for practical purposes, invoked at the beginning/end of a Job. For example, when a streaming pipeline starts the tasks corresponding to operators will be launched and they will stay around until the job is finished. I think this is different from a system such as Google Dataflow where you will see many more small tasks in the life of a streaming job and so it shouldn't be that big of a problem for Flink.

What do you think?

Best,
Aljoscha

> On 8. Dec 2017, at 00:40, Eron Wright <[hidden email]> wrote:
>
> Could you speak to whether the lifecycle provided by RichFunction
> (open/close) would fit the requirement?
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/api/common/functions/RichFunction.html#open-org.apache.flink.configuration.Configuration-
>
> On Thu, Dec 7, 2017 at 1:57 PM, Ben Sidhom <[hidden email]>
> wrote:
>
>> Hey,
>>
>> I'm working on the Apache Beam <https://beam.apache.org/> portability
>> story
>> and trying to figure out how we can get the Flink runner to support
>> the new portability
>> API <https://beam.apache.org/contribute/portability/>.
>>
>> In order to get the runner to work with portable SDKs, we need to be able
>> to spin up and manage user containers from the TaskManagers themselves. All
>> communication with user code (effectively user-defined functions) happens
>> over RPC endpoints between the container and the Flink worker threads.
>> Unfortunately, we cannot assume that the container images themselves are
>> small or that they are cheap to start up. For this reason, we cannot
>> reasonably start and stop these external services once per task (e.g., by
>> wrapping service lifetimes within mapPartions). In order to support
>> multiple jobs per JVM (either due to multiple task slots per manager or
>> multiple jobs submitted to a cluster serially) , we need to know when to
>> clean up resources associated with a particular job.
>>
>> Is there a way to do this in user code? Ideally, this would be something
>> like a set of per-job startup and shutdown hooks that execute on each
>> TaskManager that a particular job runs on. If this does not currently
>> exist, how reasonable would it be to introduce client-facing APIs that
>> would allow it? Is there a better approach for this lifecycle management
>> that better fits into the Flink execution model?
>>
>> Thanks
>> --
>> -Ben
>>

Reply | Threaded
Open this post in threaded view
|

Re: TaskManager job lifecycle hooks

Ben Sidhom
Ah, I see. Yes, that should definitely be good enough (at least for a first
pass).

Do you know how most users tend to deploy and use Flink in practice?
Job-scoped clusters seem preferable for many reasons, but it's possible
that that's only really practical when using YARN/Kubernetes/Mesos. Would
we be be cutting out a large population by only "supporting" single-job
clusters?

On Sun, Dec 10, 2017 at 3:42 PM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi Ben,
>
> I think that's a good question but I also think that Erons answer is
> sufficient for an initial implementation. We suggest more and more to use a
> single "cluster" per job (either a single per-job YARN cluster or in
> Kubernetes or in Mesos), thus we don't really have to solve the problem of
> efficiently supporting multiple jobs per JobManager. Regarding Job lifetime
> vs. Task lifetime, the open() and close() methods that Eron mentioned are,
> for practical purposes, invoked at the beginning/end of a Job. For example,
> when a streaming pipeline starts the tasks corresponding to operators will
> be launched and they will stay around until the job is finished. I think
> this is different from a system such as Google Dataflow where you will see
> many more small tasks in the life of a streaming job and so it shouldn't be
> that big of a problem for Flink.
>
> What do you think?
>
> Best,
> Aljoscha
>
> > On 8. Dec 2017, at 00:40, Eron Wright <[hidden email]> wrote:
> >
> > Could you speak to whether the lifecycle provided by RichFunction
> > (open/close) would fit the requirement?
> >
> > https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/api/java/org/apache/flink/api/common/
> functions/RichFunction.html#open-org.apache.flink.
> configuration.Configuration-
> >
> > On Thu, Dec 7, 2017 at 1:57 PM, Ben Sidhom <[hidden email]>
> > wrote:
> >
> >> Hey,
> >>
> >> I'm working on the Apache Beam <https://beam.apache.org/> portability
> >> story
> >> and trying to figure out how we can get the Flink runner to support
> >> the new portability
> >> API <https://beam.apache.org/contribute/portability/>.
> >>
> >> In order to get the runner to work with portable SDKs, we need to be
> able
> >> to spin up and manage user containers from the TaskManagers themselves.
> All
> >> communication with user code (effectively user-defined functions)
> happens
> >> over RPC endpoints between the container and the Flink worker threads.
> >> Unfortunately, we cannot assume that the container images themselves are
> >> small or that they are cheap to start up. For this reason, we cannot
> >> reasonably start and stop these external services once per task (e.g.,
> by
> >> wrapping service lifetimes within mapPartions). In order to support
> >> multiple jobs per JVM (either due to multiple task slots per manager or
> >> multiple jobs submitted to a cluster serially) , we need to know when to
> >> clean up resources associated with a particular job.
> >>
> >> Is there a way to do this in user code? Ideally, this would be something
> >> like a set of per-job startup and shutdown hooks that execute on each
> >> TaskManager that a particular job runs on. If this does not currently
> >> exist, how reasonable would it be to introduce client-facing APIs that
> >> would allow it? Is there a better approach for this lifecycle management
> >> that better fits into the Flink execution model?
> >>
> >> Thanks
> >> --
> >> -Ben
> >>
>
>


--
-Ben