Hi team,
I'm writing my custom Operator as a high fan-out operation and I use processing time timers to defer processing some inputs When timers are firing, the operator will continue to process the deferred elements. One typical use case for my Operator is like: ImpulseOperator -> my Operator -> downstream where the watermark of ImpulseOperator advances to MAX_TIMESTAMP immediately. One problem I have is that after my operator.close() is called, it's still possible for me to set processing time timers and wait for these timers to be fired. But it seems like Flink pauses invoking processing timers once one operator.close() is called in the new version. I'm curious why Flink decides to do so and any workaround I can do for my operator? Thanks for your help! |
Hi!
This is an interesting topic and we recently created a Jira issue about this: https://issues.apache.org/jira/browse/FLINK-18647. In Beam we even have a workaround for this: https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L581 Maybe it's time to finally address this in Flink as well. Best, Aljoscha On 11.11.20 01:02, Boyuan Zhang wrote: > Hi team, > > I'm writing my custom Operator as a high fan-out operation and I use > processing time timers to defer processing some inputs When timers are > firing, the operator will continue to process the deferred elements. One > typical use case for my Operator is like: > ImpulseOperator -> my Operator -> downstream where the watermark of > ImpulseOperator advances to MAX_TIMESTAMP immediately. > > One problem I have is that after my operator.close() is called, it's still > possible for me to set processing time timers and wait for these timers to > be fired. But it seems like Flink pauses invoking processing timers once > one operator.close() is called in the new version. I'm curious why Flink > decides to do so and any workaround I can do for my operator? > > Thanks for your help! > |
Thanks, Aljoscha!
Manually draining processing time timers during operator.close() is my current workaround as well. It's just not efficient for me since I may set the processing time timer for the callback after 5 mins but now I need to fire them immediately. https://issues.apache.org/jira/browse/FLINK-18647 is really helpful and looking forward to the solution. Thanks for your help! On Wed, Nov 11, 2020 at 8:13 AM Aljoscha Krettek <[hidden email]> wrote: > Hi! > > This is an interesting topic and we recently created a Jira issue about > this: https://issues.apache.org/jira/browse/FLINK-18647. > > In Beam we even have a workaround for this: > > https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L581 > > Maybe it's time to finally address this in Flink as well. > > Best, > Aljoscha > > > On 11.11.20 01:02, Boyuan Zhang wrote: > > Hi team, > > > > I'm writing my custom Operator as a high fan-out operation and I use > > processing time timers to defer processing some inputs When timers are > > firing, the operator will continue to process the deferred elements. One > > typical use case for my Operator is like: > > ImpulseOperator -> my Operator -> downstream where the watermark of > > ImpulseOperator advances to MAX_TIMESTAMP immediately. > > > > One problem I have is that after my operator.close() is called, it's > still > > possible for me to set processing time timers and wait for these timers > to > > be fired. But it seems like Flink pauses invoking processing timers once > > one operator.close() is called in the new version. I'm curious why Flink > > decides to do so and any workaround I can do for my operator? > > > > Thanks for your help! > > > > |
Free forum by Nabble | Edit this page |