[DISCUSS] Deal with the timers better before closing operators

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Deal with the timers better before closing operators

Haibo Sun
Hi, all


I want to bring up a discussion about how to deal with the pending(registered but not scheduled for execution)
timers better before closing operator.


Introduction: From my understanding, there are two types of timers in Flink: 1) One is one-shot or periodic
timers registered with ProcessingTimeService. In terms of the underlying implementation, each timer corresponds
to a Runnable and is executed through a thread pool. 2) The other is event-time or processing-time timers
registered with InternalTimerService, which can be stateful. Event-time timers are triggered by watermark,
and processing-time timers registered with the same InternalTimerService instance are triggered by a real
timer registered with ProcessingTimeService. For the convenience of later expression, here we define the
first type as "physical timer", and the second type (including timers registered with the interfaces built on
InternalTimerService such as api.windowing.triggers.TriggerContext) as "logical timer".


Why and how to deal with the physical timers better before closing operators?
Currently, after the operator is closed, it is still allowed to fire the physical timers registered by it, which may
output data again, making the close semantics of operators on the operator chain not strict (the strict semantics
should no longer output data after closing). So we need to explicitly let all physical timers done before we close
the operator. Because physical timers are registered by the operator, runtime cannot, at its own discretion, cancels
or triggers them before closing the operator. For example, If runtime cancels one of the physical timers registered
but not scheduled for execution before closing the operator, a deadlock may occur when the operator waits in
the "close()" method for the physical timer to be fired. Overall, we need to expose some capabilities to the operators
so that they can decide whether to cancel or trigger the physical timers registered but not scheduled for execution
before closing.


About how to expose such capabilities, we may have the following options. For a periodic physical timer, it should
only need the "cancel" action and the operator can cancel it by calling the "ScheduledFuture#cancel()" method ,
so here we should not need to consider it.


Option 1:  The physical timer is of the ScheduledFuture type, and there is already the "#cancel()" method in
ScheduledFuture. We just need to add the "#triggerIfNotExecuted()" method, and the changes mainly include:
    1) Adds the following TimerScheduledFuture interface that extended ScheduledFuture, and changes the return
       type of "ProcessingTimerService#registerTimer" to TimerScheduledFuture. If the operator wants to trigger a
       pending physical timer before closing, "TimerScheduledFuture#triggerIfNotExecuted()"
       need to be called explicitly in its "endInput" method.
public interface TimerScheduledFuture<V> extends TimerScheduledFuture<V> {
    void triggerIfNotExecuted(long timestamp);
}
    2) For the pending physical timers that are not cancelled or triggered by the operator before closing, runtime will
        cancel them but wait for those in executing to finish to maintain backward compatibility.


Option 2:  When the operator registers a physical timer, it passes a callback parameter of Runnable type
(like ProcessingTimeCallback), which is called back by runtime before closing the operator to let the operator
deal with those pending physical timers. The changes mainly include:
    1) Applies all changes from option 1
    2) Adds the following overloaded method to the ProcessingTimerService interface.
ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target, Runnable actionBeforeClosingOperator);


How to deal with the logical timers better before closing operators?
For logic timers, that capabilities also should be exposed to operators. According to my understanding, the logical
timer can be registered through the three interfaces: api.TimerService, api.windowing.triggers.TriggerContext,
api.operators.InternalTimerService. And the first two are implemented using api.operators.InternalTimerService.
As can be known from the declaration of the registration method, it does not return the logical timer object, which
is managed uniformly by the implementation of InternalTimerService.


1. For the api.operators.InternalTimerService interface, we may make the following changes:
    1) Adds the following overloaded method to the InternalTimerService interface.
void registerProcessingTimeTimer(N namespace, long time, TimerActionOnOperatorClose timerActionOnOperatorClose);
/**
 * It defines how to handle the timers not scheduled for execution before the operator is closed.
 *
 */
enum TimerActionOnOperatorClose {
    CANCEL,
    TRIGGER_WITH_ORIGINAL_TIMESTAMP,
    TRIGGER_WITH_CURRENT_TIMESTAMP,
    TRIGGER_WITH_MAX_TIMESTAMP
}
    2) Changes the serialization of stateful timers to add state processing for the TimerActionOnOperatorClose
        parameter, which will slightly increase the size of the state. The existing versioned serialization mechanism
        of stateful timers can ensures backward compatibility.
    3) Considering that stateful logical timers should be triggered in most cases before closing the operator, the
       default action for the old "#registerProcessingTimeTimer()" method without the "timerActionOnOperatorClose"
       parameter is TRIGGER_WITH_MAX_TIMESTAMP.


2. The change of the api.TimerService interface is similar to that of InternalTimerService, overloading a method
with the "timerActionOnOperatorClose" parameter.
void registerProcessingTimeTimer(long time, TimerActionOnOperatorClose timerAction);


3. Because the api.windowing.triggers.TriggerContext interface is only used for window, and the logical timers of
window should be triggered before closing the operator, they can use the default action and the TriggerContext
interface do not need to be change.




Please feel free to share you thoughts. Thanks.
Especially, thanks to Piotr, Aljoscha,  Arvid, Roman, who participated in the discussion earlier.


Best,
Haibo

Reply | Threaded
Open this post in threaded view
|

Re:[DISCUSS] Deal with the timers better before closing operators

Haibo Sun


I found that the format of the mail was out of order. Please look at the document:
https://docs.google.com/document/d/1jCKan5LGlkBZr2seUdwTsy-biu8Sz0xyQwqizCUZ2uk/edit?usp=sharing


