[jira] [Created] (FLINK-14428) Non-consistency key access in KeyedProcessFunction when use keyed state in both processElement and onTimer method

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-14428) Non-consistency key access in KeyedProcessFunction when use keyed state in both processElement and onTimer method

Shang Yuanchun (Jira)
vinoyang created FLINK-14428:
--------------------------------

             Summary: Non-consistency key access in KeyedProcessFunction when use keyed state in both processElement and onTimer method
                 Key: FLINK-14428
                 URL: https://issues.apache.org/jira/browse/FLINK-14428
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
            Reporter: vinoyang


Scenario:

In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}} and {{onTimer}} method may cause non-consistency key access.

Analysis:

For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key which comes from timer when calling registerXXXTimeTimer:


{code:java}
public void onProcessingTime(long time) throws Exception {
                // null out the timer in case the Triggerable calls registerProcessingTimeTimer()
                // inside the callback.
                nextTimer = null;

                InternalTimer<K, N> timer;

                while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
                        processingTimeTimersQueue.poll();
                        keyContext.setCurrentKey(timer.getKey());        //here
                        triggerTarget.onProcessingTime(timer);
                }

                if (timer != null && nextTimer == null) {
                        nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
                }
        }
{code}

For processElement method, in {{OneInputStreamTask}} it is called after seting key context:


{code:java}
                @Override
                public void emitRecord(StreamRecord<IN> record) throws Exception {
                        synchronized (lock) {
                                numRecordsIn.inc();
                                operator.setKeyContextElement1(record);        //here
                                operator.processElement(record);
                        }
                }
{code}

The setCurrentKey method in the first code snippet and the setKeyContextElement1 method in the second code snippet are point to the same {{AbstractStreamOperator#setCurrentKey}} method. However, there is only one keyed State Backend instance. And {{AbstractStreamOperator#setCurrentKey}} will change the current key of keyed state backend.

So if we access keyed state API in both {{processElement}} and {{onTimer}}, we may get error state value, because one of these methods may change the key and caused non-consistency problem.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)