Per Key Grained Watermark Support

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Per Key Grained Watermark Support

Jiayi Liao
Hi all,

Currently Watermark can only be supported on task’s level(or partition level), which means that the data belonging to the faster key has to share the same watermark with the data belonging to the slower key in the same key group of a KeyedStream. This will lead to two problems:




1. Latency. For example, every key has its own window state but they have to trigger it after the window’s end time is exceeded by the watermark which is determined by the data belonging to the slowest key usually. (Same in CepOperator and other operators which are using watermark to fire result)

2. States Size. Because the faster key delayes its firing on result, it has to store more redundant states which should be pruned earlier.




However, since the watermark has been introduced for a long time and not been designed to be more fine-grained in the first place, I find that it’s very hard to solve this problem without a big change. I wonder if there is anyone in community having some successful experience on this or maybe there is a shortcut way? If not, I can try to draft a design if this is needed in community.







Best Regards,

Jiayi Liao
Reply | Threaded
Open this post in threaded view
|

Re: Per Key Grained Watermark Support

Lasse Nedergaard
Hi Jiayi

We have face the same challenge as we deal with IoT unit and they do not necessarily share the same timestamp. Watermark or. Key would be perfect match here. We tried to workaround with handle late events as special case with sideoutputs but isn’t the perfect solution.
My conclusion is to skip watermark and create a keyed processed function and handle the time for each key my self.

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 23. sep. 2019 kl. 06.16 skrev 廖嘉逸 <[hidden email]>:
>
> Hi all,
> Currently Watermark can only be supported on task’s level(or partition level), which means that the data belonging to the faster key has to share the same watermark with the data belonging to the slower key in the same key group of a KeyedStream. This will lead to two problems:
>
> 1. Latency. For example, every key has its own window state but they have to trigger it after the window’s end time is exceeded by the watermark which is determined by the data belonging to the slowest key usually. (Same in CepOperator and other operators which are using watermark to fire result)
> 2. States Size. Because the faster key delayes its firing on result, it has to store more redundant states which should be pruned earlier.
>
> However, since the watermark has been introduced for a long time and not been designed to be more fine-grained in the first place, I find that it’s very hard to solve this problem without a big change. I wonder if there is anyone in community having some successful experience on this or maybe there is a shortcut way? If not, I can try to draft a design if this is needed in community.
>
>
> Best Regards,
> Jiayi Liao
>
>
>  
Reply | Threaded
Open this post in threaded view
|

Re: Per Key Grained Watermark Support

Congxian Qiu
Hi
There was a discussion about this issue[1], as the previous discussion said
at the moment this is not supported out of the box by Flink, I think you
can try keyed process function as Lasse said.

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Watermark-for-each-key-td27485.html#a27516
Best,
Congxian


Lasse Nedergaard <[hidden email]> 于2019年9月23日周一 下午12:42写道:

> Hi Jiayi
>
> We have face the same challenge as we deal with IoT unit and they do not
> necessarily share the same timestamp. Watermark or. Key would be perfect
> match here. We tried to workaround with handle late events as special case
> with sideoutputs but isn’t the perfect solution.
> My conclusion is to skip watermark and create a keyed processed function
> and handle the time for each key my self.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 23. sep. 2019 kl. 06.16 skrev 廖嘉逸 <[hidden email]>:
>
> Hi all,
>
> Currently Watermark can only be supported on task’s level(or partition
> level), which means that the data belonging to the faster key has to share
> the same watermark with the data belonging to the slower key in the same
> key group of a KeyedStream. This will lead to two problems:
>
>
> 1. Latency. For example, every key has its own window state but they have
> to trigger it after the window’s end time is exceeded by the watermark
> which is determined by the data belonging to the slowest key usually. (Same
> in CepOperator and other operators which are using watermark to fire result)
>
> 2. States Size. Because the faster key delayes its firing on result, it
> has to store more redundant states which should be pruned earlier.
>
>
> However, since the watermark has been introduced for a long time and not
> been designed to be more fine-grained in the first place, I find that it’s
> very hard to solve this problem without a big change. I wonder if there is
> anyone in community having some successful experience on this or maybe
> there is a shortcut way? If not, I can try to draft a design if this is
> needed in community.
>
>
>
> Best Regards,
>
> Jiayi Liao
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Per Key Grained Watermark Support

Jiayi Liao
In reply to this post by Jiayi Liao
Hi Congxian,
Thanks but by doing that, we will lose some features like output of the late data.


 Original Message
Sender: Congxian Qiu<[hidden email]>
Recipient: Lasse Nedergaard<[hidden email]>
Cc: 廖嘉逸<[hidden email]>; [hidden email]<[hidden email]>; [hidden email]<[hidden email]>
Date: Monday, Sep 23, 2019 19:56
Subject: Re: Per Key Grained Watermark Support


Hi
There was a discussion about this issue[1], as the previous discussion said at the moment this is not supported out of the box by Flink, I think you can try keyed process function as Lasse said.


[1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Watermark-for-each-key-td27485.html#a27516

Best,
Congxian




Lasse Nedergaard <[hidden email]> 于2019年9月23日周一 下午12:42写道:

Hi Jiayi


We have face the same challenge as we deal with IoT unit and they do not necessarily share the same timestamp. Watermark or. Key would be perfect match here. We tried to workaround with handle late events as special case with sideoutputs but isn’t the perfect solution.
My conclusion is to skip watermark and create a keyed processed function and handle the time for each key my self.


Med venlig hilsen / Best regards
Lasse Nedergaard



Den 23. sep. 2019 kl. 06.16 skrev 廖嘉逸 <[hidden email]>:


Hi all,
Currently Watermark can only be supported on task’s level(or partition level), which means that the data belonging to the faster key has to share the same watermark with the data belonging to the slower key in the same key group of a KeyedStream. This will lead to two problems:


1. Latency. For example, every key has its own window state but they have to trigger it after the window’s end time is exceeded by the watermark which is determined by the data belonging to the slowest key usually. (Same in CepOperator and other operators which are using watermark to fire result)
2. States Size. Because the faster key delayes its firing on result, it has to store more redundant states which should be pruned earlier.


However, since the watermark has been introduced for a long time and not been designed to be more fine-grained in the first place, I find that it’s very hard to solve this problem without a big change. I wonder if there is anyone in community having some successful experience on this or maybe there is a shortcut way? If not, I can try to draft a design if this is needed in community.




Best Regards,
Jiayi Liao