[jira] [Created] (FLINK-14184) Provide a stage listener API to be invoked per task manager

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-14184) Provide a stage listener API to be invoked per task manager

Shang Yuanchun (Jira)
Stephen Connolly created FLINK-14184:
----------------------------------------

             Summary: Provide a stage listener API to be invoked per task manager
                 Key: FLINK-14184
                 URL: https://issues.apache.org/jira/browse/FLINK-14184
             Project: Flink
          Issue Type: Improvement
            Reporter: Stephen Connolly


Often times a topology has nodes that need to use 3rd party APIs. Not every 3rd party API is written in a good style for usage from within Flink.

At present, implementing a `Rich___` will provide each stage with the `open(...)` and `close()` callbacks, as the stage is accepted for execution on each task manager.

There is, however, a need for being able to listen for the first stage being opened on any given task manager as well as the last stage being closed. 

Critically the last stage being closed is the opportunity to release any resources that are shared across multiple stages in the topology, e.g. Database connection pools, Async HTTP Client thread pools, etc.

Without such a clean-up hook, the connections and threads can act as GC roots that prevent the topology's classloader from being unloaded and result in a memory and resource leak in the task manager... nevermind that if it is a Database connection pool, it may also be consuming resources from the database.

There are three workarounds available at present:
 # Each stage just allocates its own resources and cleans up afterwards. This is, in many ways, the ideal... however this can result in higher than intended database connections, e.f. as each stage that accesses the database stage needs to have a separate database connection rather than letting the whole topology share the use of one or two connections through a connection pool. Similarly, if the 3rd party library uses a static singleton for the whole classloader there is no way for the independent stages to know when it is safe to shut down the singleton
 # Implement a reference counting proxy for the 3rd party API. This is a lot of work, you need to ensure that deserialization of the proxy returns a classloader singleton (so you can maintain the reference counts) and if the count goes wrong you have leaked the resource
 # Use a ReferenceQueue backed proxy. This is even more complex than implementing reference counting, but has the advantage of not requiring the count be maintained correctly. On the other hand, it does not provide for eager release of the resources.

If Flink provided a listener contract that could be registered with the execution environment then this would allow the resources to be cleared out. My proposed interface would look something like
{code:java}
public interface EnvironmentLocalTopologyListener extends Serializable {
  /**
   * Called immediately prior to the first {@link RichFunction#open(Configuration)}
   * being invoked for the topology on the current task manager JVM for this
   * classloader. Will not be called again unless {#close()} has been invoked first.
   * Use this method to eagerly initialize any ClassLoader scoped resources that
   * are pooled across the stages of the topology.
   *
   * @param parameters // I am unsure if this makes sense
   */
  default void open(Configuration parameters) throws Exception {}
  /**
   * Called after the last {@link RichFunction#close()} has completed and the
   * topology is effectively being stopped (for the current ClassLoader).
   * This method will only be invoked if a call to {@link #open(Configuration)}
   * was attempted, and will be invoked irrespective of whether the call to
   * {@link #open(Configuration)} terminated normally or exceptionally.
   * Use this method to release any ClassLoader scoped resources that have been
   * pooled across the stages of the topology.
   */
  default void close() throws Exception {}
  /**
   * Decorate the threads that are used to invoke the stages of the topology.
   * Use this method, for example, to seed the {@link org.slf4j.MDC} with
   * topology specific details, e.g.
   * <pre>
   * Runnable decorate(Runnable task) {
   *   return () -> {
   *     try (MDC.MDCClosable ctx = MDC.putCloseable("foo", "bar")){
   *       task.run();
   *     };
   * }
   * </pre>
   *
   * @param task // might not be the most appropriate type, I haven't
   *             // checked how Flink implements dispatch. May or may not
   *             // want a parameters argument also.
   */
  default Runnable decorate(Runnable task) { return task; }
}{code}
(Names subject to change)

Then you would use this something like
{code:java}
env.addEnvironmentLocalTopologyListener(...);
...
env.execute(...); {code}
The listener would be serialized to each task manager and then before the first task is executed the `open(...)` method would get invoked. Each thread that is running a task would be decorated by the listener, and then once all the stages are stopped the `close()` method would be invoked.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)