Felix Wollschläger created FLINK-12086:
------------------------------------------ Summary: AsyncFunction - Add access to a user defined Object for cleanup on timeout Key: FLINK-12086 URL: https://issues.apache.org/jira/browse/FLINK-12086 Project: Flink Issue Type: Improvement Components: API / DataStream Reporter: Felix Wollschläger When executing async-requests it would be nice to have access to a user defined object to perform cleanup when the process times out. For example, when executing Cassandra-Queries I'm using the drivers threadpool to submit Statements, which returns a com.datastax.driver.core.ResultSetFutre ( [https://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/core/ResultSetFuture.html] ). When I run into a timeout I could cancel the Future because waiting for it to complete is unnecessary in that case. The API could be extendend to something like this: Adding an Type-Parameter to the AsnyFunction Interface: {code:java} AsyncFunction<IN, OUT, T>{code} Updating the asnyInvoke-Method to return the user-defined object: {code:java} T asyncInvoke(IN input, ResultFuture<OUT> future) throws Exception;{code} Updating the timeout-Method to accept the user-defined object: {code:java} void timeout(IN input, T obj, ResultFuture<OUT> resultFuture) throws Exception{code} An example Implementation could look like this: {code:java} package dev.codeflush; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.ResultFuture; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SomeAsyncFunction implements AsyncFunction<Integer, String, Future<String>> { private static final long serialVersionUID = 1L; @Override public Future<String> asyncInvoke(Integer input, ResultFuture<String> resultFuture) throws Exception { Future<String> future = null; // submit something in a library thread-pool CompletableFuture.runAsync(() -> { try { resultFuture.complete(Collections.singleton(future.get())); } catch (ExecutionException e) { // handle this } catch (InterruptedException e) { // handle that } }); return future; } @Override public void timeout(Integer input, Future<String> future, ResultFuture<String> resultFuture) throws Exception { future.cancel(true); resultFuture.complete(Collections.emptySet()); } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |