[jira] [Created] (FLINK-13721) BroadcastState should support StateTTL

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-13721) BroadcastState should support StateTTL

Shang Yuanchun (Jira)
Kerem Ulutaş created FLINK-13721:
------------------------------------

             Summary: BroadcastState should support StateTTL
                 Key: FLINK-13721
                 URL: https://issues.apache.org/jira/browse/FLINK-13721
             Project: Flink
          Issue Type: Improvement
          Components: API / DataStream, Runtime / Queryable State
    Affects Versions: 1.8.1
         Environment: MacOS 10.14.6 running IntelliJ Idea Ultimate 2019.2, Flink version 1.8.1
            Reporter: Kerem Ulutaş
         Attachments: DebugBroadcastStateTTL.java, IntHolder.java, StringHolder.java, flink_broadcast_state_ttl_debug.log

Hi everyone,

Sorry if I'm doing anything wrong, this is my first issue in Apache Jira.

I have a use case which requires 2 data streams to be cross joined. To be exact, one stream is location updates from clients and the other stream is event data with location information. I'm trying to get events that happen within a certain radius of client location(s).

Since the streams can not be related to each other by using a common key for partitioning, I have to broadcast client updates to all tasks and evaluate the radius check for each event.

The requirement here is, if we don't receive any location updates from a client for a certain amount of time, we should consider the client is "gone" (similar to the rationale stated in FLINK-3089 description: https://issues.apache.org/jira/browse/FLINK-3089)

I have attached the sample application classes I used to debug BroadcastState and StateTTL together.

The output (see flink_broadcast_state_ttl_debug.log) shows that client "c0" got its first event at 17:08:07.67 (expected to be evicted sometime after 17:08:37.xxx) but doesn't get evicted.

For the operator part (which is the result of BroadcastConnectedStream<IntHolder, StringHolder>.process) - since context in onTimer method doesn't let user to change contents of the broadcast state, only way to deal with stale client data is as follows:
 * In processElement method, calculate result for client data which is newer than the ttl
 * In processBroadcastElement method, remove client data older than a certain amount of time from the broadcast state

If broadcast side of the connected streams doesn't get data for longer than the desired time-to-live amount of time, BroadcastState will hold stale data and processElement method would have to filter those client data each time. This is the method I am using in production right now.

I am not aware of any decision or limitation that makes it not possible to implement StateTTL for BroadcastState, I will be pleased if someone explains if there are any.

Thanks and regards.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)