Hello squirrels,
I want to discuss with you a few concerns I have about our current vertex-centric model implementation, Spargel, now fully subsumed by Gelly. Spargel is our implementation of Pregel [1], but it violates some fundamental properties of the model, as described in the paper and as implemented in e.g. Giraph, GPS, Hama. I often find myself confused both when trying to explain it to current Giraph users and when porting my Giraph algorithms to it. More specifically: - in the Pregel model, messages produced in superstep n, are received in superstep n+1. In Spargel, they are produced and consumed in the same iteration. - in Pregel, vertices are active during a superstep, if they have received a message in the previous superstep. In Spargel, a vertex is active during a superstep if it has changed its value. These two differences require a lot of rethinking when porting applications and can easily cause bugs. The most important problem however is that we require the user to split the computation in 2 phases (2 UDFs): - messaging: has access to the vertex state and can produce messages - update: has access to incoming messages and can update the vertex value Pregel/Giraph only expose one UDF to the user: - compute: has access to both the vertex state and the incoming messages, can produce messages and update the vertex value. This might not seem like a big deal, but except from forcing the user to split their program logic into 2 phases, Spargel also makes some common computation patterns non-intuitive or impossible to write. A very simple example is propagating a message based on its value or sender ID. To do this with Spargel, one has to store all the incoming messages in the vertex value (might be of different type btw) during the messaging phase, so that they can be accessed during the update phase. So, my first question is, when implementing Spargel, were other alternatives considered and maybe rejected in favor of performance or because of some other reason? If someone knows, I would love to hear about them! Second, I wrote a prototype implementation [2] that only exposes one UDF, compute(), by keeping the vertex state in the solution set and the messages in the workset. This way all previously mentioned limitations go away and the API (see "SSSPComputeFunction" in the example [3]) looks a lot more like Giraph (see [4]). I have not run any experiments yet and the prototype has some ugly hacks, but if you think any of this makes sense, then I'd be willing to follow up and try to optimize it. If we see that it performs well, we can consider either replacing Spargel or adding it as an alternative. Thanks for reading this long e-mail and looking forward to your input! Cheers, -Vasia. [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf [2]: https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew [3]: https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java [4]: https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java |
Hi,
At our group, we also moved several algorithms from Giraph to Gelly and ran into some confusing issues (first in understanding, second during implementation) caused by the conceptional differences you described. If there are no concrete advantages (performance mainly) in the Spargel implementation, we would be very happy to see the Gelly API be aligned to Pregel-like systems. Your SSSP example speaks for itself. Straightforward, if the reader is familiar with Pregel/Giraph/... Best, Martin On 27.10.2015 17:40, Vasiliki Kalavri wrote: > Hello squirrels, > > I want to discuss with you a few concerns I have about our current > vertex-centric model implementation, Spargel, now fully subsumed by Gelly. > > Spargel is our implementation of Pregel [1], but it violates some > fundamental properties of the model, as described in the paper and as > implemented in e.g. Giraph, GPS, Hama. I often find myself confused both > when trying to explain it to current Giraph users and when porting my > Giraph algorithms to it. > > More specifically: > - in the Pregel model, messages produced in superstep n, are received in > superstep n+1. In Spargel, they are produced and consumed in the same > iteration. > - in Pregel, vertices are active during a superstep, if they have received > a message in the previous superstep. In Spargel, a vertex is active during > a superstep if it has changed its value. > > These two differences require a lot of rethinking when porting applications > and can easily cause bugs. > > The most important problem however is that we require the user to split the > computation in 2 phases (2 UDFs): > - messaging: has access to the vertex state and can produce messages > - update: has access to incoming messages and can update the vertex value > > Pregel/Giraph only expose one UDF to the user: > - compute: has access to both the vertex state and the incoming messages, > can produce messages and update the vertex value. > > This might not seem like a big deal, but except from forcing the user to > split their program logic into 2 phases, Spargel also makes some common > computation patterns non-intuitive or impossible to write. A very simple > example is propagating a message based on its value or sender ID. To do > this with Spargel, one has to store all the incoming messages in the vertex > value (might be of different type btw) during the messaging phase, so that > they can be accessed during the update phase. > > So, my first question is, when implementing Spargel, were other > alternatives considered and maybe rejected in favor of performance or > because of some other reason? If someone knows, I would love to hear about > them! > > Second, I wrote a prototype implementation [2] that only exposes one UDF, > compute(), by keeping the vertex state in the solution set and the messages > in the workset. This way all previously mentioned limitations go away and > the API (see "SSSPComputeFunction" in the example [3]) looks a lot more > like Giraph (see [4]). > > I have not run any experiments yet and the prototype has some ugly hacks, > but if you think any of this makes sense, then I'd be willing to follow up > and try to optimize it. If we see that it performs well, we can consider > either replacing Spargel or adding it as an alternative. > > Thanks for reading this long e-mail and looking forward to your input! > > Cheers, > -Vasia. > > [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > [2]: > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > [3]: > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > [4]: > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > |
I'll try to have a look at the proposal from a performance point of view in
the next days. Please ping me, if I don't follow up this thread. Cheers, Fabian 2015-10-27 18:28 GMT+01:00 Martin Junghanns <[hidden email]>: > Hi, > > At our group, we also moved several algorithms from Giraph to Gelly and > ran into some confusing issues (first in understanding, second during > implementation) caused by the conceptional differences you described. > > If there are no concrete advantages (performance mainly) in the Spargel > implementation, we would be very happy to see the Gelly API be aligned to > Pregel-like systems. > > Your SSSP example speaks for itself. Straightforward, if the reader is > familiar with Pregel/Giraph/... > > Best, > Martin > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > >> Hello squirrels, >> >> I want to discuss with you a few concerns I have about our current >> vertex-centric model implementation, Spargel, now fully subsumed by Gelly. >> >> Spargel is our implementation of Pregel [1], but it violates some >> fundamental properties of the model, as described in the paper and as >> implemented in e.g. Giraph, GPS, Hama. I often find myself confused both >> when trying to explain it to current Giraph users and when porting my >> Giraph algorithms to it. >> >> More specifically: >> - in the Pregel model, messages produced in superstep n, are received in >> superstep n+1. In Spargel, they are produced and consumed in the same >> iteration. >> - in Pregel, vertices are active during a superstep, if they have received >> a message in the previous superstep. In Spargel, a vertex is active during >> a superstep if it has changed its value. >> >> These two differences require a lot of rethinking when porting >> applications >> and can easily cause bugs. >> >> The most important problem however is that we require the user to split >> the >> computation in 2 phases (2 UDFs): >> - messaging: has access to the vertex state and can produce messages >> - update: has access to incoming messages and can update the vertex value >> >> Pregel/Giraph only expose one UDF to the user: >> - compute: has access to both the vertex state and the incoming messages, >> can produce messages and update the vertex value. >> >> This might not seem like a big deal, but except from forcing the user to >> split their program logic into 2 phases, Spargel also makes some common >> computation patterns non-intuitive or impossible to write. A very simple >> example is propagating a message based on its value or sender ID. To do >> this with Spargel, one has to store all the incoming messages in the >> vertex >> value (might be of different type btw) during the messaging phase, so that >> they can be accessed during the update phase. >> >> So, my first question is, when implementing Spargel, were other >> alternatives considered and maybe rejected in favor of performance or >> because of some other reason? If someone knows, I would love to hear about >> them! >> >> Second, I wrote a prototype implementation [2] that only exposes one UDF, >> compute(), by keeping the vertex state in the solution set and the >> messages >> in the workset. This way all previously mentioned limitations go away and >> the API (see "SSSPComputeFunction" in the example [3]) looks a lot more >> like Giraph (see [4]). >> >> I have not run any experiments yet and the prototype has some ugly hacks, >> but if you think any of this makes sense, then I'd be willing to follow up >> and try to optimize it. If we see that it performs well, we can consider >> either replacing Spargel or adding it as an alternative. >> >> Thanks for reading this long e-mail and looking forward to your input! >> >> Cheers, >> -Vasia. >> >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf >> [2]: >> >> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew >> [3]: >> >> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java >> [4]: >> >> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java >> >> |
@Martin: thanks for your input! If you ran into any other issues that I
didn't mention, please let us know. Obviously, even with my proposal, there are still features we cannot support, e.g. updating edge values and graph mutations. We'll need to re-think the underlying iteration and/or graph representation for those. @Fabian: thanks a lot, no rush :) Let me give you some more information that might make it easier to reason about performance: Currently, in Spargel the SolutionSet (SS) keeps the vertex state and the workset (WS) keeps the active vertices. The iteration is composed of 2 coGroups. The first one takes the WS and the edges and produces messages. The second one takes the messages and the SS and produced the new WS and the SS-delta. In my proposal, the SS has the vertex state and the WS has <vertexId, MessageIterator> pairs, i.e. the inbox of each vertex. The plan is more complicated because compute() needs to have two iterators: over the edges and over the messages. First, I join SS and WS to get the active vertices (have received a msg) and their current state. Then I coGroup the result with the edges to access the neighbors. Now the main problem is that this coGroup needs to have 2 outputs: the new messages and the new vertex value. I couldn't really find a nice way to do this, so I'm emitting a Tuple that contains both types and I have a flag to separate them later with 2 flatMaps. From the vertex flatMap, I crete the SS-delta and from the messaged flatMap I apply a reduce to group the messages by vertex and send them to the new WS. One optimization would be to expose a combiner here to reduce message size. tl;dr: 1. 2 coGroups vs. Join + coGroup + flatMap + reduce 2. how can we efficiently emit 2 different types of records from a coGroup? 3. does it make any difference if we group/combine the messages before updating the workset or after? Cheers, -Vasia. On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> wrote: > I'll try to have a look at the proposal from a performance point of view in > the next days. > Please ping me, if I don't follow up this thread. > > Cheers, Fabian > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <[hidden email]>: > > > Hi, > > > > At our group, we also moved several algorithms from Giraph to Gelly and > > ran into some confusing issues (first in understanding, second during > > implementation) caused by the conceptional differences you described. > > > > If there are no concrete advantages (performance mainly) in the Spargel > > implementation, we would be very happy to see the Gelly API be aligned to > > Pregel-like systems. > > > > Your SSSP example speaks for itself. Straightforward, if the reader is > > familiar with Pregel/Giraph/... > > > > Best, > > Martin > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > >> Hello squirrels, > >> > >> I want to discuss with you a few concerns I have about our current > >> vertex-centric model implementation, Spargel, now fully subsumed by > Gelly. > >> > >> Spargel is our implementation of Pregel [1], but it violates some > >> fundamental properties of the model, as described in the paper and as > >> implemented in e.g. Giraph, GPS, Hama. I often find myself confused both > >> when trying to explain it to current Giraph users and when porting my > >> Giraph algorithms to it. > >> > >> More specifically: > >> - in the Pregel model, messages produced in superstep n, are received in > >> superstep n+1. In Spargel, they are produced and consumed in the same > >> iteration. > >> - in Pregel, vertices are active during a superstep, if they have > received > >> a message in the previous superstep. In Spargel, a vertex is active > during > >> a superstep if it has changed its value. > >> > >> These two differences require a lot of rethinking when porting > >> applications > >> and can easily cause bugs. > >> > >> The most important problem however is that we require the user to split > >> the > >> computation in 2 phases (2 UDFs): > >> - messaging: has access to the vertex state and can produce messages > >> - update: has access to incoming messages and can update the vertex > value > >> > >> Pregel/Giraph only expose one UDF to the user: > >> - compute: has access to both the vertex state and the incoming > messages, > >> can produce messages and update the vertex value. > >> > >> This might not seem like a big deal, but except from forcing the user to > >> split their program logic into 2 phases, Spargel also makes some common > >> computation patterns non-intuitive or impossible to write. A very simple > >> example is propagating a message based on its value or sender ID. To do > >> this with Spargel, one has to store all the incoming messages in the > >> vertex > >> value (might be of different type btw) during the messaging phase, so > that > >> they can be accessed during the update phase. > >> > >> So, my first question is, when implementing Spargel, were other > >> alternatives considered and maybe rejected in favor of performance or > >> because of some other reason? If someone knows, I would love to hear > about > >> them! > >> > >> Second, I wrote a prototype implementation [2] that only exposes one > UDF, > >> compute(), by keeping the vertex state in the solution set and the > >> messages > >> in the workset. This way all previously mentioned limitations go away > and > >> the API (see "SSSPComputeFunction" in the example [3]) looks a lot more > >> like Giraph (see [4]). > >> > >> I have not run any experiments yet and the prototype has some ugly > hacks, > >> but if you think any of this makes sense, then I'd be willing to follow > up > >> and try to optimize it. If we see that it performs well, we can consider > >> either replacing Spargel or adding it as an alternative. > >> > >> Thanks for reading this long e-mail and looking forward to your input! > >> > >> Cheers, > >> -Vasia. > >> > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > >> [2]: > >> > >> > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > >> [3]: > >> > >> > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > >> [4]: > >> > >> > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > >> > >> > |
Hi Vasia,
I had a look at your new implementation and have a few ideas for improvements. 1) Sending out the input iterator as you do in the last GroupReduce is quite dangerous and does not give a benefit compared to collecting all elements. Even though it is an iterator, it needs to be completely materialized in-memory whenever the record is touched by Flink or user code. I would propose to skip the reduce step completely and handle all messages separates and only collect them in the CoGroup function before giving them into the VertexComputeFunction. Be careful, to only do that with objectReuse disabled or take care to properly copy the messages. If you collect the messages in the CoGroup, you don't need the GroupReduce, have smaller records and you can remove the MessageIterator class completely. 2) Add this annotation to the AppendVertexState function: @ForwardedFieldsFirst("*->f0"). This indicates that the complete element of the first input becomes the first field of the output. Since the input is partitioned on "f0" (it comes out of the partitioned solution set) the result of ApplyVertexState will be partitioned on "f0.f0" which is (accidentially :-D) the join key of the following coGroup function -> no partitioning :-) 3) Adding the two flatMap functions behind the CoGroup prevents chaining and causes therefore some serialization overhead but shouldn't be too bad. So in total I would make this program as follows: iVertices<K,VV> iMessage<K, Message> = iVertices.map(new InitWorkSet()); iteration = iVertices.iterateDelta(iMessages, maxIt, 0) verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() .join(iteration.workSet()) .where(0) // solution set is local and build side .equalTo(0) // workset is shuffled and probe side of hashjoin superstepComp<Vertex,Tuple2<K, Message>,Bool> = verticesWithMessage.coGroup(edgessWithValue) .where("f0.f0") // vwm is locally forward and sorted .equalTo(0) // edges are already partitioned and sorted (if cached correctly) .with(...) // The coGroup collects all messages in a collection and gives it to the ComputeFunction delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged into solution set workSet<K, Message> = superStepComp.flatMap(...) // partitioned for join iteration.closeWith(delta, workSet) So, if I am correct, the program will - partition the workset - sort the vertices with messages - partition the delta One observation I have is that this program requires that all messages fit into memory. Was that also the case before? Cheers, Fabian 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <[hidden email]>: > @Martin: thanks for your input! If you ran into any other issues that I > didn't mention, please let us know. Obviously, even with my proposal, there > are still features we cannot support, e.g. updating edge values and graph > mutations. We'll need to re-think the underlying iteration and/or graph > representation for those. > > @Fabian: thanks a lot, no rush :) > Let me give you some more information that might make it easier to reason > about performance: > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state and the > workset (WS) keeps the active vertices. The iteration is composed of 2 > coGroups. The first one takes the WS and the edges and produces messages. > The second one takes the messages and the SS and produced the new WS and > the SS-delta. > > In my proposal, the SS has the vertex state and the WS has <vertexId, > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is more > complicated because compute() needs to have two iterators: over the edges > and over the messages. > First, I join SS and WS to get the active vertices (have received a msg) > and their current state. Then I coGroup the result with the edges to access > the neighbors. Now the main problem is that this coGroup needs to have 2 > outputs: the new messages and the new vertex value. I couldn't really find > a nice way to do this, so I'm emitting a Tuple that contains both types and > I have a flag to separate them later with 2 flatMaps. From the vertex > flatMap, I crete the SS-delta and from the messaged flatMap I apply a > reduce to group the messages by vertex and send them to the new WS. One > optimization would be to expose a combiner here to reduce message size. > > tl;dr: > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > 2. how can we efficiently emit 2 different types of records from a coGroup? > 3. does it make any difference if we group/combine the messages before > updating the workset or after? > > Cheers, > -Vasia. > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> wrote: > > > I'll try to have a look at the proposal from a performance point of view > in > > the next days. > > Please ping me, if I don't follow up this thread. > > > > Cheers, Fabian > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <[hidden email]>: > > > > > Hi, > > > > > > At our group, we also moved several algorithms from Giraph to Gelly and > > > ran into some confusing issues (first in understanding, second during > > > implementation) caused by the conceptional differences you described. > > > > > > If there are no concrete advantages (performance mainly) in the Spargel > > > implementation, we would be very happy to see the Gelly API be aligned > to > > > Pregel-like systems. > > > > > > Your SSSP example speaks for itself. Straightforward, if the reader is > > > familiar with Pregel/Giraph/... > > > > > > Best, > > > Martin > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > >> Hello squirrels, > > >> > > >> I want to discuss with you a few concerns I have about our current > > >> vertex-centric model implementation, Spargel, now fully subsumed by > > Gelly. > > >> > > >> Spargel is our implementation of Pregel [1], but it violates some > > >> fundamental properties of the model, as described in the paper and as > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself confused > both > > >> when trying to explain it to current Giraph users and when porting my > > >> Giraph algorithms to it. > > >> > > >> More specifically: > > >> - in the Pregel model, messages produced in superstep n, are received > in > > >> superstep n+1. In Spargel, they are produced and consumed in the same > > >> iteration. > > >> - in Pregel, vertices are active during a superstep, if they have > > received > > >> a message in the previous superstep. In Spargel, a vertex is active > > during > > >> a superstep if it has changed its value. > > >> > > >> These two differences require a lot of rethinking when porting > > >> applications > > >> and can easily cause bugs. > > >> > > >> The most important problem however is that we require the user to > split > > >> the > > >> computation in 2 phases (2 UDFs): > > >> - messaging: has access to the vertex state and can produce messages > > >> - update: has access to incoming messages and can update the vertex > > value > > >> > > >> Pregel/Giraph only expose one UDF to the user: > > >> - compute: has access to both the vertex state and the incoming > > messages, > > >> can produce messages and update the vertex value. > > >> > > >> This might not seem like a big deal, but except from forcing the user > to > > >> split their program logic into 2 phases, Spargel also makes some > common > > >> computation patterns non-intuitive or impossible to write. A very > simple > > >> example is propagating a message based on its value or sender ID. To > do > > >> this with Spargel, one has to store all the incoming messages in the > > >> vertex > > >> value (might be of different type btw) during the messaging phase, so > > that > > >> they can be accessed during the update phase. > > >> > > >> So, my first question is, when implementing Spargel, were other > > >> alternatives considered and maybe rejected in favor of performance or > > >> because of some other reason? If someone knows, I would love to hear > > about > > >> them! > > >> > > >> Second, I wrote a prototype implementation [2] that only exposes one > > UDF, > > >> compute(), by keeping the vertex state in the solution set and the > > >> messages > > >> in the workset. This way all previously mentioned limitations go away > > and > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a lot > more > > >> like Giraph (see [4]). > > >> > > >> I have not run any experiments yet and the prototype has some ugly > > hacks, > > >> but if you think any of this makes sense, then I'd be willing to > follow > > up > > >> and try to optimize it. If we see that it performs well, we can > consider > > >> either replacing Spargel or adding it as an alternative. > > >> > > >> Thanks for reading this long e-mail and looking forward to your input! > > >> > > >> Cheers, > > >> -Vasia. > > >> > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > >> [2]: > > >> > > >> > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > >> [3]: > > >> > > >> > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > >> [4]: > > >> > > >> > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > >> > > >> > > > |
Hi Fabian,
thanks so much for looking into this so quickly :-) One update I have to make is that I tried running a few experiments with this on a 6-node cluster. The current implementation gets stuck at "Rebuilding Workset Properties" and never finishes a single iteration. Running the plan of one superstep without a delta iteration terminates fine. I didn't have access to the cluster today, so I couldn't debug this further, but I will do as soon as I have access again. The rest of my comments are inline: On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote: > Hi Vasia, > > I had a look at your new implementation and have a few ideas for > improvements. > 1) Sending out the input iterator as you do in the last GroupReduce is > quite dangerous and does not give a benefit compared to collecting all > elements. Even though it is an iterator, it needs to be completely > materialized in-memory whenever the record is touched by Flink or user > code. > I would propose to skip the reduce step completely and handle all messages > separates and only collect them in the CoGroup function before giving them > into the VertexComputeFunction. Be careful, to only do that with > objectReuse disabled or take care to properly copy the messages. If you > collect the messages in the CoGroup, you don't need the GroupReduce, have > smaller records and you can remove the MessageIterator class completely. > I see. The idea was to expose to message combiner that user could implement if the messages are combinable, e.g. min, sum. This is a common case and reduces the message load significantly. Is there a way I could do something similar before the coGroup? > 2) Add this annotation to the AppendVertexState function: > @ForwardedFieldsFirst("*->f0"). This indicates that the complete element of > the first input becomes the first field of the output. Since the input is > partitioned on "f0" (it comes out of the partitioned solution set) the > result of ApplyVertexState will be partitioned on "f0.f0" which is > (accidentially :-D) the join key of the following coGroup function -> no > partitioning :-) > Great! I totally missed that ;) > 3) Adding the two flatMap functions behind the CoGroup prevents chaining > and causes therefore some serialization overhead but shouldn't be too bad. > > So in total I would make this program as follows: > > iVertices<K,VV> > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() > .join(iteration.workSet()) > .where(0) // solution set is local and build side > .equalTo(0) // workset is shuffled and probe side of hashjoin > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > verticesWithMessage.coGroup(edgessWithValue) > .where("f0.f0") // vwm is locally forward and sorted > .equalTo(0) // edges are already partitioned and sorted (if cached > correctly) > .with(...) // The coGroup collects all messages in a collection and gives > it to the ComputeFunction > delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged into > solution set > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for join > iteration.closeWith(delta, workSet) > > So, if I am correct, the program will > - partition the workset > - sort the vertices with messages > - partition the delta > > One observation I have is that this program requires that all messages fit > into memory. Was that also the case before? > I believe not. The plan has one coGroup that produces the messages and a following coGroup that groups by the messages "target ID" and consumes them in an iterator. That doesn't require them to fit in memory, right? I'm also working on a version where the graph is represented as an adjacency list, instead of two separate datasets of vertices and edges. The disadvantage is that the graph has to fit in memory, but I think the advantages are many. We'll be able to support edge value updates, edge mutations and different edge access order guarantees. I'll get back to this thread when I have a working prototype. > > Cheers, > Fabian > Thanks again! Cheers, -Vasia. > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <[hidden email]>: > > > @Martin: thanks for your input! If you ran into any other issues that I > > didn't mention, please let us know. Obviously, even with my proposal, > there > > are still features we cannot support, e.g. updating edge values and graph > > mutations. We'll need to re-think the underlying iteration and/or graph > > representation for those. > > > > @Fabian: thanks a lot, no rush :) > > Let me give you some more information that might make it easier to reason > > about performance: > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state and the > > workset (WS) keeps the active vertices. The iteration is composed of 2 > > coGroups. The first one takes the WS and the edges and produces messages. > > The second one takes the messages and the SS and produced the new WS and > > the SS-delta. > > > > In my proposal, the SS has the vertex state and the WS has <vertexId, > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is more > > complicated because compute() needs to have two iterators: over the edges > > and over the messages. > > First, I join SS and WS to get the active vertices (have received a msg) > > and their current state. Then I coGroup the result with the edges to > access > > the neighbors. Now the main problem is that this coGroup needs to have 2 > > outputs: the new messages and the new vertex value. I couldn't really > find > > a nice way to do this, so I'm emitting a Tuple that contains both types > and > > I have a flag to separate them later with 2 flatMaps. From the vertex > > flatMap, I crete the SS-delta and from the messaged flatMap I apply a > > reduce to group the messages by vertex and send them to the new WS. One > > optimization would be to expose a combiner here to reduce message size. > > > > tl;dr: > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > 2. how can we efficiently emit 2 different types of records from a > coGroup? > > 3. does it make any difference if we group/combine the messages before > > updating the workset or after? > > > > Cheers, > > -Vasia. > > > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> wrote: > > > > > I'll try to have a look at the proposal from a performance point of > view > > in > > > the next days. > > > Please ping me, if I don't follow up this thread. > > > > > > Cheers, Fabian > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <[hidden email]>: > > > > > > > Hi, > > > > > > > > At our group, we also moved several algorithms from Giraph to Gelly > and > > > > ran into some confusing issues (first in understanding, second during > > > > implementation) caused by the conceptional differences you described. > > > > > > > > If there are no concrete advantages (performance mainly) in the > Spargel > > > > implementation, we would be very happy to see the Gelly API be > aligned > > to > > > > Pregel-like systems. > > > > > > > > Your SSSP example speaks for itself. Straightforward, if the reader > is > > > > familiar with Pregel/Giraph/... > > > > > > > > Best, > > > > Martin > > > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > > > >> Hello squirrels, > > > >> > > > >> I want to discuss with you a few concerns I have about our current > > > >> vertex-centric model implementation, Spargel, now fully subsumed by > > > Gelly. > > > >> > > > >> Spargel is our implementation of Pregel [1], but it violates some > > > >> fundamental properties of the model, as described in the paper and > as > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself confused > > both > > > >> when trying to explain it to current Giraph users and when porting > my > > > >> Giraph algorithms to it. > > > >> > > > >> More specifically: > > > >> - in the Pregel model, messages produced in superstep n, are > received > > in > > > >> superstep n+1. In Spargel, they are produced and consumed in the > same > > > >> iteration. > > > >> - in Pregel, vertices are active during a superstep, if they have > > > received > > > >> a message in the previous superstep. In Spargel, a vertex is active > > > during > > > >> a superstep if it has changed its value. > > > >> > > > >> These two differences require a lot of rethinking when porting > > > >> applications > > > >> and can easily cause bugs. > > > >> > > > >> The most important problem however is that we require the user to > > split > > > >> the > > > >> computation in 2 phases (2 UDFs): > > > >> - messaging: has access to the vertex state and can produce messages > > > >> - update: has access to incoming messages and can update the vertex > > > value > > > >> > > > >> Pregel/Giraph only expose one UDF to the user: > > > >> - compute: has access to both the vertex state and the incoming > > > messages, > > > >> can produce messages and update the vertex value. > > > >> > > > >> This might not seem like a big deal, but except from forcing the > user > > to > > > >> split their program logic into 2 phases, Spargel also makes some > > common > > > >> computation patterns non-intuitive or impossible to write. A very > > simple > > > >> example is propagating a message based on its value or sender ID. To > > do > > > >> this with Spargel, one has to store all the incoming messages in the > > > >> vertex > > > >> value (might be of different type btw) during the messaging phase, > so > > > that > > > >> they can be accessed during the update phase. > > > >> > > > >> So, my first question is, when implementing Spargel, were other > > > >> alternatives considered and maybe rejected in favor of performance > or > > > >> because of some other reason? If someone knows, I would love to hear > > > about > > > >> them! > > > >> > > > >> Second, I wrote a prototype implementation [2] that only exposes one > > > UDF, > > > >> compute(), by keeping the vertex state in the solution set and the > > > >> messages > > > >> in the workset. This way all previously mentioned limitations go > away > > > and > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a lot > > more > > > >> like Giraph (see [4]). > > > >> > > > >> I have not run any experiments yet and the prototype has some ugly > > > hacks, > > > >> but if you think any of this makes sense, then I'd be willing to > > follow > > > up > > > >> and try to optimize it. If we see that it performs well, we can > > consider > > > >> either replacing Spargel or adding it as an alternative. > > > >> > > > >> Thanks for reading this long e-mail and looking forward to your > input! > > > >> > > > >> Cheers, > > > >> -Vasia. > > > >> > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > > >> [2]: > > > >> > > > >> > > > > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > > >> [3]: > > > >> > > > >> > > > > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > > >> [4]: > > > >> > > > >> > > > > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > > >> > > > >> > > > > > > |
We can of course inject an optional ReduceFunction (or GroupReduce, or
combinable GroupReduce) to reduce the size of the work set. I suggested to remove the GroupReduce function, because it did only collect all messages into a single record by emitting the input iterator which is quite dangerous. Applying a combinable reduce function is could improve the performance considerably. The good news is that it would come "for free" because the necessary partitioning and sorting can be reused (given the forwardField annotations are correctly set): - The partitioning of the reduce can be reused for the join with the solution set - The sort of the reduce is preserved by the join with the in-memory hash-table of the solution set and can be reused for the coGroup. Best, Fabian 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email]>: > Hi Fabian, > > thanks so much for looking into this so quickly :-) > > One update I have to make is that I tried running a few experiments with > this on a 6-node cluster. The current implementation gets stuck at > "Rebuilding Workset Properties" and never finishes a single iteration. > Running the plan of one superstep without a delta iteration terminates > fine. I didn't have access to the cluster today, so I couldn't debug this > further, but I will do as soon as I have access again. > > The rest of my comments are inline: > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote: > > > Hi Vasia, > > > > I had a look at your new implementation and have a few ideas for > > improvements. > > 1) Sending out the input iterator as you do in the last GroupReduce is > > quite dangerous and does not give a benefit compared to collecting all > > elements. Even though it is an iterator, it needs to be completely > > materialized in-memory whenever the record is touched by Flink or user > > code. > > I would propose to skip the reduce step completely and handle all > messages > > separates and only collect them in the CoGroup function before giving > them > > into the VertexComputeFunction. Be careful, to only do that with > > objectReuse disabled or take care to properly copy the messages. If you > > collect the messages in the CoGroup, you don't need the GroupReduce, have > > smaller records and you can remove the MessageIterator class completely. > > > > I see. The idea was to expose to message combiner that user could > implement if the messages are combinable, e.g. min, sum. This is a common > case and reduces the message load significantly. Is there a way I could do > something similar before the coGroup? > > > > > 2) Add this annotation to the AppendVertexState function: > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete element > of > > the first input becomes the first field of the output. Since the input is > > partitioned on "f0" (it comes out of the partitioned solution set) the > > result of ApplyVertexState will be partitioned on "f0.f0" which is > > (accidentially :-D) the join key of the following coGroup function -> no > > partitioning :-) > > > > Great! I totally missed that ;) > > > > > 3) Adding the two flatMap functions behind the CoGroup prevents chaining > > and causes therefore some serialization overhead but shouldn't be too > bad. > > > > So in total I would make this program as follows: > > > > iVertices<K,VV> > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() > > .join(iteration.workSet()) > > .where(0) // solution set is local and build side > > .equalTo(0) // workset is shuffled and probe side of hashjoin > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > > verticesWithMessage.coGroup(edgessWithValue) > > .where("f0.f0") // vwm is locally forward and sorted > > .equalTo(0) // edges are already partitioned and sorted (if cached > > correctly) > > .with(...) // The coGroup collects all messages in a collection and > gives > > it to the ComputeFunction > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged > into > > solution set > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for join > > iteration.closeWith(delta, workSet) > > > > So, if I am correct, the program will > > - partition the workset > > - sort the vertices with messages > > - partition the delta > > > > One observation I have is that this program requires that all messages > fit > > into memory. Was that also the case before? > > > > I believe not. The plan has one coGroup that produces the messages and a > following coGroup that groups by the messages "target ID" and consumes > them in an iterator. That doesn't require them to fit in memory, right? > > > I'm also working on a version where the graph is represented as an > adjacency list, instead of two separate datasets of vertices and edges. The > disadvantage is that the graph has to fit in memory, but I think the > advantages are many. We'll be able to support edge value updates, edge > mutations and different edge access order guarantees. I'll get back to this > thread when I have a working prototype. > > > > > > Cheers, > > Fabian > > > > Thanks again! > > Cheers, > -Vasia. > > > > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <[hidden email]>: > > > > > @Martin: thanks for your input! If you ran into any other issues that I > > > didn't mention, please let us know. Obviously, even with my proposal, > > there > > > are still features we cannot support, e.g. updating edge values and > graph > > > mutations. We'll need to re-think the underlying iteration and/or graph > > > representation for those. > > > > > > @Fabian: thanks a lot, no rush :) > > > Let me give you some more information that might make it easier to > reason > > > about performance: > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state and > the > > > workset (WS) keeps the active vertices. The iteration is composed of 2 > > > coGroups. The first one takes the WS and the edges and produces > messages. > > > The second one takes the messages and the SS and produced the new WS > and > > > the SS-delta. > > > > > > In my proposal, the SS has the vertex state and the WS has <vertexId, > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is more > > > complicated because compute() needs to have two iterators: over the > edges > > > and over the messages. > > > First, I join SS and WS to get the active vertices (have received a > msg) > > > and their current state. Then I coGroup the result with the edges to > > access > > > the neighbors. Now the main problem is that this coGroup needs to have > 2 > > > outputs: the new messages and the new vertex value. I couldn't really > > find > > > a nice way to do this, so I'm emitting a Tuple that contains both types > > and > > > I have a flag to separate them later with 2 flatMaps. From the vertex > > > flatMap, I crete the SS-delta and from the messaged flatMap I apply a > > > reduce to group the messages by vertex and send them to the new WS. One > > > optimization would be to expose a combiner here to reduce message size. > > > > > > tl;dr: > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > > 2. how can we efficiently emit 2 different types of records from a > > coGroup? > > > 3. does it make any difference if we group/combine the messages before > > > updating the workset or after? > > > > > > Cheers, > > > -Vasia. > > > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> wrote: > > > > > > > I'll try to have a look at the proposal from a performance point of > > view > > > in > > > > the next days. > > > > Please ping me, if I don't follow up this thread. > > > > > > > > Cheers, Fabian > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <[hidden email] > >: > > > > > > > > > Hi, > > > > > > > > > > At our group, we also moved several algorithms from Giraph to Gelly > > and > > > > > ran into some confusing issues (first in understanding, second > during > > > > > implementation) caused by the conceptional differences you > described. > > > > > > > > > > If there are no concrete advantages (performance mainly) in the > > Spargel > > > > > implementation, we would be very happy to see the Gelly API be > > aligned > > > to > > > > > Pregel-like systems. > > > > > > > > > > Your SSSP example speaks for itself. Straightforward, if the reader > > is > > > > > familiar with Pregel/Giraph/... > > > > > > > > > > Best, > > > > > Martin > > > > > > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > > > > > >> Hello squirrels, > > > > >> > > > > >> I want to discuss with you a few concerns I have about our current > > > > >> vertex-centric model implementation, Spargel, now fully subsumed > by > > > > Gelly. > > > > >> > > > > >> Spargel is our implementation of Pregel [1], but it violates some > > > > >> fundamental properties of the model, as described in the paper and > > as > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself > confused > > > both > > > > >> when trying to explain it to current Giraph users and when porting > > my > > > > >> Giraph algorithms to it. > > > > >> > > > > >> More specifically: > > > > >> - in the Pregel model, messages produced in superstep n, are > > received > > > in > > > > >> superstep n+1. In Spargel, they are produced and consumed in the > > same > > > > >> iteration. > > > > >> - in Pregel, vertices are active during a superstep, if they have > > > > received > > > > >> a message in the previous superstep. In Spargel, a vertex is > active > > > > during > > > > >> a superstep if it has changed its value. > > > > >> > > > > >> These two differences require a lot of rethinking when porting > > > > >> applications > > > > >> and can easily cause bugs. > > > > >> > > > > >> The most important problem however is that we require the user to > > > split > > > > >> the > > > > >> computation in 2 phases (2 UDFs): > > > > >> - messaging: has access to the vertex state and can produce > messages > > > > >> - update: has access to incoming messages and can update the > vertex > > > > value > > > > >> > > > > >> Pregel/Giraph only expose one UDF to the user: > > > > >> - compute: has access to both the vertex state and the incoming > > > > messages, > > > > >> can produce messages and update the vertex value. > > > > >> > > > > >> This might not seem like a big deal, but except from forcing the > > user > > > to > > > > >> split their program logic into 2 phases, Spargel also makes some > > > common > > > > >> computation patterns non-intuitive or impossible to write. A very > > > simple > > > > >> example is propagating a message based on its value or sender ID. > To > > > do > > > > >> this with Spargel, one has to store all the incoming messages in > the > > > > >> vertex > > > > >> value (might be of different type btw) during the messaging phase, > > so > > > > that > > > > >> they can be accessed during the update phase. > > > > >> > > > > >> So, my first question is, when implementing Spargel, were other > > > > >> alternatives considered and maybe rejected in favor of performance > > or > > > > >> because of some other reason? If someone knows, I would love to > hear > > > > about > > > > >> them! > > > > >> > > > > >> Second, I wrote a prototype implementation [2] that only exposes > one > > > > UDF, > > > > >> compute(), by keeping the vertex state in the solution set and the > > > > >> messages > > > > >> in the workset. This way all previously mentioned limitations go > > away > > > > and > > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a lot > > > more > > > > >> like Giraph (see [4]). > > > > >> > > > > >> I have not run any experiments yet and the prototype has some ugly > > > > hacks, > > > > >> but if you think any of this makes sense, then I'd be willing to > > > follow > > > > up > > > > >> and try to optimize it. If we see that it performs well, we can > > > consider > > > > >> either replacing Spargel or adding it as an alternative. > > > > >> > > > > >> Thanks for reading this long e-mail and looking forward to your > > input! > > > > >> > > > > >> Cheers, > > > > >> -Vasia. > > > > >> > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > > > >> [2]: > > > > >> > > > > >> > > > > > > > > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > > > >> [3]: > > > > >> > > > > >> > > > > > > > > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > > > >> [4]: > > > > >> > > > > >> > > > > > > > > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > > > >> > > > > >> > > > > > > > > > > |
When creating the original version of Spargel I was pretty much thinking in
GSA terms, more than in Pregel terms. There are some fundamental differences between Spargel and Pregel. Spargel is in between GAS and Pregel in some way, that is how I have always thought about it. The main reason for the form is that it fits the dataflow paradigm easier: - If one function emits the new state of the vertex and the messages, it has two different return types, which means you need a union type and filer/split type of operation on the result, which also adds overhead. In the current model, each function has one return type, which makes it easy. - The workset is also the feedback channel, which is materialized at the superstep boundaries, so keeping it small at O(vertices), rather than O(edges) is a win for performance. There is no reason to not add a Pregel model, but I would not kill Spargel for it. It will be tough to get the Pregel variant to the same efficiency. Unless you want to say, for efficiency, go with GSA, for convenience with Pregel. There are some nice things about the Spargel model. The fact that messages are first generated then consumes makes the generation of initial messages simpler in many cases, I think. It was always a bit weird to me in Pregel that you had to check whether you are in superstep one, in which case you would expect no message, and generate initial value messages. On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <[hidden email]> wrote: > We can of course inject an optional ReduceFunction (or GroupReduce, or > combinable GroupReduce) to reduce the size of the work set. > I suggested to remove the GroupReduce function, because it did only collect > all messages into a single record by emitting the input iterator which is > quite dangerous. Applying a combinable reduce function is could improve the > performance considerably. > > The good news is that it would come "for free" because the necessary > partitioning and sorting can be reused (given the forwardField annotations > are correctly set): > - The partitioning of the reduce can be reused for the join with the > solution set > - The sort of the reduce is preserved by the join with the in-memory > hash-table of the solution set and can be reused for the coGroup. > > Best, > Fabian > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email]>: > > > Hi Fabian, > > > > thanks so much for looking into this so quickly :-) > > > > One update I have to make is that I tried running a few experiments with > > this on a 6-node cluster. The current implementation gets stuck at > > "Rebuilding Workset Properties" and never finishes a single iteration. > > Running the plan of one superstep without a delta iteration terminates > > fine. I didn't have access to the cluster today, so I couldn't debug this > > further, but I will do as soon as I have access again. > > > > The rest of my comments are inline: > > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote: > > > > > Hi Vasia, > > > > > > I had a look at your new implementation and have a few ideas for > > > improvements. > > > 1) Sending out the input iterator as you do in the last GroupReduce is > > > quite dangerous and does not give a benefit compared to collecting all > > > elements. Even though it is an iterator, it needs to be completely > > > materialized in-memory whenever the record is touched by Flink or user > > > code. > > > I would propose to skip the reduce step completely and handle all > > messages > > > separates and only collect them in the CoGroup function before giving > > them > > > into the VertexComputeFunction. Be careful, to only do that with > > > objectReuse disabled or take care to properly copy the messages. If you > > > collect the messages in the CoGroup, you don't need the GroupReduce, > have > > > smaller records and you can remove the MessageIterator class > completely. > > > > > > > I see. The idea was to expose to message combiner that user could > > implement if the messages are combinable, e.g. min, sum. This is a > common > > case and reduces the message load significantly. Is there a way I could > do > > something similar before the coGroup? > > > > > > > > > 2) Add this annotation to the AppendVertexState function: > > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete > element > > of > > > the first input becomes the first field of the output. Since the input > is > > > partitioned on "f0" (it comes out of the partitioned solution set) the > > > result of ApplyVertexState will be partitioned on "f0.f0" which is > > > (accidentially :-D) the join key of the following coGroup function -> > no > > > partitioning :-) > > > > > > > Great! I totally missed that ;) > > > > > > > > > 3) Adding the two flatMap functions behind the CoGroup prevents > chaining > > > and causes therefore some serialization overhead but shouldn't be too > > bad. > > > > > > So in total I would make this program as follows: > > > > > > iVertices<K,VV> > > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() > > > .join(iteration.workSet()) > > > .where(0) // solution set is local and build side > > > .equalTo(0) // workset is shuffled and probe side of hashjoin > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > > > verticesWithMessage.coGroup(edgessWithValue) > > > .where("f0.f0") // vwm is locally forward and sorted > > > .equalTo(0) // edges are already partitioned and sorted (if cached > > > correctly) > > > .with(...) // The coGroup collects all messages in a collection and > > gives > > > it to the ComputeFunction > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged > > into > > > solution set > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for > join > > > iteration.closeWith(delta, workSet) > > > > > > So, if I am correct, the program will > > > - partition the workset > > > - sort the vertices with messages > > > - partition the delta > > > > > > One observation I have is that this program requires that all messages > > fit > > > into memory. Was that also the case before? > > > > > > > I believe not. The plan has one coGroup that produces the messages and a > > following coGroup that groups by the messages "target ID" and consumes > > them in an iterator. That doesn't require them to fit in memory, right? > > > > > > I'm also working on a version where the graph is represented as an > > adjacency list, instead of two separate datasets of vertices and edges. > The > > disadvantage is that the graph has to fit in memory, but I think the > > advantages are many. We'll be able to support edge value updates, edge > > mutations and different edge access order guarantees. I'll get back to > this > > thread when I have a working prototype. > > > > > > > > > > Cheers, > > > Fabian > > > > > > > Thanks again! > > > > Cheers, > > -Vasia. > > > > > > > > > > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <[hidden email] > >: > > > > > > > @Martin: thanks for your input! If you ran into any other issues > that I > > > > didn't mention, please let us know. Obviously, even with my proposal, > > > there > > > > are still features we cannot support, e.g. updating edge values and > > graph > > > > mutations. We'll need to re-think the underlying iteration and/or > graph > > > > representation for those. > > > > > > > > @Fabian: thanks a lot, no rush :) > > > > Let me give you some more information that might make it easier to > > reason > > > > about performance: > > > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state and > > the > > > > workset (WS) keeps the active vertices. The iteration is composed of > 2 > > > > coGroups. The first one takes the WS and the edges and produces > > messages. > > > > The second one takes the messages and the SS and produced the new WS > > and > > > > the SS-delta. > > > > > > > > In my proposal, the SS has the vertex state and the WS has <vertexId, > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is > more > > > > complicated because compute() needs to have two iterators: over the > > edges > > > > and over the messages. > > > > First, I join SS and WS to get the active vertices (have received a > > msg) > > > > and their current state. Then I coGroup the result with the edges to > > > access > > > > the neighbors. Now the main problem is that this coGroup needs to > have > > 2 > > > > outputs: the new messages and the new vertex value. I couldn't really > > > find > > > > a nice way to do this, so I'm emitting a Tuple that contains both > types > > > and > > > > I have a flag to separate them later with 2 flatMaps. From the vertex > > > > flatMap, I crete the SS-delta and from the messaged flatMap I apply a > > > > reduce to group the messages by vertex and send them to the new WS. > One > > > > optimization would be to expose a combiner here to reduce message > size. > > > > > > > > tl;dr: > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > > > 2. how can we efficiently emit 2 different types of records from a > > > coGroup? > > > > 3. does it make any difference if we group/combine the messages > before > > > > updating the workset or after? > > > > > > > > Cheers, > > > > -Vasia. > > > > > > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> > wrote: > > > > > > > > > I'll try to have a look at the proposal from a performance point of > > > view > > > > in > > > > > the next days. > > > > > Please ping me, if I don't follow up this thread. > > > > > > > > > > Cheers, Fabian > > > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > [hidden email] > > >: > > > > > > > > > > > Hi, > > > > > > > > > > > > At our group, we also moved several algorithms from Giraph to > Gelly > > > and > > > > > > ran into some confusing issues (first in understanding, second > > during > > > > > > implementation) caused by the conceptional differences you > > described. > > > > > > > > > > > > If there are no concrete advantages (performance mainly) in the > > > Spargel > > > > > > implementation, we would be very happy to see the Gelly API be > > > aligned > > > > to > > > > > > Pregel-like systems. > > > > > > > > > > > > Your SSSP example speaks for itself. Straightforward, if the > reader > > > is > > > > > > familiar with Pregel/Giraph/... > > > > > > > > > > > > Best, > > > > > > Martin > > > > > > > > > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > > > > > > > >> Hello squirrels, > > > > > >> > > > > > >> I want to discuss with you a few concerns I have about our > current > > > > > >> vertex-centric model implementation, Spargel, now fully subsumed > > by > > > > > Gelly. > > > > > >> > > > > > >> Spargel is our implementation of Pregel [1], but it violates > some > > > > > >> fundamental properties of the model, as described in the paper > and > > > as > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself > > confused > > > > both > > > > > >> when trying to explain it to current Giraph users and when > porting > > > my > > > > > >> Giraph algorithms to it. > > > > > >> > > > > > >> More specifically: > > > > > >> - in the Pregel model, messages produced in superstep n, are > > > received > > > > in > > > > > >> superstep n+1. In Spargel, they are produced and consumed in the > > > same > > > > > >> iteration. > > > > > >> - in Pregel, vertices are active during a superstep, if they > have > > > > > received > > > > > >> a message in the previous superstep. In Spargel, a vertex is > > active > > > > > during > > > > > >> a superstep if it has changed its value. > > > > > >> > > > > > >> These two differences require a lot of rethinking when porting > > > > > >> applications > > > > > >> and can easily cause bugs. > > > > > >> > > > > > >> The most important problem however is that we require the user > to > > > > split > > > > > >> the > > > > > >> computation in 2 phases (2 UDFs): > > > > > >> - messaging: has access to the vertex state and can produce > > messages > > > > > >> - update: has access to incoming messages and can update the > > vertex > > > > > value > > > > > >> > > > > > >> Pregel/Giraph only expose one UDF to the user: > > > > > >> - compute: has access to both the vertex state and the incoming > > > > > messages, > > > > > >> can produce messages and update the vertex value. > > > > > >> > > > > > >> This might not seem like a big deal, but except from forcing the > > > user > > > > to > > > > > >> split their program logic into 2 phases, Spargel also makes some > > > > common > > > > > >> computation patterns non-intuitive or impossible to write. A > very > > > > simple > > > > > >> example is propagating a message based on its value or sender > ID. > > To > > > > do > > > > > >> this with Spargel, one has to store all the incoming messages in > > the > > > > > >> vertex > > > > > >> value (might be of different type btw) during the messaging > phase, > > > so > > > > > that > > > > > >> they can be accessed during the update phase. > > > > > >> > > > > > >> So, my first question is, when implementing Spargel, were other > > > > > >> alternatives considered and maybe rejected in favor of > performance > > > or > > > > > >> because of some other reason? If someone knows, I would love to > > hear > > > > > about > > > > > >> them! > > > > > >> > > > > > >> Second, I wrote a prototype implementation [2] that only exposes > > one > > > > > UDF, > > > > > >> compute(), by keeping the vertex state in the solution set and > the > > > > > >> messages > > > > > >> in the workset. This way all previously mentioned limitations go > > > away > > > > > and > > > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a > lot > > > > more > > > > > >> like Giraph (see [4]). > > > > > >> > > > > > >> I have not run any experiments yet and the prototype has some > ugly > > > > > hacks, > > > > > >> but if you think any of this makes sense, then I'd be willing to > > > > follow > > > > > up > > > > > >> and try to optimize it. If we see that it performs well, we can > > > > consider > > > > > >> either replacing Spargel or adding it as an alternative. > > > > > >> > > > > > >> Thanks for reading this long e-mail and looking forward to your > > > input! > > > > > >> > > > > > >> Cheers, > > > > > >> -Vasia. > > > > > >> > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > > > > >> [2]: > > > > > >> > > > > > >> > > > > > > > > > > > > > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > > > > >> [3]: > > > > > >> > > > > > >> > > > > > > > > > > > > > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > > > > >> [4]: > > > > > >> > > > > > >> > > > > > > > > > > > > > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > > > > >> > > > > > >> > > > > > > > > > > > > > > > |
Actually GAS was not known when we did the iterations work (and Spargel),
but the intuition that led to Spargel is similar then the intuition that led to GAS. On Mon, Nov 2, 2015 at 4:35 PM, Stephan Ewen <[hidden email]> wrote: > When creating the original version of Spargel I was pretty much thinking > in GSA terms, more than in Pregel terms. There are some fundamental > differences between Spargel and Pregel. Spargel is in between GAS and > Pregel in some way, that is how I have always thought about it. > > The main reason for the form is that it fits the dataflow paradigm easier: > > - If one function emits the new state of the vertex and the messages, it > has two different return types, which means you need a union type and > filer/split type of operation on the result, which also adds overhead. In > the current model, each function has one return type, which makes it easy. > > - The workset is also the feedback channel, which is materialized at the > superstep boundaries, so keeping it small at O(vertices), rather than > O(edges) is a win for performance. > > There is no reason to not add a Pregel model, but I would not kill Spargel > for it. It will be tough to get the Pregel variant to the same efficiency. > Unless you want to say, for efficiency, go with GSA, for convenience with > Pregel. > > There are some nice things about the Spargel model. The fact that messages > are first generated then consumes makes the generation of initial messages > simpler in many cases, I think. It was always a bit weird to me in Pregel > that you had to check whether you are in superstep one, in which case you > would expect no message, and generate initial value messages. > > > > On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <[hidden email]> wrote: > >> We can of course inject an optional ReduceFunction (or GroupReduce, or >> combinable GroupReduce) to reduce the size of the work set. >> I suggested to remove the GroupReduce function, because it did only >> collect >> all messages into a single record by emitting the input iterator which is >> quite dangerous. Applying a combinable reduce function is could improve >> the >> performance considerably. >> >> The good news is that it would come "for free" because the necessary >> partitioning and sorting can be reused (given the forwardField annotations >> are correctly set): >> - The partitioning of the reduce can be reused for the join with the >> solution set >> - The sort of the reduce is preserved by the join with the in-memory >> hash-table of the solution set and can be reused for the coGroup. >> >> Best, >> Fabian >> >> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email]>: >> >> > Hi Fabian, >> > >> > thanks so much for looking into this so quickly :-) >> > >> > One update I have to make is that I tried running a few experiments with >> > this on a 6-node cluster. The current implementation gets stuck at >> > "Rebuilding Workset Properties" and never finishes a single iteration. >> > Running the plan of one superstep without a delta iteration terminates >> > fine. I didn't have access to the cluster today, so I couldn't debug >> this >> > further, but I will do as soon as I have access again. >> > >> > The rest of my comments are inline: >> > >> > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote: >> > >> > > Hi Vasia, >> > > >> > > I had a look at your new implementation and have a few ideas for >> > > improvements. >> > > 1) Sending out the input iterator as you do in the last GroupReduce is >> > > quite dangerous and does not give a benefit compared to collecting all >> > > elements. Even though it is an iterator, it needs to be completely >> > > materialized in-memory whenever the record is touched by Flink or user >> > > code. >> > > I would propose to skip the reduce step completely and handle all >> > messages >> > > separates and only collect them in the CoGroup function before giving >> > them >> > > into the VertexComputeFunction. Be careful, to only do that with >> > > objectReuse disabled or take care to properly copy the messages. If >> you >> > > collect the messages in the CoGroup, you don't need the GroupReduce, >> have >> > > smaller records and you can remove the MessageIterator class >> completely. >> > > >> > >> > I see. The idea was to expose to message combiner that user could >> > implement if the messages are combinable, e.g. min, sum. This is a >> common >> > case and reduces the message load significantly. Is there a way I could >> do >> > something similar before the coGroup? >> > >> > >> > >> > > 2) Add this annotation to the AppendVertexState function: >> > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete >> element >> > of >> > > the first input becomes the first field of the output. Since the >> input is >> > > partitioned on "f0" (it comes out of the partitioned solution set) the >> > > result of ApplyVertexState will be partitioned on "f0.f0" which is >> > > (accidentially :-D) the join key of the following coGroup function -> >> no >> > > partitioning :-) >> > > >> > >> > Great! I totally missed that ;) >> > >> > >> > >> > > 3) Adding the two flatMap functions behind the CoGroup prevents >> chaining >> > > and causes therefore some serialization overhead but shouldn't be too >> > bad. >> > > >> > > So in total I would make this program as follows: >> > > >> > > iVertices<K,VV> >> > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); >> > > >> > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) >> > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() >> > > .join(iteration.workSet()) >> > > .where(0) // solution set is local and build side >> > > .equalTo(0) // workset is shuffled and probe side of hashjoin >> > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = >> > > verticesWithMessage.coGroup(edgessWithValue) >> > > .where("f0.f0") // vwm is locally forward and sorted >> > > .equalTo(0) // edges are already partitioned and sorted (if cached >> > > correctly) >> > > .with(...) // The coGroup collects all messages in a collection and >> > gives >> > > it to the ComputeFunction >> > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged >> > into >> > > solution set >> > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for >> join >> > > iteration.closeWith(delta, workSet) >> > > >> > > So, if I am correct, the program will >> > > - partition the workset >> > > - sort the vertices with messages >> > > - partition the delta >> > > >> > > One observation I have is that this program requires that all messages >> > fit >> > > into memory. Was that also the case before? >> > > >> > >> > I believe not. The plan has one coGroup that produces the messages and >> a >> > following coGroup that groups by the messages "target ID" and consumes >> > them in an iterator. That doesn't require them to fit in memory, right? >> > >> > >> > I'm also working on a version where the graph is represented as an >> > adjacency list, instead of two separate datasets of vertices and edges. >> The >> > disadvantage is that the graph has to fit in memory, but I think the >> > advantages are many. We'll be able to support edge value updates, edge >> > mutations and different edge access order guarantees. I'll get back to >> this >> > thread when I have a working prototype. >> > >> > >> > > >> > > Cheers, >> > > Fabian >> > > >> > >> > Thanks again! >> > >> > Cheers, >> > -Vasia. >> > >> > >> > >> > > >> > > >> > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < >> [hidden email]>: >> > > >> > > > @Martin: thanks for your input! If you ran into any other issues >> that I >> > > > didn't mention, please let us know. Obviously, even with my >> proposal, >> > > there >> > > > are still features we cannot support, e.g. updating edge values and >> > graph >> > > > mutations. We'll need to re-think the underlying iteration and/or >> graph >> > > > representation for those. >> > > > >> > > > @Fabian: thanks a lot, no rush :) >> > > > Let me give you some more information that might make it easier to >> > reason >> > > > about performance: >> > > > >> > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state >> and >> > the >> > > > workset (WS) keeps the active vertices. The iteration is composed >> of 2 >> > > > coGroups. The first one takes the WS and the edges and produces >> > messages. >> > > > The second one takes the messages and the SS and produced the new WS >> > and >> > > > the SS-delta. >> > > > >> > > > In my proposal, the SS has the vertex state and the WS has >> <vertexId, >> > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is >> more >> > > > complicated because compute() needs to have two iterators: over the >> > edges >> > > > and over the messages. >> > > > First, I join SS and WS to get the active vertices (have received a >> > msg) >> > > > and their current state. Then I coGroup the result with the edges to >> > > access >> > > > the neighbors. Now the main problem is that this coGroup needs to >> have >> > 2 >> > > > outputs: the new messages and the new vertex value. I couldn't >> really >> > > find >> > > > a nice way to do this, so I'm emitting a Tuple that contains both >> types >> > > and >> > > > I have a flag to separate them later with 2 flatMaps. From the >> vertex >> > > > flatMap, I crete the SS-delta and from the messaged flatMap I apply >> a >> > > > reduce to group the messages by vertex and send them to the new WS. >> One >> > > > optimization would be to expose a combiner here to reduce message >> size. >> > > > >> > > > tl;dr: >> > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce >> > > > 2. how can we efficiently emit 2 different types of records from a >> > > coGroup? >> > > > 3. does it make any difference if we group/combine the messages >> before >> > > > updating the workset or after? >> > > > >> > > > Cheers, >> > > > -Vasia. >> > > > >> > > > >> > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> >> wrote: >> > > > >> > > > > I'll try to have a look at the proposal from a performance point >> of >> > > view >> > > > in >> > > > > the next days. >> > > > > Please ping me, if I don't follow up this thread. >> > > > > >> > > > > Cheers, Fabian >> > > > > >> > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < >> [hidden email] >> > >: >> > > > > >> > > > > > Hi, >> > > > > > >> > > > > > At our group, we also moved several algorithms from Giraph to >> Gelly >> > > and >> > > > > > ran into some confusing issues (first in understanding, second >> > during >> > > > > > implementation) caused by the conceptional differences you >> > described. >> > > > > > >> > > > > > If there are no concrete advantages (performance mainly) in the >> > > Spargel >> > > > > > implementation, we would be very happy to see the Gelly API be >> > > aligned >> > > > to >> > > > > > Pregel-like systems. >> > > > > > >> > > > > > Your SSSP example speaks for itself. Straightforward, if the >> reader >> > > is >> > > > > > familiar with Pregel/Giraph/... >> > > > > > >> > > > > > Best, >> > > > > > Martin >> > > > > > >> > > > > > >> > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: >> > > > > > >> > > > > >> Hello squirrels, >> > > > > >> >> > > > > >> I want to discuss with you a few concerns I have about our >> current >> > > > > >> vertex-centric model implementation, Spargel, now fully >> subsumed >> > by >> > > > > Gelly. >> > > > > >> >> > > > > >> Spargel is our implementation of Pregel [1], but it violates >> some >> > > > > >> fundamental properties of the model, as described in the paper >> and >> > > as >> > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself >> > confused >> > > > both >> > > > > >> when trying to explain it to current Giraph users and when >> porting >> > > my >> > > > > >> Giraph algorithms to it. >> > > > > >> >> > > > > >> More specifically: >> > > > > >> - in the Pregel model, messages produced in superstep n, are >> > > received >> > > > in >> > > > > >> superstep n+1. In Spargel, they are produced and consumed in >> the >> > > same >> > > > > >> iteration. >> > > > > >> - in Pregel, vertices are active during a superstep, if they >> have >> > > > > received >> > > > > >> a message in the previous superstep. In Spargel, a vertex is >> > active >> > > > > during >> > > > > >> a superstep if it has changed its value. >> > > > > >> >> > > > > >> These two differences require a lot of rethinking when porting >> > > > > >> applications >> > > > > >> and can easily cause bugs. >> > > > > >> >> > > > > >> The most important problem however is that we require the user >> to >> > > > split >> > > > > >> the >> > > > > >> computation in 2 phases (2 UDFs): >> > > > > >> - messaging: has access to the vertex state and can produce >> > messages >> > > > > >> - update: has access to incoming messages and can update the >> > vertex >> > > > > value >> > > > > >> >> > > > > >> Pregel/Giraph only expose one UDF to the user: >> > > > > >> - compute: has access to both the vertex state and the incoming >> > > > > messages, >> > > > > >> can produce messages and update the vertex value. >> > > > > >> >> > > > > >> This might not seem like a big deal, but except from forcing >> the >> > > user >> > > > to >> > > > > >> split their program logic into 2 phases, Spargel also makes >> some >> > > > common >> > > > > >> computation patterns non-intuitive or impossible to write. A >> very >> > > > simple >> > > > > >> example is propagating a message based on its value or sender >> ID. >> > To >> > > > do >> > > > > >> this with Spargel, one has to store all the incoming messages >> in >> > the >> > > > > >> vertex >> > > > > >> value (might be of different type btw) during the messaging >> phase, >> > > so >> > > > > that >> > > > > >> they can be accessed during the update phase. >> > > > > >> >> > > > > >> So, my first question is, when implementing Spargel, were other >> > > > > >> alternatives considered and maybe rejected in favor of >> performance >> > > or >> > > > > >> because of some other reason? If someone knows, I would love to >> > hear >> > > > > about >> > > > > >> them! >> > > > > >> >> > > > > >> Second, I wrote a prototype implementation [2] that only >> exposes >> > one >> > > > > UDF, >> > > > > >> compute(), by keeping the vertex state in the solution set and >> the >> > > > > >> messages >> > > > > >> in the workset. This way all previously mentioned limitations >> go >> > > away >> > > > > and >> > > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a >> lot >> > > > more >> > > > > >> like Giraph (see [4]). >> > > > > >> >> > > > > >> I have not run any experiments yet and the prototype has some >> ugly >> > > > > hacks, >> > > > > >> but if you think any of this makes sense, then I'd be willing >> to >> > > > follow >> > > > > up >> > > > > >> and try to optimize it. If we see that it performs well, we can >> > > > consider >> > > > > >> either replacing Spargel or adding it as an alternative. >> > > > > >> >> > > > > >> Thanks for reading this long e-mail and looking forward to your >> > > input! >> > > > > >> >> > > > > >> Cheers, >> > > > > >> -Vasia. >> > > > > >> >> > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf >> > > > > >> [2]: >> > > > > >> >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew >> > > > > >> [3]: >> > > > > >> >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java >> > > > > >> [4]: >> > > > > >> >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java >> > > > > >> >> > > > > >> >> > > > > >> > > > >> > > >> > >> > > |
I tried out Spargel during my work with Spotify and have implemented
several algorithms using it. In all implementations I ended up storing additional Data and Flags on the Vertex to carry them over from one UDF to the next one. It definitely makes the code harder to write and maintain. I wonder how much overhead these additional constructs cost in computation and memory consumption. Maybe going for a less optimized 1 UDF version will be not so much of a performance hit for most applications. On Tue, Nov 3, 2015 at 8:43 AM, Stephan Ewen <[hidden email]> wrote: > Actually GAS was not known when we did the iterations work (and Spargel), > but the intuition that led to Spargel is similar then the intuition that > led to GAS. > > On Mon, Nov 2, 2015 at 4:35 PM, Stephan Ewen <[hidden email]> wrote: > > > When creating the original version of Spargel I was pretty much thinking > > in GSA terms, more than in Pregel terms. There are some fundamental > > differences between Spargel and Pregel. Spargel is in between GAS and > > Pregel in some way, that is how I have always thought about it. > > > > The main reason for the form is that it fits the dataflow paradigm > easier: > > > > - If one function emits the new state of the vertex and the messages, > it > > has two different return types, which means you need a union type and > > filer/split type of operation on the result, which also adds overhead. In > > the current model, each function has one return type, which makes it > easy. > > > > - The workset is also the feedback channel, which is materialized at the > > superstep boundaries, so keeping it small at O(vertices), rather than > > O(edges) is a win for performance. > > > > There is no reason to not add a Pregel model, but I would not kill > Spargel > > for it. It will be tough to get the Pregel variant to the same > efficiency. > > Unless you want to say, for efficiency, go with GSA, for convenience with > > Pregel. > > > > There are some nice things about the Spargel model. The fact that > messages > > are first generated then consumes makes the generation of initial > messages > > simpler in many cases, I think. It was always a bit weird to me in Pregel > > that you had to check whether you are in superstep one, in which case you > > would expect no message, and generate initial value messages. > > > > > > > > On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <[hidden email]> > wrote: > > > >> We can of course inject an optional ReduceFunction (or GroupReduce, or > >> combinable GroupReduce) to reduce the size of the work set. > >> I suggested to remove the GroupReduce function, because it did only > >> collect > >> all messages into a single record by emitting the input iterator which > is > >> quite dangerous. Applying a combinable reduce function is could improve > >> the > >> performance considerably. > >> > >> The good news is that it would come "for free" because the necessary > >> partitioning and sorting can be reused (given the forwardField > annotations > >> are correctly set): > >> - The partitioning of the reduce can be reused for the join with the > >> solution set > >> - The sort of the reduce is preserved by the join with the in-memory > >> hash-table of the solution set and can be reused for the coGroup. > >> > >> Best, > >> Fabian > >> > >> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email] > >: > >> > >> > Hi Fabian, > >> > > >> > thanks so much for looking into this so quickly :-) > >> > > >> > One update I have to make is that I tried running a few experiments > with > >> > this on a 6-node cluster. The current implementation gets stuck at > >> > "Rebuilding Workset Properties" and never finishes a single iteration. > >> > Running the plan of one superstep without a delta iteration terminates > >> > fine. I didn't have access to the cluster today, so I couldn't debug > >> this > >> > further, but I will do as soon as I have access again. > >> > > >> > The rest of my comments are inline: > >> > > >> > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote: > >> > > >> > > Hi Vasia, > >> > > > >> > > I had a look at your new implementation and have a few ideas for > >> > > improvements. > >> > > 1) Sending out the input iterator as you do in the last GroupReduce > is > >> > > quite dangerous and does not give a benefit compared to collecting > all > >> > > elements. Even though it is an iterator, it needs to be completely > >> > > materialized in-memory whenever the record is touched by Flink or > user > >> > > code. > >> > > I would propose to skip the reduce step completely and handle all > >> > messages > >> > > separates and only collect them in the CoGroup function before > giving > >> > them > >> > > into the VertexComputeFunction. Be careful, to only do that with > >> > > objectReuse disabled or take care to properly copy the messages. If > >> you > >> > > collect the messages in the CoGroup, you don't need the GroupReduce, > >> have > >> > > smaller records and you can remove the MessageIterator class > >> completely. > >> > > > >> > > >> > I see. The idea was to expose to message combiner that user could > >> > implement if the messages are combinable, e.g. min, sum. This is a > >> common > >> > case and reduces the message load significantly. Is there a way I > could > >> do > >> > something similar before the coGroup? > >> > > >> > > >> > > >> > > 2) Add this annotation to the AppendVertexState function: > >> > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete > >> element > >> > of > >> > > the first input becomes the first field of the output. Since the > >> input is > >> > > partitioned on "f0" (it comes out of the partitioned solution set) > the > >> > > result of ApplyVertexState will be partitioned on "f0.f0" which is > >> > > (accidentially :-D) the join key of the following coGroup function > -> > >> no > >> > > partitioning :-) > >> > > > >> > > >> > Great! I totally missed that ;) > >> > > >> > > >> > > >> > > 3) Adding the two flatMap functions behind the CoGroup prevents > >> chaining > >> > > and causes therefore some serialization overhead but shouldn't be > too > >> > bad. > >> > > > >> > > So in total I would make this program as follows: > >> > > > >> > > iVertices<K,VV> > >> > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > >> > > > >> > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > >> > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() > >> > > .join(iteration.workSet()) > >> > > .where(0) // solution set is local and build side > >> > > .equalTo(0) // workset is shuffled and probe side of hashjoin > >> > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > >> > > verticesWithMessage.coGroup(edgessWithValue) > >> > > .where("f0.f0") // vwm is locally forward and sorted > >> > > .equalTo(0) // edges are already partitioned and sorted (if > cached > >> > > correctly) > >> > > .with(...) // The coGroup collects all messages in a collection > and > >> > gives > >> > > it to the ComputeFunction > >> > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when > merged > >> > into > >> > > solution set > >> > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for > >> join > >> > > iteration.closeWith(delta, workSet) > >> > > > >> > > So, if I am correct, the program will > >> > > - partition the workset > >> > > - sort the vertices with messages > >> > > - partition the delta > >> > > > >> > > One observation I have is that this program requires that all > messages > >> > fit > >> > > into memory. Was that also the case before? > >> > > > >> > > >> > I believe not. The plan has one coGroup that produces the messages > and > >> a > >> > following coGroup that groups by the messages "target ID" and consumes > >> > them in an iterator. That doesn't require them to fit in memory, > right? > >> > > >> > > >> > I'm also working on a version where the graph is represented as an > >> > adjacency list, instead of two separate datasets of vertices and > edges. > >> The > >> > disadvantage is that the graph has to fit in memory, but I think the > >> > advantages are many. We'll be able to support edge value updates, > edge > >> > mutations and different edge access order guarantees. I'll get back to > >> this > >> > thread when I have a working prototype. > >> > > >> > > >> > > > >> > > Cheers, > >> > > Fabian > >> > > > >> > > >> > Thanks again! > >> > > >> > Cheers, > >> > -Vasia. > >> > > >> > > >> > > >> > > > >> > > > >> > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > >> [hidden email]>: > >> > > > >> > > > @Martin: thanks for your input! If you ran into any other issues > >> that I > >> > > > didn't mention, please let us know. Obviously, even with my > >> proposal, > >> > > there > >> > > > are still features we cannot support, e.g. updating edge values > and > >> > graph > >> > > > mutations. We'll need to re-think the underlying iteration and/or > >> graph > >> > > > representation for those. > >> > > > > >> > > > @Fabian: thanks a lot, no rush :) > >> > > > Let me give you some more information that might make it easier to > >> > reason > >> > > > about performance: > >> > > > > >> > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state > >> and > >> > the > >> > > > workset (WS) keeps the active vertices. The iteration is composed > >> of 2 > >> > > > coGroups. The first one takes the WS and the edges and produces > >> > messages. > >> > > > The second one takes the messages and the SS and produced the new > WS > >> > and > >> > > > the SS-delta. > >> > > > > >> > > > In my proposal, the SS has the vertex state and the WS has > >> <vertexId, > >> > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is > >> more > >> > > > complicated because compute() needs to have two iterators: over > the > >> > edges > >> > > > and over the messages. > >> > > > First, I join SS and WS to get the active vertices (have received > a > >> > msg) > >> > > > and their current state. Then I coGroup the result with the edges > to > >> > > access > >> > > > the neighbors. Now the main problem is that this coGroup needs to > >> have > >> > 2 > >> > > > outputs: the new messages and the new vertex value. I couldn't > >> really > >> > > find > >> > > > a nice way to do this, so I'm emitting a Tuple that contains both > >> types > >> > > and > >> > > > I have a flag to separate them later with 2 flatMaps. From the > >> vertex > >> > > > flatMap, I crete the SS-delta and from the messaged flatMap I > apply > >> a > >> > > > reduce to group the messages by vertex and send them to the new > WS. > >> One > >> > > > optimization would be to expose a combiner here to reduce message > >> size. > >> > > > > >> > > > tl;dr: > >> > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > >> > > > 2. how can we efficiently emit 2 different types of records from a > >> > > coGroup? > >> > > > 3. does it make any difference if we group/combine the messages > >> before > >> > > > updating the workset or after? > >> > > > > >> > > > Cheers, > >> > > > -Vasia. > >> > > > > >> > > > > >> > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> > >> wrote: > >> > > > > >> > > > > I'll try to have a look at the proposal from a performance point > >> of > >> > > view > >> > > > in > >> > > > > the next days. > >> > > > > Please ping me, if I don't follow up this thread. > >> > > > > > >> > > > > Cheers, Fabian > >> > > > > > >> > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > >> [hidden email] > >> > >: > >> > > > > > >> > > > > > Hi, > >> > > > > > > >> > > > > > At our group, we also moved several algorithms from Giraph to > >> Gelly > >> > > and > >> > > > > > ran into some confusing issues (first in understanding, second > >> > during > >> > > > > > implementation) caused by the conceptional differences you > >> > described. > >> > > > > > > >> > > > > > If there are no concrete advantages (performance mainly) in > the > >> > > Spargel > >> > > > > > implementation, we would be very happy to see the Gelly API be > >> > > aligned > >> > > > to > >> > > > > > Pregel-like systems. > >> > > > > > > >> > > > > > Your SSSP example speaks for itself. Straightforward, if the > >> reader > >> > > is > >> > > > > > familiar with Pregel/Giraph/... > >> > > > > > > >> > > > > > Best, > >> > > > > > Martin > >> > > > > > > >> > > > > > > >> > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > >> > > > > > > >> > > > > >> Hello squirrels, > >> > > > > >> > >> > > > > >> I want to discuss with you a few concerns I have about our > >> current > >> > > > > >> vertex-centric model implementation, Spargel, now fully > >> subsumed > >> > by > >> > > > > Gelly. > >> > > > > >> > >> > > > > >> Spargel is our implementation of Pregel [1], but it violates > >> some > >> > > > > >> fundamental properties of the model, as described in the > paper > >> and > >> > > as > >> > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself > >> > confused > >> > > > both > >> > > > > >> when trying to explain it to current Giraph users and when > >> porting > >> > > my > >> > > > > >> Giraph algorithms to it. > >> > > > > >> > >> > > > > >> More specifically: > >> > > > > >> - in the Pregel model, messages produced in superstep n, are > >> > > received > >> > > > in > >> > > > > >> superstep n+1. In Spargel, they are produced and consumed in > >> the > >> > > same > >> > > > > >> iteration. > >> > > > > >> - in Pregel, vertices are active during a superstep, if they > >> have > >> > > > > received > >> > > > > >> a message in the previous superstep. In Spargel, a vertex is > >> > active > >> > > > > during > >> > > > > >> a superstep if it has changed its value. > >> > > > > >> > >> > > > > >> These two differences require a lot of rethinking when > porting > >> > > > > >> applications > >> > > > > >> and can easily cause bugs. > >> > > > > >> > >> > > > > >> The most important problem however is that we require the > user > >> to > >> > > > split > >> > > > > >> the > >> > > > > >> computation in 2 phases (2 UDFs): > >> > > > > >> - messaging: has access to the vertex state and can produce > >> > messages > >> > > > > >> - update: has access to incoming messages and can update the > >> > vertex > >> > > > > value > >> > > > > >> > >> > > > > >> Pregel/Giraph only expose one UDF to the user: > >> > > > > >> - compute: has access to both the vertex state and the > incoming > >> > > > > messages, > >> > > > > >> can produce messages and update the vertex value. > >> > > > > >> > >> > > > > >> This might not seem like a big deal, but except from forcing > >> the > >> > > user > >> > > > to > >> > > > > >> split their program logic into 2 phases, Spargel also makes > >> some > >> > > > common > >> > > > > >> computation patterns non-intuitive or impossible to write. A > >> very > >> > > > simple > >> > > > > >> example is propagating a message based on its value or sender > >> ID. > >> > To > >> > > > do > >> > > > > >> this with Spargel, one has to store all the incoming messages > >> in > >> > the > >> > > > > >> vertex > >> > > > > >> value (might be of different type btw) during the messaging > >> phase, > >> > > so > >> > > > > that > >> > > > > >> they can be accessed during the update phase. > >> > > > > >> > >> > > > > >> So, my first question is, when implementing Spargel, were > other > >> > > > > >> alternatives considered and maybe rejected in favor of > >> performance > >> > > or > >> > > > > >> because of some other reason? If someone knows, I would love > to > >> > hear > >> > > > > about > >> > > > > >> them! > >> > > > > >> > >> > > > > >> Second, I wrote a prototype implementation [2] that only > >> exposes > >> > one > >> > > > > UDF, > >> > > > > >> compute(), by keeping the vertex state in the solution set > and > >> the > >> > > > > >> messages > >> > > > > >> in the workset. This way all previously mentioned limitations > >> go > >> > > away > >> > > > > and > >> > > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks > a > >> lot > >> > > > more > >> > > > > >> like Giraph (see [4]). > >> > > > > >> > >> > > > > >> I have not run any experiments yet and the prototype has some > >> ugly > >> > > > > hacks, > >> > > > > >> but if you think any of this makes sense, then I'd be willing > >> to > >> > > > follow > >> > > > > up > >> > > > > >> and try to optimize it. If we see that it performs well, we > can > >> > > > consider > >> > > > > >> either replacing Spargel or adding it as an alternative. > >> > > > > >> > >> > > > > >> Thanks for reading this long e-mail and looking forward to > your > >> > > input! > >> > > > > >> > >> > > > > >> Cheers, > >> > > > > >> -Vasia. > >> > > > > >> > >> > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > >> > > > > >> [2]: > >> > > > > >> > >> > > > > >> > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > >> > > > > >> [3]: > >> > > > > >> > >> > > > > >> > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > >> > > > > >> [4]: > >> > > > > >> > >> > > > > >> > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > >> > > > > >> > >> > > > > >> > >> > > > > > >> > > > > >> > > > >> > > >> > > > > > |
Thanks for the detailed explanation Stephan!
Seeing Spargel as a Gather-Scatter model makes much more sense :) I think we should be more careful not to present it as a "Pregel equivalent" to avoid confusion of users coming from systems like Giraph. Maybe I could put together a comparison table Pregel-Spargel-GSA to make clear what each model can support and what are the (dis-)advantages. I wouldn't kill Spargel either, but I would like to add to Gelly a more generic iteration model that could support algorithms beyond simplistic graph propagation. Pregel could be a start, even though it also has its issues. Having to return 2 different types and the size of the workset are the two problems I also faced, when trying to map Pregel to a Flink plan. Also, I am mostly convinced that representing the Graph as two separate datasets of vertices and edges is not optimal for the Pregel model. I'll share my findings as soon as I have an adjacency list version running. Thanks again! Cheers, -Vasia. On 3 November 2015 at 12:59, Martin Neumann <[hidden email]> wrote: > I tried out Spargel during my work with Spotify and have implemented > several algorithms using it. In all implementations I ended up storing > additional Data and Flags on the Vertex to carry them over from one UDF to > the next one. It definitely makes the code harder to write and maintain. > > I wonder how much overhead these additional constructs cost in computation > and memory consumption. Maybe going for a less optimized 1 UDF version will > be not so much of a performance hit for most applications. > > > > On Tue, Nov 3, 2015 at 8:43 AM, Stephan Ewen <[hidden email]> wrote: > > > Actually GAS was not known when we did the iterations work (and Spargel), > > but the intuition that led to Spargel is similar then the intuition that > > led to GAS. > > > > On Mon, Nov 2, 2015 at 4:35 PM, Stephan Ewen <[hidden email]> wrote: > > > > > When creating the original version of Spargel I was pretty much > thinking > > > in GSA terms, more than in Pregel terms. There are some fundamental > > > differences between Spargel and Pregel. Spargel is in between GAS and > > > Pregel in some way, that is how I have always thought about it. > > > > > > The main reason for the form is that it fits the dataflow paradigm > > easier: > > > > > > - If one function emits the new state of the vertex and the messages, > > it > > > has two different return types, which means you need a union type and > > > filer/split type of operation on the result, which also adds overhead. > In > > > the current model, each function has one return type, which makes it > > easy. > > > > > > - The workset is also the feedback channel, which is materialized at > the > > > superstep boundaries, so keeping it small at O(vertices), rather than > > > O(edges) is a win for performance. > > > > > > There is no reason to not add a Pregel model, but I would not kill > > Spargel > > > for it. It will be tough to get the Pregel variant to the same > > efficiency. > > > Unless you want to say, for efficiency, go with GSA, for convenience > with > > > Pregel. > > > > > > There are some nice things about the Spargel model. The fact that > > messages > > > are first generated then consumes makes the generation of initial > > messages > > > simpler in many cases, I think. It was always a bit weird to me in > Pregel > > > that you had to check whether you are in superstep one, in which case > you > > > would expect no message, and generate initial value messages. > > > > > > > > > > > > On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <[hidden email]> > > wrote: > > > > > >> We can of course inject an optional ReduceFunction (or GroupReduce, or > > >> combinable GroupReduce) to reduce the size of the work set. > > >> I suggested to remove the GroupReduce function, because it did only > > >> collect > > >> all messages into a single record by emitting the input iterator which > > is > > >> quite dangerous. Applying a combinable reduce function is could > improve > > >> the > > >> performance considerably. > > >> > > >> The good news is that it would come "for free" because the necessary > > >> partitioning and sorting can be reused (given the forwardField > > annotations > > >> are correctly set): > > >> - The partitioning of the reduce can be reused for the join with the > > >> solution set > > >> - The sort of the reduce is preserved by the join with the in-memory > > >> hash-table of the solution set and can be reused for the coGroup. > > >> > > >> Best, > > >> Fabian > > >> > > >> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri < > [hidden email] > > >: > > >> > > >> > Hi Fabian, > > >> > > > >> > thanks so much for looking into this so quickly :-) > > >> > > > >> > One update I have to make is that I tried running a few experiments > > with > > >> > this on a 6-node cluster. The current implementation gets stuck at > > >> > "Rebuilding Workset Properties" and never finishes a single > iteration. > > >> > Running the plan of one superstep without a delta iteration > terminates > > >> > fine. I didn't have access to the cluster today, so I couldn't debug > > >> this > > >> > further, but I will do as soon as I have access again. > > >> > > > >> > The rest of my comments are inline: > > >> > > > >> > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> > wrote: > > >> > > > >> > > Hi Vasia, > > >> > > > > >> > > I had a look at your new implementation and have a few ideas for > > >> > > improvements. > > >> > > 1) Sending out the input iterator as you do in the last > GroupReduce > > is > > >> > > quite dangerous and does not give a benefit compared to collecting > > all > > >> > > elements. Even though it is an iterator, it needs to be completely > > >> > > materialized in-memory whenever the record is touched by Flink or > > user > > >> > > code. > > >> > > I would propose to skip the reduce step completely and handle all > > >> > messages > > >> > > separates and only collect them in the CoGroup function before > > giving > > >> > them > > >> > > into the VertexComputeFunction. Be careful, to only do that with > > >> > > objectReuse disabled or take care to properly copy the messages. > If > > >> you > > >> > > collect the messages in the CoGroup, you don't need the > GroupReduce, > > >> have > > >> > > smaller records and you can remove the MessageIterator class > > >> completely. > > >> > > > > >> > > > >> > I see. The idea was to expose to message combiner that user could > > >> > implement if the messages are combinable, e.g. min, sum. This is a > > >> common > > >> > case and reduces the message load significantly. Is there a way I > > could > > >> do > > >> > something similar before the coGroup? > > >> > > > >> > > > >> > > > >> > > 2) Add this annotation to the AppendVertexState function: > > >> > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete > > >> element > > >> > of > > >> > > the first input becomes the first field of the output. Since the > > >> input is > > >> > > partitioned on "f0" (it comes out of the partitioned solution set) > > the > > >> > > result of ApplyVertexState will be partitioned on "f0.f0" which is > > >> > > (accidentially :-D) the join key of the following coGroup function > > -> > > >> no > > >> > > partitioning :-) > > >> > > > > >> > > > >> > Great! I totally missed that ;) > > >> > > > >> > > > >> > > > >> > > 3) Adding the two flatMap functions behind the CoGroup prevents > > >> chaining > > >> > > and causes therefore some serialization overhead but shouldn't be > > too > > >> > bad. > > >> > > > > >> > > So in total I would make this program as follows: > > >> > > > > >> > > iVertices<K,VV> > > >> > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > >> > > > > >> > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > > >> > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() > > >> > > .join(iteration.workSet()) > > >> > > .where(0) // solution set is local and build side > > >> > > .equalTo(0) // workset is shuffled and probe side of hashjoin > > >> > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > > >> > > verticesWithMessage.coGroup(edgessWithValue) > > >> > > .where("f0.f0") // vwm is locally forward and sorted > > >> > > .equalTo(0) // edges are already partitioned and sorted (if > > cached > > >> > > correctly) > > >> > > .with(...) // The coGroup collects all messages in a collection > > and > > >> > gives > > >> > > it to the ComputeFunction > > >> > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when > > merged > > >> > into > > >> > > solution set > > >> > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned > for > > >> join > > >> > > iteration.closeWith(delta, workSet) > > >> > > > > >> > > So, if I am correct, the program will > > >> > > - partition the workset > > >> > > - sort the vertices with messages > > >> > > - partition the delta > > >> > > > > >> > > One observation I have is that this program requires that all > > messages > > >> > fit > > >> > > into memory. Was that also the case before? > > >> > > > > >> > > > >> > I believe not. The plan has one coGroup that produces the messages > > and > > >> a > > >> > following coGroup that groups by the messages "target ID" and > consumes > > >> > them in an iterator. That doesn't require them to fit in memory, > > right? > > >> > > > >> > > > >> > I'm also working on a version where the graph is represented as an > > >> > adjacency list, instead of two separate datasets of vertices and > > edges. > > >> The > > >> > disadvantage is that the graph has to fit in memory, but I think the > > >> > advantages are many. We'll be able to support edge value updates, > > edge > > >> > mutations and different edge access order guarantees. I'll get back > to > > >> this > > >> > thread when I have a working prototype. > > >> > > > >> > > > >> > > > > >> > > Cheers, > > >> > > Fabian > > >> > > > > >> > > > >> > Thanks again! > > >> > > > >> > Cheers, > > >> > -Vasia. > > >> > > > >> > > > >> > > > >> > > > > >> > > > > >> > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > > >> [hidden email]>: > > >> > > > > >> > > > @Martin: thanks for your input! If you ran into any other issues > > >> that I > > >> > > > didn't mention, please let us know. Obviously, even with my > > >> proposal, > > >> > > there > > >> > > > are still features we cannot support, e.g. updating edge values > > and > > >> > graph > > >> > > > mutations. We'll need to re-think the underlying iteration > and/or > > >> graph > > >> > > > representation for those. > > >> > > > > > >> > > > @Fabian: thanks a lot, no rush :) > > >> > > > Let me give you some more information that might make it easier > to > > >> > reason > > >> > > > about performance: > > >> > > > > > >> > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex > state > > >> and > > >> > the > > >> > > > workset (WS) keeps the active vertices. The iteration is > composed > > >> of 2 > > >> > > > coGroups. The first one takes the WS and the edges and produces > > >> > messages. > > >> > > > The second one takes the messages and the SS and produced the > new > > WS > > >> > and > > >> > > > the SS-delta. > > >> > > > > > >> > > > In my proposal, the SS has the vertex state and the WS has > > >> <vertexId, > > >> > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan > is > > >> more > > >> > > > complicated because compute() needs to have two iterators: over > > the > > >> > edges > > >> > > > and over the messages. > > >> > > > First, I join SS and WS to get the active vertices (have > received > > a > > >> > msg) > > >> > > > and their current state. Then I coGroup the result with the > edges > > to > > >> > > access > > >> > > > the neighbors. Now the main problem is that this coGroup needs > to > > >> have > > >> > 2 > > >> > > > outputs: the new messages and the new vertex value. I couldn't > > >> really > > >> > > find > > >> > > > a nice way to do this, so I'm emitting a Tuple that contains > both > > >> types > > >> > > and > > >> > > > I have a flag to separate them later with 2 flatMaps. From the > > >> vertex > > >> > > > flatMap, I crete the SS-delta and from the messaged flatMap I > > apply > > >> a > > >> > > > reduce to group the messages by vertex and send them to the new > > WS. > > >> One > > >> > > > optimization would be to expose a combiner here to reduce > message > > >> size. > > >> > > > > > >> > > > tl;dr: > > >> > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > >> > > > 2. how can we efficiently emit 2 different types of records > from a > > >> > > coGroup? > > >> > > > 3. does it make any difference if we group/combine the messages > > >> before > > >> > > > updating the workset or after? > > >> > > > > > >> > > > Cheers, > > >> > > > -Vasia. > > >> > > > > > >> > > > > > >> > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> > > >> wrote: > > >> > > > > > >> > > > > I'll try to have a look at the proposal from a performance > point > > >> of > > >> > > view > > >> > > > in > > >> > > > > the next days. > > >> > > > > Please ping me, if I don't follow up this thread. > > >> > > > > > > >> > > > > Cheers, Fabian > > >> > > > > > > >> > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > > >> [hidden email] > > >> > >: > > >> > > > > > > >> > > > > > Hi, > > >> > > > > > > > >> > > > > > At our group, we also moved several algorithms from Giraph > to > > >> Gelly > > >> > > and > > >> > > > > > ran into some confusing issues (first in understanding, > second > > >> > during > > >> > > > > > implementation) caused by the conceptional differences you > > >> > described. > > >> > > > > > > > >> > > > > > If there are no concrete advantages (performance mainly) in > > the > > >> > > Spargel > > >> > > > > > implementation, we would be very happy to see the Gelly API > be > > >> > > aligned > > >> > > > to > > >> > > > > > Pregel-like systems. > > >> > > > > > > > >> > > > > > Your SSSP example speaks for itself. Straightforward, if the > > >> reader > > >> > > is > > >> > > > > > familiar with Pregel/Giraph/... > > >> > > > > > > > >> > > > > > Best, > > >> > > > > > Martin > > >> > > > > > > > >> > > > > > > > >> > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > >> > > > > > > > >> > > > > >> Hello squirrels, > > >> > > > > >> > > >> > > > > >> I want to discuss with you a few concerns I have about our > > >> current > > >> > > > > >> vertex-centric model implementation, Spargel, now fully > > >> subsumed > > >> > by > > >> > > > > Gelly. > > >> > > > > >> > > >> > > > > >> Spargel is our implementation of Pregel [1], but it > violates > > >> some > > >> > > > > >> fundamental properties of the model, as described in the > > paper > > >> and > > >> > > as > > >> > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself > > >> > confused > > >> > > > both > > >> > > > > >> when trying to explain it to current Giraph users and when > > >> porting > > >> > > my > > >> > > > > >> Giraph algorithms to it. > > >> > > > > >> > > >> > > > > >> More specifically: > > >> > > > > >> - in the Pregel model, messages produced in superstep n, > are > > >> > > received > > >> > > > in > > >> > > > > >> superstep n+1. In Spargel, they are produced and consumed > in > > >> the > > >> > > same > > >> > > > > >> iteration. > > >> > > > > >> - in Pregel, vertices are active during a superstep, if > they > > >> have > > >> > > > > received > > >> > > > > >> a message in the previous superstep. In Spargel, a vertex > is > > >> > active > > >> > > > > during > > >> > > > > >> a superstep if it has changed its value. > > >> > > > > >> > > >> > > > > >> These two differences require a lot of rethinking when > > porting > > >> > > > > >> applications > > >> > > > > >> and can easily cause bugs. > > >> > > > > >> > > >> > > > > >> The most important problem however is that we require the > > user > > >> to > > >> > > > split > > >> > > > > >> the > > >> > > > > >> computation in 2 phases (2 UDFs): > > >> > > > > >> - messaging: has access to the vertex state and can produce > > >> > messages > > >> > > > > >> - update: has access to incoming messages and can update > the > > >> > vertex > > >> > > > > value > > >> > > > > >> > > >> > > > > >> Pregel/Giraph only expose one UDF to the user: > > >> > > > > >> - compute: has access to both the vertex state and the > > incoming > > >> > > > > messages, > > >> > > > > >> can produce messages and update the vertex value. > > >> > > > > >> > > >> > > > > >> This might not seem like a big deal, but except from > forcing > > >> the > > >> > > user > > >> > > > to > > >> > > > > >> split their program logic into 2 phases, Spargel also makes > > >> some > > >> > > > common > > >> > > > > >> computation patterns non-intuitive or impossible to write. > A > > >> very > > >> > > > simple > > >> > > > > >> example is propagating a message based on its value or > sender > > >> ID. > > >> > To > > >> > > > do > > >> > > > > >> this with Spargel, one has to store all the incoming > messages > > >> in > > >> > the > > >> > > > > >> vertex > > >> > > > > >> value (might be of different type btw) during the messaging > > >> phase, > > >> > > so > > >> > > > > that > > >> > > > > >> they can be accessed during the update phase. > > >> > > > > >> > > >> > > > > >> So, my first question is, when implementing Spargel, were > > other > > >> > > > > >> alternatives considered and maybe rejected in favor of > > >> performance > > >> > > or > > >> > > > > >> because of some other reason? If someone knows, I would > love > > to > > >> > hear > > >> > > > > about > > >> > > > > >> them! > > >> > > > > >> > > >> > > > > >> Second, I wrote a prototype implementation [2] that only > > >> exposes > > >> > one > > >> > > > > UDF, > > >> > > > > >> compute(), by keeping the vertex state in the solution set > > and > > >> the > > >> > > > > >> messages > > >> > > > > >> in the workset. This way all previously mentioned > limitations > > >> go > > >> > > away > > >> > > > > and > > >> > > > > >> the API (see "SSSPComputeFunction" in the example [3]) > looks > > a > > >> lot > > >> > > > more > > >> > > > > >> like Giraph (see [4]). > > >> > > > > >> > > >> > > > > >> I have not run any experiments yet and the prototype has > some > > >> ugly > > >> > > > > hacks, > > >> > > > > >> but if you think any of this makes sense, then I'd be > willing > > >> to > > >> > > > follow > > >> > > > > up > > >> > > > > >> and try to optimize it. If we see that it performs well, we > > can > > >> > > > consider > > >> > > > > >> either replacing Spargel or adding it as an alternative. > > >> > > > > >> > > >> > > > > >> Thanks for reading this long e-mail and looking forward to > > your > > >> > > input! > > >> > > > > >> > > >> > > > > >> Cheers, > > >> > > > > >> -Vasia. > > >> > > > > >> > > >> > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > >> > > > > >> [2]: > > >> > > > > >> > > >> > > > > >> > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > >> > > > > >> [3]: > > >> > > > > >> > > >> > > > > >> > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > >> > > > > >> [4]: > > >> > > > > >> > > >> > > > > >> > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > >> > > > > >> > > >> > > > > >> > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > > |
In reply to this post by Stephan Ewen
I also think a Giraph-like model could be added, but we shouldn't remove
Spargel in favour of it! On Tue, Nov 3, 2015 at 2:35 AM, Stephan Ewen <[hidden email]> wrote: > When creating the original version of Spargel I was pretty much thinking in > GSA terms, more than in Pregel terms. There are some fundamental > differences between Spargel and Pregel. Spargel is in between GAS and > Pregel in some way, that is how I have always thought about it. > > The main reason for the form is that it fits the dataflow paradigm easier: > > - If one function emits the new state of the vertex and the messages, it > has two different return types, which means you need a union type and > filer/split type of operation on the result, which also adds overhead. In > the current model, each function has one return type, which makes it easy. > > - The workset is also the feedback channel, which is materialized at the > superstep boundaries, so keeping it small at O(vertices), rather than > O(edges) is a win for performance. > > There is no reason to not add a Pregel model, but I would not kill Spargel > for it. It will be tough to get the Pregel variant to the same efficiency. > Unless you want to say, for efficiency, go with GSA, for convenience with > Pregel. > > There are some nice things about the Spargel model. The fact that messages > are first generated then consumes makes the generation of initial messages > simpler in many cases, I think. It was always a bit weird to me in Pregel > that you had to check whether you are in superstep one, in which case you > would expect no message, and generate initial value messages. > > > > On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <[hidden email]> wrote: > > > We can of course inject an optional ReduceFunction (or GroupReduce, or > > combinable GroupReduce) to reduce the size of the work set. > > I suggested to remove the GroupReduce function, because it did only > collect > > all messages into a single record by emitting the input iterator which is > > quite dangerous. Applying a combinable reduce function is could improve > the > > performance considerably. > > > > The good news is that it would come "for free" because the necessary > > partitioning and sorting can be reused (given the forwardField > annotations > > are correctly set): > > - The partitioning of the reduce can be reused for the join with the > > solution set > > - The sort of the reduce is preserved by the join with the in-memory > > hash-table of the solution set and can be reused for the coGroup. > > > > Best, > > Fabian > > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email]>: > > > > > Hi Fabian, > > > > > > thanks so much for looking into this so quickly :-) > > > > > > One update I have to make is that I tried running a few experiments > with > > > this on a 6-node cluster. The current implementation gets stuck at > > > "Rebuilding Workset Properties" and never finishes a single iteration. > > > Running the plan of one superstep without a delta iteration terminates > > > fine. I didn't have access to the cluster today, so I couldn't debug > this > > > further, but I will do as soon as I have access again. > > > > > > The rest of my comments are inline: > > > > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote: > > > > > > > Hi Vasia, > > > > > > > > I had a look at your new implementation and have a few ideas for > > > > improvements. > > > > 1) Sending out the input iterator as you do in the last GroupReduce > is > > > > quite dangerous and does not give a benefit compared to collecting > all > > > > elements. Even though it is an iterator, it needs to be completely > > > > materialized in-memory whenever the record is touched by Flink or > user > > > > code. > > > > I would propose to skip the reduce step completely and handle all > > > messages > > > > separates and only collect them in the CoGroup function before giving > > > them > > > > into the VertexComputeFunction. Be careful, to only do that with > > > > objectReuse disabled or take care to properly copy the messages. If > you > > > > collect the messages in the CoGroup, you don't need the GroupReduce, > > have > > > > smaller records and you can remove the MessageIterator class > > completely. > > > > > > > > > > I see. The idea was to expose to message combiner that user could > > > implement if the messages are combinable, e.g. min, sum. This is a > > common > > > case and reduces the message load significantly. Is there a way I could > > do > > > something similar before the coGroup? > > > > > > > > > > > > > 2) Add this annotation to the AppendVertexState function: > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete > > element > > > of > > > > the first input becomes the first field of the output. Since the > input > > is > > > > partitioned on "f0" (it comes out of the partitioned solution set) > the > > > > result of ApplyVertexState will be partitioned on "f0.f0" which is > > > > (accidentially :-D) the join key of the following coGroup function -> > > no > > > > partitioning :-) > > > > > > > > > > Great! I totally missed that ;) > > > > > > > > > > > > > 3) Adding the two flatMap functions behind the CoGroup prevents > > chaining > > > > and causes therefore some serialization overhead but shouldn't be too > > > bad. > > > > > > > > So in total I would make this program as follows: > > > > > > > > iVertices<K,VV> > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > > > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > > > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() > > > > .join(iteration.workSet()) > > > > .where(0) // solution set is local and build side > > > > .equalTo(0) // workset is shuffled and probe side of hashjoin > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > > > > verticesWithMessage.coGroup(edgessWithValue) > > > > .where("f0.f0") // vwm is locally forward and sorted > > > > .equalTo(0) // edges are already partitioned and sorted (if cached > > > > correctly) > > > > .with(...) // The coGroup collects all messages in a collection and > > > gives > > > > it to the ComputeFunction > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged > > > into > > > > solution set > > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for > > join > > > > iteration.closeWith(delta, workSet) > > > > > > > > So, if I am correct, the program will > > > > - partition the workset > > > > - sort the vertices with messages > > > > - partition the delta > > > > > > > > One observation I have is that this program requires that all > messages > > > fit > > > > into memory. Was that also the case before? > > > > > > > > > > I believe not. The plan has one coGroup that produces the messages > and a > > > following coGroup that groups by the messages "target ID" and consumes > > > them in an iterator. That doesn't require them to fit in memory, > right? > > > > > > > > > I'm also working on a version where the graph is represented as an > > > adjacency list, instead of two separate datasets of vertices and edges. > > The > > > disadvantage is that the graph has to fit in memory, but I think the > > > advantages are many. We'll be able to support edge value updates, edge > > > mutations and different edge access order guarantees. I'll get back to > > this > > > thread when I have a working prototype. > > > > > > > > > > > > > > Cheers, > > > > Fabian > > > > > > > > > > Thanks again! > > > > > > Cheers, > > > -Vasia. > > > > > > > > > > > > > > > > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > [hidden email] > > >: > > > > > > > > > @Martin: thanks for your input! If you ran into any other issues > > that I > > > > > didn't mention, please let us know. Obviously, even with my > proposal, > > > > there > > > > > are still features we cannot support, e.g. updating edge values and > > > graph > > > > > mutations. We'll need to re-think the underlying iteration and/or > > graph > > > > > representation for those. > > > > > > > > > > @Fabian: thanks a lot, no rush :) > > > > > Let me give you some more information that might make it easier to > > > reason > > > > > about performance: > > > > > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state > and > > > the > > > > > workset (WS) keeps the active vertices. The iteration is composed > of > > 2 > > > > > coGroups. The first one takes the WS and the edges and produces > > > messages. > > > > > The second one takes the messages and the SS and produced the new > WS > > > and > > > > > the SS-delta. > > > > > > > > > > In my proposal, the SS has the vertex state and the WS has > <vertexId, > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is > > more > > > > > complicated because compute() needs to have two iterators: over the > > > edges > > > > > and over the messages. > > > > > First, I join SS and WS to get the active vertices (have received a > > > msg) > > > > > and their current state. Then I coGroup the result with the edges > to > > > > access > > > > > the neighbors. Now the main problem is that this coGroup needs to > > have > > > 2 > > > > > outputs: the new messages and the new vertex value. I couldn't > really > > > > find > > > > > a nice way to do this, so I'm emitting a Tuple that contains both > > types > > > > and > > > > > I have a flag to separate them later with 2 flatMaps. From the > vertex > > > > > flatMap, I crete the SS-delta and from the messaged flatMap I > apply a > > > > > reduce to group the messages by vertex and send them to the new WS. > > One > > > > > optimization would be to expose a combiner here to reduce message > > size. > > > > > > > > > > tl;dr: > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > > > > 2. how can we efficiently emit 2 different types of records from a > > > > coGroup? > > > > > 3. does it make any difference if we group/combine the messages > > before > > > > > updating the workset or after? > > > > > > > > > > Cheers, > > > > > -Vasia. > > > > > > > > > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> > > wrote: > > > > > > > > > > > I'll try to have a look at the proposal from a performance point > of > > > > view > > > > > in > > > > > > the next days. > > > > > > Please ping me, if I don't follow up this thread. > > > > > > > > > > > > Cheers, Fabian > > > > > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > > [hidden email] > > > >: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > At our group, we also moved several algorithms from Giraph to > > Gelly > > > > and > > > > > > > ran into some confusing issues (first in understanding, second > > > during > > > > > > > implementation) caused by the conceptional differences you > > > described. > > > > > > > > > > > > > > If there are no concrete advantages (performance mainly) in the > > > > Spargel > > > > > > > implementation, we would be very happy to see the Gelly API be > > > > aligned > > > > > to > > > > > > > Pregel-like systems. > > > > > > > > > > > > > > Your SSSP example speaks for itself. Straightforward, if the > > reader > > > > is > > > > > > > familiar with Pregel/Giraph/... > > > > > > > > > > > > > > Best, > > > > > > > Martin > > > > > > > > > > > > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > > > > > > > > > >> Hello squirrels, > > > > > > >> > > > > > > >> I want to discuss with you a few concerns I have about our > > current > > > > > > >> vertex-centric model implementation, Spargel, now fully > subsumed > > > by > > > > > > Gelly. > > > > > > >> > > > > > > >> Spargel is our implementation of Pregel [1], but it violates > > some > > > > > > >> fundamental properties of the model, as described in the paper > > and > > > > as > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself > > > confused > > > > > both > > > > > > >> when trying to explain it to current Giraph users and when > > porting > > > > my > > > > > > >> Giraph algorithms to it. > > > > > > >> > > > > > > >> More specifically: > > > > > > >> - in the Pregel model, messages produced in superstep n, are > > > > received > > > > > in > > > > > > >> superstep n+1. In Spargel, they are produced and consumed in > the > > > > same > > > > > > >> iteration. > > > > > > >> - in Pregel, vertices are active during a superstep, if they > > have > > > > > > received > > > > > > >> a message in the previous superstep. In Spargel, a vertex is > > > active > > > > > > during > > > > > > >> a superstep if it has changed its value. > > > > > > >> > > > > > > >> These two differences require a lot of rethinking when porting > > > > > > >> applications > > > > > > >> and can easily cause bugs. > > > > > > >> > > > > > > >> The most important problem however is that we require the user > > to > > > > > split > > > > > > >> the > > > > > > >> computation in 2 phases (2 UDFs): > > > > > > >> - messaging: has access to the vertex state and can produce > > > messages > > > > > > >> - update: has access to incoming messages and can update the > > > vertex > > > > > > value > > > > > > >> > > > > > > >> Pregel/Giraph only expose one UDF to the user: > > > > > > >> - compute: has access to both the vertex state and the > incoming > > > > > > messages, > > > > > > >> can produce messages and update the vertex value. > > > > > > >> > > > > > > >> This might not seem like a big deal, but except from forcing > the > > > > user > > > > > to > > > > > > >> split their program logic into 2 phases, Spargel also makes > some > > > > > common > > > > > > >> computation patterns non-intuitive or impossible to write. A > > very > > > > > simple > > > > > > >> example is propagating a message based on its value or sender > > ID. > > > To > > > > > do > > > > > > >> this with Spargel, one has to store all the incoming messages > in > > > the > > > > > > >> vertex > > > > > > >> value (might be of different type btw) during the messaging > > phase, > > > > so > > > > > > that > > > > > > >> they can be accessed during the update phase. > > > > > > >> > > > > > > >> So, my first question is, when implementing Spargel, were > other > > > > > > >> alternatives considered and maybe rejected in favor of > > performance > > > > or > > > > > > >> because of some other reason? If someone knows, I would love > to > > > hear > > > > > > about > > > > > > >> them! > > > > > > >> > > > > > > >> Second, I wrote a prototype implementation [2] that only > exposes > > > one > > > > > > UDF, > > > > > > >> compute(), by keeping the vertex state in the solution set and > > the > > > > > > >> messages > > > > > > >> in the workset. This way all previously mentioned limitations > go > > > > away > > > > > > and > > > > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a > > lot > > > > > more > > > > > > >> like Giraph (see [4]). > > > > > > >> > > > > > > >> I have not run any experiments yet and the prototype has some > > ugly > > > > > > hacks, > > > > > > >> but if you think any of this makes sense, then I'd be willing > to > > > > > follow > > > > > > up > > > > > > >> and try to optimize it. If we see that it performs well, we > can > > > > > consider > > > > > > >> either replacing Spargel or adding it as an alternative. > > > > > > >> > > > > > > >> Thanks for reading this long e-mail and looking forward to > your > > > > input! > > > > > > >> > > > > > > >> Cheers, > > > > > > >> -Vasia. > > > > > > >> > > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > > > > > >> [2]: > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > > > > > >> [3]: > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > > > > > >> [4]: > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > |
The problem with having many different graph model in gelly is that it
might get quite confusing for a user. Maybe this can be fixed with good documentation so that its clear how each model works and what its benefits are (and maybe when its better to use it over a different model). On Tue, Nov 3, 2015 at 3:29 PM, Andra Lungu <[hidden email]> wrote: > I also think a Giraph-like model could be added, but we shouldn't remove > Spargel in favour of it! > > On Tue, Nov 3, 2015 at 2:35 AM, Stephan Ewen <[hidden email]> wrote: > > > When creating the original version of Spargel I was pretty much thinking > in > > GSA terms, more than in Pregel terms. There are some fundamental > > differences between Spargel and Pregel. Spargel is in between GAS and > > Pregel in some way, that is how I have always thought about it. > > > > The main reason for the form is that it fits the dataflow paradigm > easier: > > > > - If one function emits the new state of the vertex and the messages, > it > > has two different return types, which means you need a union type and > > filer/split type of operation on the result, which also adds overhead. In > > the current model, each function has one return type, which makes it > easy. > > > > - The workset is also the feedback channel, which is materialized at the > > superstep boundaries, so keeping it small at O(vertices), rather than > > O(edges) is a win for performance. > > > > There is no reason to not add a Pregel model, but I would not kill > Spargel > > for it. It will be tough to get the Pregel variant to the same > efficiency. > > Unless you want to say, for efficiency, go with GSA, for convenience with > > Pregel. > > > > There are some nice things about the Spargel model. The fact that > messages > > are first generated then consumes makes the generation of initial > messages > > simpler in many cases, I think. It was always a bit weird to me in Pregel > > that you had to check whether you are in superstep one, in which case you > > would expect no message, and generate initial value messages. > > > > > > > > On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <[hidden email]> > wrote: > > > > > We can of course inject an optional ReduceFunction (or GroupReduce, or > > > combinable GroupReduce) to reduce the size of the work set. > > > I suggested to remove the GroupReduce function, because it did only > > collect > > > all messages into a single record by emitting the input iterator which > is > > > quite dangerous. Applying a combinable reduce function is could improve > > the > > > performance considerably. > > > > > > The good news is that it would come "for free" because the necessary > > > partitioning and sorting can be reused (given the forwardField > > annotations > > > are correctly set): > > > - The partitioning of the reduce can be reused for the join with the > > > solution set > > > - The sort of the reduce is preserved by the join with the in-memory > > > hash-table of the solution set and can be reused for the coGroup. > > > > > > Best, > > > Fabian > > > > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email] > >: > > > > > > > Hi Fabian, > > > > > > > > thanks so much for looking into this so quickly :-) > > > > > > > > One update I have to make is that I tried running a few experiments > > with > > > > this on a 6-node cluster. The current implementation gets stuck at > > > > "Rebuilding Workset Properties" and never finishes a single > iteration. > > > > Running the plan of one superstep without a delta iteration > terminates > > > > fine. I didn't have access to the cluster today, so I couldn't debug > > this > > > > further, but I will do as soon as I have access again. > > > > > > > > The rest of my comments are inline: > > > > > > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> > wrote: > > > > > > > > > Hi Vasia, > > > > > > > > > > I had a look at your new implementation and have a few ideas for > > > > > improvements. > > > > > 1) Sending out the input iterator as you do in the last GroupReduce > > is > > > > > quite dangerous and does not give a benefit compared to collecting > > all > > > > > elements. Even though it is an iterator, it needs to be completely > > > > > materialized in-memory whenever the record is touched by Flink or > > user > > > > > code. > > > > > I would propose to skip the reduce step completely and handle all > > > > messages > > > > > separates and only collect them in the CoGroup function before > giving > > > > them > > > > > into the VertexComputeFunction. Be careful, to only do that with > > > > > objectReuse disabled or take care to properly copy the messages. If > > you > > > > > collect the messages in the CoGroup, you don't need the > GroupReduce, > > > have > > > > > smaller records and you can remove the MessageIterator class > > > completely. > > > > > > > > > > > > > I see. The idea was to expose to message combiner that user could > > > > implement if the messages are combinable, e.g. min, sum. This is a > > > common > > > > case and reduces the message load significantly. Is there a way I > could > > > do > > > > something similar before the coGroup? > > > > > > > > > > > > > > > > > 2) Add this annotation to the AppendVertexState function: > > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete > > > element > > > > of > > > > > the first input becomes the first field of the output. Since the > > input > > > is > > > > > partitioned on "f0" (it comes out of the partitioned solution set) > > the > > > > > result of ApplyVertexState will be partitioned on "f0.f0" which is > > > > > (accidentially :-D) the join key of the following coGroup function > -> > > > no > > > > > partitioning :-) > > > > > > > > > > > > > Great! I totally missed that ;) > > > > > > > > > > > > > > > > > 3) Adding the two flatMap functions behind the CoGroup prevents > > > chaining > > > > > and causes therefore some serialization overhead but shouldn't be > too > > > > bad. > > > > > > > > > > So in total I would make this program as follows: > > > > > > > > > > iVertices<K,VV> > > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > > > > > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > > > > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() > > > > > .join(iteration.workSet()) > > > > > .where(0) // solution set is local and build side > > > > > .equalTo(0) // workset is shuffled and probe side of hashjoin > > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > > > > > verticesWithMessage.coGroup(edgessWithValue) > > > > > .where("f0.f0") // vwm is locally forward and sorted > > > > > .equalTo(0) // edges are already partitioned and sorted (if > cached > > > > > correctly) > > > > > .with(...) // The coGroup collects all messages in a collection > and > > > > gives > > > > > it to the ComputeFunction > > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when > merged > > > > into > > > > > solution set > > > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for > > > join > > > > > iteration.closeWith(delta, workSet) > > > > > > > > > > So, if I am correct, the program will > > > > > - partition the workset > > > > > - sort the vertices with messages > > > > > - partition the delta > > > > > > > > > > One observation I have is that this program requires that all > > messages > > > > fit > > > > > into memory. Was that also the case before? > > > > > > > > > > > > > I believe not. The plan has one coGroup that produces the messages > > and a > > > > following coGroup that groups by the messages "target ID" and > consumes > > > > them in an iterator. That doesn't require them to fit in memory, > > right? > > > > > > > > > > > > I'm also working on a version where the graph is represented as an > > > > adjacency list, instead of two separate datasets of vertices and > edges. > > > The > > > > disadvantage is that the graph has to fit in memory, but I think the > > > > advantages are many. We'll be able to support edge value updates, > edge > > > > mutations and different edge access order guarantees. I'll get back > to > > > this > > > > thread when I have a working prototype. > > > > > > > > > > > > > > > > > > Cheers, > > > > > Fabian > > > > > > > > > > > > > Thanks again! > > > > > > > > Cheers, > > > > -Vasia. > > > > > > > > > > > > > > > > > > > > > > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > > [hidden email] > > > >: > > > > > > > > > > > @Martin: thanks for your input! If you ran into any other issues > > > that I > > > > > > didn't mention, please let us know. Obviously, even with my > > proposal, > > > > > there > > > > > > are still features we cannot support, e.g. updating edge values > and > > > > graph > > > > > > mutations. We'll need to re-think the underlying iteration and/or > > > graph > > > > > > representation for those. > > > > > > > > > > > > @Fabian: thanks a lot, no rush :) > > > > > > Let me give you some more information that might make it easier > to > > > > reason > > > > > > about performance: > > > > > > > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state > > and > > > > the > > > > > > workset (WS) keeps the active vertices. The iteration is composed > > of > > > 2 > > > > > > coGroups. The first one takes the WS and the edges and produces > > > > messages. > > > > > > The second one takes the messages and the SS and produced the new > > WS > > > > and > > > > > > the SS-delta. > > > > > > > > > > > > In my proposal, the SS has the vertex state and the WS has > > <vertexId, > > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan > is > > > more > > > > > > complicated because compute() needs to have two iterators: over > the > > > > edges > > > > > > and over the messages. > > > > > > First, I join SS and WS to get the active vertices (have > received a > > > > msg) > > > > > > and their current state. Then I coGroup the result with the edges > > to > > > > > access > > > > > > the neighbors. Now the main problem is that this coGroup needs to > > > have > > > > 2 > > > > > > outputs: the new messages and the new vertex value. I couldn't > > really > > > > > find > > > > > > a nice way to do this, so I'm emitting a Tuple that contains both > > > types > > > > > and > > > > > > I have a flag to separate them later with 2 flatMaps. From the > > vertex > > > > > > flatMap, I crete the SS-delta and from the messaged flatMap I > > apply a > > > > > > reduce to group the messages by vertex and send them to the new > WS. > > > One > > > > > > optimization would be to expose a combiner here to reduce message > > > size. > > > > > > > > > > > > tl;dr: > > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > > > > > 2. how can we efficiently emit 2 different types of records from > a > > > > > coGroup? > > > > > > 3. does it make any difference if we group/combine the messages > > > before > > > > > > updating the workset or after? > > > > > > > > > > > > Cheers, > > > > > > -Vasia. > > > > > > > > > > > > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> > > > wrote: > > > > > > > > > > > > > I'll try to have a look at the proposal from a performance > point > > of > > > > > view > > > > > > in > > > > > > > the next days. > > > > > > > Please ping me, if I don't follow up this thread. > > > > > > > > > > > > > > Cheers, Fabian > > > > > > > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > > > [hidden email] > > > > >: > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > At our group, we also moved several algorithms from Giraph to > > > Gelly > > > > > and > > > > > > > > ran into some confusing issues (first in understanding, > second > > > > during > > > > > > > > implementation) caused by the conceptional differences you > > > > described. > > > > > > > > > > > > > > > > If there are no concrete advantages (performance mainly) in > the > > > > > Spargel > > > > > > > > implementation, we would be very happy to see the Gelly API > be > > > > > aligned > > > > > > to > > > > > > > > Pregel-like systems. > > > > > > > > > > > > > > > > Your SSSP example speaks for itself. Straightforward, if the > > > reader > > > > > is > > > > > > > > familiar with Pregel/Giraph/... > > > > > > > > > > > > > > > > Best, > > > > > > > > Martin > > > > > > > > > > > > > > > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > > > > > > > > > > > >> Hello squirrels, > > > > > > > >> > > > > > > > >> I want to discuss with you a few concerns I have about our > > > current > > > > > > > >> vertex-centric model implementation, Spargel, now fully > > subsumed > > > > by > > > > > > > Gelly. > > > > > > > >> > > > > > > > >> Spargel is our implementation of Pregel [1], but it violates > > > some > > > > > > > >> fundamental properties of the model, as described in the > paper > > > and > > > > > as > > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself > > > > confused > > > > > > both > > > > > > > >> when trying to explain it to current Giraph users and when > > > porting > > > > > my > > > > > > > >> Giraph algorithms to it. > > > > > > > >> > > > > > > > >> More specifically: > > > > > > > >> - in the Pregel model, messages produced in superstep n, are > > > > > received > > > > > > in > > > > > > > >> superstep n+1. In Spargel, they are produced and consumed in > > the > > > > > same > > > > > > > >> iteration. > > > > > > > >> - in Pregel, vertices are active during a superstep, if they > > > have > > > > > > > received > > > > > > > >> a message in the previous superstep. In Spargel, a vertex is > > > > active > > > > > > > during > > > > > > > >> a superstep if it has changed its value. > > > > > > > >> > > > > > > > >> These two differences require a lot of rethinking when > porting > > > > > > > >> applications > > > > > > > >> and can easily cause bugs. > > > > > > > >> > > > > > > > >> The most important problem however is that we require the > user > > > to > > > > > > split > > > > > > > >> the > > > > > > > >> computation in 2 phases (2 UDFs): > > > > > > > >> - messaging: has access to the vertex state and can produce > > > > messages > > > > > > > >> - update: has access to incoming messages and can update the > > > > vertex > > > > > > > value > > > > > > > >> > > > > > > > >> Pregel/Giraph only expose one UDF to the user: > > > > > > > >> - compute: has access to both the vertex state and the > > incoming > > > > > > > messages, > > > > > > > >> can produce messages and update the vertex value. > > > > > > > >> > > > > > > > >> This might not seem like a big deal, but except from forcing > > the > > > > > user > > > > > > to > > > > > > > >> split their program logic into 2 phases, Spargel also makes > > some > > > > > > common > > > > > > > >> computation patterns non-intuitive or impossible to write. A > > > very > > > > > > simple > > > > > > > >> example is propagating a message based on its value or > sender > > > ID. > > > > To > > > > > > do > > > > > > > >> this with Spargel, one has to store all the incoming > messages > > in > > > > the > > > > > > > >> vertex > > > > > > > >> value (might be of different type btw) during the messaging > > > phase, > > > > > so > > > > > > > that > > > > > > > >> they can be accessed during the update phase. > > > > > > > >> > > > > > > > >> So, my first question is, when implementing Spargel, were > > other > > > > > > > >> alternatives considered and maybe rejected in favor of > > > performance > > > > > or > > > > > > > >> because of some other reason? If someone knows, I would love > > to > > > > hear > > > > > > > about > > > > > > > >> them! > > > > > > > >> > > > > > > > >> Second, I wrote a prototype implementation [2] that only > > exposes > > > > one > > > > > > > UDF, > > > > > > > >> compute(), by keeping the vertex state in the solution set > and > > > the > > > > > > > >> messages > > > > > > > >> in the workset. This way all previously mentioned > limitations > > go > > > > > away > > > > > > > and > > > > > > > >> the API (see "SSSPComputeFunction" in the example [3]) > looks a > > > lot > > > > > > more > > > > > > > >> like Giraph (see [4]). > > > > > > > >> > > > > > > > >> I have not run any experiments yet and the prototype has > some > > > ugly > > > > > > > hacks, > > > > > > > >> but if you think any of this makes sense, then I'd be > willing > > to > > > > > > follow > > > > > > > up > > > > > > > >> and try to optimize it. If we see that it performs well, we > > can > > > > > > consider > > > > > > > >> either replacing Spargel or adding it as an alternative. > > > > > > > >> > > > > > > > >> Thanks for reading this long e-mail and looking forward to > > your > > > > > input! > > > > > > > >> > > > > > > > >> Cheers, > > > > > > > >> -Vasia. > > > > > > > >> > > > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > > > > > > >> [2]: > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > > > > > > >> [3]: > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > > > > > > >> [4]: > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Sounds good. I like the idea of presenting it as a spectrum:
Pregel -> Scatter/Gather (Spargel) -> GAS/GSA/SGA On Tue, Nov 3, 2015 at 5:55 PM, Martin Neumann <[hidden email]> wrote: > The problem with having many different graph model in gelly is that it > might get quite confusing for a user. > Maybe this can be fixed with good documentation so that its clear how each > model works and what its benefits are (and maybe when its better to use it > over a different model). > > On Tue, Nov 3, 2015 at 3:29 PM, Andra Lungu <[hidden email]> wrote: > > > I also think a Giraph-like model could be added, but we shouldn't remove > > Spargel in favour of it! > > > > On Tue, Nov 3, 2015 at 2:35 AM, Stephan Ewen <[hidden email]> wrote: > > > > > When creating the original version of Spargel I was pretty much > thinking > > in > > > GSA terms, more than in Pregel terms. There are some fundamental > > > differences between Spargel and Pregel. Spargel is in between GAS and > > > Pregel in some way, that is how I have always thought about it. > > > > > > The main reason for the form is that it fits the dataflow paradigm > > easier: > > > > > > - If one function emits the new state of the vertex and the messages, > > it > > > has two different return types, which means you need a union type and > > > filer/split type of operation on the result, which also adds overhead. > In > > > the current model, each function has one return type, which makes it > > easy. > > > > > > - The workset is also the feedback channel, which is materialized at > the > > > superstep boundaries, so keeping it small at O(vertices), rather than > > > O(edges) is a win for performance. > > > > > > There is no reason to not add a Pregel model, but I would not kill > > Spargel > > > for it. It will be tough to get the Pregel variant to the same > > efficiency. > > > Unless you want to say, for efficiency, go with GSA, for convenience > with > > > Pregel. > > > > > > There are some nice things about the Spargel model. The fact that > > messages > > > are first generated then consumes makes the generation of initial > > messages > > > simpler in many cases, I think. It was always a bit weird to me in > Pregel > > > that you had to check whether you are in superstep one, in which case > you > > > would expect no message, and generate initial value messages. > > > > > > > > > > > > On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <[hidden email]> > > wrote: > > > > > > > We can of course inject an optional ReduceFunction (or GroupReduce, > or > > > > combinable GroupReduce) to reduce the size of the work set. > > > > I suggested to remove the GroupReduce function, because it did only > > > collect > > > > all messages into a single record by emitting the input iterator > which > > is > > > > quite dangerous. Applying a combinable reduce function is could > improve > > > the > > > > performance considerably. > > > > > > > > The good news is that it would come "for free" because the necessary > > > > partitioning and sorting can be reused (given the forwardField > > > annotations > > > > are correctly set): > > > > - The partitioning of the reduce can be reused for the join with the > > > > solution set > > > > - The sort of the reduce is preserved by the join with the in-memory > > > > hash-table of the solution set and can be reused for the coGroup. > > > > > > > > Best, > > > > Fabian > > > > > > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri < > [hidden email] > > >: > > > > > > > > > Hi Fabian, > > > > > > > > > > thanks so much for looking into this so quickly :-) > > > > > > > > > > One update I have to make is that I tried running a few experiments > > > with > > > > > this on a 6-node cluster. The current implementation gets stuck at > > > > > "Rebuilding Workset Properties" and never finishes a single > > iteration. > > > > > Running the plan of one superstep without a delta iteration > > terminates > > > > > fine. I didn't have access to the cluster today, so I couldn't > debug > > > this > > > > > further, but I will do as soon as I have access again. > > > > > > > > > > The rest of my comments are inline: > > > > > > > > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> > > wrote: > > > > > > > > > > > Hi Vasia, > > > > > > > > > > > > I had a look at your new implementation and have a few ideas for > > > > > > improvements. > > > > > > 1) Sending out the input iterator as you do in the last > GroupReduce > > > is > > > > > > quite dangerous and does not give a benefit compared to > collecting > > > all > > > > > > elements. Even though it is an iterator, it needs to be > completely > > > > > > materialized in-memory whenever the record is touched by Flink or > > > user > > > > > > code. > > > > > > I would propose to skip the reduce step completely and handle all > > > > > messages > > > > > > separates and only collect them in the CoGroup function before > > giving > > > > > them > > > > > > into the VertexComputeFunction. Be careful, to only do that with > > > > > > objectReuse disabled or take care to properly copy the messages. > If > > > you > > > > > > collect the messages in the CoGroup, you don't need the > > GroupReduce, > > > > have > > > > > > smaller records and you can remove the MessageIterator class > > > > completely. > > > > > > > > > > > > > > > > I see. The idea was to expose to message combiner that user could > > > > > implement if the messages are combinable, e.g. min, sum. This is a > > > > common > > > > > case and reduces the message load significantly. Is there a way I > > could > > > > do > > > > > something similar before the coGroup? > > > > > > > > > > > > > > > > > > > > > 2) Add this annotation to the AppendVertexState function: > > > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete > > > > element > > > > > of > > > > > > the first input becomes the first field of the output. Since the > > > input > > > > is > > > > > > partitioned on "f0" (it comes out of the partitioned solution > set) > > > the > > > > > > result of ApplyVertexState will be partitioned on "f0.f0" which > is > > > > > > (accidentially :-D) the join key of the following coGroup > function > > -> > > > > no > > > > > > partitioning :-) > > > > > > > > > > > > > > > > Great! I totally missed that ;) > > > > > > > > > > > > > > > > > > > > > 3) Adding the two flatMap functions behind the CoGroup prevents > > > > chaining > > > > > > and causes therefore some serialization overhead but shouldn't be > > too > > > > > bad. > > > > > > > > > > > > So in total I would make this program as follows: > > > > > > > > > > > > iVertices<K,VV> > > > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > > > > > > > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > > > > > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() > > > > > > .join(iteration.workSet()) > > > > > > .where(0) // solution set is local and build side > > > > > > .equalTo(0) // workset is shuffled and probe side of hashjoin > > > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > > > > > > verticesWithMessage.coGroup(edgessWithValue) > > > > > > .where("f0.f0") // vwm is locally forward and sorted > > > > > > .equalTo(0) // edges are already partitioned and sorted (if > > cached > > > > > > correctly) > > > > > > .with(...) // The coGroup collects all messages in a collection > > and > > > > > gives > > > > > > it to the ComputeFunction > > > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when > > merged > > > > > into > > > > > > solution set > > > > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned > for > > > > join > > > > > > iteration.closeWith(delta, workSet) > > > > > > > > > > > > So, if I am correct, the program will > > > > > > - partition the workset > > > > > > - sort the vertices with messages > > > > > > - partition the delta > > > > > > > > > > > > One observation I have is that this program requires that all > > > messages > > > > > fit > > > > > > into memory. Was that also the case before? > > > > > > > > > > > > > > > > I believe not. The plan has one coGroup that produces the messages > > > and a > > > > > following coGroup that groups by the messages "target ID" and > > consumes > > > > > them in an iterator. That doesn't require them to fit in memory, > > > right? > > > > > > > > > > > > > > > I'm also working on a version where the graph is represented as an > > > > > adjacency list, instead of two separate datasets of vertices and > > edges. > > > > The > > > > > disadvantage is that the graph has to fit in memory, but I think > the > > > > > advantages are many. We'll be able to support edge value updates, > > edge > > > > > mutations and different edge access order guarantees. I'll get back > > to > > > > this > > > > > thread when I have a working prototype. > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > Fabian > > > > > > > > > > > > > > > > Thanks again! > > > > > > > > > > Cheers, > > > > > -Vasia. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > > > [hidden email] > > > > >: > > > > > > > > > > > > > @Martin: thanks for your input! If you ran into any other > issues > > > > that I > > > > > > > didn't mention, please let us know. Obviously, even with my > > > proposal, > > > > > > there > > > > > > > are still features we cannot support, e.g. updating edge values > > and > > > > > graph > > > > > > > mutations. We'll need to re-think the underlying iteration > and/or > > > > graph > > > > > > > representation for those. > > > > > > > > > > > > > > @Fabian: thanks a lot, no rush :) > > > > > > > Let me give you some more information that might make it easier > > to > > > > > reason > > > > > > > about performance: > > > > > > > > > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex > state > > > and > > > > > the > > > > > > > workset (WS) keeps the active vertices. The iteration is > composed > > > of > > > > 2 > > > > > > > coGroups. The first one takes the WS and the edges and produces > > > > > messages. > > > > > > > The second one takes the messages and the SS and produced the > new > > > WS > > > > > and > > > > > > > the SS-delta. > > > > > > > > > > > > > > In my proposal, the SS has the vertex state and the WS has > > > <vertexId, > > > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan > > is > > > > more > > > > > > > complicated because compute() needs to have two iterators: over > > the > > > > > edges > > > > > > > and over the messages. > > > > > > > First, I join SS and WS to get the active vertices (have > > received a > > > > > msg) > > > > > > > and their current state. Then I coGroup the result with the > edges > > > to > > > > > > access > > > > > > > the neighbors. Now the main problem is that this coGroup needs > to > > > > have > > > > > 2 > > > > > > > outputs: the new messages and the new vertex value. I couldn't > > > really > > > > > > find > > > > > > > a nice way to do this, so I'm emitting a Tuple that contains > both > > > > types > > > > > > and > > > > > > > I have a flag to separate them later with 2 flatMaps. From the > > > vertex > > > > > > > flatMap, I crete the SS-delta and from the messaged flatMap I > > > apply a > > > > > > > reduce to group the messages by vertex and send them to the new > > WS. > > > > One > > > > > > > optimization would be to expose a combiner here to reduce > message > > > > size. > > > > > > > > > > > > > > tl;dr: > > > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > > > > > > 2. how can we efficiently emit 2 different types of records > from > > a > > > > > > coGroup? > > > > > > > 3. does it make any difference if we group/combine the messages > > > > before > > > > > > > updating the workset or after? > > > > > > > > > > > > > > Cheers, > > > > > > > -Vasia. > > > > > > > > > > > > > > > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> > > > > wrote: > > > > > > > > > > > > > > > I'll try to have a look at the proposal from a performance > > point > > > of > > > > > > view > > > > > > > in > > > > > > > > the next days. > > > > > > > > Please ping me, if I don't follow up this thread. > > > > > > > > > > > > > > > > Cheers, Fabian > > > > > > > > > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > > > > [hidden email] > > > > > >: > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > At our group, we also moved several algorithms from Giraph > to > > > > Gelly > > > > > > and > > > > > > > > > ran into some confusing issues (first in understanding, > > second > > > > > during > > > > > > > > > implementation) caused by the conceptional differences you > > > > > described. > > > > > > > > > > > > > > > > > > If there are no concrete advantages (performance mainly) in > > the > > > > > > Spargel > > > > > > > > > implementation, we would be very happy to see the Gelly API > > be > > > > > > aligned > > > > > > > to > > > > > > > > > Pregel-like systems. > > > > > > > > > > > > > > > > > > Your SSSP example speaks for itself. Straightforward, if > the > > > > reader > > > > > > is > > > > > > > > > familiar with Pregel/Giraph/... > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Martin > > > > > > > > > > > > > > > > > > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > > > > > > > > > > > > > >> Hello squirrels, > > > > > > > > >> > > > > > > > > >> I want to discuss with you a few concerns I have about our > > > > current > > > > > > > > >> vertex-centric model implementation, Spargel, now fully > > > subsumed > > > > > by > > > > > > > > Gelly. > > > > > > > > >> > > > > > > > > >> Spargel is our implementation of Pregel [1], but it > violates > > > > some > > > > > > > > >> fundamental properties of the model, as described in the > > paper > > > > and > > > > > > as > > > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself > > > > > confused > > > > > > > both > > > > > > > > >> when trying to explain it to current Giraph users and when > > > > porting > > > > > > my > > > > > > > > >> Giraph algorithms to it. > > > > > > > > >> > > > > > > > > >> More specifically: > > > > > > > > >> - in the Pregel model, messages produced in superstep n, > are > > > > > > received > > > > > > > in > > > > > > > > >> superstep n+1. In Spargel, they are produced and consumed > in > > > the > > > > > > same > > > > > > > > >> iteration. > > > > > > > > >> - in Pregel, vertices are active during a superstep, if > they > > > > have > > > > > > > > received > > > > > > > > >> a message in the previous superstep. In Spargel, a vertex > is > > > > > active > > > > > > > > during > > > > > > > > >> a superstep if it has changed its value. > > > > > > > > >> > > > > > > > > >> These two differences require a lot of rethinking when > > porting > > > > > > > > >> applications > > > > > > > > >> and can easily cause bugs. > > > > > > > > >> > > > > > > > > >> The most important problem however is that we require the > > user > > > > to > > > > > > > split > > > > > > > > >> the > > > > > > > > >> computation in 2 phases (2 UDFs): > > > > > > > > >> - messaging: has access to the vertex state and can > produce > > > > > messages > > > > > > > > >> - update: has access to incoming messages and can update > the > > > > > vertex > > > > > > > > value > > > > > > > > >> > > > > > > > > >> Pregel/Giraph only expose one UDF to the user: > > > > > > > > >> - compute: has access to both the vertex state and the > > > incoming > > > > > > > > messages, > > > > > > > > >> can produce messages and update the vertex value. > > > > > > > > >> > > > > > > > > >> This might not seem like a big deal, but except from > forcing > > > the > > > > > > user > > > > > > > to > > > > > > > > >> split their program logic into 2 phases, Spargel also > makes > > > some > > > > > > > common > > > > > > > > >> computation patterns non-intuitive or impossible to > write. A > > > > very > > > > > > > simple > > > > > > > > >> example is propagating a message based on its value or > > sender > > > > ID. > > > > > To > > > > > > > do > > > > > > > > >> this with Spargel, one has to store all the incoming > > messages > > > in > > > > > the > > > > > > > > >> vertex > > > > > > > > >> value (might be of different type btw) during the > messaging > > > > phase, > > > > > > so > > > > > > > > that > > > > > > > > >> they can be accessed during the update phase. > > > > > > > > >> > > > > > > > > >> So, my first question is, when implementing Spargel, were > > > other > > > > > > > > >> alternatives considered and maybe rejected in favor of > > > > performance > > > > > > or > > > > > > > > >> because of some other reason? If someone knows, I would > love > > > to > > > > > hear > > > > > > > > about > > > > > > > > >> them! > > > > > > > > >> > > > > > > > > >> Second, I wrote a prototype implementation [2] that only > > > exposes > > > > > one > > > > > > > > UDF, > > > > > > > > >> compute(), by keeping the vertex state in the solution set > > and > > > > the > > > > > > > > >> messages > > > > > > > > >> in the workset. This way all previously mentioned > > limitations > > > go > > > > > > away > > > > > > > > and > > > > > > > > >> the API (see "SSSPComputeFunction" in the example [3]) > > looks a > > > > lot > > > > > > > more > > > > > > > > >> like Giraph (see [4]). > > > > > > > > >> > > > > > > > > >> I have not run any experiments yet and the prototype has > > some > > > > ugly > > > > > > > > hacks, > > > > > > > > >> but if you think any of this makes sense, then I'd be > > willing > > > to > > > > > > > follow > > > > > > > > up > > > > > > > > >> and try to optimize it. If we see that it performs well, > we > > > can > > > > > > > consider > > > > > > > > >> either replacing Spargel or adding it as an alternative. > > > > > > > > >> > > > > > > > > >> Thanks for reading this long e-mail and looking forward to > > > your > > > > > > input! > > > > > > > > >> > > > > > > > > >> Cheers, > > > > > > > > >> -Vasia. > > > > > > > > >> > > > > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > > > > > > > >> [2]: > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > > > > > > > >> [3]: > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > > > > > > > >> [4]: > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
In reply to this post by Fabian Hueske-2
@Fabian
Is there any advantage in putting the reducer-combiner before updating the workset vs. after (i.e. right before the join with the solution set)? If it helps, here are the plans of these 2 alternatives: https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing Thanks a lot for the help! -Vasia. On 30 October 2015 at 21:28, Fabian Hueske <[hidden email]> wrote: > We can of course inject an optional ReduceFunction (or GroupReduce, or > combinable GroupReduce) to reduce the size of the work set. > I suggested to remove the GroupReduce function, because it did only collect > all messages into a single record by emitting the input iterator which is > quite dangerous. Applying a combinable reduce function is could improve the > performance considerably. > > The good news is that it would come "for free" because the necessary > partitioning and sorting can be reused (given the forwardField annotations > are correctly set): > - The partitioning of the reduce can be reused for the join with the > solution set > - The sort of the reduce is preserved by the join with the in-memory > hash-table of the solution set and can be reused for the coGroup. > > Best, > Fabian > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email]>: > > > Hi Fabian, > > > > thanks so much for looking into this so quickly :-) > > > > One update I have to make is that I tried running a few experiments with > > this on a 6-node cluster. The current implementation gets stuck at > > "Rebuilding Workset Properties" and never finishes a single iteration. > > Running the plan of one superstep without a delta iteration terminates > > fine. I didn't have access to the cluster today, so I couldn't debug this > > further, but I will do as soon as I have access again. > > > > The rest of my comments are inline: > > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote: > > > > > Hi Vasia, > > > > > > I had a look at your new implementation and have a few ideas for > > > improvements. > > > 1) Sending out the input iterator as you do in the last GroupReduce is > > > quite dangerous and does not give a benefit compared to collecting all > > > elements. Even though it is an iterator, it needs to be completely > > > materialized in-memory whenever the record is touched by Flink or user > > > code. > > > I would propose to skip the reduce step completely and handle all > > messages > > > separates and only collect them in the CoGroup function before giving > > them > > > into the VertexComputeFunction. Be careful, to only do that with > > > objectReuse disabled or take care to properly copy the messages. If you > > > collect the messages in the CoGroup, you don't need the GroupReduce, > have > > > smaller records and you can remove the MessageIterator class > completely. > > > > > > > I see. The idea was to expose to message combiner that user could > > implement if the messages are combinable, e.g. min, sum. This is a > common > > case and reduces the message load significantly. Is there a way I could > do > > something similar before the coGroup? > > > > > > > > > 2) Add this annotation to the AppendVertexState function: > > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete > element > > of > > > the first input becomes the first field of the output. Since the input > is > > > partitioned on "f0" (it comes out of the partitioned solution set) the > > > result of ApplyVertexState will be partitioned on "f0.f0" which is > > > (accidentially :-D) the join key of the following coGroup function -> > no > > > partitioning :-) > > > > > > > Great! I totally missed that ;) > > > > > > > > > 3) Adding the two flatMap functions behind the CoGroup prevents > chaining > > > and causes therefore some serialization overhead but shouldn't be too > > bad. > > > > > > So in total I would make this program as follows: > > > > > > iVertices<K,VV> > > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() > > > .join(iteration.workSet()) > > > .where(0) // solution set is local and build side > > > .equalTo(0) // workset is shuffled and probe side of hashjoin > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > > > verticesWithMessage.coGroup(edgessWithValue) > > > .where("f0.f0") // vwm is locally forward and sorted > > > .equalTo(0) // edges are already partitioned and sorted (if cached > > > correctly) > > > .with(...) // The coGroup collects all messages in a collection and > > gives > > > it to the ComputeFunction > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged > > into > > > solution set > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for > join > > > iteration.closeWith(delta, workSet) > > > > > > So, if I am correct, the program will > > > - partition the workset > > > - sort the vertices with messages > > > - partition the delta > > > > > > One observation I have is that this program requires that all messages > > fit > > > into memory. Was that also the case before? > > > > > > > I believe not. The plan has one coGroup that produces the messages and a > > following coGroup that groups by the messages "target ID" and consumes > > them in an iterator. That doesn't require them to fit in memory, right? > > > > > > I'm also working on a version where the graph is represented as an > > adjacency list, instead of two separate datasets of vertices and edges. > The > > disadvantage is that the graph has to fit in memory, but I think the > > advantages are many. We'll be able to support edge value updates, edge > > mutations and different edge access order guarantees. I'll get back to > this > > thread when I have a working prototype. > > > > > > > > > > Cheers, > > > Fabian > > > > > > > Thanks again! > > > > Cheers, > > -Vasia. > > > > > > > > > > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <[hidden email] > >: > > > > > > > @Martin: thanks for your input! If you ran into any other issues > that I > > > > didn't mention, please let us know. Obviously, even with my proposal, > > > there > > > > are still features we cannot support, e.g. updating edge values and > > graph > > > > mutations. We'll need to re-think the underlying iteration and/or > graph > > > > representation for those. > > > > > > > > @Fabian: thanks a lot, no rush :) > > > > Let me give you some more information that might make it easier to > > reason > > > > about performance: > > > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state and > > the > > > > workset (WS) keeps the active vertices. The iteration is composed of > 2 > > > > coGroups. The first one takes the WS and the edges and produces > > messages. > > > > The second one takes the messages and the SS and produced the new WS > > and > > > > the SS-delta. > > > > > > > > In my proposal, the SS has the vertex state and the WS has <vertexId, > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is > more > > > > complicated because compute() needs to have two iterators: over the > > edges > > > > and over the messages. > > > > First, I join SS and WS to get the active vertices (have received a > > msg) > > > > and their current state. Then I coGroup the result with the edges to > > > access > > > > the neighbors. Now the main problem is that this coGroup needs to > have > > 2 > > > > outputs: the new messages and the new vertex value. I couldn't really > > > find > > > > a nice way to do this, so I'm emitting a Tuple that contains both > types > > > and > > > > I have a flag to separate them later with 2 flatMaps. From the vertex > > > > flatMap, I crete the SS-delta and from the messaged flatMap I apply a > > > > reduce to group the messages by vertex and send them to the new WS. > One > > > > optimization would be to expose a combiner here to reduce message > size. > > > > > > > > tl;dr: > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > > > 2. how can we efficiently emit 2 different types of records from a > > > coGroup? > > > > 3. does it make any difference if we group/combine the messages > before > > > > updating the workset or after? > > > > > > > > Cheers, > > > > -Vasia. > > > > > > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> > wrote: > > > > > > > > > I'll try to have a look at the proposal from a performance point of > > > view > > > > in > > > > > the next days. > > > > > Please ping me, if I don't follow up this thread. > > > > > > > > > > Cheers, Fabian > > > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > [hidden email] > > >: > > > > > > > > > > > Hi, > > > > > > > > > > > > At our group, we also moved several algorithms from Giraph to > Gelly > > > and > > > > > > ran into some confusing issues (first in understanding, second > > during > > > > > > implementation) caused by the conceptional differences you > > described. > > > > > > > > > > > > If there are no concrete advantages (performance mainly) in the > > > Spargel > > > > > > implementation, we would be very happy to see the Gelly API be > > > aligned > > > > to > > > > > > Pregel-like systems. > > > > > > > > > > > > Your SSSP example speaks for itself. Straightforward, if the > reader > > > is > > > > > > familiar with Pregel/Giraph/... > > > > > > > > > > > > Best, > > > > > > Martin > > > > > > > > > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > > > > > > > >> Hello squirrels, > > > > > >> > > > > > >> I want to discuss with you a few concerns I have about our > current > > > > > >> vertex-centric model implementation, Spargel, now fully subsumed > > by > > > > > Gelly. > > > > > >> > > > > > >> Spargel is our implementation of Pregel [1], but it violates > some > > > > > >> fundamental properties of the model, as described in the paper > and > > > as > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself > > confused > > > > both > > > > > >> when trying to explain it to current Giraph users and when > porting > > > my > > > > > >> Giraph algorithms to it. > > > > > >> > > > > > >> More specifically: > > > > > >> - in the Pregel model, messages produced in superstep n, are > > > received > > > > in > > > > > >> superstep n+1. In Spargel, they are produced and consumed in the > > > same > > > > > >> iteration. > > > > > >> - in Pregel, vertices are active during a superstep, if they > have > > > > > received > > > > > >> a message in the previous superstep. In Spargel, a vertex is > > active > > > > > during > > > > > >> a superstep if it has changed its value. > > > > > >> > > > > > >> These two differences require a lot of rethinking when porting > > > > > >> applications > > > > > >> and can easily cause bugs. > > > > > >> > > > > > >> The most important problem however is that we require the user > to > > > > split > > > > > >> the > > > > > >> computation in 2 phases (2 UDFs): > > > > > >> - messaging: has access to the vertex state and can produce > > messages > > > > > >> - update: has access to incoming messages and can update the > > vertex > > > > > value > > > > > >> > > > > > >> Pregel/Giraph only expose one UDF to the user: > > > > > >> - compute: has access to both the vertex state and the incoming > > > > > messages, > > > > > >> can produce messages and update the vertex value. > > > > > >> > > > > > >> This might not seem like a big deal, but except from forcing the > > > user > > > > to > > > > > >> split their program logic into 2 phases, Spargel also makes some > > > > common > > > > > >> computation patterns non-intuitive or impossible to write. A > very > > > > simple > > > > > >> example is propagating a message based on its value or sender > ID. > > To > > > > do > > > > > >> this with Spargel, one has to store all the incoming messages in > > the > > > > > >> vertex > > > > > >> value (might be of different type btw) during the messaging > phase, > > > so > > > > > that > > > > > >> they can be accessed during the update phase. > > > > > >> > > > > > >> So, my first question is, when implementing Spargel, were other > > > > > >> alternatives considered and maybe rejected in favor of > performance > > > or > > > > > >> because of some other reason? If someone knows, I would love to > > hear > > > > > about > > > > > >> them! > > > > > >> > > > > > >> Second, I wrote a prototype implementation [2] that only exposes > > one > > > > > UDF, > > > > > >> compute(), by keeping the vertex state in the solution set and > the > > > > > >> messages > > > > > >> in the workset. This way all previously mentioned limitations go > > > away > > > > > and > > > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a > lot > > > > more > > > > > >> like Giraph (see [4]). > > > > > >> > > > > > >> I have not run any experiments yet and the prototype has some > ugly > > > > > hacks, > > > > > >> but if you think any of this makes sense, then I'd be willing to > > > > follow > > > > > up > > > > > >> and try to optimize it. If we see that it performs well, we can > > > > consider > > > > > >> either replacing Spargel or adding it as an alternative. > > > > > >> > > > > > >> Thanks for reading this long e-mail and looking forward to your > > > input! > > > > > >> > > > > > >> Cheers, > > > > > >> -Vasia. > > > > > >> > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > > > > >> [2]: > > > > > >> > > > > > >> > > > > > > > > > > > > > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > > > > >> [3]: > > > > > >> > > > > > >> > > > > > > > > > > > > > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > > > > >> [4]: > > > > > >> > > > > > >> > > > > > > > > > > > > > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > > > > >> > > > > > >> > > > > > > > > > > > > > > > |
Hi Vasia,
sorry for the late reply. I don't think there is a big difference. In both cases, the partitioning and sorting happens at the end of the iteration. If the groupReduce is applied before the workset is returned, the sorting happens on the filtered result (after the flatMap) which might be a little bit more efficient (depending on the ratio of messages and solution set updates). Also it does not require that the initial workset is sorted for the first groupReduce. I would put it at the end. Cheers, Fabian 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri <[hidden email]>: > @Fabian > > Is there any advantage in putting the reducer-combiner before updating the > workset vs. after (i.e. right before the join with the solution set)? > > If it helps, here are the plans of these 2 alternatives: > > > https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing > > https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing > > Thanks a lot for the help! > > -Vasia. > > On 30 October 2015 at 21:28, Fabian Hueske <[hidden email]> wrote: > > > We can of course inject an optional ReduceFunction (or GroupReduce, or > > combinable GroupReduce) to reduce the size of the work set. > > I suggested to remove the GroupReduce function, because it did only > collect > > all messages into a single record by emitting the input iterator which is > > quite dangerous. Applying a combinable reduce function is could improve > the > > performance considerably. > > > > The good news is that it would come "for free" because the necessary > > partitioning and sorting can be reused (given the forwardField > annotations > > are correctly set): > > - The partitioning of the reduce can be reused for the join with the > > solution set > > - The sort of the reduce is preserved by the join with the in-memory > > hash-table of the solution set and can be reused for the coGroup. > > > > Best, > > Fabian > > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email]>: > > > > > Hi Fabian, > > > > > > thanks so much for looking into this so quickly :-) > > > > > > One update I have to make is that I tried running a few experiments > with > > > this on a 6-node cluster. The current implementation gets stuck at > > > "Rebuilding Workset Properties" and never finishes a single iteration. > > > Running the plan of one superstep without a delta iteration terminates > > > fine. I didn't have access to the cluster today, so I couldn't debug > this > > > further, but I will do as soon as I have access again. > > > > > > The rest of my comments are inline: > > > > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote: > > > > > > > Hi Vasia, > > > > > > > > I had a look at your new implementation and have a few ideas for > > > > improvements. > > > > 1) Sending out the input iterator as you do in the last GroupReduce > is > > > > quite dangerous and does not give a benefit compared to collecting > all > > > > elements. Even though it is an iterator, it needs to be completely > > > > materialized in-memory whenever the record is touched by Flink or > user > > > > code. > > > > I would propose to skip the reduce step completely and handle all > > > messages > > > > separates and only collect them in the CoGroup function before giving > > > them > > > > into the VertexComputeFunction. Be careful, to only do that with > > > > objectReuse disabled or take care to properly copy the messages. If > you > > > > collect the messages in the CoGroup, you don't need the GroupReduce, > > have > > > > smaller records and you can remove the MessageIterator class > > completely. > > > > > > > > > > I see. The idea was to expose to message combiner that user could > > > implement if the messages are combinable, e.g. min, sum. This is a > > common > > > case and reduces the message load significantly. Is there a way I could > > do > > > something similar before the coGroup? > > > > > > > > > > > > > 2) Add this annotation to the AppendVertexState function: > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete > > element > > > of > > > > the first input becomes the first field of the output. Since the > input > > is > > > > partitioned on "f0" (it comes out of the partitioned solution set) > the > > > > result of ApplyVertexState will be partitioned on "f0.f0" which is > > > > (accidentially :-D) the join key of the following coGroup function -> > > no > > > > partitioning :-) > > > > > > > > > > Great! I totally missed that ;) > > > > > > > > > > > > > 3) Adding the two flatMap functions behind the CoGroup prevents > > chaining > > > > and causes therefore some serialization overhead but shouldn't be too > > > bad. > > > > > > > > So in total I would make this program as follows: > > > > > > > > iVertices<K,VV> > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > > > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > > > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() > > > > .join(iteration.workSet()) > > > > .where(0) // solution set is local and build side > > > > .equalTo(0) // workset is shuffled and probe side of hashjoin > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > > > > verticesWithMessage.coGroup(edgessWithValue) > > > > .where("f0.f0") // vwm is locally forward and sorted > > > > .equalTo(0) // edges are already partitioned and sorted (if cached > > > > correctly) > > > > .with(...) // The coGroup collects all messages in a collection and > > > gives > > > > it to the ComputeFunction > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged > > > into > > > > solution set > > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for > > join > > > > iteration.closeWith(delta, workSet) > > > > > > > > So, if I am correct, the program will > > > > - partition the workset > > > > - sort the vertices with messages > > > > - partition the delta > > > > > > > > One observation I have is that this program requires that all > messages > > > fit > > > > into memory. Was that also the case before? > > > > > > > > > > I believe not. The plan has one coGroup that produces the messages > and a > > > following coGroup that groups by the messages "target ID" and consumes > > > them in an iterator. That doesn't require them to fit in memory, > right? > > > > > > > > > I'm also working on a version where the graph is represented as an > > > adjacency list, instead of two separate datasets of vertices and edges. > > The > > > disadvantage is that the graph has to fit in memory, but I think the > > > advantages are many. We'll be able to support edge value updates, edge > > > mutations and different edge access order guarantees. I'll get back to > > this > > > thread when I have a working prototype. > > > > > > > > > > > > > > Cheers, > > > > Fabian > > > > > > > > > > Thanks again! > > > > > > Cheers, > > > -Vasia. > > > > > > > > > > > > > > > > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > [hidden email] > > >: > > > > > > > > > @Martin: thanks for your input! If you ran into any other issues > > that I > > > > > didn't mention, please let us know. Obviously, even with my > proposal, > > > > there > > > > > are still features we cannot support, e.g. updating edge values and > > > graph > > > > > mutations. We'll need to re-think the underlying iteration and/or > > graph > > > > > representation for those. > > > > > > > > > > @Fabian: thanks a lot, no rush :) > > > > > Let me give you some more information that might make it easier to > > > reason > > > > > about performance: > > > > > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state > and > > > the > > > > > workset (WS) keeps the active vertices. The iteration is composed > of > > 2 > > > > > coGroups. The first one takes the WS and the edges and produces > > > messages. > > > > > The second one takes the messages and the SS and produced the new > WS > > > and > > > > > the SS-delta. > > > > > > > > > > In my proposal, the SS has the vertex state and the WS has > <vertexId, > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is > > more > > > > > complicated because compute() needs to have two iterators: over the > > > edges > > > > > and over the messages. > > > > > First, I join SS and WS to get the active vertices (have received a > > > msg) > > > > > and their current state. Then I coGroup the result with the edges > to > > > > access > > > > > the neighbors. Now the main problem is that this coGroup needs to > > have > > > 2 > > > > > outputs: the new messages and the new vertex value. I couldn't > really > > > > find > > > > > a nice way to do this, so I'm emitting a Tuple that contains both > > types > > > > and > > > > > I have a flag to separate them later with 2 flatMaps. From the > vertex > > > > > flatMap, I crete the SS-delta and from the messaged flatMap I > apply a > > > > > reduce to group the messages by vertex and send them to the new WS. > > One > > > > > optimization would be to expose a combiner here to reduce message > > size. > > > > > > > > > > tl;dr: > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > > > > 2. how can we efficiently emit 2 different types of records from a > > > > coGroup? > > > > > 3. does it make any difference if we group/combine the messages > > before > > > > > updating the workset or after? > > > > > > > > > > Cheers, > > > > > -Vasia. > > > > > > > > > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> > > wrote: > > > > > > > > > > > I'll try to have a look at the proposal from a performance point > of > > > > view > > > > > in > > > > > > the next days. > > > > > > Please ping me, if I don't follow up this thread. > > > > > > > > > > > > Cheers, Fabian > > > > > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > > [hidden email] > > > >: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > At our group, we also moved several algorithms from Giraph to > > Gelly > > > > and > > > > > > > ran into some confusing issues (first in understanding, second > > > during > > > > > > > implementation) caused by the conceptional differences you > > > described. > > > > > > > > > > > > > > If there are no concrete advantages (performance mainly) in the > > > > Spargel > > > > > > > implementation, we would be very happy to see the Gelly API be > > > > aligned > > > > > to > > > > > > > Pregel-like systems. > > > > > > > > > > > > > > Your SSSP example speaks for itself. Straightforward, if the > > reader > > > > is > > > > > > > familiar with Pregel/Giraph/... > > > > > > > > > > > > > > Best, > > > > > > > Martin > > > > > > > > > > > > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > > > > > > > > > >> Hello squirrels, > > > > > > >> > > > > > > >> I want to discuss with you a few concerns I have about our > > current > > > > > > >> vertex-centric model implementation, Spargel, now fully > subsumed > > > by > > > > > > Gelly. > > > > > > >> > > > > > > >> Spargel is our implementation of Pregel [1], but it violates > > some > > > > > > >> fundamental properties of the model, as described in the paper > > and > > > > as > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself > > > confused > > > > > both > > > > > > >> when trying to explain it to current Giraph users and when > > porting > > > > my > > > > > > >> Giraph algorithms to it. > > > > > > >> > > > > > > >> More specifically: > > > > > > >> - in the Pregel model, messages produced in superstep n, are > > > > received > > > > > in > > > > > > >> superstep n+1. In Spargel, they are produced and consumed in > the > > > > same > > > > > > >> iteration. > > > > > > >> - in Pregel, vertices are active during a superstep, if they > > have > > > > > > received > > > > > > >> a message in the previous superstep. In Spargel, a vertex is > > > active > > > > > > during > > > > > > >> a superstep if it has changed its value. > > > > > > >> > > > > > > >> These two differences require a lot of rethinking when porting > > > > > > >> applications > > > > > > >> and can easily cause bugs. > > > > > > >> > > > > > > >> The most important problem however is that we require the user > > to > > > > > split > > > > > > >> the > > > > > > >> computation in 2 phases (2 UDFs): > > > > > > >> - messaging: has access to the vertex state and can produce > > > messages > > > > > > >> - update: has access to incoming messages and can update the > > > vertex > > > > > > value > > > > > > >> > > > > > > >> Pregel/Giraph only expose one UDF to the user: > > > > > > >> - compute: has access to both the vertex state and the > incoming > > > > > > messages, > > > > > > >> can produce messages and update the vertex value. > > > > > > >> > > > > > > >> This might not seem like a big deal, but except from forcing > the > > > > user > > > > > to > > > > > > >> split their program logic into 2 phases, Spargel also makes > some > > > > > common > > > > > > >> computation patterns non-intuitive or impossible to write. A > > very > > > > > simple > > > > > > >> example is propagating a message based on its value or sender > > ID. > > > To > > > > > do > > > > > > >> this with Spargel, one has to store all the incoming messages > in > > > the > > > > > > >> vertex > > > > > > >> value (might be of different type btw) during the messaging > > phase, > > > > so > > > > > > that > > > > > > >> they can be accessed during the update phase. > > > > > > >> > > > > > > >> So, my first question is, when implementing Spargel, were > other > > > > > > >> alternatives considered and maybe rejected in favor of > > performance > > > > or > > > > > > >> because of some other reason? If someone knows, I would love > to > > > hear > > > > > > about > > > > > > >> them! > > > > > > >> > > > > > > >> Second, I wrote a prototype implementation [2] that only > exposes > > > one > > > > > > UDF, > > > > > > >> compute(), by keeping the vertex state in the solution set and > > the > > > > > > >> messages > > > > > > >> in the workset. This way all previously mentioned limitations > go > > > > away > > > > > > and > > > > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a > > lot > > > > > more > > > > > > >> like Giraph (see [4]). > > > > > > >> > > > > > > >> I have not run any experiments yet and the prototype has some > > ugly > > > > > > hacks, > > > > > > >> but if you think any of this makes sense, then I'd be willing > to > > > > > follow > > > > > > up > > > > > > >> and try to optimize it. If we see that it performs well, we > can > > > > > consider > > > > > > >> either replacing Spargel or adding it as an alternative. > > > > > > >> > > > > > > >> Thanks for reading this long e-mail and looking forward to > your > > > > input! > > > > > > >> > > > > > > >> Cheers, > > > > > > >> -Vasia. > > > > > > >> > > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > > > > > >> [2]: > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > > > > > >> [3]: > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > > > > > >> [4]: > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > |
Hi,
after running a few experiments, I can confirm that putting the combiner after the flatMap is indeed more efficient. I ran SSSP and Connected Components with Spargel, GSA, and the Pregel model and the results are the following: - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster and Pregel is ~1.1x faster without combiner, ~1.3x faster with combiner. - for Connected Components, Spargel and GSA perform similarly, while Pregel is 1.4-1.6x slower. To start with, this is much better than I expected :) However, there is a main shortcoming in my current implementation that negatively impacts performance: Since the compute function coGroup needs to output both new vertex values and new messages, I emit a wrapping tuple that contains both vertex state and messages and then filter them out based on a boolean field. The problem is that since I cannot emit null fields, I emit a dummy message for each new vertex state and a dummy vertex state for each new message. That essentially means that the intermediate messages result is double in size, if say the vertex values are of the same type as the messages (can be worse if the vertex values are more complex). So my question is, is there a way to avoid this redundancy, by either emitting null fields or by creating an operator that could emit 2 different types of tuples? Thanks! -Vasia. On 9 November 2015 at 15:20, Fabian Hueske <[hidden email]> wrote: > Hi Vasia, > > sorry for the late reply. > I don't think there is a big difference. In both cases, the partitioning > and sorting happens at the end of the iteration. > If the groupReduce is applied before the workset is returned, the sorting > happens on the filtered result (after the flatMap) which might be a little > bit more efficient (depending on the ratio of messages and solution set > updates). Also it does not require that the initial workset is sorted for > the first groupReduce. > > I would put it at the end. > > Cheers, Fabian > > 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri <[hidden email]>: > > > @Fabian > > > > Is there any advantage in putting the reducer-combiner before updating > the > > workset vs. after (i.e. right before the join with the solution set)? > > > > If it helps, here are the plans of these 2 alternatives: > > > > > > > https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing > > > > > https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing > > > > Thanks a lot for the help! > > > > -Vasia. > > > > On 30 October 2015 at 21:28, Fabian Hueske <[hidden email]> wrote: > > > > > We can of course inject an optional ReduceFunction (or GroupReduce, or > > > combinable GroupReduce) to reduce the size of the work set. > > > I suggested to remove the GroupReduce function, because it did only > > collect > > > all messages into a single record by emitting the input iterator which > is > > > quite dangerous. Applying a combinable reduce function is could improve > > the > > > performance considerably. > > > > > > The good news is that it would come "for free" because the necessary > > > partitioning and sorting can be reused (given the forwardField > > annotations > > > are correctly set): > > > - The partitioning of the reduce can be reused for the join with the > > > solution set > > > - The sort of the reduce is preserved by the join with the in-memory > > > hash-table of the solution set and can be reused for the coGroup. > > > > > > Best, > > > Fabian > > > > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email] > >: > > > > > > > Hi Fabian, > > > > > > > > thanks so much for looking into this so quickly :-) > > > > > > > > One update I have to make is that I tried running a few experiments > > with > > > > this on a 6-node cluster. The current implementation gets stuck at > > > > "Rebuilding Workset Properties" and never finishes a single > iteration. > > > > Running the plan of one superstep without a delta iteration > terminates > > > > fine. I didn't have access to the cluster today, so I couldn't debug > > this > > > > further, but I will do as soon as I have access again. > > > > > > > > The rest of my comments are inline: > > > > > > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> > wrote: > > > > > > > > > Hi Vasia, > > > > > > > > > > I had a look at your new implementation and have a few ideas for > > > > > improvements. > > > > > 1) Sending out the input iterator as you do in the last GroupReduce > > is > > > > > quite dangerous and does not give a benefit compared to collecting > > all > > > > > elements. Even though it is an iterator, it needs to be completely > > > > > materialized in-memory whenever the record is touched by Flink or > > user > > > > > code. > > > > > I would propose to skip the reduce step completely and handle all > > > > messages > > > > > separates and only collect them in the CoGroup function before > giving > > > > them > > > > > into the VertexComputeFunction. Be careful, to only do that with > > > > > objectReuse disabled or take care to properly copy the messages. If > > you > > > > > collect the messages in the CoGroup, you don't need the > GroupReduce, > > > have > > > > > smaller records and you can remove the MessageIterator class > > > completely. > > > > > > > > > > > > > I see. The idea was to expose to message combiner that user could > > > > implement if the messages are combinable, e.g. min, sum. This is a > > > common > > > > case and reduces the message load significantly. Is there a way I > could > > > do > > > > something similar before the coGroup? > > > > > > > > > > > > > > > > > 2) Add this annotation to the AppendVertexState function: > > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete > > > element > > > > of > > > > > the first input becomes the first field of the output. Since the > > input > > > is > > > > > partitioned on "f0" (it comes out of the partitioned solution set) > > the > > > > > result of ApplyVertexState will be partitioned on "f0.f0" which is > > > > > (accidentially :-D) the join key of the following coGroup function > -> > > > no > > > > > partitioning :-) > > > > > > > > > > > > > Great! I totally missed that ;) > > > > > > > > > > > > > > > > > 3) Adding the two flatMap functions behind the CoGroup prevents > > > chaining > > > > > and causes therefore some serialization overhead but shouldn't be > too > > > > bad. > > > > > > > > > > So in total I would make this program as follows: > > > > > > > > > > iVertices<K,VV> > > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > > > > > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > > > > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() > > > > > .join(iteration.workSet()) > > > > > .where(0) // solution set is local and build side > > > > > .equalTo(0) // workset is shuffled and probe side of hashjoin > > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > > > > > verticesWithMessage.coGroup(edgessWithValue) > > > > > .where("f0.f0") // vwm is locally forward and sorted > > > > > .equalTo(0) // edges are already partitioned and sorted (if > cached > > > > > correctly) > > > > > .with(...) // The coGroup collects all messages in a collection > and > > > > gives > > > > > it to the ComputeFunction > > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when > merged > > > > into > > > > > solution set > > > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for > > > join > > > > > iteration.closeWith(delta, workSet) > > > > > > > > > > So, if I am correct, the program will > > > > > - partition the workset > > > > > - sort the vertices with messages > > > > > - partition the delta > > > > > > > > > > One observation I have is that this program requires that all > > messages > > > > fit > > > > > into memory. Was that also the case before? > > > > > > > > > > > > > I believe not. The plan has one coGroup that produces the messages > > and a > > > > following coGroup that groups by the messages "target ID" and > consumes > > > > them in an iterator. That doesn't require them to fit in memory, > > right? > > > > > > > > > > > > I'm also working on a version where the graph is represented as an > > > > adjacency list, instead of two separate datasets of vertices and > edges. > > > The > > > > disadvantage is that the graph has to fit in memory, but I think the > > > > advantages are many. We'll be able to support edge value updates, > edge > > > > mutations and different edge access order guarantees. I'll get back > to > > > this > > > > thread when I have a working prototype. > > > > > > > > > > > > > > > > > > Cheers, > > > > > Fabian > > > > > > > > > > > > > Thanks again! > > > > > > > > Cheers, > > > > -Vasia. > > > > > > > > > > > > > > > > > > > > > > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > > [hidden email] > > > >: > > > > > > > > > > > @Martin: thanks for your input! If you ran into any other issues > > > that I > > > > > > didn't mention, please let us know. Obviously, even with my > > proposal, > > > > > there > > > > > > are still features we cannot support, e.g. updating edge values > and > > > > graph > > > > > > mutations. We'll need to re-think the underlying iteration and/or > > > graph > > > > > > representation for those. > > > > > > > > > > > > @Fabian: thanks a lot, no rush :) > > > > > > Let me give you some more information that might make it easier > to > > > > reason > > > > > > about performance: > > > > > > > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state > > and > > > > the > > > > > > workset (WS) keeps the active vertices. The iteration is composed > > of > > > 2 > > > > > > coGroups. The first one takes the WS and the edges and produces > > > > messages. > > > > > > The second one takes the messages and the SS and produced the new > > WS > > > > and > > > > > > the SS-delta. > > > > > > > > > > > > In my proposal, the SS has the vertex state and the WS has > > <vertexId, > > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan > is > > > more > > > > > > complicated because compute() needs to have two iterators: over > the > > > > edges > > > > > > and over the messages. > > > > > > First, I join SS and WS to get the active vertices (have > received a > > > > msg) > > > > > > and their current state. Then I coGroup the result with the edges > > to > > > > > access > > > > > > the neighbors. Now the main problem is that this coGroup needs to > > > have > > > > 2 > > > > > > outputs: the new messages and the new vertex value. I couldn't > > really > > > > > find > > > > > > a nice way to do this, so I'm emitting a Tuple that contains both > > > types > > > > > and > > > > > > I have a flag to separate them later with 2 flatMaps. From the > > vertex > > > > > > flatMap, I crete the SS-delta and from the messaged flatMap I > > apply a > > > > > > reduce to group the messages by vertex and send them to the new > WS. > > > One > > > > > > optimization would be to expose a combiner here to reduce message > > > size. > > > > > > > > > > > > tl;dr: > > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > > > > > 2. how can we efficiently emit 2 different types of records from > a > > > > > coGroup? > > > > > > 3. does it make any difference if we group/combine the messages > > > before > > > > > > updating the workset or after? > > > > > > > > > > > > Cheers, > > > > > > -Vasia. > > > > > > > > > > > > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> > > > wrote: > > > > > > > > > > > > > I'll try to have a look at the proposal from a performance > point > > of > > > > > view > > > > > > in > > > > > > > the next days. > > > > > > > Please ping me, if I don't follow up this thread. > > > > > > > > > > > > > > Cheers, Fabian > > > > > > > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > > > [hidden email] > > > > >: > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > At our group, we also moved several algorithms from Giraph to > > > Gelly > > > > > and > > > > > > > > ran into some confusing issues (first in understanding, > second > > > > during > > > > > > > > implementation) caused by the conceptional differences you > > > > described. > > > > > > > > > > > > > > > > If there are no concrete advantages (performance mainly) in > the > > > > > Spargel > > > > > > > > implementation, we would be very happy to see the Gelly API > be > > > > > aligned > > > > > > to > > > > > > > > Pregel-like systems. > > > > > > > > > > > > > > > > Your SSSP example speaks for itself. Straightforward, if the > > > reader > > > > > is > > > > > > > > familiar with Pregel/Giraph/... > > > > > > > > > > > > > > > > Best, > > > > > > > > Martin > > > > > > > > > > > > > > > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > > > > > > > > > > > >> Hello squirrels, > > > > > > > >> > > > > > > > >> I want to discuss with you a few concerns I have about our > > > current > > > > > > > >> vertex-centric model implementation, Spargel, now fully > > subsumed > > > > by > > > > > > > Gelly. > > > > > > > >> > > > > > > > >> Spargel is our implementation of Pregel [1], but it violates > > > some > > > > > > > >> fundamental properties of the model, as described in the > paper > > > and > > > > > as > > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself > > > > confused > > > > > > both > > > > > > > >> when trying to explain it to current Giraph users and when > > > porting > > > > > my > > > > > > > >> Giraph algorithms to it. > > > > > > > >> > > > > > > > >> More specifically: > > > > > > > >> - in the Pregel model, messages produced in superstep n, are > > > > > received > > > > > > in > > > > > > > >> superstep n+1. In Spargel, they are produced and consumed in > > the > > > > > same > > > > > > > >> iteration. > > > > > > > >> - in Pregel, vertices are active during a superstep, if they > > > have > > > > > > > received > > > > > > > >> a message in the previous superstep. In Spargel, a vertex is > > > > active > > > > > > > during > > > > > > > >> a superstep if it has changed its value. > > > > > > > >> > > > > > > > >> These two differences require a lot of rethinking when > porting > > > > > > > >> applications > > > > > > > >> and can easily cause bugs. > > > > > > > >> > > > > > > > >> The most important problem however is that we require the > user > > > to > > > > > > split > > > > > > > >> the > > > > > > > >> computation in 2 phases (2 UDFs): > > > > > > > >> - messaging: has access to the vertex state and can produce > > > > messages > > > > > > > >> - update: has access to incoming messages and can update the > > > > vertex > > > > > > > value > > > > > > > >> > > > > > > > >> Pregel/Giraph only expose one UDF to the user: > > > > > > > >> - compute: has access to both the vertex state and the > > incoming > > > > > > > messages, > > > > > > > >> can produce messages and update the vertex value. > > > > > > > >> > > > > > > > >> This might not seem like a big deal, but except from forcing > > the > > > > > user > > > > > > to > > > > > > > >> split their program logic into 2 phases, Spargel also makes > > some > > > > > > common > > > > > > > >> computation patterns non-intuitive or impossible to write. A > > > very > > > > > > simple > > > > > > > >> example is propagating a message based on its value or > sender > > > ID. > > > > To > > > > > > do > > > > > > > >> this with Spargel, one has to store all the incoming > messages > > in > > > > the > > > > > > > >> vertex > > > > > > > >> value (might be of different type btw) during the messaging > > > phase, > > > > > so > > > > > > > that > > > > > > > >> they can be accessed during the update phase. > > > > > > > >> > > > > > > > >> So, my first question is, when implementing Spargel, were > > other > > > > > > > >> alternatives considered and maybe rejected in favor of > > > performance > > > > > or > > > > > > > >> because of some other reason? If someone knows, I would love > > to > > > > hear > > > > > > > about > > > > > > > >> them! > > > > > > > >> > > > > > > > >> Second, I wrote a prototype implementation [2] that only > > exposes > > > > one > > > > > > > UDF, > > > > > > > >> compute(), by keeping the vertex state in the solution set > and > > > the > > > > > > > >> messages > > > > > > > >> in the workset. This way all previously mentioned > limitations > > go > > > > > away > > > > > > > and > > > > > > > >> the API (see "SSSPComputeFunction" in the example [3]) > looks a > > > lot > > > > > > more > > > > > > > >> like Giraph (see [4]). > > > > > > > >> > > > > > > > >> I have not run any experiments yet and the prototype has > some > > > ugly > > > > > > > hacks, > > > > > > > >> but if you think any of this makes sense, then I'd be > willing > > to > > > > > > follow > > > > > > > up > > > > > > > >> and try to optimize it. If we see that it performs well, we > > can > > > > > > consider > > > > > > > >> either replacing Spargel or adding it as an alternative. > > > > > > > >> > > > > > > > >> Thanks for reading this long e-mail and looking forward to > > your > > > > > input! > > > > > > > >> > > > > > > > >> Cheers, > > > > > > > >> -Vasia. > > > > > > > >> > > > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > > > > > > >> [2]: > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > > > > > > >> [3]: > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > > > > > > >> [4]: > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
You could implement a Java Either type (similar to Scala's Either) that
either has a Message or the VertexState and a corresponding TypeInformation and TypeSerializer that serializes a byte flag to indicate which both types is used. It might actually make sense, to add a generic Either type to the Java API in general (similar to the Java Tuples with resemble the Scala Tuples). Cheers, Fabian 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri <[hidden email]>: > Hi, > > after running a few experiments, I can confirm that putting the combiner > after the flatMap is indeed more efficient. > > I ran SSSP and Connected Components with Spargel, GSA, and the Pregel model > and the results are the following: > > - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster and Pregel > is ~1.1x faster without combiner, ~1.3x faster with combiner. > - for Connected Components, Spargel and GSA perform similarly, while Pregel > is 1.4-1.6x slower. > > To start with, this is much better than I expected :) > However, there is a main shortcoming in my current implementation that > negatively impacts performance: > Since the compute function coGroup needs to output both new vertex values > and new messages, I emit a wrapping tuple that contains both vertex state > and messages and then filter them out based on a boolean field. The problem > is that since I cannot emit null fields, I emit a dummy message for each > new vertex state and a dummy vertex state for each new message. That > essentially means that the intermediate messages result is double in size, > if say the vertex values are of the same type as the messages (can be worse > if the vertex values are more complex). > So my question is, is there a way to avoid this redundancy, by either > emitting null fields or by creating an operator that could emit 2 different > types of tuples? > > Thanks! > -Vasia. > > On 9 November 2015 at 15:20, Fabian Hueske <[hidden email]> wrote: > > > Hi Vasia, > > > > sorry for the late reply. > > I don't think there is a big difference. In both cases, the partitioning > > and sorting happens at the end of the iteration. > > If the groupReduce is applied before the workset is returned, the sorting > > happens on the filtered result (after the flatMap) which might be a > little > > bit more efficient (depending on the ratio of messages and solution set > > updates). Also it does not require that the initial workset is sorted for > > the first groupReduce. > > > > I would put it at the end. > > > > Cheers, Fabian > > > > 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri <[hidden email]>: > > > > > @Fabian > > > > > > Is there any advantage in putting the reducer-combiner before updating > > the > > > workset vs. after (i.e. right before the join with the solution set)? > > > > > > If it helps, here are the plans of these 2 alternatives: > > > > > > > > > > > > https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing > > > > > > > > > https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing > > > > > > Thanks a lot for the help! > > > > > > -Vasia. > > > > > > On 30 October 2015 at 21:28, Fabian Hueske <[hidden email]> wrote: > > > > > > > We can of course inject an optional ReduceFunction (or GroupReduce, > or > > > > combinable GroupReduce) to reduce the size of the work set. > > > > I suggested to remove the GroupReduce function, because it did only > > > collect > > > > all messages into a single record by emitting the input iterator > which > > is > > > > quite dangerous. Applying a combinable reduce function is could > improve > > > the > > > > performance considerably. > > > > > > > > The good news is that it would come "for free" because the necessary > > > > partitioning and sorting can be reused (given the forwardField > > > annotations > > > > are correctly set): > > > > - The partitioning of the reduce can be reused for the join with the > > > > solution set > > > > - The sort of the reduce is preserved by the join with the in-memory > > > > hash-table of the solution set and can be reused for the coGroup. > > > > > > > > Best, > > > > Fabian > > > > > > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri < > [hidden email] > > >: > > > > > > > > > Hi Fabian, > > > > > > > > > > thanks so much for looking into this so quickly :-) > > > > > > > > > > One update I have to make is that I tried running a few experiments > > > with > > > > > this on a 6-node cluster. The current implementation gets stuck at > > > > > "Rebuilding Workset Properties" and never finishes a single > > iteration. > > > > > Running the plan of one superstep without a delta iteration > > terminates > > > > > fine. I didn't have access to the cluster today, so I couldn't > debug > > > this > > > > > further, but I will do as soon as I have access again. > > > > > > > > > > The rest of my comments are inline: > > > > > > > > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> > > wrote: > > > > > > > > > > > Hi Vasia, > > > > > > > > > > > > I had a look at your new implementation and have a few ideas for > > > > > > improvements. > > > > > > 1) Sending out the input iterator as you do in the last > GroupReduce > > > is > > > > > > quite dangerous and does not give a benefit compared to > collecting > > > all > > > > > > elements. Even though it is an iterator, it needs to be > completely > > > > > > materialized in-memory whenever the record is touched by Flink or > > > user > > > > > > code. > > > > > > I would propose to skip the reduce step completely and handle all > > > > > messages > > > > > > separates and only collect them in the CoGroup function before > > giving > > > > > them > > > > > > into the VertexComputeFunction. Be careful, to only do that with > > > > > > objectReuse disabled or take care to properly copy the messages. > If > > > you > > > > > > collect the messages in the CoGroup, you don't need the > > GroupReduce, > > > > have > > > > > > smaller records and you can remove the MessageIterator class > > > > completely. > > > > > > > > > > > > > > > > I see. The idea was to expose to message combiner that user could > > > > > implement if the messages are combinable, e.g. min, sum. This is a > > > > common > > > > > case and reduces the message load significantly. Is there a way I > > could > > > > do > > > > > something similar before the coGroup? > > > > > > > > > > > > > > > > > > > > > 2) Add this annotation to the AppendVertexState function: > > > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete > > > > element > > > > > of > > > > > > the first input becomes the first field of the output. Since the > > > input > > > > is > > > > > > partitioned on "f0" (it comes out of the partitioned solution > set) > > > the > > > > > > result of ApplyVertexState will be partitioned on "f0.f0" which > is > > > > > > (accidentially :-D) the join key of the following coGroup > function > > -> > > > > no > > > > > > partitioning :-) > > > > > > > > > > > > > > > > Great! I totally missed that ;) > > > > > > > > > > > > > > > > > > > > > 3) Adding the two flatMap functions behind the CoGroup prevents > > > > chaining > > > > > > and causes therefore some serialization overhead but shouldn't be > > too > > > > > bad. > > > > > > > > > > > > So in total I would make this program as follows: > > > > > > > > > > > > iVertices<K,VV> > > > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > > > > > > > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > > > > > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() > > > > > > .join(iteration.workSet()) > > > > > > .where(0) // solution set is local and build side > > > > > > .equalTo(0) // workset is shuffled and probe side of hashjoin > > > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > > > > > > verticesWithMessage.coGroup(edgessWithValue) > > > > > > .where("f0.f0") // vwm is locally forward and sorted > > > > > > .equalTo(0) // edges are already partitioned and sorted (if > > cached > > > > > > correctly) > > > > > > .with(...) // The coGroup collects all messages in a collection > > and > > > > > gives > > > > > > it to the ComputeFunction > > > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when > > merged > > > > > into > > > > > > solution set > > > > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned > for > > > > join > > > > > > iteration.closeWith(delta, workSet) > > > > > > > > > > > > So, if I am correct, the program will > > > > > > - partition the workset > > > > > > - sort the vertices with messages > > > > > > - partition the delta > > > > > > > > > > > > One observation I have is that this program requires that all > > > messages > > > > > fit > > > > > > into memory. Was that also the case before? > > > > > > > > > > > > > > > > I believe not. The plan has one coGroup that produces the messages > > > and a > > > > > following coGroup that groups by the messages "target ID" and > > consumes > > > > > them in an iterator. That doesn't require them to fit in memory, > > > right? > > > > > > > > > > > > > > > I'm also working on a version where the graph is represented as an > > > > > adjacency list, instead of two separate datasets of vertices and > > edges. > > > > The > > > > > disadvantage is that the graph has to fit in memory, but I think > the > > > > > advantages are many. We'll be able to support edge value updates, > > edge > > > > > mutations and different edge access order guarantees. I'll get back > > to > > > > this > > > > > thread when I have a working prototype. > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > Fabian > > > > > > > > > > > > > > > > Thanks again! > > > > > > > > > > Cheers, > > > > > -Vasia. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > > > [hidden email] > > > > >: > > > > > > > > > > > > > @Martin: thanks for your input! If you ran into any other > issues > > > > that I > > > > > > > didn't mention, please let us know. Obviously, even with my > > > proposal, > > > > > > there > > > > > > > are still features we cannot support, e.g. updating edge values > > and > > > > > graph > > > > > > > mutations. We'll need to re-think the underlying iteration > and/or > > > > graph > > > > > > > representation for those. > > > > > > > > > > > > > > @Fabian: thanks a lot, no rush :) > > > > > > > Let me give you some more information that might make it easier > > to > > > > > reason > > > > > > > about performance: > > > > > > > > > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex > state > > > and > > > > > the > > > > > > > workset (WS) keeps the active vertices. The iteration is > composed > > > of > > > > 2 > > > > > > > coGroups. The first one takes the WS and the edges and produces > > > > > messages. > > > > > > > The second one takes the messages and the SS and produced the > new > > > WS > > > > > and > > > > > > > the SS-delta. > > > > > > > > > > > > > > In my proposal, the SS has the vertex state and the WS has > > > <vertexId, > > > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan > > is > > > > more > > > > > > > complicated because compute() needs to have two iterators: over > > the > > > > > edges > > > > > > > and over the messages. > > > > > > > First, I join SS and WS to get the active vertices (have > > received a > > > > > msg) > > > > > > > and their current state. Then I coGroup the result with the > edges > > > to > > > > > > access > > > > > > > the neighbors. Now the main problem is that this coGroup needs > to > > > > have > > > > > 2 > > > > > > > outputs: the new messages and the new vertex value. I couldn't > > > really > > > > > > find > > > > > > > a nice way to do this, so I'm emitting a Tuple that contains > both > > > > types > > > > > > and > > > > > > > I have a flag to separate them later with 2 flatMaps. From the > > > vertex > > > > > > > flatMap, I crete the SS-delta and from the messaged flatMap I > > > apply a > > > > > > > reduce to group the messages by vertex and send them to the new > > WS. > > > > One > > > > > > > optimization would be to expose a combiner here to reduce > message > > > > size. > > > > > > > > > > > > > > tl;dr: > > > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > > > > > > 2. how can we efficiently emit 2 different types of records > from > > a > > > > > > coGroup? > > > > > > > 3. does it make any difference if we group/combine the messages > > > > before > > > > > > > updating the workset or after? > > > > > > > > > > > > > > Cheers, > > > > > > > -Vasia. > > > > > > > > > > > > > > > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> > > > > wrote: > > > > > > > > > > > > > > > I'll try to have a look at the proposal from a performance > > point > > > of > > > > > > view > > > > > > > in > > > > > > > > the next days. > > > > > > > > Please ping me, if I don't follow up this thread. > > > > > > > > > > > > > > > > Cheers, Fabian > > > > > > > > > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > > > > [hidden email] > > > > > >: > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > At our group, we also moved several algorithms from Giraph > to > > > > Gelly > > > > > > and > > > > > > > > > ran into some confusing issues (first in understanding, > > second > > > > > during > > > > > > > > > implementation) caused by the conceptional differences you > > > > > described. > > > > > > > > > > > > > > > > > > If there are no concrete advantages (performance mainly) in > > the > > > > > > Spargel > > > > > > > > > implementation, we would be very happy to see the Gelly API > > be > > > > > > aligned > > > > > > > to > > > > > > > > > Pregel-like systems. > > > > > > > > > > > > > > > > > > Your SSSP example speaks for itself. Straightforward, if > the > > > > reader > > > > > > is > > > > > > > > > familiar with Pregel/Giraph/... > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Martin > > > > > > > > > > > > > > > > > > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > > > > > > > > > > > > > >> Hello squirrels, > > > > > > > > >> > > > > > > > > >> I want to discuss with you a few concerns I have about our > > > > current > > > > > > > > >> vertex-centric model implementation, Spargel, now fully > > > subsumed > > > > > by > > > > > > > > Gelly. > > > > > > > > >> > > > > > > > > >> Spargel is our implementation of Pregel [1], but it > violates > > > > some > > > > > > > > >> fundamental properties of the model, as described in the > > paper > > > > and > > > > > > as > > > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself > > > > > confused > > > > > > > both > > > > > > > > >> when trying to explain it to current Giraph users and when > > > > porting > > > > > > my > > > > > > > > >> Giraph algorithms to it. > > > > > > > > >> > > > > > > > > >> More specifically: > > > > > > > > >> - in the Pregel model, messages produced in superstep n, > are > > > > > > received > > > > > > > in > > > > > > > > >> superstep n+1. In Spargel, they are produced and consumed > in > > > the > > > > > > same > > > > > > > > >> iteration. > > > > > > > > >> - in Pregel, vertices are active during a superstep, if > they > > > > have > > > > > > > > received > > > > > > > > >> a message in the previous superstep. In Spargel, a vertex > is > > > > > active > > > > > > > > during > > > > > > > > >> a superstep if it has changed its value. > > > > > > > > >> > > > > > > > > >> These two differences require a lot of rethinking when > > porting > > > > > > > > >> applications > > > > > > > > >> and can easily cause bugs. > > > > > > > > >> > > > > > > > > >> The most important problem however is that we require the > > user > > > > to > > > > > > > split > > > > > > > > >> the > > > > > > > > >> computation in 2 phases (2 UDFs): > > > > > > > > >> - messaging: has access to the vertex state and can > produce > > > > > messages > > > > > > > > >> - update: has access to incoming messages and can update > the > > > > > vertex > > > > > > > > value > > > > > > > > >> > > > > > > > > >> Pregel/Giraph only expose one UDF to the user: > > > > > > > > >> - compute: has access to both the vertex state and the > > > incoming > > > > > > > > messages, > > > > > > > > >> can produce messages and update the vertex value. > > > > > > > > >> > > > > > > > > >> This might not seem like a big deal, but except from > forcing > > > the > > > > > > user > > > > > > > to > > > > > > > > >> split their program logic into 2 phases, Spargel also > makes > > > some > > > > > > > common > > > > > > > > >> computation patterns non-intuitive or impossible to > write. A > > > > very > > > > > > > simple > > > > > > > > >> example is propagating a message based on its value or > > sender > > > > ID. > > > > > To > > > > > > > do > > > > > > > > >> this with Spargel, one has to store all the incoming > > messages > > > in > > > > > the > > > > > > > > >> vertex > > > > > > > > >> value (might be of different type btw) during the > messaging > > > > phase, > > > > > > so > > > > > > > > that > > > > > > > > >> they can be accessed during the update phase. > > > > > > > > >> > > > > > > > > >> So, my first question is, when implementing Spargel, were > > > other > > > > > > > > >> alternatives considered and maybe rejected in favor of > > > > performance > > > > > > or > > > > > > > > >> because of some other reason? If someone knows, I would > love > > > to > > > > > hear > > > > > > > > about > > > > > > > > >> them! > > > > > > > > >> > > > > > > > > >> Second, I wrote a prototype implementation [2] that only > > > exposes > > > > > one > > > > > > > > UDF, > > > > > > > > >> compute(), by keeping the vertex state in the solution set > > and > > > > the > > > > > > > > >> messages > > > > > > > > >> in the workset. This way all previously mentioned > > limitations > > > go > > > > > > away > > > > > > > > and > > > > > > > > >> the API (see "SSSPComputeFunction" in the example [3]) > > looks a > > > > lot > > > > > > > more > > > > > > > > >> like Giraph (see [4]). > > > > > > > > >> > > > > > > > > >> I have not run any experiments yet and the prototype has > > some > > > > ugly > > > > > > > > hacks, > > > > > > > > >> but if you think any of this makes sense, then I'd be > > willing > > > to > > > > > > > follow > > > > > > > > up > > > > > > > > >> and try to optimize it. If we see that it performs well, > we > > > can > > > > > > > consider > > > > > > > > >> either replacing Spargel or adding it as an alternative. > > > > > > > > >> > > > > > > > > >> Thanks for reading this long e-mail and looking forward to > > > your > > > > > > input! > > > > > > > > >> > > > > > > > > >> Cheers, > > > > > > > > >> -Vasia. > > > > > > > > >> > > > > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > > > > > > > >> [2]: > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > > > > > > > >> [3]: > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > > > > > > > >> [4]: > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Thanks Fabian! I'll try that :)
On 10 November 2015 at 22:31, Fabian Hueske <[hidden email]> wrote: > You could implement a Java Either type (similar to Scala's Either) that > either has a Message or the VertexState and a corresponding TypeInformation > and TypeSerializer that serializes a byte flag to indicate which both types > is used. > It might actually make sense, to add a generic Either type to the Java API > in general (similar to the Java Tuples with resemble the Scala Tuples). > > Cheers, Fabian > > 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri <[hidden email]>: > > > Hi, > > > > after running a few experiments, I can confirm that putting the combiner > > after the flatMap is indeed more efficient. > > > > I ran SSSP and Connected Components with Spargel, GSA, and the Pregel > model > > and the results are the following: > > > > - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster and > Pregel > > is ~1.1x faster without combiner, ~1.3x faster with combiner. > > - for Connected Components, Spargel and GSA perform similarly, while > Pregel > > is 1.4-1.6x slower. > > > > To start with, this is much better than I expected :) > > However, there is a main shortcoming in my current implementation that > > negatively impacts performance: > > Since the compute function coGroup needs to output both new vertex values > > and new messages, I emit a wrapping tuple that contains both vertex state > > and messages and then filter them out based on a boolean field. The > problem > > is that since I cannot emit null fields, I emit a dummy message for each > > new vertex state and a dummy vertex state for each new message. That > > essentially means that the intermediate messages result is double in > size, > > if say the vertex values are of the same type as the messages (can be > worse > > if the vertex values are more complex). > > So my question is, is there a way to avoid this redundancy, by either > > emitting null fields or by creating an operator that could emit 2 > different > > types of tuples? > > > > Thanks! > > -Vasia. > > > > On 9 November 2015 at 15:20, Fabian Hueske <[hidden email]> wrote: > > > > > Hi Vasia, > > > > > > sorry for the late reply. > > > I don't think there is a big difference. In both cases, the > partitioning > > > and sorting happens at the end of the iteration. > > > If the groupReduce is applied before the workset is returned, the > sorting > > > happens on the filtered result (after the flatMap) which might be a > > little > > > bit more efficient (depending on the ratio of messages and solution set > > > updates). Also it does not require that the initial workset is sorted > for > > > the first groupReduce. > > > > > > I would put it at the end. > > > > > > Cheers, Fabian > > > > > > 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri <[hidden email] > >: > > > > > > > @Fabian > > > > > > > > Is there any advantage in putting the reducer-combiner before > updating > > > the > > > > workset vs. after (i.e. right before the join with the solution set)? > > > > > > > > If it helps, here are the plans of these 2 alternatives: > > > > > > > > > > > > > > > > > > https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing > > > > > > > > > > > > > > https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing > > > > > > > > Thanks a lot for the help! > > > > > > > > -Vasia. > > > > > > > > On 30 October 2015 at 21:28, Fabian Hueske <[hidden email]> > wrote: > > > > > > > > > We can of course inject an optional ReduceFunction (or GroupReduce, > > or > > > > > combinable GroupReduce) to reduce the size of the work set. > > > > > I suggested to remove the GroupReduce function, because it did only > > > > collect > > > > > all messages into a single record by emitting the input iterator > > which > > > is > > > > > quite dangerous. Applying a combinable reduce function is could > > improve > > > > the > > > > > performance considerably. > > > > > > > > > > The good news is that it would come "for free" because the > necessary > > > > > partitioning and sorting can be reused (given the forwardField > > > > annotations > > > > > are correctly set): > > > > > - The partitioning of the reduce can be reused for the join with > the > > > > > solution set > > > > > - The sort of the reduce is preserved by the join with the > in-memory > > > > > hash-table of the solution set and can be reused for the coGroup. > > > > > > > > > > Best, > > > > > Fabian > > > > > > > > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri < > > [hidden email] > > > >: > > > > > > > > > > > Hi Fabian, > > > > > > > > > > > > thanks so much for looking into this so quickly :-) > > > > > > > > > > > > One update I have to make is that I tried running a few > experiments > > > > with > > > > > > this on a 6-node cluster. The current implementation gets stuck > at > > > > > > "Rebuilding Workset Properties" and never finishes a single > > > iteration. > > > > > > Running the plan of one superstep without a delta iteration > > > terminates > > > > > > fine. I didn't have access to the cluster today, so I couldn't > > debug > > > > this > > > > > > further, but I will do as soon as I have access again. > > > > > > > > > > > > The rest of my comments are inline: > > > > > > > > > > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> > > > wrote: > > > > > > > > > > > > > Hi Vasia, > > > > > > > > > > > > > > I had a look at your new implementation and have a few ideas > for > > > > > > > improvements. > > > > > > > 1) Sending out the input iterator as you do in the last > > GroupReduce > > > > is > > > > > > > quite dangerous and does not give a benefit compared to > > collecting > > > > all > > > > > > > elements. Even though it is an iterator, it needs to be > > completely > > > > > > > materialized in-memory whenever the record is touched by Flink > or > > > > user > > > > > > > code. > > > > > > > I would propose to skip the reduce step completely and handle > all > > > > > > messages > > > > > > > separates and only collect them in the CoGroup function before > > > giving > > > > > > them > > > > > > > into the VertexComputeFunction. Be careful, to only do that > with > > > > > > > objectReuse disabled or take care to properly copy the > messages. > > If > > > > you > > > > > > > collect the messages in the CoGroup, you don't need the > > > GroupReduce, > > > > > have > > > > > > > smaller records and you can remove the MessageIterator class > > > > > completely. > > > > > > > > > > > > > > > > > > > I see. The idea was to expose to message combiner that user > could > > > > > > implement if the messages are combinable, e.g. min, sum. This > is a > > > > > common > > > > > > case and reduces the message load significantly. Is there a way I > > > could > > > > > do > > > > > > something similar before the coGroup? > > > > > > > > > > > > > > > > > > > > > > > > > 2) Add this annotation to the AppendVertexState function: > > > > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the > complete > > > > > element > > > > > > of > > > > > > > the first input becomes the first field of the output. Since > the > > > > input > > > > > is > > > > > > > partitioned on "f0" (it comes out of the partitioned solution > > set) > > > > the > > > > > > > result of ApplyVertexState will be partitioned on "f0.f0" which > > is > > > > > > > (accidentially :-D) the join key of the following coGroup > > function > > > -> > > > > > no > > > > > > > partitioning :-) > > > > > > > > > > > > > > > > > > > Great! I totally missed that ;) > > > > > > > > > > > > > > > > > > > > > > > > > 3) Adding the two flatMap functions behind the CoGroup prevents > > > > > chaining > > > > > > > and causes therefore some serialization overhead but shouldn't > be > > > too > > > > > > bad. > > > > > > > > > > > > > > So in total I would make this program as follows: > > > > > > > > > > > > > > iVertices<K,VV> > > > > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > > > > > > > > > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > > > > > > > verticesWithMessage<Vertex, Message> = > iteration.getSolutionSet() > > > > > > > .join(iteration.workSet()) > > > > > > > .where(0) // solution set is local and build side > > > > > > > .equalTo(0) // workset is shuffled and probe side of hashjoin > > > > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > > > > > > > verticesWithMessage.coGroup(edgessWithValue) > > > > > > > .where("f0.f0") // vwm is locally forward and sorted > > > > > > > .equalTo(0) // edges are already partitioned and sorted (if > > > cached > > > > > > > correctly) > > > > > > > .with(...) // The coGroup collects all messages in a > collection > > > and > > > > > > gives > > > > > > > it to the ComputeFunction > > > > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when > > > merged > > > > > > into > > > > > > > solution set > > > > > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned > > for > > > > > join > > > > > > > iteration.closeWith(delta, workSet) > > > > > > > > > > > > > > So, if I am correct, the program will > > > > > > > - partition the workset > > > > > > > - sort the vertices with messages > > > > > > > - partition the delta > > > > > > > > > > > > > > One observation I have is that this program requires that all > > > > messages > > > > > > fit > > > > > > > into memory. Was that also the case before? > > > > > > > > > > > > > > > > > > > I believe not. The plan has one coGroup that produces the > messages > > > > and a > > > > > > following coGroup that groups by the messages "target ID" and > > > consumes > > > > > > them in an iterator. That doesn't require them to fit in memory, > > > > right? > > > > > > > > > > > > > > > > > > I'm also working on a version where the graph is represented as > an > > > > > > adjacency list, instead of two separate datasets of vertices and > > > edges. > > > > > The > > > > > > disadvantage is that the graph has to fit in memory, but I think > > the > > > > > > advantages are many. We'll be able to support edge value > updates, > > > edge > > > > > > mutations and different edge access order guarantees. I'll get > back > > > to > > > > > this > > > > > > thread when I have a working prototype. > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > Fabian > > > > > > > > > > > > > > > > > > > Thanks again! > > > > > > > > > > > > Cheers, > > > > > > -Vasia. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > > > > [hidden email] > > > > > >: > > > > > > > > > > > > > > > @Martin: thanks for your input! If you ran into any other > > issues > > > > > that I > > > > > > > > didn't mention, please let us know. Obviously, even with my > > > > proposal, > > > > > > > there > > > > > > > > are still features we cannot support, e.g. updating edge > values > > > and > > > > > > graph > > > > > > > > mutations. We'll need to re-think the underlying iteration > > and/or > > > > > graph > > > > > > > > representation for those. > > > > > > > > > > > > > > > > @Fabian: thanks a lot, no rush :) > > > > > > > > Let me give you some more information that might make it > easier > > > to > > > > > > reason > > > > > > > > about performance: > > > > > > > > > > > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex > > state > > > > and > > > > > > the > > > > > > > > workset (WS) keeps the active vertices. The iteration is > > composed > > > > of > > > > > 2 > > > > > > > > coGroups. The first one takes the WS and the edges and > produces > > > > > > messages. > > > > > > > > The second one takes the messages and the SS and produced the > > new > > > > WS > > > > > > and > > > > > > > > the SS-delta. > > > > > > > > > > > > > > > > In my proposal, the SS has the vertex state and the WS has > > > > <vertexId, > > > > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The > plan > > > is > > > > > more > > > > > > > > complicated because compute() needs to have two iterators: > over > > > the > > > > > > edges > > > > > > > > and over the messages. > > > > > > > > First, I join SS and WS to get the active vertices (have > > > received a > > > > > > msg) > > > > > > > > and their current state. Then I coGroup the result with the > > edges > > > > to > > > > > > > access > > > > > > > > the neighbors. Now the main problem is that this coGroup > needs > > to > > > > > have > > > > > > 2 > > > > > > > > outputs: the new messages and the new vertex value. I > couldn't > > > > really > > > > > > > find > > > > > > > > a nice way to do this, so I'm emitting a Tuple that contains > > both > > > > > types > > > > > > > and > > > > > > > > I have a flag to separate them later with 2 flatMaps. From > the > > > > vertex > > > > > > > > flatMap, I crete the SS-delta and from the messaged flatMap I > > > > apply a > > > > > > > > reduce to group the messages by vertex and send them to the > new > > > WS. > > > > > One > > > > > > > > optimization would be to expose a combiner here to reduce > > message > > > > > size. > > > > > > > > > > > > > > > > tl;dr: > > > > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > > > > > > > 2. how can we efficiently emit 2 different types of records > > from > > > a > > > > > > > coGroup? > > > > > > > > 3. does it make any difference if we group/combine the > messages > > > > > before > > > > > > > > updating the workset or after? > > > > > > > > > > > > > > > > Cheers, > > > > > > > > -Vasia. > > > > > > > > > > > > > > > > > > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske < > [hidden email]> > > > > > wrote: > > > > > > > > > > > > > > > > > I'll try to have a look at the proposal from a performance > > > point > > > > of > > > > > > > view > > > > > > > > in > > > > > > > > > the next days. > > > > > > > > > Please ping me, if I don't follow up this thread. > > > > > > > > > > > > > > > > > > Cheers, Fabian > > > > > > > > > > > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > > > > > [hidden email] > > > > > > >: > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > At our group, we also moved several algorithms from > Giraph > > to > > > > > Gelly > > > > > > > and > > > > > > > > > > ran into some confusing issues (first in understanding, > > > second > > > > > > during > > > > > > > > > > implementation) caused by the conceptional differences > you > > > > > > described. > > > > > > > > > > > > > > > > > > > > If there are no concrete advantages (performance mainly) > in > > > the > > > > > > > Spargel > > > > > > > > > > implementation, we would be very happy to see the Gelly > API > > > be > > > > > > > aligned > > > > > > > > to > > > > > > > > > > Pregel-like systems. > > > > > > > > > > > > > > > > > > > > Your SSSP example speaks for itself. Straightforward, if > > the > > > > > reader > > > > > > > is > > > > > > > > > > familiar with Pregel/Giraph/... > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Martin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > > > > > > > > > > > > > > > >> Hello squirrels, > > > > > > > > > >> > > > > > > > > > >> I want to discuss with you a few concerns I have about > our > > > > > current > > > > > > > > > >> vertex-centric model implementation, Spargel, now fully > > > > subsumed > > > > > > by > > > > > > > > > Gelly. > > > > > > > > > >> > > > > > > > > > >> Spargel is our implementation of Pregel [1], but it > > violates > > > > > some > > > > > > > > > >> fundamental properties of the model, as described in the > > > paper > > > > > and > > > > > > > as > > > > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find > myself > > > > > > confused > > > > > > > > both > > > > > > > > > >> when trying to explain it to current Giraph users and > when > > > > > porting > > > > > > > my > > > > > > > > > >> Giraph algorithms to it. > > > > > > > > > >> > > > > > > > > > >> More specifically: > > > > > > > > > >> - in the Pregel model, messages produced in superstep n, > > are > > > > > > > received > > > > > > > > in > > > > > > > > > >> superstep n+1. In Spargel, they are produced and > consumed > > in > > > > the > > > > > > > same > > > > > > > > > >> iteration. > > > > > > > > > >> - in Pregel, vertices are active during a superstep, if > > they > > > > > have > > > > > > > > > received > > > > > > > > > >> a message in the previous superstep. In Spargel, a > vertex > > is > > > > > > active > > > > > > > > > during > > > > > > > > > >> a superstep if it has changed its value. > > > > > > > > > >> > > > > > > > > > >> These two differences require a lot of rethinking when > > > porting > > > > > > > > > >> applications > > > > > > > > > >> and can easily cause bugs. > > > > > > > > > >> > > > > > > > > > >> The most important problem however is that we require > the > > > user > > > > > to > > > > > > > > split > > > > > > > > > >> the > > > > > > > > > >> computation in 2 phases (2 UDFs): > > > > > > > > > >> - messaging: has access to the vertex state and can > > produce > > > > > > messages > > > > > > > > > >> - update: has access to incoming messages and can update > > the > > > > > > vertex > > > > > > > > > value > > > > > > > > > >> > > > > > > > > > >> Pregel/Giraph only expose one UDF to the user: > > > > > > > > > >> - compute: has access to both the vertex state and the > > > > incoming > > > > > > > > > messages, > > > > > > > > > >> can produce messages and update the vertex value. > > > > > > > > > >> > > > > > > > > > >> This might not seem like a big deal, but except from > > forcing > > > > the > > > > > > > user > > > > > > > > to > > > > > > > > > >> split their program logic into 2 phases, Spargel also > > makes > > > > some > > > > > > > > common > > > > > > > > > >> computation patterns non-intuitive or impossible to > > write. A > > > > > very > > > > > > > > simple > > > > > > > > > >> example is propagating a message based on its value or > > > sender > > > > > ID. > > > > > > To > > > > > > > > do > > > > > > > > > >> this with Spargel, one has to store all the incoming > > > messages > > > > in > > > > > > the > > > > > > > > > >> vertex > > > > > > > > > >> value (might be of different type btw) during the > > messaging > > > > > phase, > > > > > > > so > > > > > > > > > that > > > > > > > > > >> they can be accessed during the update phase. > > > > > > > > > >> > > > > > > > > > >> So, my first question is, when implementing Spargel, > were > > > > other > > > > > > > > > >> alternatives considered and maybe rejected in favor of > > > > > performance > > > > > > > or > > > > > > > > > >> because of some other reason? If someone knows, I would > > love > > > > to > > > > > > hear > > > > > > > > > about > > > > > > > > > >> them! > > > > > > > > > >> > > > > > > > > > >> Second, I wrote a prototype implementation [2] that only > > > > exposes > > > > > > one > > > > > > > > > UDF, > > > > > > > > > >> compute(), by keeping the vertex state in the solution > set > > > and > > > > > the > > > > > > > > > >> messages > > > > > > > > > >> in the workset. This way all previously mentioned > > > limitations > > > > go > > > > > > > away > > > > > > > > > and > > > > > > > > > >> the API (see "SSSPComputeFunction" in the example [3]) > > > looks a > > > > > lot > > > > > > > > more > > > > > > > > > >> like Giraph (see [4]). > > > > > > > > > >> > > > > > > > > > >> I have not run any experiments yet and the prototype has > > > some > > > > > ugly > > > > > > > > > hacks, > > > > > > > > > >> but if you think any of this makes sense, then I'd be > > > willing > > > > to > > > > > > > > follow > > > > > > > > > up > > > > > > > > > >> and try to optimize it. If we see that it performs well, > > we > > > > can > > > > > > > > consider > > > > > > > > > >> either replacing Spargel or adding it as an alternative. > > > > > > > > > >> > > > > > > > > > >> Thanks for reading this long e-mail and looking forward > to > > > > your > > > > > > > input! > > > > > > > > > >> > > > > > > > > > >> Cheers, > > > > > > > > > >> -Vasia. > > > > > > > > > >> > > > > > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > > > > > > > > >> [2]: > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > > > > > > > > >> [3]: > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > > > > > > > > >> [4]: > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
"Either" an "Optional" types are quite useful.
Let's add them to the core Java API. On Wed, Nov 11, 2015 at 10:00 AM, Vasiliki Kalavri < [hidden email]> wrote: > Thanks Fabian! I'll try that :) > > On 10 November 2015 at 22:31, Fabian Hueske <[hidden email]> wrote: > > > You could implement a Java Either type (similar to Scala's Either) that > > either has a Message or the VertexState and a corresponding > TypeInformation > > and TypeSerializer that serializes a byte flag to indicate which both > types > > is used. > > It might actually make sense, to add a generic Either type to the Java > API > > in general (similar to the Java Tuples with resemble the Scala Tuples). > > > > Cheers, Fabian > > > > 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri <[hidden email]>: > > > > > Hi, > > > > > > after running a few experiments, I can confirm that putting the > combiner > > > after the flatMap is indeed more efficient. > > > > > > I ran SSSP and Connected Components with Spargel, GSA, and the Pregel > > model > > > and the results are the following: > > > > > > - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster and > > Pregel > > > is ~1.1x faster without combiner, ~1.3x faster with combiner. > > > - for Connected Components, Spargel and GSA perform similarly, while > > Pregel > > > is 1.4-1.6x slower. > > > > > > To start with, this is much better than I expected :) > > > However, there is a main shortcoming in my current implementation that > > > negatively impacts performance: > > > Since the compute function coGroup needs to output both new vertex > values > > > and new messages, I emit a wrapping tuple that contains both vertex > state > > > and messages and then filter them out based on a boolean field. The > > problem > > > is that since I cannot emit null fields, I emit a dummy message for > each > > > new vertex state and a dummy vertex state for each new message. That > > > essentially means that the intermediate messages result is double in > > size, > > > if say the vertex values are of the same type as the messages (can be > > worse > > > if the vertex values are more complex). > > > So my question is, is there a way to avoid this redundancy, by either > > > emitting null fields or by creating an operator that could emit 2 > > different > > > types of tuples? > > > > > > Thanks! > > > -Vasia. > > > > > > On 9 November 2015 at 15:20, Fabian Hueske <[hidden email]> wrote: > > > > > > > Hi Vasia, > > > > > > > > sorry for the late reply. > > > > I don't think there is a big difference. In both cases, the > > partitioning > > > > and sorting happens at the end of the iteration. > > > > If the groupReduce is applied before the workset is returned, the > > sorting > > > > happens on the filtered result (after the flatMap) which might be a > > > little > > > > bit more efficient (depending on the ratio of messages and solution > set > > > > updates). Also it does not require that the initial workset is sorted > > for > > > > the first groupReduce. > > > > > > > > I would put it at the end. > > > > > > > > Cheers, Fabian > > > > > > > > 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri < > [hidden email] > > >: > > > > > > > > > @Fabian > > > > > > > > > > Is there any advantage in putting the reducer-combiner before > > updating > > > > the > > > > > workset vs. after (i.e. right before the join with the solution > set)? > > > > > > > > > > If it helps, here are the plans of these 2 alternatives: > > > > > > > > > > > > > > > > > > > > > > > > > https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing > > > > > > > > > > > > > > > > > > > > https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing > > > > > > > > > > Thanks a lot for the help! > > > > > > > > > > -Vasia. > > > > > > > > > > On 30 October 2015 at 21:28, Fabian Hueske <[hidden email]> > > wrote: > > > > > > > > > > > We can of course inject an optional ReduceFunction (or > GroupReduce, > > > or > > > > > > combinable GroupReduce) to reduce the size of the work set. > > > > > > I suggested to remove the GroupReduce function, because it did > only > > > > > collect > > > > > > all messages into a single record by emitting the input iterator > > > which > > > > is > > > > > > quite dangerous. Applying a combinable reduce function is could > > > improve > > > > > the > > > > > > performance considerably. > > > > > > > > > > > > The good news is that it would come "for free" because the > > necessary > > > > > > partitioning and sorting can be reused (given the forwardField > > > > > annotations > > > > > > are correctly set): > > > > > > - The partitioning of the reduce can be reused for the join with > > the > > > > > > solution set > > > > > > - The sort of the reduce is preserved by the join with the > > in-memory > > > > > > hash-table of the solution set and can be reused for the coGroup. > > > > > > > > > > > > Best, > > > > > > Fabian > > > > > > > > > > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri < > > > [hidden email] > > > > >: > > > > > > > > > > > > > Hi Fabian, > > > > > > > > > > > > > > thanks so much for looking into this so quickly :-) > > > > > > > > > > > > > > One update I have to make is that I tried running a few > > experiments > > > > > with > > > > > > > this on a 6-node cluster. The current implementation gets stuck > > at > > > > > > > "Rebuilding Workset Properties" and never finishes a single > > > > iteration. > > > > > > > Running the plan of one superstep without a delta iteration > > > > terminates > > > > > > > fine. I didn't have access to the cluster today, so I couldn't > > > debug > > > > > this > > > > > > > further, but I will do as soon as I have access again. > > > > > > > > > > > > > > The rest of my comments are inline: > > > > > > > > > > > > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> > > > > wrote: > > > > > > > > > > > > > > > Hi Vasia, > > > > > > > > > > > > > > > > I had a look at your new implementation and have a few ideas > > for > > > > > > > > improvements. > > > > > > > > 1) Sending out the input iterator as you do in the last > > > GroupReduce > > > > > is > > > > > > > > quite dangerous and does not give a benefit compared to > > > collecting > > > > > all > > > > > > > > elements. Even though it is an iterator, it needs to be > > > completely > > > > > > > > materialized in-memory whenever the record is touched by > Flink > > or > > > > > user > > > > > > > > code. > > > > > > > > I would propose to skip the reduce step completely and handle > > all > > > > > > > messages > > > > > > > > separates and only collect them in the CoGroup function > before > > > > giving > > > > > > > them > > > > > > > > into the VertexComputeFunction. Be careful, to only do that > > with > > > > > > > > objectReuse disabled or take care to properly copy the > > messages. > > > If > > > > > you > > > > > > > > collect the messages in the CoGroup, you don't need the > > > > GroupReduce, > > > > > > have > > > > > > > > smaller records and you can remove the MessageIterator class > > > > > > completely. > > > > > > > > > > > > > > > > > > > > > > I see. The idea was to expose to message combiner that user > > could > > > > > > > implement if the messages are combinable, e.g. min, sum. This > > is a > > > > > > common > > > > > > > case and reduces the message load significantly. Is there a > way I > > > > could > > > > > > do > > > > > > > something similar before the coGroup? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) Add this annotation to the AppendVertexState function: > > > > > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the > > complete > > > > > > element > > > > > > > of > > > > > > > > the first input becomes the first field of the output. Since > > the > > > > > input > > > > > > is > > > > > > > > partitioned on "f0" (it comes out of the partitioned solution > > > set) > > > > > the > > > > > > > > result of ApplyVertexState will be partitioned on "f0.f0" > which > > > is > > > > > > > > (accidentially :-D) the join key of the following coGroup > > > function > > > > -> > > > > > > no > > > > > > > > partitioning :-) > > > > > > > > > > > > > > > > > > > > > > Great! I totally missed that ;) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3) Adding the two flatMap functions behind the CoGroup > prevents > > > > > > chaining > > > > > > > > and causes therefore some serialization overhead but > shouldn't > > be > > > > too > > > > > > > bad. > > > > > > > > > > > > > > > > So in total I would make this program as follows: > > > > > > > > > > > > > > > > iVertices<K,VV> > > > > > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > > > > > > > > > > > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > > > > > > > > verticesWithMessage<Vertex, Message> = > > iteration.getSolutionSet() > > > > > > > > .join(iteration.workSet()) > > > > > > > > .where(0) // solution set is local and build side > > > > > > > > .equalTo(0) // workset is shuffled and probe side of > hashjoin > > > > > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > > > > > > > > verticesWithMessage.coGroup(edgessWithValue) > > > > > > > > .where("f0.f0") // vwm is locally forward and sorted > > > > > > > > .equalTo(0) // edges are already partitioned and sorted > (if > > > > cached > > > > > > > > correctly) > > > > > > > > .with(...) // The coGroup collects all messages in a > > collection > > > > and > > > > > > > gives > > > > > > > > it to the ComputeFunction > > > > > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned > when > > > > merged > > > > > > > into > > > > > > > > solution set > > > > > > > > workSet<K, Message> = superStepComp.flatMap(...) // > partitioned > > > for > > > > > > join > > > > > > > > iteration.closeWith(delta, workSet) > > > > > > > > > > > > > > > > So, if I am correct, the program will > > > > > > > > - partition the workset > > > > > > > > - sort the vertices with messages > > > > > > > > - partition the delta > > > > > > > > > > > > > > > > One observation I have is that this program requires that all > > > > > messages > > > > > > > fit > > > > > > > > into memory. Was that also the case before? > > > > > > > > > > > > > > > > > > > > > > I believe not. The plan has one coGroup that produces the > > messages > > > > > and a > > > > > > > following coGroup that groups by the messages "target ID" and > > > > consumes > > > > > > > them in an iterator. That doesn't require them to fit in > memory, > > > > > right? > > > > > > > > > > > > > > > > > > > > > I'm also working on a version where the graph is represented > as > > an > > > > > > > adjacency list, instead of two separate datasets of vertices > and > > > > edges. > > > > > > The > > > > > > > disadvantage is that the graph has to fit in memory, but I > think > > > the > > > > > > > advantages are many. We'll be able to support edge value > > updates, > > > > edge > > > > > > > mutations and different edge access order guarantees. I'll get > > back > > > > to > > > > > > this > > > > > > > thread when I have a working prototype. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > Fabian > > > > > > > > > > > > > > > > > > > > > > Thanks again! > > > > > > > > > > > > > > Cheers, > > > > > > > -Vasia. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > > > > > [hidden email] > > > > > > >: > > > > > > > > > > > > > > > > > @Martin: thanks for your input! If you ran into any other > > > issues > > > > > > that I > > > > > > > > > didn't mention, please let us know. Obviously, even with my > > > > > proposal, > > > > > > > > there > > > > > > > > > are still features we cannot support, e.g. updating edge > > values > > > > and > > > > > > > graph > > > > > > > > > mutations. We'll need to re-think the underlying iteration > > > and/or > > > > > > graph > > > > > > > > > representation for those. > > > > > > > > > > > > > > > > > > @Fabian: thanks a lot, no rush :) > > > > > > > > > Let me give you some more information that might make it > > easier > > > > to > > > > > > > reason > > > > > > > > > about performance: > > > > > > > > > > > > > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex > > > state > > > > > and > > > > > > > the > > > > > > > > > workset (WS) keeps the active vertices. The iteration is > > > composed > > > > > of > > > > > > 2 > > > > > > > > > coGroups. The first one takes the WS and the edges and > > produces > > > > > > > messages. > > > > > > > > > The second one takes the messages and the SS and produced > the > > > new > > > > > WS > > > > > > > and > > > > > > > > > the SS-delta. > > > > > > > > > > > > > > > > > > In my proposal, the SS has the vertex state and the WS has > > > > > <vertexId, > > > > > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The > > plan > > > > is > > > > > > more > > > > > > > > > complicated because compute() needs to have two iterators: > > over > > > > the > > > > > > > edges > > > > > > > > > and over the messages. > > > > > > > > > First, I join SS and WS to get the active vertices (have > > > > received a > > > > > > > msg) > > > > > > > > > and their current state. Then I coGroup the result with the > > > edges > > > > > to > > > > > > > > access > > > > > > > > > the neighbors. Now the main problem is that this coGroup > > needs > > > to > > > > > > have > > > > > > > 2 > > > > > > > > > outputs: the new messages and the new vertex value. I > > couldn't > > > > > really > > > > > > > > find > > > > > > > > > a nice way to do this, so I'm emitting a Tuple that > contains > > > both > > > > > > types > > > > > > > > and > > > > > > > > > I have a flag to separate them later with 2 flatMaps. From > > the > > > > > vertex > > > > > > > > > flatMap, I crete the SS-delta and from the messaged > flatMap I > > > > > apply a > > > > > > > > > reduce to group the messages by vertex and send them to the > > new > > > > WS. > > > > > > One > > > > > > > > > optimization would be to expose a combiner here to reduce > > > message > > > > > > size. > > > > > > > > > > > > > > > > > > tl;dr: > > > > > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > > > > > > > > 2. how can we efficiently emit 2 different types of records > > > from > > > > a > > > > > > > > coGroup? > > > > > > > > > 3. does it make any difference if we group/combine the > > messages > > > > > > before > > > > > > > > > updating the workset or after? > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > -Vasia. > > > > > > > > > > > > > > > > > > > > > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske < > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > I'll try to have a look at the proposal from a > performance > > > > point > > > > > of > > > > > > > > view > > > > > > > > > in > > > > > > > > > > the next days. > > > > > > > > > > Please ping me, if I don't follow up this thread. > > > > > > > > > > > > > > > > > > > > Cheers, Fabian > > > > > > > > > > > > > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > > > > > > [hidden email] > > > > > > > >: > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > At our group, we also moved several algorithms from > > Giraph > > > to > > > > > > Gelly > > > > > > > > and > > > > > > > > > > > ran into some confusing issues (first in understanding, > > > > second > > > > > > > during > > > > > > > > > > > implementation) caused by the conceptional differences > > you > > > > > > > described. > > > > > > > > > > > > > > > > > > > > > > If there are no concrete advantages (performance > mainly) > > in > > > > the > > > > > > > > Spargel > > > > > > > > > > > implementation, we would be very happy to see the Gelly > > API > > > > be > > > > > > > > aligned > > > > > > > > > to > > > > > > > > > > > Pregel-like systems. > > > > > > > > > > > > > > > > > > > > > > Your SSSP example speaks for itself. Straightforward, > if > > > the > > > > > > reader > > > > > > > > is > > > > > > > > > > > familiar with Pregel/Giraph/... > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Martin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > > > > > > > > > > > > > > > > > >> Hello squirrels, > > > > > > > > > > >> > > > > > > > > > > >> I want to discuss with you a few concerns I have about > > our > > > > > > current > > > > > > > > > > >> vertex-centric model implementation, Spargel, now > fully > > > > > subsumed > > > > > > > by > > > > > > > > > > Gelly. > > > > > > > > > > >> > > > > > > > > > > >> Spargel is our implementation of Pregel [1], but it > > > violates > > > > > > some > > > > > > > > > > >> fundamental properties of the model, as described in > the > > > > paper > > > > > > and > > > > > > > > as > > > > > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find > > myself > > > > > > > confused > > > > > > > > > both > > > > > > > > > > >> when trying to explain it to current Giraph users and > > when > > > > > > porting > > > > > > > > my > > > > > > > > > > >> Giraph algorithms to it. > > > > > > > > > > >> > > > > > > > > > > >> More specifically: > > > > > > > > > > >> - in the Pregel model, messages produced in superstep > n, > > > are > > > > > > > > received > > > > > > > > > in > > > > > > > > > > >> superstep n+1. In Spargel, they are produced and > > consumed > > > in > > > > > the > > > > > > > > same > > > > > > > > > > >> iteration. > > > > > > > > > > >> - in Pregel, vertices are active during a superstep, > if > > > they > > > > > > have > > > > > > > > > > received > > > > > > > > > > >> a message in the previous superstep. In Spargel, a > > vertex > > > is > > > > > > > active > > > > > > > > > > during > > > > > > > > > > >> a superstep if it has changed its value. > > > > > > > > > > >> > > > > > > > > > > >> These two differences require a lot of rethinking when > > > > porting > > > > > > > > > > >> applications > > > > > > > > > > >> and can easily cause bugs. > > > > > > > > > > >> > > > > > > > > > > >> The most important problem however is that we require > > the > > > > user > > > > > > to > > > > > > > > > split > > > > > > > > > > >> the > > > > > > > > > > >> computation in 2 phases (2 UDFs): > > > > > > > > > > >> - messaging: has access to the vertex state and can > > > produce > > > > > > > messages > > > > > > > > > > >> - update: has access to incoming messages and can > update > > > the > > > > > > > vertex > > > > > > > > > > value > > > > > > > > > > >> > > > > > > > > > > >> Pregel/Giraph only expose one UDF to the user: > > > > > > > > > > >> - compute: has access to both the vertex state and the > > > > > incoming > > > > > > > > > > messages, > > > > > > > > > > >> can produce messages and update the vertex value. > > > > > > > > > > >> > > > > > > > > > > >> This might not seem like a big deal, but except from > > > forcing > > > > > the > > > > > > > > user > > > > > > > > > to > > > > > > > > > > >> split their program logic into 2 phases, Spargel also > > > makes > > > > > some > > > > > > > > > common > > > > > > > > > > >> computation patterns non-intuitive or impossible to > > > write. A > > > > > > very > > > > > > > > > simple > > > > > > > > > > >> example is propagating a message based on its value or > > > > sender > > > > > > ID. > > > > > > > To > > > > > > > > > do > > > > > > > > > > >> this with Spargel, one has to store all the incoming > > > > messages > > > > > in > > > > > > > the > > > > > > > > > > >> vertex > > > > > > > > > > >> value (might be of different type btw) during the > > > messaging > > > > > > phase, > > > > > > > > so > > > > > > > > > > that > > > > > > > > > > >> they can be accessed during the update phase. > > > > > > > > > > >> > > > > > > > > > > >> So, my first question is, when implementing Spargel, > > were > > > > > other > > > > > > > > > > >> alternatives considered and maybe rejected in favor of > > > > > > performance > > > > > > > > or > > > > > > > > > > >> because of some other reason? If someone knows, I > would > > > love > > > > > to > > > > > > > hear > > > > > > > > > > about > > > > > > > > > > >> them! > > > > > > > > > > >> > > > > > > > > > > >> Second, I wrote a prototype implementation [2] that > only > > > > > exposes > > > > > > > one > > > > > > > > > > UDF, > > > > > > > > > > >> compute(), by keeping the vertex state in the solution > > set > > > > and > > > > > > the > > > > > > > > > > >> messages > > > > > > > > > > >> in the workset. This way all previously mentioned > > > > limitations > > > > > go > > > > > > > > away > > > > > > > > > > and > > > > > > > > > > >> the API (see "SSSPComputeFunction" in the example [3]) > > > > looks a > > > > > > lot > > > > > > > > > more > > > > > > > > > > >> like Giraph (see [4]). > > > > > > > > > > >> > > > > > > > > > > >> I have not run any experiments yet and the prototype > has > > > > some > > > > > > ugly > > > > > > > > > > hacks, > > > > > > > > > > >> but if you think any of this makes sense, then I'd be > > > > willing > > > > > to > > > > > > > > > follow > > > > > > > > > > up > > > > > > > > > > >> and try to optimize it. If we see that it performs > well, > > > we > > > > > can > > > > > > > > > consider > > > > > > > > > > >> either replacing Spargel or adding it as an > alternative. > > > > > > > > > > >> > > > > > > > > > > >> Thanks for reading this long e-mail and looking > forward > > to > > > > > your > > > > > > > > input! > > > > > > > > > > >> > > > > > > > > > > >> Cheers, > > > > > > > > > > >> -Vasia. > > > > > > > > > > >> > > > > > > > > > > >> [1]: > https://kowshik.github.io/JPregel/pregel_paper.pdf > > > > > > > > > > >> [2]: > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > > > > > > > > > >> [3]: > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > > > > > > > > > >> [4]: > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |