[jira] [Created] (FLINK-1986) Group by fails on iterative data streams

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-1986) Group by fails on iterative data streams

Shang Yuanchun (Jira)
Daniel Bali created FLINK-1986:
----------------------------------

             Summary: Group by fails on iterative data streams
                 Key: FLINK-1986
                 URL: https://issues.apache.org/jira/browse/FLINK-1986
             Project: Flink
          Issue Type: Bug
          Components: Streaming
            Reporter: Daniel Bali


Hello!

When I try to run a `groupBy` on an IterativeDataStream I get a NullPointerException. Here is the code that reproduces the issue:

{code}
public Test() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

    DataStream<Tuple2<Long, Long>> edges = env
            .generateSequence(0, 7)
            .map(new MapFunction<Long, Tuple2<Long, Long>>() {
                @Override
                public Tuple2<Long, Long> map(Long v) throws Exception {
                    return new Tuple2<>(v, (v + 1));
                }
            });

    IterativeDataStream<Tuple2<Long, Long>> iteration = edges.iterate();

    SplitDataStream<Tuple2<Long, Long>> step = iteration.groupBy(1)
            .map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                @Override
                public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws Exception {
                    return tuple;
                }
            })
            .split(new OutputSelector<Tuple2<Long, Long>>() {
                @Override
                public Iterable<String> select(Tuple2<Long, Long> tuple) {
                    List<String> output = new ArrayList<>();
                    output.add("iterate");
                    return output;
                }
            });

    iteration.closeWith(step.select("iterate"));

    env.execute("Sandbox");
}
{code}

Moving the groupBy before the iteration solves the issue. e.g. this works:

{code}
... iteration = edges.groupBy(1).iterate();
iteration.map(...)
{code}

Here is the stack trace:

{code}
Exception in thread "main" java.lang.NullPointerException
        at org.apache.flink.streaming.api.graph.StreamGraph.addIterationTail(StreamGraph.java:207)
        at org.apache.flink.streaming.api.datastream.IterativeDataStream.closeWith(IterativeDataStream.java:72)
        at org.apache.flink.graph.streaming.example.Test.<init>(Test.java:73)
        at org.apache.flink.graph.streaming.example.Test.main(Test.java:79)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Reply | Threaded
Open this post in threaded view
|

Re: [jira] [Created] (FLINK-1986) Group by fails on iterative data streams

Szabó Péter
The problem is that the the StreamIterationHead is not created, because
only IterativeDataStream.transform(...) can create it. groupBy() on an
IterativeDataStream does not call transform(), therefore the exception. All
methods of DataStream that is supported for iterations and do not call
transform() should be overriden in IterativeDataStream in order to add the
iteration head.

Peter

2015-05-07 17:11 GMT+02:00 Daniel Bali (JIRA) <[hidden email]>:

> Daniel Bali created FLINK-1986:
> ----------------------------------
>
>              Summary: Group by fails on iterative data streams
>                  Key: FLINK-1986
>                  URL: https://issues.apache.org/jira/browse/FLINK-1986
>              Project: Flink
>           Issue Type: Bug
>           Components: Streaming
>             Reporter: Daniel Bali
>
>
> Hello!
>
> When I try to run a `groupBy` on an IterativeDataStream I get a
> NullPointerException. Here is the code that reproduces the issue:
>
> {code}
> public Test() throws Exception {
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
>
>     DataStream<Tuple2<Long, Long>> edges = env
>             .generateSequence(0, 7)
>             .map(new MapFunction<Long, Tuple2<Long, Long>>() {
>                 @Override
>                 public Tuple2<Long, Long> map(Long v) throws Exception {
>                     return new Tuple2<>(v, (v + 1));
>                 }
>             });
>
>     IterativeDataStream<Tuple2<Long, Long>> iteration = edges.iterate();
>
>     SplitDataStream<Tuple2<Long, Long>> step = iteration.groupBy(1)
>             .map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>()
> {
>                 @Override
>                 public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple)
> throws Exception {
>                     return tuple;
>                 }
>             })
>             .split(new OutputSelector<Tuple2<Long, Long>>() {
>                 @Override
>                 public Iterable<String> select(Tuple2<Long, Long> tuple) {
>                     List<String> output = new ArrayList<>();
>                     output.add("iterate");
>                     return output;
>                 }
>             });
>
>     iteration.closeWith(step.select("iterate"));
>
>     env.execute("Sandbox");
> }
> {code}
>
> Moving the groupBy before the iteration solves the issue. e.g. this works:
>
> {code}
> ... iteration = edges.groupBy(1).iterate();
> iteration.map(...)
> {code}
>
> Here is the stack trace:
>
> {code}
> Exception in thread "main" java.lang.NullPointerException
>         at
> org.apache.flink.streaming.api.graph.StreamGraph.addIterationTail(StreamGraph.java:207)
>         at
> org.apache.flink.streaming.api.datastream.IterativeDataStream.closeWith(IterativeDataStream.java:72)
>         at
> org.apache.flink.graph.streaming.example.Test.<init>(Test.java:73)
>         at org.apache.flink.graph.streaming.example.Test.main(Test.java:79)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:601)
>         at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
> {code}
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)
>