Oliver Kostera created FLINK-14197:
--------------------------------------
Summary: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows
Key: FLINK-14197
URL:
https://issues.apache.org/jira/browse/FLINK-14197 Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing, Runtime / State Backends
Affects Versions: 1.9.0
Environment: Tested with:
* Local Flink Mini Cluster running from IDE
* Flink standalone cluster run in docker
Reporter: Oliver Kostera
I'm using *ProcessWindowFunction* in a keyed stream with the following definition:
{code:java}
final SingleOutputStreamOperator<Message> processWindowFunctionStream =
keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
.process(new CustomProcessWindowFunction()).uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
.name("Process window function");
{code}
My checkpointing configuration is set to use RocksDB state backend with incremental checkpointing and EXACTLY_ONCE mode.
In a runtime I noticed that even though data ingestion is static - same keys and frequency of messages the size of the process window operator keeps increasing. I tried to reproduce it with minimal similar setup here:
https://github.com/loliver1234/flink-process-window-function and was successful to do so.
Testing conditions:
- RabbitMQ source with Exactly-once guarantee and 65k prefetch count
- RabbitMQ sink to collect messages
- Simple ProcessWindowFunction that only pass messages through
- Stream time characteristic set to TimeCharacteristic.ProcessingTime
Testing scenario:
- Start flink job and check initial state size - State Size: 127 KB
- Start sending messages, 1000 same unique keys every 1s (they are not falling into defined time window gap set to 100ms, each message should create new window)
- State of the process window operator keeps increasing - after 1mln messages state ended up to be around 2mb
- Stop sending messages and wait till rabbit queue is fully consumed and few checkpoints go by
- Was expected to see state size to decrease to base value but it stayed at 2mb
- Continue to send messages with the same keys and state kept increasing trend.
What I checked:
- Registration and deregistration of timestamps set for time windows - each registration matched its deregistration
- Checked that in fact there are no window merges
- Tried custom Trigger disabling window merges and setting onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state behavior
On staging environment, we noticed that state for that operator keeps increasing indefinitely, after some months reaching even 1,5gb for 100k unique keys
Flink commit id: 9c32ed9
--
This message was sent by Atlassian Jira
(v8.3.4#803005)