[jira] [Created] (FLINK-20159) [FLIP-27 source] FutureNotifier does not return a new future when Future::future() is invoked within the returned future's callback

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-20159) [FLIP-27 source] FutureNotifier does not return a new future when Future::future() is invoked within the returned future's callback

Shang Yuanchun (Jira)
Sundaram Ananthanarayanan created FLINK-20159:
-------------------------------------------------

             Summary: [FLIP-27 source] FutureNotifier does not return a new future when Future::future() is invoked within the returned future's callback
                 Key: FLINK-20159
                 URL: https://issues.apache.org/jira/browse/FLINK-20159
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.11.2
            Reporter: Sundaram Ananthanarayanan
             Fix For: 1.12.0


Here's the *problem*. FutureNotifier::future should return a new future every time the previous future was completed. That's the expectation. However, if the future is being requested from within the completion callback of the previous future, then it, instead of returning a new future, returns the existing future. This could potentially result in infinite recursions depending on how the callback method is implemented. Here's an example:

 
{code:java}
Consumer code:
void consumeDataOnce() {
  // get the data from the producer and check if it was empty
  Data data = producer.getData();
  // if data was empty, then grab the future and attach a callback as below
  if (data.isEmpty()) {
    producer.getCompletableFuture().thenRun(() -> consumeDataOnce());
  }
}
{code}
 

In the above method, let's say the producer notified the consumer (produced by FutureNofier::future), thinking that some data was available to be consumed. Now let's say the data returned from the producer was instead empty during the callback. In this case, the method goes on in an infinite loop when the future is completed. 

 

*Issue:* If you observe FutureNotifier::notifyComplete's implementation closely, you realize that the future is completed before the futureRef is swapped with null. 
{code:java}
public void notifyComplete() {
  CompletableFuture<Void> future = futureRef.get();
  // If there are multiple threads trying to complete the future, only the first one succeeds.
  if (future != null && future.complete(null)) {
    futureRef.compareAndSet(future, null);
  }
}
{code}
If we can change the ordering instead, where the future is swapped atomically first before being completed, then we can guarantee that the future returned by FutureNotifier::future will always be a new one if the previous one had completed. 

 

[~sewen] [~jqin] [~stevenz3wu]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)