[jira] [Created] (FLINK-17781) OperatorCoordinator Context must support calls from thread other than JobMaster Main Thread

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-17781) OperatorCoordinator Context must support calls from thread other than JobMaster Main Thread

Shang Yuanchun (Jira)
Stephan Ewen created FLINK-17781:
------------------------------------

             Summary: OperatorCoordinator Context must support calls from thread other than JobMaster Main Thread
                 Key: FLINK-17781
                 URL: https://issues.apache.org/jira/browse/FLINK-17781
             Project: Flink
          Issue Type: Sub-task
            Reporter: Stephan Ewen
            Assignee: Stephan Ewen
             Fix For: 1.11.0


Currently, calls on the Context in the OperatorCoordinator go directly synchronously to the ExcutionGraph.

There are two critical problems are:
  - It is common that the code in the OperatorCoordinator runs in a separate thread (for example, because it executes blocking operations). Calling the scheduler from another thread causes the Scheduler to crash (Assertion Error, violation of single threaded property)
  - Calls on the ExecutionGraph are removed as part of removing the legacy scheduler. Certain calls do not work any more.

+Problem Level 1:+

The solution would be to pass in the scheduler and a main thread executor to interact with it.

However, to do that the scheduler needs to be created before the OperatorCoordinators are created. One could do that by creating the Coordinators lazily after the Scheduler.

+Problem Level 2:+

The Scheduler restores the savepoints as part of the scheduler creation, when the ExecutionGraph and the CheckpointCoordinator are created early in the constructor.
(Side note: That design is tricky in itself, because it means state is restored before the scheduler is even properly constructed.)

That means the OperatorCoordinator needs to exist (or an in placeholder component needs to exist) to accept the restored state.

That brings us to a cyclic dependency:
  - OperatorCoordinator (context) needs Scheduler and MainThreadExecutor
  - Scheduler and MainThreadExecutor need constructed ExecutionGraph
  - ExecutionGraph needs CheckpointCoordinator
  - CheckpointCoordinator needs OperatorCoordinator

+Breaking the Cycle+

The only way we can do this is with a form of lazy initialization:
  - We eagerly create the OperatorCoordinators so they exist for state restore
  - We provide an uninitialized context to them
  - When the Scheduler is started (after leadership is granted) we initialize the context with the (then readily constructed) Scheduler and MainThreadExecutor

+Longer-term Solution+

The longer term solution would require a major change in the Scheduler and CheckpointCoordinator setup. Something like this:
  - Scheduler (and ExecutionGraph) are constructed first
  - JobMaster waits for leadership
  - Upon leader grant, Operator Coordinators are constructed and can reference the Scheduler and FencedMainThreadExecutor
  - CheckpointCoordinator is constructed and references ExecutionGraph and OperatorCoordinators
  - Savepoint or latest checkpoint is restored

The implementation of the current should try to couple parts as loosely as possible to make it easy to implement the above approach later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)