Hi,
I am trying to use ConvergenceCriterion for checking L1 norm of line rank vector(similar to page rank vector) in the iteration as a termination condition. Below I pasted the code for the iteration part and the convergence criterion checking. It is working fine in local for smaller dataset (in KBs) and the iteration converges in reasonable number of times. But when I run it for little large dataset (4 MB) : for just 8 iterations, it took 1 hour and keeps running. So I tried to run the same code by replacing the L1 Norm convergence criterion with FilterFunction (as similar to EpsilonFilter in PageRankBasic example) as termination condition and now the job finishes very fast. It seems to be that the convergence criterion which I am using for termination is making the job very slow. Can someone give suggestions on it or is there any other way to do it? Thanks, Janani //Initialize random vector with mx1 DataSet<Tuple2<Long, Double>> edgeScores = d.map(new InitializeRandomVector()).name("V"); IterativeDataSet<Tuple2<Long,Double>> iteration = edgeScores.iterate(maxIterations) .registerAggregationConvergenceCriterion(L1_NormDiff.AGGREGATOR_NAME, DoubleSumAggregator.class, L1_NormConvergence.class) .name("EdgeScoreVector_BulkIteration"); DataSet<Tuple2<Long, Double>> new_edgeScores = iteration .join(d).where(0).equalTo(0).with(new V1_HadamardProduct()).name("V1") .join(srcIncMat).where(0).equalTo(0).with(new V2_SrcIncWithV1()).name("V2") .groupBy(0).aggregate(Aggregations.SUM, 1) .join(tarIncMat).where(0).equalTo(1).with(new V3_TarIncWithV2()).name("V3") .map(new DampingMapper(c, numEdges)) .join(iteration).where(0).equalTo(0).with(new L1_NormDiff()).name("L1_NORM"); DataSet<Tuple2<Long, Double>> convergedVector = iteration.closeWith(new_edgeScores); public static final class L1_NormConvergence implements ConvergenceCriterion<DoubleValue>{ public boolean isConverged(int iteration, DoubleValue value) { double diff = value.getValue(); return diff < EPSILON; } } public static final class L1_NormDiff extends JoinFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> { public static final String AGGREGATOR_NAME = "linerank.aggregator"; private DoubleSumAggregator agg; public void open(Configuration parameters) { agg = getIterationRuntimeContext().getIterationAggregator(AGGREGATOR_NAME); } @Override public Tuple2<Long, Double> join(Tuple2<Long, Double> current, Tuple2<Long, Double> prev) throws Exception { agg.aggregate(Math.abs(prev.f1 - current.f1)); return current; } } |
Hi Janani!
Can you give us the code of the other variant, for comparison? Also, some quick questions to help figuring this out: 1) Your code uses an extra join for the convergence check. Does other variant you talk about also introduce an additional join? 2) The join you introduce, does it increase the data volume (find multiple matches partners per record) such that more records are emitted? Stephan |
Hi Stephan, Thanks for the reply. Below I pasted the code for the other variant which runs faster compared to the one in the previous email. Also here, I tried to answer your questions. *1) Your code uses an extra join for the convergence check. Does othervariant you talk about also introduce an additional join?* Yes, but it is a filter join. In my case I don't want to filter the tuples during the iteration, instead I want to check the convergence at the end of every super step and stop the entire iteration when convergence check is true. *2) The join you introduce, does it increase the data volume (find multiple matches partners per record) such that more records are emitted?* No. In the code from the previous email, each tuple emitted from DampingMapper is joined with exactly one tuple in the iteration. Hence the final join (L1_NormDiff()) in the iteration exactly emits one tuple for each joining key. For example if we consider it for page rank algorithm, the newly computed page rank for each page (each tuple) is compared with its value in the previous iteration and find the its difference in L1_NormDiff join. I want to aggregate this difference for all the pages in the current iteration and check this aggregated value is less than certain threshold at the end of each super step. Bulk Iteration: DataSet<Tuple2<Long, Double>> edgeScores = d.map(new InitializeRandomVector()).name("V"); IterativeDataSet<Tuple2<Long,Double>> iteration = edgeScores.iterate(maxIterations) .name("EdgeScoreVector_BulkIteration"); DataSet<Tuple2<Long, Double>> new_edgeScores = iteration .join(d).where(0).equalTo(0).with(new V1_HadamardProduct()).name("V1") .join(srcIncMat).where(0).equalTo(0).with(new V2_SrcIncWithV1()).name("V2") .groupBy(0).aggregate(Aggregations.SUM, 1) .join(tarIncMat).where(0).equalTo(1).with(new V3_TarIncWithV2()).name("V3") .map(new DampingMapper(c, numEdges)); DataSet<Tuple2<Long, Double>> convergedVector = iteration.closeWith( new_edgeScores, new_edgeScores.join(iteration).where(0).equalTo(0).filter(new EpsilonFilter())); public static final class EpsilonFilter extends FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> { @Override public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) { return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON; } } Thanks, Janani On Mon, Jul 7, 2014 at 12:13 PM, Stephan Ewen <[hidden email]> wrote: > Hi Janani! > > Can you give us the code of the other variant, for comparison? > > Also, some quick questions to help figuring this out: > > 1) Your code uses an extra join for the convergence check. Does other > variant you talk about also introduce an additional join? > > 2) The join you introduce, does it increase the data volume (find multiple > matches partners per record) such that more records are emitted? > > Stephan > |
Hi,
This issue was resolved (earlier). The problem was incorrect input. The incorrect input made the program to not satisfy the convergence criterion and kept running forever. Regards, Janani On Mon, Jul 7, 2014 at 3:14 PM, Janani Chakkaradhari <[hidden email] > wrote: > > Hi Stephan, > > Thanks for the reply. Below I pasted the code for the other variant which > runs faster compared to the one in the previous email. Also here, I tried > to answer your questions. > > *1) Your code uses an extra join for the convergence check. Does > othervariant you talk about also introduce an additional join?* > > Yes, but it is a filter join. In my case I don't want to filter the tuples > during the iteration, instead I want to check the convergence at the end of > every super step and stop the entire iteration when convergence check is > true. > > > *2) The join you introduce, does it increase the data volume (find > multiple matches partners per record) such that more records are emitted?* > > No. In the code from the previous email, each tuple emitted from > DampingMapper is joined with exactly one tuple in the iteration. Hence the > final join (L1_NormDiff()) in the iteration exactly emits one tuple for > each joining key. > > For example if we consider it for page rank algorithm, the newly computed > page rank for each page (each tuple) is compared with its value in the > previous iteration and find the its difference in L1_NormDiff join. I want > to aggregate this difference for all the pages in the current iteration and > check this aggregated value is less than certain threshold at the end of > each super step. > > Bulk Iteration: > > DataSet<Tuple2<Long, Double>> edgeScores = d.map(new > InitializeRandomVector()).name("V"); > IterativeDataSet<Tuple2<Long,Double>> iteration = > edgeScores.iterate(maxIterations) > .name("EdgeScoreVector_BulkIteration"); > > DataSet<Tuple2<Long, Double>> new_edgeScores = iteration > .join(d).where(0).equalTo(0).with(new > V1_HadamardProduct()).name("V1") > .join(srcIncMat).where(0).equalTo(0).with(new > V2_SrcIncWithV1()).name("V2") > .groupBy(0).aggregate(Aggregations.SUM, 1) > .join(tarIncMat).where(0).equalTo(1).with(new > V3_TarIncWithV2()).name("V3") > .map(new DampingMapper(c, numEdges)); > > > DataSet<Tuple2<Long, Double>> convergedVector = > iteration.closeWith( > new_edgeScores, > > new_edgeScores.join(iteration).where(0).equalTo(0).filter(new > EpsilonFilter())); > > > public static final class EpsilonFilter extends > FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> { > > @Override > public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, > Double>> value) { > return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON; > } > } > > Thanks, > Janani > > > On Mon, Jul 7, 2014 at 12:13 PM, Stephan Ewen <[hidden email]> wrote: > >> Hi Janani! >> >> Can you give us the code of the other variant, for comparison? >> >> Also, some quick questions to help figuring this out: >> >> 1) Your code uses an extra join for the convergence check. Does other >> variant you talk about also introduce an additional join? >> >> 2) The join you introduce, does it increase the data volume (find multiple >> matches partners per record) such that more records are emitted? >> >> Stephan >> > > |
Free forum by Nabble | Edit this page |