[jira] [Created] (FLINK-12086) AsyncFunction - Add access to a user defined Object for cleanup on timeout

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-12086) AsyncFunction - Add access to a user defined Object for cleanup on timeout

Shang Yuanchun (Jira)
Felix Wollschläger created FLINK-12086:
------------------------------------------

             Summary: AsyncFunction - Add access to a user defined Object for cleanup on timeout
                 Key: FLINK-12086
                 URL: https://issues.apache.org/jira/browse/FLINK-12086
             Project: Flink
          Issue Type: Improvement
          Components: API / DataStream
            Reporter: Felix Wollschläger


When executing async-requests it would be nice to have access to a user defined object to perform cleanup when the process times out.

For example, when executing Cassandra-Queries I'm using the drivers threadpool to submit Statements, which returns a com.datastax.driver.core.ResultSetFutre ( [https://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/core/ResultSetFuture.html] ). When I run into a timeout I could cancel the Future because waiting for it to complete is unnecessary in that case.

 

The API could be extendend to something like this:

 

Adding an Type-Parameter to the AsnyFunction Interface:
{code:java}
AsyncFunction<IN, OUT, T>{code}
Updating the asnyInvoke-Method to return the user-defined object:
{code:java}
T asyncInvoke(IN input, ResultFuture<OUT> future) throws Exception;{code}
Updating the timeout-Method to accept the user-defined object:
{code:java}
void timeout(IN input, T obj, ResultFuture<OUT> resultFuture) throws Exception{code}
 

An example Implementation could look like this:
{code:java}
package dev.codeflush;

import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SomeAsyncFunction implements AsyncFunction<Integer, String, Future<String>> {

    private static final long serialVersionUID = 1L;
   
    @Override
    public Future<String> asyncInvoke(Integer input, ResultFuture<String> resultFuture) throws Exception {
        Future<String> future = null; // submit something in a library thread-pool

        CompletableFuture.runAsync(() -> {
            try {
                resultFuture.complete(Collections.singleton(future.get()));
            } catch (ExecutionException e) {
                // handle this
            } catch (InterruptedException e) {
                // handle that
            }
        });
       
        return future;
    }

    @Override
    public void timeout(Integer input, Future<String> future, ResultFuture<String> resultFuture) throws Exception {
        future.cancel(true);
        resultFuture.complete(Collections.emptySet());
    }
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)