Huang Wei created FLINK-2495:
--------------------------------
Summary: Add a null point check in API DataStream.union
Key: FLINK-2495
URL:
https://issues.apache.org/jira/browse/FLINK-2495 Project: Flink
Issue Type: Bug
Components: Streaming
Affects Versions: 0.10
Reporter: Huang Wei
Fix For: 0.10
The API(public DataStream<OUT> union(DataStream<OUT>... streams)) is a external interface for user.
The parameter "streams" maybe null and it will throw NullPointerException error.
This test below can be intuitive to explain this problem:
package org.apache.flink.streaming.api;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.junit.Test;
/**
* Created by HuangWHWHW on 2015/8/7.
*/
public class test {
public static class sourceFunction extends RichParallelSourceFunction<String> {
public sourceFunction() {
}
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
sourceContext.collect("a");
}
@Override
public void cancel() {
}
}
@Test
public void testUnion(){
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> source = env.addSource(new sourceFunction());
DataStream<String> temp1 = null;
DataStream<String> temp2 = source.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if (value == "a") {
return "This is for test temp2.";
}
return null;
}
});
DataStream<String> sink = temp2.union(temp1);
sink.print();
try {
env.execute();
}catch (Exception e){
e.printStackTrace();
}
}
}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)