Register processing time timers when Operator.close() is called

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Register processing time timers when Operator.close() is called

Boyuan Zhang
Hi team,

I'm writing my custom Operator as a high fan-out operation and I use
processing time timers to defer processing some inputs When timers are
firing, the operator will continue to process the deferred elements. One
typical use case for my Operator is like:
ImpulseOperator -> my Operator -> downstream where the watermark of
ImpulseOperator advances to MAX_TIMESTAMP immediately.

One problem I have is that after my operator.close() is called, it's still
possible for me to set processing time timers and wait for these timers to
be fired. But it seems like Flink pauses invoking processing timers once
one operator.close() is called in the new version. I'm curious why Flink
decides to do so and any workaround I can do for my operator?

Thanks for your help!
Reply | Threaded
Open this post in threaded view
|

Re: Register processing time timers when Operator.close() is called

Aljoscha Krettek-2
Hi!

This is an interesting topic and we recently created a Jira issue about
this: https://issues.apache.org/jira/browse/FLINK-18647.

In Beam we even have a workaround for this:
https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L581

Maybe it's time to finally address this in Flink as well.

Best,
Aljoscha


On 11.11.20 01:02, Boyuan Zhang wrote:

> Hi team,
>
> I'm writing my custom Operator as a high fan-out operation and I use
> processing time timers to defer processing some inputs When timers are
> firing, the operator will continue to process the deferred elements. One
> typical use case for my Operator is like:
> ImpulseOperator -> my Operator -> downstream where the watermark of
> ImpulseOperator advances to MAX_TIMESTAMP immediately.
>
> One problem I have is that after my operator.close() is called, it's still
> possible for me to set processing time timers and wait for these timers to
> be fired. But it seems like Flink pauses invoking processing timers once
> one operator.close() is called in the new version. I'm curious why Flink
> decides to do so and any workaround I can do for my operator?
>
> Thanks for your help!
>

Reply | Threaded
Open this post in threaded view
|

Re: Register processing time timers when Operator.close() is called

Boyuan Zhang
Thanks, Aljoscha!

Manually draining processing time timers during operator.close() is my
current workaround as well. It's just not efficient for me since I may set
the processing time timer for the callback after 5 mins but now I need to
fire them immediately.

https://issues.apache.org/jira/browse/FLINK-18647 is really helpful and
looking forward to the solution.

Thanks for your help!


On Wed, Nov 11, 2020 at 8:13 AM Aljoscha Krettek <[hidden email]>
wrote:

> Hi!
>
> This is an interesting topic and we recently created a Jira issue about
> this: https://issues.apache.org/jira/browse/FLINK-18647.
>
> In Beam we even have a workaround for this:
>
> https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L581
>
> Maybe it's time to finally address this in Flink as well.
>
> Best,
> Aljoscha
>
>
> On 11.11.20 01:02, Boyuan Zhang wrote:
> > Hi team,
> >
> > I'm writing my custom Operator as a high fan-out operation and I use
> > processing time timers to defer processing some inputs When timers are
> > firing, the operator will continue to process the deferred elements. One
> > typical use case for my Operator is like:
> > ImpulseOperator -> my Operator -> downstream where the watermark of
> > ImpulseOperator advances to MAX_TIMESTAMP immediately.
> >
> > One problem I have is that after my operator.close() is called, it's
> still
> > possible for me to set processing time timers and wait for these timers
> to
> > be fired. But it seems like Flink pauses invoking processing timers once
> > one operator.close() is called in the new version. I'm curious why Flink
> > decides to do so and any workaround I can do for my operator?
> >
> > Thanks for your help!
> >
>
>