Till Rohrmann created FLINK-6435:
------------------------------------
Summary: AsyncWaitOperator does not handle exceptions properly
Key: FLINK-6435
URL:
https://issues.apache.org/jira/browse/FLINK-6435 Project: Flink
Issue Type: Bug
Components: Distributed Coordination
Affects Versions: 1.3.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
A user reported that the {{AsyncWaitOperator}} does not handle exceptions properly. The following code snipped does not make the job fail.
{code}
public void test() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> withTimestamps = env.fromCollection(Arrays.asList(1,2,3,4,5));
AsyncDataStream.unorderedWait(withTimestamps,
(AsyncFunction<Integer, String>) (input, collector) -> {
if (input == 3){
collector.collect(new RuntimeException("Test"));
return;
}
collector.collect(Collections.singleton("Ok"));
}, 10, TimeUnit.MILLISECONDS)
.returns(String.class)
.print();
env.execute("unit-test");
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)