[jira] [Created] (FLINK-3052) Optimizer does not push properties out of bulk iterations

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-3052) Optimizer does not push properties out of bulk iterations

Shang Yuanchun (Jira)
Till Rohrmann created FLINK-3052:
------------------------------------

             Summary: Optimizer does not push properties out of bulk iterations
                 Key: FLINK-3052
                 URL: https://issues.apache.org/jira/browse/FLINK-3052
             Project: Flink
          Issue Type: Bug
          Components: Optimizer
    Affects Versions: 0.10.0
            Reporter: Till Rohrmann
            Assignee: Till Rohrmann
             Fix For: 0.10.1


Flink's optimizer should be able to reuse interesting properties from outside the loop. In order to do that it is sometimes necessary to append a NoOp node to the step function which recomputes the required properties.

This is currently not working for {{BulkIterations}}, because the plans with the appended NoOp nodes are not added to the overall list of candidates.

This not only leads to sub-optimal plan selection but sometimes to the rejection of valid jobs. The following job, for example, will be falsely rejected by flink.

{code}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

                DataSet<Tuple1<Long>> input1 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
                        @Override
                        public Tuple1<Long> map(Long value) throws Exception {
                                return new Tuple1<>(value);
                        }
                });

                DataSet<Tuple1<Long>> input2 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
                        @Override
                        public Tuple1<Long> map(Long value) throws Exception {
                                return new Tuple1<>(value);
                        }
                });

                DataSet<Tuple1<Long>> distinctInput = input1.distinct();

                IterativeDataSet<Tuple1<Long>> iteration = distinctInput.iterate(10);

                DataSet<Tuple1<Long>> iterationStep = iteration
                                .coGroup(input2)
                                .where(0)
                                .equalTo(0)
                                .with(new CoGroupFunction<Tuple1<Long>, Tuple1<Long>, Tuple1<Long>>() {
                                        @Override
                                        public void coGroup(
                                                        Iterable<Tuple1<Long>> first,
                                                        Iterable<Tuple1<Long>> second,
                                                        Collector<Tuple1<Long>> out) throws Exception {
                                                Iterator<Tuple1<Long>> it = first.iterator();

                                                if (it.hasNext()) {
                                                        out.collect(it.next());
                                                }
                                        }
                                });

                DataSet<Tuple1<Long>> iterationResult = iteration.closeWith(iterationStep);

                iterationResult.output(new DiscardingOutputFormat<Tuple1<Long>>());
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)