[jira] [Created] (FLINK-19382) Introducing ReplayableSourceStateBackend

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-19382) Introducing ReplayableSourceStateBackend

Shang Yuanchun (Jira)
Theo Diefenthal created FLINK-19382:
---------------------------------------

             Summary: Introducing ReplayableSourceStateBackend
                 Key: FLINK-19382
                 URL: https://issues.apache.org/jira/browse/FLINK-19382
             Project: Flink
          Issue Type: Improvement
            Reporter: Theo Diefenthal


I got the idea of a new StateBackend simply called "ReplayableSource". This statebackend would be bound by a number of limitations, but in the few areas, it could improve the pipeline performance by magnitudes which makes me think it's worth debating about it.

I'd like to start with describing two useful scenarios for such a state backend before debating more about the backend. Both scenarios share that they read data from kafka and process that via Flink.

Scenario 1: Buffering data. Currently I'm developing a pipeline where directly post to reading the data, I need to buffer it for 1 minute in event time. This is due to inter-event-dependencies: If within one minute a certain event arrives, I have to enrich information to the first event. Right now, I store the 1 minute event time data in Flink State leading to checkpointing the entire buffer on each checkpoint and making it impossible for me to use exactly-once processing (We want to have as low latency as possible, i.e. after buffering at maximum a few seconds while simoulatenously having high volume of data).

Scenario 2: Performing Flink SQL CEP Queries. CEP Queries naturally have inter-event-dependencies. Having simple MATCH_RECOGNIZE queries directly post to a kafka source often lead to requiring RocksDB state backend and slow performance.

 

The idea: Instead of storing the entire state, we could simply store a kafka offset. When restoring the state from savepoint, all the state could be restored by reading from kafka. The checkpoint size would thus be reduced from huge sizes down to just a few numbers which allows frequent and fast checkpointing.

 

Limitations:
 * This would only work for fully deterministic/replayable streaming jobs. If a certain operator within the pipeline is not determinstic, a replay could cause another result.
 * The source must be replayable, e.g. kafka
 * This would also only work for "short-state-living" pipelines. There are many pipelines which build up their state over days, month or even years. Restoring such a state by replaying all the data over that time would be almost impossible, especially as kafka usually has a retention of something like a week configured. However, there are also many queries with short-lived-state like the mentioned CEP usecase where one usually have patterns defined in timeframes of second, minutes, hours or a few days for the event correlation.
 * Not sure if there are more limitations with regards to windowing/watermarks or similar things which would make that feature impossible!?

For certain scenarios, this feature would obviously be dumb, most likely for windows pipelines. It is certainly much cheapter to e.g. store a COUNT per window then replaying all events per window in order to restore that COUNT. But I'm focusing on something like CEP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)