Best,
Haibo
At 2019-12-16 21:29:15, "Haibo Sun" <[hidden email]> wrote:

>Hi, all
>
>
>I want to bring up a discussion about how to deal with the pending(registered but not scheduled for execution)
>timers better before closing operator.
>
>
>Introduction: From my understanding, there are two types of timers in Flink: 1) One is one-shot or periodic
>timers registered with ProcessingTimeService. In terms of the underlying implementation, each timer corresponds
>to a Runnable and is executed through a thread pool. 2) The other is event-time or processing-time timers
>registered with InternalTimerService, which can be stateful. Event-time timers are triggered by watermark,
>and processing-time timers registered with the same InternalTimerService instance are triggered by a real
>timer registered with ProcessingTimeService. For the convenience of later expression, here we define the
>first type as "physical timer", and the second type (including timers registered with the interfaces built on
>InternalTimerService such as api.windowing.triggers.TriggerContext) as "logical timer".
>
>
>Why and how to deal with the physical timers better before closing operators?
>Currently, after the operator is closed, it is still allowed to fire the physical timers registered by it, which may
>output data again, making the close semantics of operators on the operator chain not strict (the strict semantics
>should no longer output data after closing). So we need to explicitly let all physical timers done before we close
>the operator. Because physical timers are registered by the operator, runtime cannot, at its own discretion, cancels
>or triggers them before closing the operator. For example, If runtime cancels one of the physical timers registered
>but not scheduled for execution before closing the operator, a deadlock may occur when the operator waits in
>the "close()" method for the physical timer to be fired. Overall, we need to expose some capabilities to the operators
>so that they can decide whether to cancel or trigger the physical timers registered but not scheduled for execution
>before closing.
>
>
>About how to expose such capabilities, we may have the following options. For a periodic physical timer, it should
>only need the "cancel" action and the operator can cancel it by calling the "ScheduledFuture#cancel()" method ,
>so here we should not need to consider it.
>
>
>Option 1:  The physical timer is of the ScheduledFuture type, and there is already the "#cancel()" method in
>ScheduledFuture. We just need to add the "#triggerIfNotExecuted()" method, and the changes mainly include:
>    1) Adds the following TimerScheduledFuture interface that extended ScheduledFuture, and changes the return
>       type of "ProcessingTimerService#registerTimer" to TimerScheduledFuture. If the operator wants to trigger a
>       pending physical timer before closing, "TimerScheduledFuture#triggerIfNotExecuted()"
>       need to be called explicitly in its "endInput" method.
>public interface TimerScheduledFuture<V> extends TimerScheduledFuture<V> {
>    void triggerIfNotExecuted(long timestamp);
>}
>    2) For the pending physical timers that are not cancelled or triggered by the operator before closing, runtime will
>        cancel them but wait for those in executing to finish to maintain backward compatibility.
>
>
>Option 2:  When the operator registers a physical timer, it passes a callback parameter of Runnable type
>(like ProcessingTimeCallback), which is called back by runtime before closing the operator to let the operator
>deal with those pending physical timers. The changes mainly include:
>    1) Applies all changes from option 1
>    2) Adds the following overloaded method to the ProcessingTimerService interface.
>ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target, Runnable actionBeforeClosingOperator);
>
>
>How to deal with the logical timers better before closing operators?
>For logic timers, that capabilities also should be exposed to operators. According to my understanding, the logical
>timer can be registered through the three interfaces: api.TimerService, api.windowing.triggers.TriggerContext,
>api.operators.InternalTimerService. And the first two are implemented using api.operators.InternalTimerService.
>As can be known from the declaration of the registration method, it does not return the logical timer object, which
>is managed uniformly by the implementation of InternalTimerService.
>
>
>1. For the api.operators.InternalTimerService interface, we may make the following changes:
>    1) Adds the following overloaded method to the InternalTimerService interface.
>void registerProcessingTimeTimer(N namespace, long time, TimerActionOnOperatorClose timerActionOnOperatorClose);
>/**
> * It defines how to handle the timers not scheduled for execution before the operator is closed.
> *
> */
>enum TimerActionOnOperatorClose {
>    CANCEL,
>    TRIGGER_WITH_ORIGINAL_TIMESTAMP,
>    TRIGGER_WITH_CURRENT_TIMESTAMP,
>    TRIGGER_WITH_MAX_TIMESTAMP
>}
>    2) Changes the serialization of stateful timers to add state processing for the TimerActionOnOperatorClose
>        parameter, which will slightly increase the size of the state. The existing versioned serialization mechanism
>        of stateful timers can ensures backward compatibility.
>    3) Considering that stateful logical timers should be triggered in most cases before closing the operator, the
>       default action for the old "#registerProcessingTimeTimer()" method without the "timerActionOnOperatorClose"
>       parameter is TRIGGER_WITH_MAX_TIMESTAMP.
>
>
>2. The change of the api.TimerService interface is similar to that of InternalTimerService, overloading a method
>with the "timerActionOnOperatorClose" parameter.
>void registerProcessingTimeTimer(long time, TimerActionOnOperatorClose timerAction);
>
>
>3. Because the api.windowing.triggers.TriggerContext interface is only used for window, and the logical timers of
>window should be triggered before closing the operator, they can use the default action and the TriggerContext
>interface do not need to be change.
>
>
>
>
>Please feel free to share you thoughts. Thanks.
>Especially, thanks to Piotr, Aljoscha,  Arvid, Roman, who participated in the discussion earlier.
>
>
>Best,
>Haibo
>