vinoyang created FLINK-14428:
--------------------------------
Summary: Non-consistency key access in KeyedProcessFunction when use keyed state in both processElement and onTimer method
Key: FLINK-14428
URL:
https://issues.apache.org/jira/browse/FLINK-14428 Project: Flink
Issue Type: Bug
Components: API / DataStream
Reporter: vinoyang
Scenario:
In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}} and {{onTimer}} method may cause non-consistency key access.
Analysis:
For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key which comes from timer when calling registerXXXTimeTimer:
{code:java}
public void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer<K, N> timer;
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey()); //here
triggerTarget.onProcessingTime(timer);
}
if (timer != null && nextTimer == null) {
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
}
}
{code}
For processElement method, in {{OneInputStreamTask}} it is called after seting key context:
{code:java}
@Override
public void emitRecord(StreamRecord<IN> record) throws Exception {
synchronized (lock) {
numRecordsIn.inc();
operator.setKeyContextElement1(record); //here
operator.processElement(record);
}
}
{code}
The setCurrentKey method in the first code snippet and the setKeyContextElement1 method in the second code snippet are point to the same {{AbstractStreamOperator#setCurrentKey}} method. However, there is only one keyed State Backend instance. And {{AbstractStreamOperator#setCurrentKey}} will change the current key of keyed state backend.
So if we access keyed state API in both {{processElement}} and {{onTimer}}, we may get error state value, because one of these methods may change the key and caused non-consistency problem.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)