The two files you committed here "PipelineBreakerTest.java" and
"SelectOneReducer.java" both contain the old license header. I'll try and see if I can make the license checker more strict. On Sat, Jul 12, 2014 at 7:32 PM, <[hidden email]> wrote: > [FLINK-1018] Add tests to verify correct placement of pipeline breakers > with broadcast variables > > > Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo > Commit: > http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ec0b00d6 > Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ec0b00d6 > Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ec0b00d6 > > Branch: refs/heads/master > Commit: ec0b00d613a4400baf53f5de2361a2271a26ae63 > Parents: a822486 > Author: Stephan Ewen <[hidden email]> > Authored: Fri Jul 11 18:02:52 2014 +0200 > Committer: Stephan Ewen <[hidden email]> > Committed: Sat Jul 12 19:31:26 2014 +0200 > > ---------------------------------------------------------------------- > .../compiler/BranchingPlansCompilerTest.java | 10 +- > .../flink/compiler/PipelineBreakerTest.java | 137 +++++++++++++++++++ > .../testfunctions/SelectOneReducer.java | 28 ++++ > 3 files changed, 170 insertions(+), 5 deletions(-) > ---------------------------------------------------------------------- > > > > http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java > ---------------------------------------------------------------------- > diff --git > a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java > b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java > index 31dadae..571f4e4 100644 > --- > a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java > +++ > b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java > @@ -359,6 +359,7 @@ public class BranchingPlansCompilerTest extends > CompilerTestBase { > } > } > > + @SuppressWarnings({ "unchecked", "deprecation" }) > @Test > public void testBranchEachContractType() { > try { > @@ -374,7 +375,6 @@ public class BranchingPlansCompilerTest extends > CompilerTestBase { > .name("Reduce 1") > .build(); > > - @SuppressWarnings("unchecked") > JoinOperator match1 = JoinOperator.builder(new > DummyMatchStub(), IntValue.class, 0, 0) > .input1(sourceB, sourceB, sourceC) > .input2(sourceC) > @@ -434,10 +434,10 @@ public class BranchingPlansCompilerTest extends > CompilerTestBase { > .build(); > > FileDataSink sink = new FileDataSink(new > DummyOutputFormat(), OUT_FILE, cogroup7); > - // sink.addInput(sourceA); > - // sink.addInput(co3); > - // sink.addInput(co4); > - // sink.addInput(co1); > + sink.addInput(sourceA); > + sink.addInput(cogroup3); > + sink.addInput(cogroup4); > + sink.addInput(cogroup1); > > // return the PACT plan > Plan plan = new Plan(sink, "Branching of each > contract type"); > > > http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java > ---------------------------------------------------------------------- > diff --git > a/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java > b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java > new file mode 100644 > index 0000000..4e43a74 > --- /dev/null > +++ > b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java > @@ -0,0 +1,137 @@ > > +/*********************************************************************************************************************** > + * > + * Copyright (C) 2010-2013 by the Stratosphere project ( > http://stratosphere.eu) > + * > + * Licensed under the Apache License, Version 2.0 (the "License"); you > may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > distributed under the License is distributed on > + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either > express or implied. See the License for the > + * specific language governing permissions and limitations under the > License. > + * > + > **********************************************************************************************************************/ > + > +package org.apache.flink.compiler; > + > +import static org.junit.Assert.*; > + > +import org.junit.Test; > +import org.apache.flink.api.common.Plan; > +import org.apache.flink.api.java.DataSet; > +import org.apache.flink.api.java.ExecutionEnvironment; > +import org.apache.flink.api.java.IterativeDataSet; > +import org.apache.flink.compiler.plan.BulkIterationPlanNode; > +import org.apache.flink.compiler.plan.OptimizedPlan; > +import org.apache.flink.compiler.plan.SingleInputPlanNode; > +import org.apache.flink.compiler.plan.SinkPlanNode; > +import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator; > +import org.apache.flink.compiler.testfunctions.IdentityMapper; > +import org.apache.flink.compiler.testfunctions.SelectOneReducer; > + > +@SuppressWarnings("serial") > +public class PipelineBreakerTest extends CompilerTestBase { > + > + @Test > + public void testPipelineBreakerWithBroadcastVariable() { > + try { > + ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > + env.setDegreeOfParallelism(64); > + > + DataSet<Long> source = env.generateSequence(1, > 10).map(new IdentityMapper<Long>()); > + > + DataSet<Long> result = source.map(new > IdentityMapper<Long>()) > + > .map(new IdentityMapper<Long>()) > + > .withBroadcastSet(source, "bc"); > + > + result.print(); > + > + Plan p = env.createProgramPlan(); > + OptimizedPlan op = compileNoStats(p); > + > + SinkPlanNode sink = > op.getDataSinks().iterator().next(); > + SingleInputPlanNode mapper = (SingleInputPlanNode) > sink.getInput().getSource(); > + > + > assertTrue(mapper.getInput().getTempMode().breaksPipeline()); > + } > + catch (Exception e) { > + e.printStackTrace(); > + fail(e.getMessage()); > + } > + } > + > + @Test > + public void testPipelineBreakerBroadcastedAllReduce() { > + try { > + ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > + env.setDegreeOfParallelism(64); > + > + DataSet<Long> sourceWithMapper = > env.generateSequence(1, 10).map(new IdentityMapper<Long>()); > + > + DataSet<Long> bcInput1 = sourceWithMapper > + > .map(new IdentityMapper<Long>()) > + > .reduce(new SelectOneReducer<Long>()); > + DataSet<Long> bcInput2 = env.generateSequence(1, > 10); > + > + DataSet<Long> result = sourceWithMapper > + .map(new IdentityMapper<Long>()) > + > .withBroadcastSet(bcInput1, "bc1") > + > .withBroadcastSet(bcInput2, "bc2"); > + > + result.print(); > + > + Plan p = env.createProgramPlan(); > + OptimizedPlan op = compileNoStats(p); > + > + SinkPlanNode sink = > op.getDataSinks().iterator().next(); > + SingleInputPlanNode mapper = (SingleInputPlanNode) > sink.getInput().getSource(); > + > + > assertTrue(mapper.getInput().getTempMode().breaksPipeline()); > + } > + catch (Exception e) { > + e.printStackTrace(); > + fail(e.getMessage()); > + } > + } > + > + @Test > + public void testPipelineBreakerBroadcastedPartialSolution() { > + try { > + ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > + env.setDegreeOfParallelism(64); > + > + > + DataSet<Long> initialSource = > env.generateSequence(1, 10); > + IterativeDataSet<Long> iteration = > initialSource.iterate(100); > + > + > + DataSet<Long> sourceWithMapper = > env.generateSequence(1, 10).map(new IdentityMapper<Long>()); > + > + DataSet<Long> bcInput1 = sourceWithMapper > + > .map(new IdentityMapper<Long>()) > + > .reduce(new SelectOneReducer<Long>()); > + > + DataSet<Long> result = sourceWithMapper > + .map(new IdentityMapper<Long>()) > + > .withBroadcastSet(iteration, "bc2") > + > .withBroadcastSet(bcInput1, "bc1"); > + > + > + iteration.closeWith(result).print(); > + > + Plan p = env.createProgramPlan(); > + OptimizedPlan op = compileNoStats(p); > + > + SinkPlanNode sink = > op.getDataSinks().iterator().next(); > + BulkIterationPlanNode iterationPlanNode = > (BulkIterationPlanNode) sink.getInput().getSource(); > + SingleInputPlanNode mapper = (SingleInputPlanNode) > iterationPlanNode.getRootOfStepFunction(); > + > + > assertTrue(mapper.getInput().getTempMode().breaksPipeline()); > + } > + catch (Exception e) { > + e.printStackTrace(); > + fail(e.getMessage()); > + } > + } > +} > > > http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java > ---------------------------------------------------------------------- > diff --git > a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java > b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java > new file mode 100644 > index 0000000..492b9f8 > --- /dev/null > +++ > b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java > @@ -0,0 +1,28 @@ > > +/*********************************************************************************************************************** > + * > + * Copyright (C) 2010-2013 by the Stratosphere project ( > http://stratosphere.eu) > + * > + * Licensed under the Apache License, Version 2.0 (the "License"); you > may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > distributed under the License is distributed on > + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either > express or implied. See the License for the > + * specific language governing permissions and limitations under the > License. > + * > + > **********************************************************************************************************************/ > + > +package org.apache.flink.compiler.testfunctions; > + > +import org.apache.flink.api.java.functions.ReduceFunction; > + > +public class SelectOneReducer<T> extends ReduceFunction<T> { > + > + private static final long serialVersionUID = 1L; > + > + @Override > + public T reduce(T value1, T value2) throws Exception { > + return value1; > + } > +} > > |
Ah, good point. They com from old pull requests.
Let me know if you manage to configure rat stricter. |
I'm currently preparing a commit with some more license fixes and a strict
rat configuration. On Sun, Jul 13, 2014 at 2:58 PM, Stephan Ewen <[hidden email]> wrote: > Ah, good point. They com from old pull requests. > > Let me know if you manage to configure rat stricter. > |
Okay, I'll wait with merging the streaming code until the rat is updated.
|
Free forum by Nabble | Edit this page |