Fabian Hueske created FLINK-5031:
------------------------------------
Summary: Consecutive DataStream.split() ignored
Key: FLINK-5031
URL:
https://issues.apache.org/jira/browse/FLINK-5031 Project: Flink
Issue Type: Bug
Components: Streaming
Affects Versions: 1.1.3, 1.2.0
Reporter: Fabian Hueske
Fix For: 1.2.0
The output of the following program
{code}
static final class ThresholdSelector implements OutputSelector<Long> {
long threshold;
public ThresholdSelector(long threshold) {
this.threshold = threshold;
}
@Override
public Iterable<String> select(Long value) {
if (value < threshold) {
return Collections.singletonList("Less");
} else {
return Collections.singletonList("GreaterEqual");
}
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SplitStream<Long> split1 = env.generateSequence(1, 11)
.split(new ThresholdSelector(6));
// stream11 should be [1,2,3,4,5]
DataStream<Long> stream11 = split1.select("Less");
SplitStream<Long> split2 = stream11
// .map(new MapFunction<Long, Long>() {
// @Override
// public Long map(Long value) throws Exception {
// return value;
// }
// })
.split(new ThresholdSelector(3));
DataStream<Long> stream21 = split2.select("Less");
// stream21 should be [1,2]
stream21.print();
env.execute();
}
{code}
should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second {{split}} operation is ignored.
The program is correctly evaluate if the identity {{MapFunction}} is added to the program.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)