Re: [2/3] git commit: [FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: [2/3] git commit: [FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables

Robert Metzger
The two files you committed here "PipelineBreakerTest.java" and
"SelectOneReducer.java" both contain the old license header. I'll try and
see if I can make the license checker more strict.


On Sat, Jul 12, 2014 at 7:32 PM, <[hidden email]> wrote:

> [FLINK-1018] Add tests to verify correct placement of pipeline breakers
> with broadcast variables
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ec0b00d6
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ec0b00d6
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ec0b00d6
>
> Branch: refs/heads/master
> Commit: ec0b00d613a4400baf53f5de2361a2271a26ae63
> Parents: a822486
> Author: Stephan Ewen <[hidden email]>
> Authored: Fri Jul 11 18:02:52 2014 +0200
> Committer: Stephan Ewen <[hidden email]>
> Committed: Sat Jul 12 19:31:26 2014 +0200
>
> ----------------------------------------------------------------------
>  .../compiler/BranchingPlansCompilerTest.java    |  10 +-
>  .../flink/compiler/PipelineBreakerTest.java     | 137 +++++++++++++++++++
>  .../testfunctions/SelectOneReducer.java         |  28 ++++
>  3 files changed, 170 insertions(+), 5 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> ----------------------------------------------------------------------
> diff --git
> a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> index 31dadae..571f4e4 100644
> ---
> a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> +++
> b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> @@ -359,6 +359,7 @@ public class BranchingPlansCompilerTest extends
> CompilerTestBase {
>                 }
>         }
>
> +       @SuppressWarnings({ "unchecked", "deprecation" })
>         @Test
>         public void testBranchEachContractType() {
>                 try {
> @@ -374,7 +375,6 @@ public class BranchingPlansCompilerTest extends
> CompilerTestBase {
>                                 .name("Reduce 1")
>                                 .build();
>
> -                       @SuppressWarnings("unchecked")
>                         JoinOperator match1 = JoinOperator.builder(new
> DummyMatchStub(), IntValue.class, 0, 0)
>                                 .input1(sourceB, sourceB, sourceC)
>                                 .input2(sourceC)
> @@ -434,10 +434,10 @@ public class BranchingPlansCompilerTest extends
> CompilerTestBase {
>                                 .build();
>
>                         FileDataSink sink = new FileDataSink(new
> DummyOutputFormat(), OUT_FILE, cogroup7);
> -       //              sink.addInput(sourceA);
> -       //              sink.addInput(co3);
> -       //              sink.addInput(co4);
> -       //              sink.addInput(co1);
> +                       sink.addInput(sourceA);
> +                       sink.addInput(cogroup3);
> +                       sink.addInput(cogroup4);
> +                       sink.addInput(cogroup1);
>
>                         // return the PACT plan
>                         Plan plan = new Plan(sink, "Branching of each
> contract type");
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> ----------------------------------------------------------------------
> diff --git
> a/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> new file mode 100644
> index 0000000..4e43a74
> --- /dev/null
> +++
> b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> @@ -0,0 +1,137 @@
>
> +/***********************************************************************************************************************
> + *
> + * Copyright (C) 2010-2013 by the Stratosphere project (
> http://stratosphere.eu)
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License"); you
> may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> distributed under the License is distributed on
> + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
> express or implied. See the License for the
> + * specific language governing permissions and limitations under the
> License.
> + *
> +
> **********************************************************************************************************************/
> +
> +package org.apache.flink.compiler;
> +
> +import static org.junit.Assert.*;
> +
> +import org.junit.Test;
> +import org.apache.flink.api.common.Plan;
> +import org.apache.flink.api.java.DataSet;
> +import org.apache.flink.api.java.ExecutionEnvironment;
> +import org.apache.flink.api.java.IterativeDataSet;
> +import org.apache.flink.compiler.plan.BulkIterationPlanNode;
> +import org.apache.flink.compiler.plan.OptimizedPlan;
> +import org.apache.flink.compiler.plan.SingleInputPlanNode;
> +import org.apache.flink.compiler.plan.SinkPlanNode;
> +import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
> +import org.apache.flink.compiler.testfunctions.IdentityMapper;
> +import org.apache.flink.compiler.testfunctions.SelectOneReducer;
> +
> +@SuppressWarnings("serial")
> +public class PipelineBreakerTest extends CompilerTestBase {
> +
> +       @Test
> +       public void testPipelineBreakerWithBroadcastVariable() {
> +               try {
> +                       ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> +                       env.setDegreeOfParallelism(64);
> +
> +                       DataSet<Long> source = env.generateSequence(1,
> 10).map(new IdentityMapper<Long>());
> +
> +                       DataSet<Long> result = source.map(new
> IdentityMapper<Long>())
> +
>       .map(new IdentityMapper<Long>())
> +
>               .withBroadcastSet(source, "bc");
> +
> +                       result.print();
> +
> +                       Plan p = env.createProgramPlan();
> +                       OptimizedPlan op = compileNoStats(p);
> +
> +                       SinkPlanNode sink =
> op.getDataSinks().iterator().next();
> +                       SingleInputPlanNode mapper = (SingleInputPlanNode)
> sink.getInput().getSource();
> +
> +
> assertTrue(mapper.getInput().getTempMode().breaksPipeline());
> +               }
> +               catch (Exception e) {
> +                       e.printStackTrace();
> +                       fail(e.getMessage());
> +               }
> +       }
> +
> +       @Test
> +       public void testPipelineBreakerBroadcastedAllReduce() {
> +               try {
> +                       ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> +                       env.setDegreeOfParallelism(64);
> +
> +                       DataSet<Long> sourceWithMapper =
> env.generateSequence(1, 10).map(new IdentityMapper<Long>());
> +
> +                       DataSet<Long> bcInput1 = sourceWithMapper
> +
>       .map(new IdentityMapper<Long>())
> +
>       .reduce(new SelectOneReducer<Long>());
> +                       DataSet<Long> bcInput2 = env.generateSequence(1,
> 10);
> +
> +                       DataSet<Long> result = sourceWithMapper
> +                                       .map(new IdentityMapper<Long>())
> +
> .withBroadcastSet(bcInput1, "bc1")
> +
> .withBroadcastSet(bcInput2, "bc2");
> +
> +                       result.print();
> +
> +                       Plan p = env.createProgramPlan();
> +                       OptimizedPlan op = compileNoStats(p);
> +
> +                       SinkPlanNode sink =
> op.getDataSinks().iterator().next();
> +                       SingleInputPlanNode mapper = (SingleInputPlanNode)
> sink.getInput().getSource();
> +
> +
> assertTrue(mapper.getInput().getTempMode().breaksPipeline());
> +               }
> +               catch (Exception e) {
> +                       e.printStackTrace();
> +                       fail(e.getMessage());
> +               }
> +       }
> +
> +       @Test
> +       public void testPipelineBreakerBroadcastedPartialSolution() {
> +               try {
> +                       ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> +                       env.setDegreeOfParallelism(64);
> +
> +
> +                       DataSet<Long> initialSource =
> env.generateSequence(1, 10);
> +                       IterativeDataSet<Long> iteration =
> initialSource.iterate(100);
> +
> +
> +                       DataSet<Long> sourceWithMapper =
> env.generateSequence(1, 10).map(new IdentityMapper<Long>());
> +
> +                       DataSet<Long> bcInput1 = sourceWithMapper
> +
>       .map(new IdentityMapper<Long>())
> +
>       .reduce(new SelectOneReducer<Long>());
> +
> +                       DataSet<Long> result = sourceWithMapper
> +                                       .map(new IdentityMapper<Long>())
> +
> .withBroadcastSet(iteration, "bc2")
> +
> .withBroadcastSet(bcInput1, "bc1");
> +
> +
> +                       iteration.closeWith(result).print();
> +
> +                       Plan p = env.createProgramPlan();
> +                       OptimizedPlan op = compileNoStats(p);
> +
> +                       SinkPlanNode sink =
> op.getDataSinks().iterator().next();
> +                       BulkIterationPlanNode iterationPlanNode =
> (BulkIterationPlanNode) sink.getInput().getSource();
> +                       SingleInputPlanNode mapper = (SingleInputPlanNode)
> iterationPlanNode.getRootOfStepFunction();
> +
> +
> assertTrue(mapper.getInput().getTempMode().breaksPipeline());
> +               }
> +               catch (Exception e) {
> +                       e.printStackTrace();
> +                       fail(e.getMessage());
> +               }
> +       }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> ----------------------------------------------------------------------
> diff --git
> a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> new file mode 100644
> index 0000000..492b9f8
> --- /dev/null
> +++
> b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> @@ -0,0 +1,28 @@
>
> +/***********************************************************************************************************************
> + *
> + * Copyright (C) 2010-2013 by the Stratosphere project (
> http://stratosphere.eu)
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License"); you
> may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> distributed under the License is distributed on
> + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
> express or implied. See the License for the
> + * specific language governing permissions and limitations under the
> License.
> + *
> +
> **********************************************************************************************************************/
> +
> +package org.apache.flink.compiler.testfunctions;
> +
> +import org.apache.flink.api.java.functions.ReduceFunction;
> +
> +public class SelectOneReducer<T> extends ReduceFunction<T> {
> +
> +       private static final long serialVersionUID = 1L;
> +
> +       @Override
> +       public T reduce(T value1, T value2) throws Exception {
> +               return value1;
> +       }
> +}
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [2/3] git commit: [FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables

Stephan Ewen
Ah, good point. They com from old pull requests.

Let me know if you manage to configure rat stricter.
Reply | Threaded
Open this post in threaded view
|

Re: [2/3] git commit: [FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables

Robert Metzger
I'm currently preparing a commit with some more license fixes and a strict
rat configuration.


On Sun, Jul 13, 2014 at 2:58 PM, Stephan Ewen <[hidden email]> wrote:

> Ah, good point. They com from old pull requests.
>
> Let me know if you manage to configure rat stricter.
>
Reply | Threaded
Open this post in threaded view
|

Re: [2/3] git commit: [FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables

Stephan Ewen
Okay, I'll wait with merging the streaming code until the rat is updated.