Daniel Bali created FLINK-1986:
---------------------------------- Summary: Group by fails on iterative data streams Key: FLINK-1986 URL: https://issues.apache.org/jira/browse/FLINK-1986 Project: Flink Issue Type: Bug Components: Streaming Reporter: Daniel Bali Hello! When I try to run a `groupBy` on an IterativeDataStream I get a NullPointerException. Here is the code that reproduces the issue: {code} public Test() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); DataStream<Tuple2<Long, Long>> edges = env .generateSequence(0, 7) .map(new MapFunction<Long, Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> map(Long v) throws Exception { return new Tuple2<>(v, (v + 1)); } }); IterativeDataStream<Tuple2<Long, Long>> iteration = edges.iterate(); SplitDataStream<Tuple2<Long, Long>> step = iteration.groupBy(1) .map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws Exception { return tuple; } }) .split(new OutputSelector<Tuple2<Long, Long>>() { @Override public Iterable<String> select(Tuple2<Long, Long> tuple) { List<String> output = new ArrayList<>(); output.add("iterate"); return output; } }); iteration.closeWith(step.select("iterate")); env.execute("Sandbox"); } {code} Moving the groupBy before the iteration solves the issue. e.g. this works: {code} ... iteration = edges.groupBy(1).iterate(); iteration.map(...) {code} Here is the stack trace: {code} Exception in thread "main" java.lang.NullPointerException at org.apache.flink.streaming.api.graph.StreamGraph.addIterationTail(StreamGraph.java:207) at org.apache.flink.streaming.api.datastream.IterativeDataStream.closeWith(IterativeDataStream.java:72) at org.apache.flink.graph.streaming.example.Test.<init>(Test.java:73) at org.apache.flink.graph.streaming.example.Test.main(Test.java:79) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) |
The problem is that the the StreamIterationHead is not created, because
only IterativeDataStream.transform(...) can create it. groupBy() on an IterativeDataStream does not call transform(), therefore the exception. All methods of DataStream that is supported for iterations and do not call transform() should be overriden in IterativeDataStream in order to add the iteration head. Peter 2015-05-07 17:11 GMT+02:00 Daniel Bali (JIRA) <[hidden email]>: > Daniel Bali created FLINK-1986: > ---------------------------------- > > Summary: Group by fails on iterative data streams > Key: FLINK-1986 > URL: https://issues.apache.org/jira/browse/FLINK-1986 > Project: Flink > Issue Type: Bug > Components: Streaming > Reporter: Daniel Bali > > > Hello! > > When I try to run a `groupBy` on an IterativeDataStream I get a > NullPointerException. Here is the code that reproduces the issue: > > {code} > public Test() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(); > > DataStream<Tuple2<Long, Long>> edges = env > .generateSequence(0, 7) > .map(new MapFunction<Long, Tuple2<Long, Long>>() { > @Override > public Tuple2<Long, Long> map(Long v) throws Exception { > return new Tuple2<>(v, (v + 1)); > } > }); > > IterativeDataStream<Tuple2<Long, Long>> iteration = edges.iterate(); > > SplitDataStream<Tuple2<Long, Long>> step = iteration.groupBy(1) > .map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() > { > @Override > public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) > throws Exception { > return tuple; > } > }) > .split(new OutputSelector<Tuple2<Long, Long>>() { > @Override > public Iterable<String> select(Tuple2<Long, Long> tuple) { > List<String> output = new ArrayList<>(); > output.add("iterate"); > return output; > } > }); > > iteration.closeWith(step.select("iterate")); > > env.execute("Sandbox"); > } > {code} > > Moving the groupBy before the iteration solves the issue. e.g. this works: > > {code} > ... iteration = edges.groupBy(1).iterate(); > iteration.map(...) > {code} > > Here is the stack trace: > > {code} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.streaming.api.graph.StreamGraph.addIterationTail(StreamGraph.java:207) > at > org.apache.flink.streaming.api.datastream.IterativeDataStream.closeWith(IterativeDataStream.java:72) > at > org.apache.flink.graph.streaming.example.Test.<init>(Test.java:73) > at org.apache.flink.graph.streaming.example.Test.main(Test.java:79) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:601) > at > com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) > {code} > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332) > |
Free forum by Nabble | Edit this page |