Hi all!
While working on FLINK-2905, I was wondering what a good (and fast) way to compute the intersect between two data sets (Gelly vertices in my case) with unknown size would be. I came up with three ways to solve this: Consider two sets: DataSet<Vertex<K, VV>> verticesLeft = this.getVertices(); DataSet<Vertex<K, VV>> verticesRight = graph.getVertices(); Way 1 (join) intersectVertices = verticesLeft.join(verticesRight) .where(0) .equalTo(0) .with(new JoinFunction<Vertex<K, VV>, /* .. * ./>() { @Override public Vertex<K, VV> join(Vertex<K, VV> first, Vertex<K, VV> second) throws Exception { return first; } }); Way 2 (coGroup) intersectVertices = verticesLeft.coGroup(verticesRight) .where(0) .equalTo(0) .with(new CoGroupFunction<Vertex<K, VV>, /* .. */ >() { @Override public void coGroup(Iterable<Vertex<K, VV>> first, Iterable<Vertex<K, VV>> second, Collector<Vertex<K, VV>> out) throws Exception { Iterator<Vertex<K, VV>> leftIt = first.iterator(); Iterator<Vertex<K, VV>> rightIt = second.iterator(); if (leftIt.hasNext() && rightIt.hasNext()) { out.collect(leftIt.next()); } } }); Way 3 (union + groupBy + aggregate) intersectVertices = verticesLeft.union(verticesRight) .map(new MapFunction<Vertex<K, VV>, Tuple3<K, VV, Integer>>() { @Override public Tuple3<K, VV, Integer> map(Vertex<K, VV> vertex) throws Exception { return new Tuple3<>(vertex.f0, vertex.f1, 1); } }).withForwardedFields("f0;f1") .groupBy(0) // vertex id .aggregate(Aggregations.SUM, 2) .filter(new FilterFunction<Tuple3<K, VV, Integer>>() { @Override public boolean filter(Tuple3<K, VV, Integer> value) { return value.f2 == 2; } }) .map(new MapFunction<Tuple3<K, VV, Integer>, Vertex<K, VV>>() { @Override public Vertex<K, VV> map(Tuple3<K, VV, Integer> vertexWithAggregate) { return new Vertex<>(vertexWithAggregate.f0, vertexWithAggregate.f1); } }).withForwardedFields("f0;f1"); Thanks for your input. Best, Martin |
Hi Martin,
isn't finding the intersection of edges enough in this case? And assuming there are no duplicate edges, I believe a join should do the trick. Cheers, -Vasia. On 28 October 2015 at 13:15, Martin Junghanns <[hidden email]> wrote: > Hi all! > > While working on FLINK-2905, I was wondering what a good (and fast) way to > compute the intersect between two data sets (Gelly vertices in my case) > with unknown size would be. > > I came up with three ways to solve this: > > Consider two sets: > > DataSet<Vertex<K, VV>> verticesLeft = this.getVertices(); > DataSet<Vertex<K, VV>> verticesRight = graph.getVertices(); > > Way 1 (join) > > intersectVertices = verticesLeft.join(verticesRight) > .where(0) > .equalTo(0) > .with(new JoinFunction<Vertex<K, VV>, /* .. * ./>() { > @Override > public Vertex<K, VV> join(Vertex<K, VV> first, Vertex<K, VV> second) > throws Exception { > return first; > } > }); > > Way 2 (coGroup) > > intersectVertices = verticesLeft.coGroup(verticesRight) > .where(0) > .equalTo(0) > .with(new CoGroupFunction<Vertex<K, VV>, /* .. */ >() { > @Override > public void coGroup(Iterable<Vertex<K, VV>> first, > Iterable<Vertex<K, VV>> second, > Collector<Vertex<K, VV>> out) throws Exception { > Iterator<Vertex<K, VV>> leftIt = first.iterator(); > Iterator<Vertex<K, VV>> rightIt = second.iterator(); > if (leftIt.hasNext() && rightIt.hasNext()) { > out.collect(leftIt.next()); > } > } > }); > > Way 3 (union + groupBy + aggregate) > > intersectVertices = verticesLeft.union(verticesRight) > .map(new MapFunction<Vertex<K, VV>, Tuple3<K, VV, Integer>>() { > @Override > public Tuple3<K, VV, Integer> map(Vertex<K, VV> vertex) > throws Exception { > return new Tuple3<>(vertex.f0, vertex.f1, 1); > } > }).withForwardedFields("f0;f1") > .groupBy(0) // vertex id > .aggregate(Aggregations.SUM, 2) > .filter(new FilterFunction<Tuple3<K, VV, Integer>>() { > @Override > public boolean filter(Tuple3<K, VV, Integer> value) { > return value.f2 == 2; > } > }) > .map(new MapFunction<Tuple3<K, VV, Integer>, Vertex<K, VV>>() { > @Override > public Vertex<K, VV> map(Tuple3<K, VV, Integer> vertexWithAggregate) { > return new Vertex<>(vertexWithAggregate.f0, vertexWithAggregate.f1); > } > }).withForwardedFields("f0;f1"); > > Thanks for your input. > > Best, > > Martin > > > > |
I would go for the first solution with the join.
This gives the engine the highest degree of freedom: - repartition vs. broadcast-forward - sort-merge vs. hash-join Best, Fabian 2015-10-28 18:45 GMT+01:00 Vasiliki Kalavri <[hidden email]>: > Hi Martin, > > isn't finding the intersection of edges enough in this case? > And assuming there are no duplicate edges, I believe a join should do the > trick. > > Cheers, > -Vasia. > > On 28 October 2015 at 13:15, Martin Junghanns <[hidden email]> > wrote: > > > Hi all! > > > > While working on FLINK-2905, I was wondering what a good (and fast) way > to > > compute the intersect between two data sets (Gelly vertices in my case) > > with unknown size would be. > > > > I came up with three ways to solve this: > > > > Consider two sets: > > > > DataSet<Vertex<K, VV>> verticesLeft = this.getVertices(); > > DataSet<Vertex<K, VV>> verticesRight = graph.getVertices(); > > > > Way 1 (join) > > > > intersectVertices = verticesLeft.join(verticesRight) > > .where(0) > > .equalTo(0) > > .with(new JoinFunction<Vertex<K, VV>, /* .. * ./>() { > > @Override > > public Vertex<K, VV> join(Vertex<K, VV> first, Vertex<K, VV> second) > > throws Exception { > > return first; > > } > > }); > > > > Way 2 (coGroup) > > > > intersectVertices = verticesLeft.coGroup(verticesRight) > > .where(0) > > .equalTo(0) > > .with(new CoGroupFunction<Vertex<K, VV>, /* .. */ >() { > > @Override > > public void coGroup(Iterable<Vertex<K, VV>> first, > > Iterable<Vertex<K, VV>> second, > > Collector<Vertex<K, VV>> out) throws Exception { > > Iterator<Vertex<K, VV>> leftIt = first.iterator(); > > Iterator<Vertex<K, VV>> rightIt = second.iterator(); > > if (leftIt.hasNext() && rightIt.hasNext()) { > > out.collect(leftIt.next()); > > } > > } > > }); > > > > Way 3 (union + groupBy + aggregate) > > > > intersectVertices = verticesLeft.union(verticesRight) > > .map(new MapFunction<Vertex<K, VV>, Tuple3<K, VV, Integer>>() { > > @Override > > public Tuple3<K, VV, Integer> map(Vertex<K, VV> vertex) > > throws Exception { > > return new Tuple3<>(vertex.f0, vertex.f1, 1); > > } > > }).withForwardedFields("f0;f1") > > .groupBy(0) // vertex id > > .aggregate(Aggregations.SUM, 2) > > .filter(new FilterFunction<Tuple3<K, VV, Integer>>() { > > @Override > > public boolean filter(Tuple3<K, VV, Integer> value) { > > return value.f2 == 2; > > } > > }) > > .map(new MapFunction<Tuple3<K, VV, Integer>, Vertex<K, VV>>() { > > @Override > > public Vertex<K, VV> map(Tuple3<K, VV, Integer> vertexWithAggregate) { > > return new Vertex<>(vertexWithAggregate.f0, vertexWithAggregate.f1); > > } > > }).withForwardedFields("f0;f1"); > > > > Thanks for your input. > > > > Best, > > > > Martin > > > > > > > > > |
Free forum by Nabble | Edit this page |