Sundaram Ananthanarayanan created FLINK-20159:
-------------------------------------------------
Summary: [FLIP-27 source] FutureNotifier does not return a new future when Future::future() is invoked within the returned future's callback
Key: FLINK-20159
URL:
https://issues.apache.org/jira/browse/FLINK-20159 Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.11.2
Reporter: Sundaram Ananthanarayanan
Fix For: 1.12.0
Here's the *problem*. FutureNotifier::future should return a new future every time the previous future was completed. That's the expectation. However, if the future is being requested from within the completion callback of the previous future, then it, instead of returning a new future, returns the existing future. This could potentially result in infinite recursions depending on how the callback method is implemented. Here's an example:
{code:java}
Consumer code:
void consumeDataOnce() {
// get the data from the producer and check if it was empty
Data data = producer.getData();
// if data was empty, then grab the future and attach a callback as below
if (data.isEmpty()) {
producer.getCompletableFuture().thenRun(() -> consumeDataOnce());
}
}
{code}
In the above method, let's say the producer notified the consumer (produced by FutureNofier::future), thinking that some data was available to be consumed. Now let's say the data returned from the producer was instead empty during the callback. In this case, the method goes on in an infinite loop when the future is completed.
*Issue:* If you observe FutureNotifier::notifyComplete's implementation closely, you realize that the future is completed before the futureRef is swapped with null.
{code:java}
public void notifyComplete() {
CompletableFuture<Void> future = futureRef.get();
// If there are multiple threads trying to complete the future, only the first one succeeds.
if (future != null && future.complete(null)) {
futureRef.compareAndSet(future, null);
}
}
{code}
If we can change the ordering instead, where the future is swapped atomically first before being completed, then we can guarantee that the future returned by FutureNotifier::future will always be a new one if the previous one had completed.
[~sewen] [~jqin] [~stevenz3wu]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)