[gelly] Spargel model rework

classic Classic list List threaded Threaded
25 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[gelly] Spargel model rework

Vasiliki Kalavri
Hello squirrels,

I want to discuss with you a few concerns I have about our current
vertex-centric model implementation, Spargel, now fully subsumed by Gelly.

Spargel is our implementation of Pregel [1], but it violates some
fundamental properties of the model, as described in the paper and as
implemented in e.g. Giraph, GPS, Hama. I often find myself confused both
when trying to explain it to current Giraph users and when porting my
Giraph algorithms to it.

More specifically:
- in the Pregel model, messages produced in superstep n, are received in
superstep n+1. In Spargel, they are produced and consumed in the same
iteration.
- in Pregel, vertices are active during a superstep, if they have received
a message in the previous superstep. In Spargel, a vertex is active during
a superstep if it has changed its value.

These two differences require a lot of rethinking when porting applications
and can easily cause bugs.

The most important problem however is that we require the user to split the
computation in 2 phases (2 UDFs):
- messaging: has access to the vertex state and can produce messages
- update: has access to incoming messages and can update the vertex value

Pregel/Giraph only expose one UDF to the user:
- compute: has access to both the vertex state and the incoming messages,
can produce messages and update the vertex value.

This might not seem like a big deal, but except from forcing the user to
split their program logic into 2 phases, Spargel also makes some common
computation patterns non-intuitive or impossible to write. A very simple
example is propagating a message based on its value or sender ID. To do
this with Spargel, one has to store all the incoming messages in the vertex
value (might be of different type btw) during the messaging phase, so that
they can be accessed during the update phase.

So, my first question is, when implementing Spargel, were other
alternatives considered and maybe rejected in favor of performance or
because of some other reason? If someone knows, I would love to hear about
them!

Second, I wrote a prototype implementation [2] that only exposes one UDF,
compute(), by keeping the vertex state in the solution set and the messages
in the workset. This way all previously mentioned limitations go away and
the API (see "SSSPComputeFunction" in the example [3]) looks a lot more
like Giraph (see [4]).

I have not run any experiments yet and the prototype has some ugly hacks,
but if you think any of this makes sense, then I'd be willing to follow up
and try to optimize it. If we see that it performs well, we can consider
either replacing Spargel or adding it as an alternative.

Thanks for reading this long e-mail and looking forward to your input!

Cheers,
-Vasia.

[1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
[2]:
https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
[3]:
https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
[4]:
https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Martin Junghanns
Hi,

At our group, we also moved several algorithms from Giraph to Gelly and
ran into some confusing issues (first in understanding, second during
implementation) caused by the conceptional differences you described.

If there are no concrete advantages (performance mainly) in the Spargel
implementation, we would be very happy to see the Gelly API be aligned
to Pregel-like systems.

Your SSSP example speaks for itself. Straightforward, if the reader is
familiar with Pregel/Giraph/...

Best,
Martin

On 27.10.2015 17:40, Vasiliki Kalavri wrote:

> Hello squirrels,
>
> I want to discuss with you a few concerns I have about our current
> vertex-centric model implementation, Spargel, now fully subsumed by Gelly.
>
> Spargel is our implementation of Pregel [1], but it violates some
> fundamental properties of the model, as described in the paper and as
> implemented in e.g. Giraph, GPS, Hama. I often find myself confused both
> when trying to explain it to current Giraph users and when porting my
> Giraph algorithms to it.
>
> More specifically:
> - in the Pregel model, messages produced in superstep n, are received in
> superstep n+1. In Spargel, they are produced and consumed in the same
> iteration.
> - in Pregel, vertices are active during a superstep, if they have received
> a message in the previous superstep. In Spargel, a vertex is active during
> a superstep if it has changed its value.
>
> These two differences require a lot of rethinking when porting applications
> and can easily cause bugs.
>
> The most important problem however is that we require the user to split the
> computation in 2 phases (2 UDFs):
> - messaging: has access to the vertex state and can produce messages
> - update: has access to incoming messages and can update the vertex value
>
> Pregel/Giraph only expose one UDF to the user:
> - compute: has access to both the vertex state and the incoming messages,
> can produce messages and update the vertex value.
>
> This might not seem like a big deal, but except from forcing the user to
> split their program logic into 2 phases, Spargel also makes some common
> computation patterns non-intuitive or impossible to write. A very simple
> example is propagating a message based on its value or sender ID. To do
> this with Spargel, one has to store all the incoming messages in the vertex
> value (might be of different type btw) during the messaging phase, so that
> they can be accessed during the update phase.
>
> So, my first question is, when implementing Spargel, were other
> alternatives considered and maybe rejected in favor of performance or
> because of some other reason? If someone knows, I would love to hear about
> them!
>
> Second, I wrote a prototype implementation [2] that only exposes one UDF,
> compute(), by keeping the vertex state in the solution set and the messages
> in the workset. This way all previously mentioned limitations go away and
> the API (see "SSSPComputeFunction" in the example [3]) looks a lot more
> like Giraph (see [4]).
>
> I have not run any experiments yet and the prototype has some ugly hacks,
> but if you think any of this makes sense, then I'd be willing to follow up
> and try to optimize it. If we see that it performs well, we can consider
> either replacing Spargel or adding it as an alternative.
>
> Thanks for reading this long e-mail and looking forward to your input!
>
> Cheers,
> -Vasia.
>
> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> [2]:
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> [3]:
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> [4]:
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Fabian Hueske-2
I'll try to have a look at the proposal from a performance point of view in
the next days.
Please ping me, if I don't follow up this thread.

Cheers, Fabian

2015-10-27 18:28 GMT+01:00 Martin Junghanns <[hidden email]>:

> Hi,
>
> At our group, we also moved several algorithms from Giraph to Gelly and
> ran into some confusing issues (first in understanding, second during
> implementation) caused by the conceptional differences you described.
>
> If there are no concrete advantages (performance mainly) in the Spargel
> implementation, we would be very happy to see the Gelly API be aligned to
> Pregel-like systems.
>
> Your SSSP example speaks for itself. Straightforward, if the reader is
> familiar with Pregel/Giraph/...
>
> Best,
> Martin
>
>
> On 27.10.2015 17:40, Vasiliki Kalavri wrote:
>
>> Hello squirrels,
>>
>> I want to discuss with you a few concerns I have about our current
>> vertex-centric model implementation, Spargel, now fully subsumed by Gelly.
>>
>> Spargel is our implementation of Pregel [1], but it violates some
>> fundamental properties of the model, as described in the paper and as
>> implemented in e.g. Giraph, GPS, Hama. I often find myself confused both
>> when trying to explain it to current Giraph users and when porting my
>> Giraph algorithms to it.
>>
>> More specifically:
>> - in the Pregel model, messages produced in superstep n, are received in
>> superstep n+1. In Spargel, they are produced and consumed in the same
>> iteration.
>> - in Pregel, vertices are active during a superstep, if they have received
>> a message in the previous superstep. In Spargel, a vertex is active during
>> a superstep if it has changed its value.
>>
>> These two differences require a lot of rethinking when porting
>> applications
>> and can easily cause bugs.
>>
>> The most important problem however is that we require the user to split
>> the
>> computation in 2 phases (2 UDFs):
>> - messaging: has access to the vertex state and can produce messages
>> - update: has access to incoming messages and can update the vertex value
>>
>> Pregel/Giraph only expose one UDF to the user:
>> - compute: has access to both the vertex state and the incoming messages,
>> can produce messages and update the vertex value.
>>
>> This might not seem like a big deal, but except from forcing the user to
>> split their program logic into 2 phases, Spargel also makes some common
>> computation patterns non-intuitive or impossible to write. A very simple
>> example is propagating a message based on its value or sender ID. To do
>> this with Spargel, one has to store all the incoming messages in the
>> vertex
>> value (might be of different type btw) during the messaging phase, so that
>> they can be accessed during the update phase.
>>
>> So, my first question is, when implementing Spargel, were other
>> alternatives considered and maybe rejected in favor of performance or
>> because of some other reason? If someone knows, I would love to hear about
>> them!
>>
>> Second, I wrote a prototype implementation [2] that only exposes one UDF,
>> compute(), by keeping the vertex state in the solution set and the
>> messages
>> in the workset. This way all previously mentioned limitations go away and
>> the API (see "SSSPComputeFunction" in the example [3]) looks a lot more
>> like Giraph (see [4]).
>>
>> I have not run any experiments yet and the prototype has some ugly hacks,
>> but if you think any of this makes sense, then I'd be willing to follow up
>> and try to optimize it. If we see that it performs well, we can consider
>> either replacing Spargel or adding it as an alternative.
>>
>> Thanks for reading this long e-mail and looking forward to your input!
>>
>> Cheers,
>> -Vasia.
>>
>> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
>> [2]:
>>
>> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
>> [3]:
>>
>> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
>> [4]:
>>
>> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Vasiliki Kalavri
@Martin: thanks for your input! If you ran into any other issues that I
didn't mention, please let us know. Obviously, even with my proposal, there
are still features we cannot support, e.g. updating edge values and graph
mutations. We'll need to re-think the underlying iteration and/or graph
representation for those.

@Fabian: thanks a lot, no rush :)
Let me give you some more information that might make it easier to reason
about performance:

Currently, in Spargel the SolutionSet (SS) keeps the vertex state and the
workset (WS) keeps the active vertices. The iteration is composed of 2
coGroups. The first one takes the WS and the edges and produces messages.
The second one takes the messages and the SS and produced the new WS and
the SS-delta.

In my proposal, the SS has the vertex state and the WS has <vertexId,
MessageIterator> pairs, i.e. the inbox of each vertex. The plan is more
complicated because compute() needs to have two iterators: over the edges
and over the messages.
First, I join SS and WS to get the active vertices (have received a msg)
and their current state. Then I coGroup the result with the edges to access
the neighbors. Now the main problem is that this coGroup needs to have 2
outputs: the new messages and the new vertex value. I couldn't really find
a nice way to do this, so I'm emitting a Tuple that contains both types and
I have a flag to separate them later with 2 flatMaps. From the vertex
flatMap, I crete the SS-delta and from the messaged flatMap I apply a
reduce to group the messages by vertex and send them to the new WS. One
optimization would be to expose a combiner here to reduce message size.

tl;dr:
1. 2 coGroups vs. Join + coGroup + flatMap + reduce
2. how can we efficiently emit 2 different types of records from a coGroup?
3. does it make any difference if we group/combine the messages before
updating the workset or after?

Cheers,
-Vasia.


On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> wrote:

> I'll try to have a look at the proposal from a performance point of view in
> the next days.
> Please ping me, if I don't follow up this thread.
>
> Cheers, Fabian
>
> 2015-10-27 18:28 GMT+01:00 Martin Junghanns <[hidden email]>:
>
> > Hi,
> >
> > At our group, we also moved several algorithms from Giraph to Gelly and
> > ran into some confusing issues (first in understanding, second during
> > implementation) caused by the conceptional differences you described.
> >
> > If there are no concrete advantages (performance mainly) in the Spargel
> > implementation, we would be very happy to see the Gelly API be aligned to
> > Pregel-like systems.
> >
> > Your SSSP example speaks for itself. Straightforward, if the reader is
> > familiar with Pregel/Giraph/...
> >
> > Best,
> > Martin
> >
> >
> > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> >
> >> Hello squirrels,
> >>
> >> I want to discuss with you a few concerns I have about our current
> >> vertex-centric model implementation, Spargel, now fully subsumed by
> Gelly.
> >>
> >> Spargel is our implementation of Pregel [1], but it violates some
> >> fundamental properties of the model, as described in the paper and as
> >> implemented in e.g. Giraph, GPS, Hama. I often find myself confused both
> >> when trying to explain it to current Giraph users and when porting my
> >> Giraph algorithms to it.
> >>
> >> More specifically:
> >> - in the Pregel model, messages produced in superstep n, are received in
> >> superstep n+1. In Spargel, they are produced and consumed in the same
> >> iteration.
> >> - in Pregel, vertices are active during a superstep, if they have
> received
> >> a message in the previous superstep. In Spargel, a vertex is active
> during
> >> a superstep if it has changed its value.
> >>
> >> These two differences require a lot of rethinking when porting
> >> applications
> >> and can easily cause bugs.
> >>
> >> The most important problem however is that we require the user to split
> >> the
> >> computation in 2 phases (2 UDFs):
> >> - messaging: has access to the vertex state and can produce messages
> >> - update: has access to incoming messages and can update the vertex
> value
> >>
> >> Pregel/Giraph only expose one UDF to the user:
> >> - compute: has access to both the vertex state and the incoming
> messages,
> >> can produce messages and update the vertex value.
> >>
> >> This might not seem like a big deal, but except from forcing the user to
> >> split their program logic into 2 phases, Spargel also makes some common
> >> computation patterns non-intuitive or impossible to write. A very simple
> >> example is propagating a message based on its value or sender ID. To do
> >> this with Spargel, one has to store all the incoming messages in the
> >> vertex
> >> value (might be of different type btw) during the messaging phase, so
> that
> >> they can be accessed during the update phase.
> >>
> >> So, my first question is, when implementing Spargel, were other
> >> alternatives considered and maybe rejected in favor of performance or
> >> because of some other reason? If someone knows, I would love to hear
> about
> >> them!
> >>
> >> Second, I wrote a prototype implementation [2] that only exposes one
> UDF,
> >> compute(), by keeping the vertex state in the solution set and the
> >> messages
> >> in the workset. This way all previously mentioned limitations go away
> and
> >> the API (see "SSSPComputeFunction" in the example [3]) looks a lot more
> >> like Giraph (see [4]).
> >>
> >> I have not run any experiments yet and the prototype has some ugly
> hacks,
> >> but if you think any of this makes sense, then I'd be willing to follow
> up
> >> and try to optimize it. If we see that it performs well, we can consider
> >> either replacing Spargel or adding it as an alternative.
> >>
> >> Thanks for reading this long e-mail and looking forward to your input!
> >>
> >> Cheers,
> >> -Vasia.
> >>
> >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> >> [2]:
> >>
> >>
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> >> [3]:
> >>
> >>
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> >> [4]:
> >>
> >>
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Fabian Hueske-2
Hi Vasia,

I had a look at your new implementation and have a few ideas for
improvements.
1) Sending out the input iterator as you do in the last GroupReduce is
quite dangerous and does not give a benefit compared to collecting all
elements. Even though it is an iterator, it needs to be completely
materialized in-memory whenever the record is touched by Flink or user
code.
I would propose to skip the reduce step completely and handle all messages
separates and only collect them in the CoGroup function before giving them
into the VertexComputeFunction. Be careful, to only do that with
objectReuse disabled or take care to properly copy the messages. If you
collect the messages in the CoGroup, you don't need the GroupReduce, have
smaller records and you can remove the MessageIterator class completely.
2) Add this annotation to the AppendVertexState function:
@ForwardedFieldsFirst("*->f0"). This indicates that the complete element of
the first input becomes the first field of the output. Since the input is
partitioned on "f0" (it comes out of the partitioned solution set) the
result of ApplyVertexState will be partitioned on "f0.f0" which is
(accidentially :-D) the join key of the following coGroup function -> no
partitioning :-)
3) Adding the two flatMap functions behind the CoGroup prevents chaining
and causes therefore some serialization overhead but shouldn't be too bad.

So in total I would make this program as follows:

iVertices<K,VV>
iMessage<K, Message> = iVertices.map(new InitWorkSet());

iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
verticesWithMessage<Vertex, Message> = iteration.getSolutionSet()
  .join(iteration.workSet())
  .where(0) // solution set is local and build side
  .equalTo(0) // workset is shuffled and probe side of hashjoin
superstepComp<Vertex,Tuple2<K, Message>,Bool> =
verticesWithMessage.coGroup(edgessWithValue)
  .where("f0.f0") // vwm is locally forward and sorted
  .equalTo(0) //  edges are already partitioned and sorted (if cached
correctly)
  .with(...) // The coGroup collects all messages in a collection and gives
it to the ComputeFunction
delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged into
solution set
workSet<K, Message> = superStepComp.flatMap(...) // partitioned for join
iteration.closeWith(delta, workSet)

So, if I am correct, the program will
- partition the workset
- sort the vertices with messages
- partition the delta

One observation I have is that this program requires that all messages fit
into memory. Was that also the case before?

Cheers,
Fabian


2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <[hidden email]>:

> @Martin: thanks for your input! If you ran into any other issues that I
> didn't mention, please let us know. Obviously, even with my proposal, there
> are still features we cannot support, e.g. updating edge values and graph
> mutations. We'll need to re-think the underlying iteration and/or graph
> representation for those.
>
> @Fabian: thanks a lot, no rush :)
> Let me give you some more information that might make it easier to reason
> about performance:
>
> Currently, in Spargel the SolutionSet (SS) keeps the vertex state and the
> workset (WS) keeps the active vertices. The iteration is composed of 2
> coGroups. The first one takes the WS and the edges and produces messages.
> The second one takes the messages and the SS and produced the new WS and
> the SS-delta.
>
> In my proposal, the SS has the vertex state and the WS has <vertexId,
> MessageIterator> pairs, i.e. the inbox of each vertex. The plan is more
> complicated because compute() needs to have two iterators: over the edges
> and over the messages.
> First, I join SS and WS to get the active vertices (have received a msg)
> and their current state. Then I coGroup the result with the edges to access
> the neighbors. Now the main problem is that this coGroup needs to have 2
> outputs: the new messages and the new vertex value. I couldn't really find
> a nice way to do this, so I'm emitting a Tuple that contains both types and
> I have a flag to separate them later with 2 flatMaps. From the vertex
> flatMap, I crete the SS-delta and from the messaged flatMap I apply a
> reduce to group the messages by vertex and send them to the new WS. One
> optimization would be to expose a combiner here to reduce message size.
>
> tl;dr:
> 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> 2. how can we efficiently emit 2 different types of records from a coGroup?
> 3. does it make any difference if we group/combine the messages before
> updating the workset or after?
>
> Cheers,
> -Vasia.
>
>
> On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> wrote:
>
> > I'll try to have a look at the proposal from a performance point of view
> in
> > the next days.
> > Please ping me, if I don't follow up this thread.
> >
> > Cheers, Fabian
> >
> > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <[hidden email]>:
> >
> > > Hi,
> > >
> > > At our group, we also moved several algorithms from Giraph to Gelly and
> > > ran into some confusing issues (first in understanding, second during
> > > implementation) caused by the conceptional differences you described.
> > >
> > > If there are no concrete advantages (performance mainly) in the Spargel
> > > implementation, we would be very happy to see the Gelly API be aligned
> to
> > > Pregel-like systems.
> > >
> > > Your SSSP example speaks for itself. Straightforward, if the reader is
> > > familiar with Pregel/Giraph/...
> > >
> > > Best,
> > > Martin
> > >
> > >
> > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > >
> > >> Hello squirrels,
> > >>
> > >> I want to discuss with you a few concerns I have about our current
> > >> vertex-centric model implementation, Spargel, now fully subsumed by
> > Gelly.
> > >>
> > >> Spargel is our implementation of Pregel [1], but it violates some
> > >> fundamental properties of the model, as described in the paper and as
> > >> implemented in e.g. Giraph, GPS, Hama. I often find myself confused
> both
> > >> when trying to explain it to current Giraph users and when porting my
> > >> Giraph algorithms to it.
> > >>
> > >> More specifically:
> > >> - in the Pregel model, messages produced in superstep n, are received
> in
> > >> superstep n+1. In Spargel, they are produced and consumed in the same
> > >> iteration.
> > >> - in Pregel, vertices are active during a superstep, if they have
> > received
> > >> a message in the previous superstep. In Spargel, a vertex is active
> > during
> > >> a superstep if it has changed its value.
> > >>
> > >> These two differences require a lot of rethinking when porting
> > >> applications
> > >> and can easily cause bugs.
> > >>
> > >> The most important problem however is that we require the user to
> split
> > >> the
> > >> computation in 2 phases (2 UDFs):
> > >> - messaging: has access to the vertex state and can produce messages
> > >> - update: has access to incoming messages and can update the vertex
> > value
> > >>
> > >> Pregel/Giraph only expose one UDF to the user:
> > >> - compute: has access to both the vertex state and the incoming
> > messages,
> > >> can produce messages and update the vertex value.
> > >>
> > >> This might not seem like a big deal, but except from forcing the user
> to
> > >> split their program logic into 2 phases, Spargel also makes some
> common
> > >> computation patterns non-intuitive or impossible to write. A very
> simple
> > >> example is propagating a message based on its value or sender ID. To
> do
> > >> this with Spargel, one has to store all the incoming messages in the
> > >> vertex
> > >> value (might be of different type btw) during the messaging phase, so
> > that
> > >> they can be accessed during the update phase.
> > >>
> > >> So, my first question is, when implementing Spargel, were other
> > >> alternatives considered and maybe rejected in favor of performance or
> > >> because of some other reason? If someone knows, I would love to hear
> > about
> > >> them!
> > >>
> > >> Second, I wrote a prototype implementation [2] that only exposes one
> > UDF,
> > >> compute(), by keeping the vertex state in the solution set and the
> > >> messages
> > >> in the workset. This way all previously mentioned limitations go away
> > and
> > >> the API (see "SSSPComputeFunction" in the example [3]) looks a lot
> more
> > >> like Giraph (see [4]).
> > >>
> > >> I have not run any experiments yet and the prototype has some ugly
> > hacks,
> > >> but if you think any of this makes sense, then I'd be willing to
> follow
> > up
> > >> and try to optimize it. If we see that it performs well, we can
> consider
> > >> either replacing Spargel or adding it as an alternative.
> > >>
> > >> Thanks for reading this long e-mail and looking forward to your input!
> > >>
> > >> Cheers,
> > >> -Vasia.
> > >>
> > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> > >> [2]:
> > >>
> > >>
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > >> [3]:
> > >>
> > >>
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > >> [4]:
> > >>
> > >>
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > >>
> > >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Vasiliki Kalavri
Hi Fabian,

thanks so much for looking into this so quickly :-)

One update I have to make is that I tried running a few experiments with
this on a 6-node cluster. The current implementation gets stuck at
"Rebuilding Workset Properties" and never finishes a single iteration.
Running the plan of one superstep without a delta iteration terminates
fine. I didn't have access to the cluster today, so I couldn't debug this
further, but I will do as soon as I have access again.

The rest of my comments are inline:

On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote:

> Hi Vasia,
>
> I had a look at your new implementation and have a few ideas for
> improvements.
> 1) Sending out the input iterator as you do in the last GroupReduce is
> quite dangerous and does not give a benefit compared to collecting all
> elements. Even though it is an iterator, it needs to be completely
> materialized in-memory whenever the record is touched by Flink or user
> code.
> I would propose to skip the reduce step completely and handle all messages
> separates and only collect them in the CoGroup function before giving them
> into the VertexComputeFunction. Be careful, to only do that with
> objectReuse disabled or take care to properly copy the messages. If you
> collect the messages in the CoGroup, you don't need the GroupReduce, have
> smaller records and you can remove the MessageIterator class completely.
>

​I see. The idea was to expose to message combiner that user could
​implement if the messages are combinable, e.g. min, sum. This is a common
case and reduces the message load significantly. Is there a way I could do
something similar before the coGroup?



> 2) Add this annotation to the AppendVertexState function:
> @ForwardedFieldsFirst("*->f0"). This indicates that the complete element of
> the first input becomes the first field of the output. Since the input is
> partitioned on "f0" (it comes out of the partitioned solution set) the
> result of ApplyVertexState will be partitioned on "f0.f0" which is
> (accidentially :-D) the join key of the following coGroup function -> no
> partitioning :-)
>

​Great! I totally missed that ;)​



> 3) Adding the two flatMap functions behind the CoGroup prevents chaining
> and causes therefore some serialization overhead but shouldn't be too bad.
>
> So in total I would make this program as follows:
>
> iVertices<K,VV>
> iMessage<K, Message> = iVertices.map(new InitWorkSet());
>
> iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> verticesWithMessage<Vertex, Message> = iteration.getSolutionSet()
>   .join(iteration.workSet())
>   .where(0) // solution set is local and build side
>   .equalTo(0) // workset is shuffled and probe side of hashjoin
> superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> verticesWithMessage.coGroup(edgessWithValue)
>   .where("f0.f0") // vwm is locally forward and sorted
>   .equalTo(0) //  edges are already partitioned and sorted (if cached
> correctly)
>   .with(...) // The coGroup collects all messages in a collection and gives
> it to the ComputeFunction
> delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged into
> solution set
> workSet<K, Message> = superStepComp.flatMap(...) // partitioned for join
> iteration.closeWith(delta, workSet)
>
> So, if I am correct, the program will
> - partition the workset
> - sort the vertices with messages
> - partition the delta
>
> One observation I have is that this program requires that all messages fit
> into memory. Was that also the case before?
>

​I believe not. The plan has one coGroup that produces the messages and a
following coGroup that groups by the messages "target ID" and consumes
them​ in an iterator. That doesn't require them to fit in memory, right?


​I'm also working on a version where the graph is represented as an
adjacency list, instead of two separate datasets of vertices and edges. The
disadvantage is that the graph has to fit in memory, but I think the
advantages are many​. We'll be able to support edge value updates, edge
mutations and different edge access order guarantees. I'll get back to this
thread when I have a working prototype.


>
> Cheers,
> Fabian
>

​Thanks again!

Cheers,
-Vasia.



>
>
> 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <[hidden email]>:
>
> > @Martin: thanks for your input! If you ran into any other issues that I
> > didn't mention, please let us know. Obviously, even with my proposal,
> there
> > are still features we cannot support, e.g. updating edge values and graph
> > mutations. We'll need to re-think the underlying iteration and/or graph
> > representation for those.
> >
> > @Fabian: thanks a lot, no rush :)
> > Let me give you some more information that might make it easier to reason
> > about performance:
> >
> > Currently, in Spargel the SolutionSet (SS) keeps the vertex state and the
> > workset (WS) keeps the active vertices. The iteration is composed of 2
> > coGroups. The first one takes the WS and the edges and produces messages.
> > The second one takes the messages and the SS and produced the new WS and
> > the SS-delta.
> >
> > In my proposal, the SS has the vertex state and the WS has <vertexId,
> > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is more
> > complicated because compute() needs to have two iterators: over the edges
> > and over the messages.
> > First, I join SS and WS to get the active vertices (have received a msg)
> > and their current state. Then I coGroup the result with the edges to
> access
> > the neighbors. Now the main problem is that this coGroup needs to have 2
> > outputs: the new messages and the new vertex value. I couldn't really
> find
> > a nice way to do this, so I'm emitting a Tuple that contains both types
> and
> > I have a flag to separate them later with 2 flatMaps. From the vertex
> > flatMap, I crete the SS-delta and from the messaged flatMap I apply a
> > reduce to group the messages by vertex and send them to the new WS. One
> > optimization would be to expose a combiner here to reduce message size.
> >
> > tl;dr:
> > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> > 2. how can we efficiently emit 2 different types of records from a
> coGroup?
> > 3. does it make any difference if we group/combine the messages before
> > updating the workset or after?
> >
> > Cheers,
> > -Vasia.
> >
> >
> > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> wrote:
> >
> > > I'll try to have a look at the proposal from a performance point of
> view
> > in
> > > the next days.
> > > Please ping me, if I don't follow up this thread.
> > >
> > > Cheers, Fabian
> > >
> > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <[hidden email]>:
> > >
> > > > Hi,
> > > >
> > > > At our group, we also moved several algorithms from Giraph to Gelly
> and
> > > > ran into some confusing issues (first in understanding, second during
> > > > implementation) caused by the conceptional differences you described.
> > > >
> > > > If there are no concrete advantages (performance mainly) in the
> Spargel
> > > > implementation, we would be very happy to see the Gelly API be
> aligned
> > to
> > > > Pregel-like systems.
> > > >
> > > > Your SSSP example speaks for itself. Straightforward, if the reader
> is
> > > > familiar with Pregel/Giraph/...
> > > >
> > > > Best,
> > > > Martin
> > > >
> > > >
> > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > > >
> > > >> Hello squirrels,
> > > >>
> > > >> I want to discuss with you a few concerns I have about our current
> > > >> vertex-centric model implementation, Spargel, now fully subsumed by
> > > Gelly.
> > > >>
> > > >> Spargel is our implementation of Pregel [1], but it violates some
> > > >> fundamental properties of the model, as described in the paper and
> as
> > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself confused
> > both
> > > >> when trying to explain it to current Giraph users and when porting
> my
> > > >> Giraph algorithms to it.
> > > >>
> > > >> More specifically:
> > > >> - in the Pregel model, messages produced in superstep n, are
> received
> > in
> > > >> superstep n+1. In Spargel, they are produced and consumed in the
> same
> > > >> iteration.
> > > >> - in Pregel, vertices are active during a superstep, if they have
> > > received
> > > >> a message in the previous superstep. In Spargel, a vertex is active
> > > during
> > > >> a superstep if it has changed its value.
> > > >>
> > > >> These two differences require a lot of rethinking when porting
> > > >> applications
> > > >> and can easily cause bugs.
> > > >>
> > > >> The most important problem however is that we require the user to
> > split
> > > >> the
> > > >> computation in 2 phases (2 UDFs):
> > > >> - messaging: has access to the vertex state and can produce messages
> > > >> - update: has access to incoming messages and can update the vertex
> > > value
> > > >>
> > > >> Pregel/Giraph only expose one UDF to the user:
> > > >> - compute: has access to both the vertex state and the incoming
> > > messages,
> > > >> can produce messages and update the vertex value.
> > > >>
> > > >> This might not seem like a big deal, but except from forcing the
> user
> > to
> > > >> split their program logic into 2 phases, Spargel also makes some
> > common
> > > >> computation patterns non-intuitive or impossible to write. A very
> > simple
> > > >> example is propagating a message based on its value or sender ID. To
> > do
> > > >> this with Spargel, one has to store all the incoming messages in the
> > > >> vertex
> > > >> value (might be of different type btw) during the messaging phase,
> so
> > > that
> > > >> they can be accessed during the update phase.
> > > >>
> > > >> So, my first question is, when implementing Spargel, were other
> > > >> alternatives considered and maybe rejected in favor of performance
> or
> > > >> because of some other reason? If someone knows, I would love to hear
> > > about
> > > >> them!
> > > >>
> > > >> Second, I wrote a prototype implementation [2] that only exposes one
> > > UDF,
> > > >> compute(), by keeping the vertex state in the solution set and the
> > > >> messages
> > > >> in the workset. This way all previously mentioned limitations go
> away
> > > and
> > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a lot
> > more
> > > >> like Giraph (see [4]).
> > > >>
> > > >> I have not run any experiments yet and the prototype has some ugly
> > > hacks,
> > > >> but if you think any of this makes sense, then I'd be willing to
> > follow
> > > up
> > > >> and try to optimize it. If we see that it performs well, we can
> > consider
> > > >> either replacing Spargel or adding it as an alternative.
> > > >>
> > > >> Thanks for reading this long e-mail and looking forward to your
> input!
> > > >>
> > > >> Cheers,
> > > >> -Vasia.
> > > >>
> > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> > > >> [2]:
> > > >>
> > > >>
> > >
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > > >> [3]:
> > > >>
> > > >>
> > >
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > > >> [4]:
> > > >>
> > > >>
> > >
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > > >>
> > > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Fabian Hueske-2
We can of course inject an optional ReduceFunction (or GroupReduce, or
combinable GroupReduce) to reduce the size of the work set.
I suggested to remove the GroupReduce function, because it did only collect
all messages into a single record by emitting the input iterator which is
quite dangerous. Applying a combinable reduce function is could improve the
performance considerably.

The good news is that it would come "for free" because the necessary
partitioning and sorting can be reused (given the forwardField annotations
are correctly set):
- The partitioning of the reduce can be reused for the join with the
solution set
- The sort of the reduce is preserved by the join with the in-memory
hash-table of the solution set and can be reused for the coGroup.

Best,
Fabian

2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email]>:

> Hi Fabian,
>
> thanks so much for looking into this so quickly :-)
>
> One update I have to make is that I tried running a few experiments with
> this on a 6-node cluster. The current implementation gets stuck at
> "Rebuilding Workset Properties" and never finishes a single iteration.
> Running the plan of one superstep without a delta iteration terminates
> fine. I didn't have access to the cluster today, so I couldn't debug this
> further, but I will do as soon as I have access again.
>
> The rest of my comments are inline:
>
> On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote:
>
> > Hi Vasia,
> >
> > I had a look at your new implementation and have a few ideas for
> > improvements.
> > 1) Sending out the input iterator as you do in the last GroupReduce is
> > quite dangerous and does not give a benefit compared to collecting all
> > elements. Even though it is an iterator, it needs to be completely
> > materialized in-memory whenever the record is touched by Flink or user
> > code.
> > I would propose to skip the reduce step completely and handle all
> messages
> > separates and only collect them in the CoGroup function before giving
> them
> > into the VertexComputeFunction. Be careful, to only do that with
> > objectReuse disabled or take care to properly copy the messages. If you
> > collect the messages in the CoGroup, you don't need the GroupReduce, have
> > smaller records and you can remove the MessageIterator class completely.
> >
>
> ​I see. The idea was to expose to message combiner that user could
> ​implement if the messages are combinable, e.g. min, sum. This is a common
> case and reduces the message load significantly. Is there a way I could do
> something similar before the coGroup?
>
>
>
> > 2) Add this annotation to the AppendVertexState function:
> > @ForwardedFieldsFirst("*->f0"). This indicates that the complete element
> of
> > the first input becomes the first field of the output. Since the input is
> > partitioned on "f0" (it comes out of the partitioned solution set) the
> > result of ApplyVertexState will be partitioned on "f0.f0" which is
> > (accidentially :-D) the join key of the following coGroup function -> no
> > partitioning :-)
> >
>
> ​Great! I totally missed that ;)​
>
>
>
> > 3) Adding the two flatMap functions behind the CoGroup prevents chaining
> > and causes therefore some serialization overhead but shouldn't be too
> bad.
> >
> > So in total I would make this program as follows:
> >
> > iVertices<K,VV>
> > iMessage<K, Message> = iVertices.map(new InitWorkSet());
> >
> > iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet()
> >   .join(iteration.workSet())
> >   .where(0) // solution set is local and build side
> >   .equalTo(0) // workset is shuffled and probe side of hashjoin
> > superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> > verticesWithMessage.coGroup(edgessWithValue)
> >   .where("f0.f0") // vwm is locally forward and sorted
> >   .equalTo(0) //  edges are already partitioned and sorted (if cached
> > correctly)
> >   .with(...) // The coGroup collects all messages in a collection and
> gives
> > it to the ComputeFunction
> > delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged
> into
> > solution set
> > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for join
> > iteration.closeWith(delta, workSet)
> >
> > So, if I am correct, the program will
> > - partition the workset
> > - sort the vertices with messages
> > - partition the delta
> >
> > One observation I have is that this program requires that all messages
> fit
> > into memory. Was that also the case before?
> >
>
> ​I believe not. The plan has one coGroup that produces the messages and a
> following coGroup that groups by the messages "target ID" and consumes
> them​ in an iterator. That doesn't require them to fit in memory, right?
>
>
> ​I'm also working on a version where the graph is represented as an
> adjacency list, instead of two separate datasets of vertices and edges. The
> disadvantage is that the graph has to fit in memory, but I think the
> advantages are many​. We'll be able to support edge value updates, edge
> mutations and different edge access order guarantees. I'll get back to this
> thread when I have a working prototype.
>
>
> >
> > Cheers,
> > Fabian
> >
>
> ​Thanks again!
>
> Cheers,
> -Vasia.
> ​
>
>
> >
> >
> > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <[hidden email]>:
> >
> > > @Martin: thanks for your input! If you ran into any other issues that I
> > > didn't mention, please let us know. Obviously, even with my proposal,
> > there
> > > are still features we cannot support, e.g. updating edge values and
> graph
> > > mutations. We'll need to re-think the underlying iteration and/or graph
> > > representation for those.
> > >
> > > @Fabian: thanks a lot, no rush :)
> > > Let me give you some more information that might make it easier to
> reason
> > > about performance:
> > >
> > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state and
> the
> > > workset (WS) keeps the active vertices. The iteration is composed of 2
> > > coGroups. The first one takes the WS and the edges and produces
> messages.
> > > The second one takes the messages and the SS and produced the new WS
> and
> > > the SS-delta.
> > >
> > > In my proposal, the SS has the vertex state and the WS has <vertexId,
> > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is more
> > > complicated because compute() needs to have two iterators: over the
> edges
> > > and over the messages.
> > > First, I join SS and WS to get the active vertices (have received a
> msg)
> > > and their current state. Then I coGroup the result with the edges to
> > access
> > > the neighbors. Now the main problem is that this coGroup needs to have
> 2
> > > outputs: the new messages and the new vertex value. I couldn't really
> > find
> > > a nice way to do this, so I'm emitting a Tuple that contains both types
> > and
> > > I have a flag to separate them later with 2 flatMaps. From the vertex
> > > flatMap, I crete the SS-delta and from the messaged flatMap I apply a
> > > reduce to group the messages by vertex and send them to the new WS. One
> > > optimization would be to expose a combiner here to reduce message size.
> > >
> > > tl;dr:
> > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> > > 2. how can we efficiently emit 2 different types of records from a
> > coGroup?
> > > 3. does it make any difference if we group/combine the messages before
> > > updating the workset or after?
> > >
> > > Cheers,
> > > -Vasia.
> > >
> > >
> > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]> wrote:
> > >
> > > > I'll try to have a look at the proposal from a performance point of
> > view
> > > in
> > > > the next days.
> > > > Please ping me, if I don't follow up this thread.
> > > >
> > > > Cheers, Fabian
> > > >
> > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <[hidden email]
> >:
> > > >
> > > > > Hi,
> > > > >
> > > > > At our group, we also moved several algorithms from Giraph to Gelly
> > and
> > > > > ran into some confusing issues (first in understanding, second
> during
> > > > > implementation) caused by the conceptional differences you
> described.
> > > > >
> > > > > If there are no concrete advantages (performance mainly) in the
> > Spargel
> > > > > implementation, we would be very happy to see the Gelly API be
> > aligned
> > > to
> > > > > Pregel-like systems.
> > > > >
> > > > > Your SSSP example speaks for itself. Straightforward, if the reader
> > is
> > > > > familiar with Pregel/Giraph/...
> > > > >
> > > > > Best,
> > > > > Martin
> > > > >
> > > > >
> > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > > > >
> > > > >> Hello squirrels,
> > > > >>
> > > > >> I want to discuss with you a few concerns I have about our current
> > > > >> vertex-centric model implementation, Spargel, now fully subsumed
> by
> > > > Gelly.
> > > > >>
> > > > >> Spargel is our implementation of Pregel [1], but it violates some
> > > > >> fundamental properties of the model, as described in the paper and
> > as
> > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself
> confused
> > > both
> > > > >> when trying to explain it to current Giraph users and when porting
> > my
> > > > >> Giraph algorithms to it.
> > > > >>
> > > > >> More specifically:
> > > > >> - in the Pregel model, messages produced in superstep n, are
> > received
> > > in
> > > > >> superstep n+1. In Spargel, they are produced and consumed in the
> > same
> > > > >> iteration.
> > > > >> - in Pregel, vertices are active during a superstep, if they have
> > > > received
> > > > >> a message in the previous superstep. In Spargel, a vertex is
> active
> > > > during
> > > > >> a superstep if it has changed its value.
> > > > >>
> > > > >> These two differences require a lot of rethinking when porting
> > > > >> applications
> > > > >> and can easily cause bugs.
> > > > >>
> > > > >> The most important problem however is that we require the user to
> > > split
> > > > >> the
> > > > >> computation in 2 phases (2 UDFs):
> > > > >> - messaging: has access to the vertex state and can produce
> messages
> > > > >> - update: has access to incoming messages and can update the
> vertex
> > > > value
> > > > >>
> > > > >> Pregel/Giraph only expose one UDF to the user:
> > > > >> - compute: has access to both the vertex state and the incoming
> > > > messages,
> > > > >> can produce messages and update the vertex value.
> > > > >>
> > > > >> This might not seem like a big deal, but except from forcing the
> > user
> > > to
> > > > >> split their program logic into 2 phases, Spargel also makes some
> > > common
> > > > >> computation patterns non-intuitive or impossible to write. A very
> > > simple
> > > > >> example is propagating a message based on its value or sender ID.
> To
> > > do
> > > > >> this with Spargel, one has to store all the incoming messages in
> the
> > > > >> vertex
> > > > >> value (might be of different type btw) during the messaging phase,
> > so
> > > > that
> > > > >> they can be accessed during the update phase.
> > > > >>
> > > > >> So, my first question is, when implementing Spargel, were other
> > > > >> alternatives considered and maybe rejected in favor of performance
> > or
> > > > >> because of some other reason? If someone knows, I would love to
> hear
> > > > about
> > > > >> them!
> > > > >>
> > > > >> Second, I wrote a prototype implementation [2] that only exposes
> one
> > > > UDF,
> > > > >> compute(), by keeping the vertex state in the solution set and the
> > > > >> messages
> > > > >> in the workset. This way all previously mentioned limitations go
> > away
> > > > and
> > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a lot
> > > more
> > > > >> like Giraph (see [4]).
> > > > >>
> > > > >> I have not run any experiments yet and the prototype has some ugly
> > > > hacks,
> > > > >> but if you think any of this makes sense, then I'd be willing to
> > > follow
> > > > up
> > > > >> and try to optimize it. If we see that it performs well, we can
> > > consider
> > > > >> either replacing Spargel or adding it as an alternative.
> > > > >>
> > > > >> Thanks for reading this long e-mail and looking forward to your
> > input!
> > > > >>
> > > > >> Cheers,
> > > > >> -Vasia.
> > > > >>
> > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> > > > >> [2]:
> > > > >>
> > > > >>
> > > >
> > >
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > > > >> [3]:
> > > > >>
> > > > >>
> > > >
> > >
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > > > >> [4]:
> > > > >>
> > > > >>
> > > >
> > >
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > > > >>
> > > > >>
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Stephan Ewen
When creating the original version of Spargel I was pretty much thinking in
GSA terms, more than in Pregel terms. There are some fundamental
differences between Spargel and Pregel. Spargel is in between GAS and
Pregel in some way, that is how I have always thought about it.

The main reason for the form is that it fits the dataflow paradigm easier:

  - If one function emits the new state of the vertex and the messages, it
has two different return types, which means you need a union type and
filer/split type of operation on the result, which also adds overhead. In
the current model, each function has one return type, which makes it easy.

 - The workset is also the feedback channel, which is materialized at the
superstep boundaries, so keeping it small at O(vertices), rather than
O(edges) is a win for performance.

There is no reason to not add a Pregel model, but I would not kill Spargel
for it. It will be tough to get the Pregel variant to the same efficiency.
Unless you want to say, for efficiency, go with GSA, for convenience with
Pregel.

There are some nice things about the Spargel model. The fact that messages
are first generated then consumes makes the generation of initial messages
simpler in many cases, I think. It was always a bit weird to me in Pregel
that you had to check whether you are in superstep one, in which case you
would expect no message, and generate initial value messages.



On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <[hidden email]> wrote:

> We can of course inject an optional ReduceFunction (or GroupReduce, or
> combinable GroupReduce) to reduce the size of the work set.
> I suggested to remove the GroupReduce function, because it did only collect
> all messages into a single record by emitting the input iterator which is
> quite dangerous. Applying a combinable reduce function is could improve the
> performance considerably.
>
> The good news is that it would come "for free" because the necessary
> partitioning and sorting can be reused (given the forwardField annotations
> are correctly set):
> - The partitioning of the reduce can be reused for the join with the
> solution set
> - The sort of the reduce is preserved by the join with the in-memory
> hash-table of the solution set and can be reused for the coGroup.
>
> Best,
> Fabian
>
> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email]>:
>
> > Hi Fabian,
> >
> > thanks so much for looking into this so quickly :-)
> >
> > One update I have to make is that I tried running a few experiments with
> > this on a 6-node cluster. The current implementation gets stuck at
> > "Rebuilding Workset Properties" and never finishes a single iteration.
> > Running the plan of one superstep without a delta iteration terminates
> > fine. I didn't have access to the cluster today, so I couldn't debug this
> > further, but I will do as soon as I have access again.
> >
> > The rest of my comments are inline:
> >
> > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote:
> >
> > > Hi Vasia,
> > >
> > > I had a look at your new implementation and have a few ideas for
> > > improvements.
> > > 1) Sending out the input iterator as you do in the last GroupReduce is
> > > quite dangerous and does not give a benefit compared to collecting all
> > > elements. Even though it is an iterator, it needs to be completely
> > > materialized in-memory whenever the record is touched by Flink or user
> > > code.
> > > I would propose to skip the reduce step completely and handle all
> > messages
> > > separates and only collect them in the CoGroup function before giving
> > them
> > > into the VertexComputeFunction. Be careful, to only do that with
> > > objectReuse disabled or take care to properly copy the messages. If you
> > > collect the messages in the CoGroup, you don't need the GroupReduce,
> have
> > > smaller records and you can remove the MessageIterator class
> completely.
> > >
> >
> > ​I see. The idea was to expose to message combiner that user could
> > ​implement if the messages are combinable, e.g. min, sum. This is a
> common
> > case and reduces the message load significantly. Is there a way I could
> do
> > something similar before the coGroup?
> >
> >
> >
> > > 2) Add this annotation to the AppendVertexState function:
> > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete
> element
> > of
> > > the first input becomes the first field of the output. Since the input
> is
> > > partitioned on "f0" (it comes out of the partitioned solution set) the
> > > result of ApplyVertexState will be partitioned on "f0.f0" which is
> > > (accidentially :-D) the join key of the following coGroup function ->
> no
> > > partitioning :-)
> > >
> >
> > ​Great! I totally missed that ;)​
> >
> >
> >
> > > 3) Adding the two flatMap functions behind the CoGroup prevents
> chaining
> > > and causes therefore some serialization overhead but shouldn't be too
> > bad.
> > >
> > > So in total I would make this program as follows:
> > >
> > > iVertices<K,VV>
> > > iMessage<K, Message> = iVertices.map(new InitWorkSet());
> > >
> > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet()
> > >   .join(iteration.workSet())
> > >   .where(0) // solution set is local and build side
> > >   .equalTo(0) // workset is shuffled and probe side of hashjoin
> > > superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> > > verticesWithMessage.coGroup(edgessWithValue)
> > >   .where("f0.f0") // vwm is locally forward and sorted
> > >   .equalTo(0) //  edges are already partitioned and sorted (if cached
> > > correctly)
> > >   .with(...) // The coGroup collects all messages in a collection and
> > gives
> > > it to the ComputeFunction
> > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged
> > into
> > > solution set
> > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for
> join
> > > iteration.closeWith(delta, workSet)
> > >
> > > So, if I am correct, the program will
> > > - partition the workset
> > > - sort the vertices with messages
> > > - partition the delta
> > >
> > > One observation I have is that this program requires that all messages
> > fit
> > > into memory. Was that also the case before?
> > >
> >
> > ​I believe not. The plan has one coGroup that produces the messages and a
> > following coGroup that groups by the messages "target ID" and consumes
> > them​ in an iterator. That doesn't require them to fit in memory, right?
> >
> >
> > ​I'm also working on a version where the graph is represented as an
> > adjacency list, instead of two separate datasets of vertices and edges.
> The
> > disadvantage is that the graph has to fit in memory, but I think the
> > advantages are many​. We'll be able to support edge value updates, edge
> > mutations and different edge access order guarantees. I'll get back to
> this
> > thread when I have a working prototype.
> >
> >
> > >
> > > Cheers,
> > > Fabian
> > >
> >
> > ​Thanks again!
> >
> > Cheers,
> > -Vasia.
> > ​
> >
> >
> > >
> > >
> > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <[hidden email]
> >:
> > >
> > > > @Martin: thanks for your input! If you ran into any other issues
> that I
> > > > didn't mention, please let us know. Obviously, even with my proposal,
> > > there
> > > > are still features we cannot support, e.g. updating edge values and
> > graph
> > > > mutations. We'll need to re-think the underlying iteration and/or
> graph
> > > > representation for those.
> > > >
> > > > @Fabian: thanks a lot, no rush :)
> > > > Let me give you some more information that might make it easier to
> > reason
> > > > about performance:
> > > >
> > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state and
> > the
> > > > workset (WS) keeps the active vertices. The iteration is composed of
> 2
> > > > coGroups. The first one takes the WS and the edges and produces
> > messages.
> > > > The second one takes the messages and the SS and produced the new WS
> > and
> > > > the SS-delta.
> > > >
> > > > In my proposal, the SS has the vertex state and the WS has <vertexId,
> > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is
> more
> > > > complicated because compute() needs to have two iterators: over the
> > edges
> > > > and over the messages.
> > > > First, I join SS and WS to get the active vertices (have received a
> > msg)
> > > > and their current state. Then I coGroup the result with the edges to
> > > access
> > > > the neighbors. Now the main problem is that this coGroup needs to
> have
> > 2
> > > > outputs: the new messages and the new vertex value. I couldn't really
> > > find
> > > > a nice way to do this, so I'm emitting a Tuple that contains both
> types
> > > and
> > > > I have a flag to separate them later with 2 flatMaps. From the vertex
> > > > flatMap, I crete the SS-delta and from the messaged flatMap I apply a
> > > > reduce to group the messages by vertex and send them to the new WS.
> One
> > > > optimization would be to expose a combiner here to reduce message
> size.
> > > >
> > > > tl;dr:
> > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> > > > 2. how can we efficiently emit 2 different types of records from a
> > > coGroup?
> > > > 3. does it make any difference if we group/combine the messages
> before
> > > > updating the workset or after?
> > > >
> > > > Cheers,
> > > > -Vasia.
> > > >
> > > >
> > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]>
> wrote:
> > > >
> > > > > I'll try to have a look at the proposal from a performance point of
> > > view
> > > > in
> > > > > the next days.
> > > > > Please ping me, if I don't follow up this thread.
> > > > >
> > > > > Cheers, Fabian
> > > > >
> > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
> [hidden email]
> > >:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > At our group, we also moved several algorithms from Giraph to
> Gelly
> > > and
> > > > > > ran into some confusing issues (first in understanding, second
> > during
> > > > > > implementation) caused by the conceptional differences you
> > described.
> > > > > >
> > > > > > If there are no concrete advantages (performance mainly) in the
> > > Spargel
> > > > > > implementation, we would be very happy to see the Gelly API be
> > > aligned
> > > > to
> > > > > > Pregel-like systems.
> > > > > >
> > > > > > Your SSSP example speaks for itself. Straightforward, if the
> reader
> > > is
> > > > > > familiar with Pregel/Giraph/...
> > > > > >
> > > > > > Best,
> > > > > > Martin
> > > > > >
> > > > > >
> > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > > > > >
> > > > > >> Hello squirrels,
> > > > > >>
> > > > > >> I want to discuss with you a few concerns I have about our
> current
> > > > > >> vertex-centric model implementation, Spargel, now fully subsumed
> > by
> > > > > Gelly.
> > > > > >>
> > > > > >> Spargel is our implementation of Pregel [1], but it violates
> some
> > > > > >> fundamental properties of the model, as described in the paper
> and
> > > as
> > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself
> > confused
> > > > both
> > > > > >> when trying to explain it to current Giraph users and when
> porting
> > > my
> > > > > >> Giraph algorithms to it.
> > > > > >>
> > > > > >> More specifically:
> > > > > >> - in the Pregel model, messages produced in superstep n, are
> > > received
> > > > in
> > > > > >> superstep n+1. In Spargel, they are produced and consumed in the
> > > same
> > > > > >> iteration.
> > > > > >> - in Pregel, vertices are active during a superstep, if they
> have
> > > > > received
> > > > > >> a message in the previous superstep. In Spargel, a vertex is
> > active
> > > > > during
> > > > > >> a superstep if it has changed its value.
> > > > > >>
> > > > > >> These two differences require a lot of rethinking when porting
> > > > > >> applications
> > > > > >> and can easily cause bugs.
> > > > > >>
> > > > > >> The most important problem however is that we require the user
> to
> > > > split
> > > > > >> the
> > > > > >> computation in 2 phases (2 UDFs):
> > > > > >> - messaging: has access to the vertex state and can produce
> > messages
> > > > > >> - update: has access to incoming messages and can update the
> > vertex
> > > > > value
> > > > > >>
> > > > > >> Pregel/Giraph only expose one UDF to the user:
> > > > > >> - compute: has access to both the vertex state and the incoming
> > > > > messages,
> > > > > >> can produce messages and update the vertex value.
> > > > > >>
> > > > > >> This might not seem like a big deal, but except from forcing the
> > > user
> > > > to
> > > > > >> split their program logic into 2 phases, Spargel also makes some
> > > > common
> > > > > >> computation patterns non-intuitive or impossible to write. A
> very
> > > > simple
> > > > > >> example is propagating a message based on its value or sender
> ID.
> > To
> > > > do
> > > > > >> this with Spargel, one has to store all the incoming messages in
> > the
> > > > > >> vertex
> > > > > >> value (might be of different type btw) during the messaging
> phase,
> > > so
> > > > > that
> > > > > >> they can be accessed during the update phase.
> > > > > >>
> > > > > >> So, my first question is, when implementing Spargel, were other
> > > > > >> alternatives considered and maybe rejected in favor of
> performance
> > > or
> > > > > >> because of some other reason? If someone knows, I would love to
> > hear
> > > > > about
> > > > > >> them!
> > > > > >>
> > > > > >> Second, I wrote a prototype implementation [2] that only exposes
> > one
> > > > > UDF,
> > > > > >> compute(), by keeping the vertex state in the solution set and
> the
> > > > > >> messages
> > > > > >> in the workset. This way all previously mentioned limitations go
> > > away
> > > > > and
> > > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a
> lot
> > > > more
> > > > > >> like Giraph (see [4]).
> > > > > >>
> > > > > >> I have not run any experiments yet and the prototype has some
> ugly
> > > > > hacks,
> > > > > >> but if you think any of this makes sense, then I'd be willing to
> > > > follow
> > > > > up
> > > > > >> and try to optimize it. If we see that it performs well, we can
> > > > consider
> > > > > >> either replacing Spargel or adding it as an alternative.
> > > > > >>
> > > > > >> Thanks for reading this long e-mail and looking forward to your
> > > input!
> > > > > >>
> > > > > >> Cheers,
> > > > > >> -Vasia.
> > > > > >>
> > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> > > > > >> [2]:
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > > > > >> [3]:
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > > > > >> [4]:
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Stephan Ewen
Actually GAS was not known when we did the iterations work (and Spargel),
but the intuition that led to Spargel is similar then the intuition that
led to GAS.

On Mon, Nov 2, 2015 at 4:35 PM, Stephan Ewen <[hidden email]> wrote:

> When creating the original version of Spargel I was pretty much thinking
> in GSA terms, more than in Pregel terms. There are some fundamental
> differences between Spargel and Pregel. Spargel is in between GAS and
> Pregel in some way, that is how I have always thought about it.
>
> The main reason for the form is that it fits the dataflow paradigm easier:
>
>   - If one function emits the new state of the vertex and the messages, it
> has two different return types, which means you need a union type and
> filer/split type of operation on the result, which also adds overhead. In
> the current model, each function has one return type, which makes it easy.
>
>  - The workset is also the feedback channel, which is materialized at the
> superstep boundaries, so keeping it small at O(vertices), rather than
> O(edges) is a win for performance.
>
> There is no reason to not add a Pregel model, but I would not kill Spargel
> for it. It will be tough to get the Pregel variant to the same efficiency.
> Unless you want to say, for efficiency, go with GSA, for convenience with
> Pregel.
>
> There are some nice things about the Spargel model. The fact that messages
> are first generated then consumes makes the generation of initial messages
> simpler in many cases, I think. It was always a bit weird to me in Pregel
> that you had to check whether you are in superstep one, in which case you
> would expect no message, and generate initial value messages.
>
>
>
> On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <[hidden email]> wrote:
>
>> We can of course inject an optional ReduceFunction (or GroupReduce, or
>> combinable GroupReduce) to reduce the size of the work set.
>> I suggested to remove the GroupReduce function, because it did only
>> collect
>> all messages into a single record by emitting the input iterator which is
>> quite dangerous. Applying a combinable reduce function is could improve
>> the
>> performance considerably.
>>
>> The good news is that it would come "for free" because the necessary
>> partitioning and sorting can be reused (given the forwardField annotations
>> are correctly set):
>> - The partitioning of the reduce can be reused for the join with the
>> solution set
>> - The sort of the reduce is preserved by the join with the in-memory
>> hash-table of the solution set and can be reused for the coGroup.
>>
>> Best,
>> Fabian
>>
>> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email]>:
>>
>> > Hi Fabian,
>> >
>> > thanks so much for looking into this so quickly :-)
>> >
>> > One update I have to make is that I tried running a few experiments with
>> > this on a 6-node cluster. The current implementation gets stuck at
>> > "Rebuilding Workset Properties" and never finishes a single iteration.
>> > Running the plan of one superstep without a delta iteration terminates
>> > fine. I didn't have access to the cluster today, so I couldn't debug
>> this
>> > further, but I will do as soon as I have access again.
>> >
>> > The rest of my comments are inline:
>> >
>> > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote:
>> >
>> > > Hi Vasia,
>> > >
>> > > I had a look at your new implementation and have a few ideas for
>> > > improvements.
>> > > 1) Sending out the input iterator as you do in the last GroupReduce is
>> > > quite dangerous and does not give a benefit compared to collecting all
>> > > elements. Even though it is an iterator, it needs to be completely
>> > > materialized in-memory whenever the record is touched by Flink or user
>> > > code.
>> > > I would propose to skip the reduce step completely and handle all
>> > messages
>> > > separates and only collect them in the CoGroup function before giving
>> > them
>> > > into the VertexComputeFunction. Be careful, to only do that with
>> > > objectReuse disabled or take care to properly copy the messages. If
>> you
>> > > collect the messages in the CoGroup, you don't need the GroupReduce,
>> have
>> > > smaller records and you can remove the MessageIterator class
>> completely.
>> > >
>> >
>> > ​I see. The idea was to expose to message combiner that user could
>> > ​implement if the messages are combinable, e.g. min, sum. This is a
>> common
>> > case and reduces the message load significantly. Is there a way I could
>> do
>> > something similar before the coGroup?
>> >
>> >
>> >
>> > > 2) Add this annotation to the AppendVertexState function:
>> > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete
>> element
>> > of
>> > > the first input becomes the first field of the output. Since the
>> input is
>> > > partitioned on "f0" (it comes out of the partitioned solution set) the
>> > > result of ApplyVertexState will be partitioned on "f0.f0" which is
>> > > (accidentially :-D) the join key of the following coGroup function ->
>> no
>> > > partitioning :-)
>> > >
>> >
>> > ​Great! I totally missed that ;)​
>> >
>> >
>> >
>> > > 3) Adding the two flatMap functions behind the CoGroup prevents
>> chaining
>> > > and causes therefore some serialization overhead but shouldn't be too
>> > bad.
>> > >
>> > > So in total I would make this program as follows:
>> > >
>> > > iVertices<K,VV>
>> > > iMessage<K, Message> = iVertices.map(new InitWorkSet());
>> > >
>> > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
>> > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet()
>> > >   .join(iteration.workSet())
>> > >   .where(0) // solution set is local and build side
>> > >   .equalTo(0) // workset is shuffled and probe side of hashjoin
>> > > superstepComp<Vertex,Tuple2<K, Message>,Bool> =
>> > > verticesWithMessage.coGroup(edgessWithValue)
>> > >   .where("f0.f0") // vwm is locally forward and sorted
>> > >   .equalTo(0) //  edges are already partitioned and sorted (if cached
>> > > correctly)
>> > >   .with(...) // The coGroup collects all messages in a collection and
>> > gives
>> > > it to the ComputeFunction
>> > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged
>> > into
>> > > solution set
>> > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for
>> join
>> > > iteration.closeWith(delta, workSet)
>> > >
>> > > So, if I am correct, the program will
>> > > - partition the workset
>> > > - sort the vertices with messages
>> > > - partition the delta
>> > >
>> > > One observation I have is that this program requires that all messages
>> > fit
>> > > into memory. Was that also the case before?
>> > >
>> >
>> > ​I believe not. The plan has one coGroup that produces the messages and
>> a
>> > following coGroup that groups by the messages "target ID" and consumes
>> > them​ in an iterator. That doesn't require them to fit in memory, right?
>> >
>> >
>> > ​I'm also working on a version where the graph is represented as an
>> > adjacency list, instead of two separate datasets of vertices and edges.
>> The
>> > disadvantage is that the graph has to fit in memory, but I think the
>> > advantages are many​. We'll be able to support edge value updates, edge
>> > mutations and different edge access order guarantees. I'll get back to
>> this
>> > thread when I have a working prototype.
>> >
>> >
>> > >
>> > > Cheers,
>> > > Fabian
>> > >
>> >
>> > ​Thanks again!
>> >
>> > Cheers,
>> > -Vasia.
>> > ​
>> >
>> >
>> > >
>> > >
>> > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <
>> [hidden email]>:
>> > >
>> > > > @Martin: thanks for your input! If you ran into any other issues
>> that I
>> > > > didn't mention, please let us know. Obviously, even with my
>> proposal,
>> > > there
>> > > > are still features we cannot support, e.g. updating edge values and
>> > graph
>> > > > mutations. We'll need to re-think the underlying iteration and/or
>> graph
>> > > > representation for those.
>> > > >
>> > > > @Fabian: thanks a lot, no rush :)
>> > > > Let me give you some more information that might make it easier to
>> > reason
>> > > > about performance:
>> > > >
>> > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state
>> and
>> > the
>> > > > workset (WS) keeps the active vertices. The iteration is composed
>> of 2
>> > > > coGroups. The first one takes the WS and the edges and produces
>> > messages.
>> > > > The second one takes the messages and the SS and produced the new WS
>> > and
>> > > > the SS-delta.
>> > > >
>> > > > In my proposal, the SS has the vertex state and the WS has
>> <vertexId,
>> > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is
>> more
>> > > > complicated because compute() needs to have two iterators: over the
>> > edges
>> > > > and over the messages.
>> > > > First, I join SS and WS to get the active vertices (have received a
>> > msg)
>> > > > and their current state. Then I coGroup the result with the edges to
>> > > access
>> > > > the neighbors. Now the main problem is that this coGroup needs to
>> have
>> > 2
>> > > > outputs: the new messages and the new vertex value. I couldn't
>> really
>> > > find
>> > > > a nice way to do this, so I'm emitting a Tuple that contains both
>> types
>> > > and
>> > > > I have a flag to separate them later with 2 flatMaps. From the
>> vertex
>> > > > flatMap, I crete the SS-delta and from the messaged flatMap I apply
>> a
>> > > > reduce to group the messages by vertex and send them to the new WS.
>> One
>> > > > optimization would be to expose a combiner here to reduce message
>> size.
>> > > >
>> > > > tl;dr:
>> > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
>> > > > 2. how can we efficiently emit 2 different types of records from a
>> > > coGroup?
>> > > > 3. does it make any difference if we group/combine the messages
>> before
>> > > > updating the workset or after?
>> > > >
>> > > > Cheers,
>> > > > -Vasia.
>> > > >
>> > > >
>> > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]>
>> wrote:
>> > > >
>> > > > > I'll try to have a look at the proposal from a performance point
>> of
>> > > view
>> > > > in
>> > > > > the next days.
>> > > > > Please ping me, if I don't follow up this thread.
>> > > > >
>> > > > > Cheers, Fabian
>> > > > >
>> > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
>> [hidden email]
>> > >:
>> > > > >
>> > > > > > Hi,
>> > > > > >
>> > > > > > At our group, we also moved several algorithms from Giraph to
>> Gelly
>> > > and
>> > > > > > ran into some confusing issues (first in understanding, second
>> > during
>> > > > > > implementation) caused by the conceptional differences you
>> > described.
>> > > > > >
>> > > > > > If there are no concrete advantages (performance mainly) in the
>> > > Spargel
>> > > > > > implementation, we would be very happy to see the Gelly API be
>> > > aligned
>> > > > to
>> > > > > > Pregel-like systems.
>> > > > > >
>> > > > > > Your SSSP example speaks for itself. Straightforward, if the
>> reader
>> > > is
>> > > > > > familiar with Pregel/Giraph/...
>> > > > > >
>> > > > > > Best,
>> > > > > > Martin
>> > > > > >
>> > > > > >
>> > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
>> > > > > >
>> > > > > >> Hello squirrels,
>> > > > > >>
>> > > > > >> I want to discuss with you a few concerns I have about our
>> current
>> > > > > >> vertex-centric model implementation, Spargel, now fully
>> subsumed
>> > by
>> > > > > Gelly.
>> > > > > >>
>> > > > > >> Spargel is our implementation of Pregel [1], but it violates
>> some
>> > > > > >> fundamental properties of the model, as described in the paper
>> and
>> > > as
>> > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself
>> > confused
>> > > > both
>> > > > > >> when trying to explain it to current Giraph users and when
>> porting
>> > > my
>> > > > > >> Giraph algorithms to it.
>> > > > > >>
>> > > > > >> More specifically:
>> > > > > >> - in the Pregel model, messages produced in superstep n, are
>> > > received
>> > > > in
>> > > > > >> superstep n+1. In Spargel, they are produced and consumed in
>> the
>> > > same
>> > > > > >> iteration.
>> > > > > >> - in Pregel, vertices are active during a superstep, if they
>> have
>> > > > > received
>> > > > > >> a message in the previous superstep. In Spargel, a vertex is
>> > active
>> > > > > during
>> > > > > >> a superstep if it has changed its value.
>> > > > > >>
>> > > > > >> These two differences require a lot of rethinking when porting
>> > > > > >> applications
>> > > > > >> and can easily cause bugs.
>> > > > > >>
>> > > > > >> The most important problem however is that we require the user
>> to
>> > > > split
>> > > > > >> the
>> > > > > >> computation in 2 phases (2 UDFs):
>> > > > > >> - messaging: has access to the vertex state and can produce
>> > messages
>> > > > > >> - update: has access to incoming messages and can update the
>> > vertex
>> > > > > value
>> > > > > >>
>> > > > > >> Pregel/Giraph only expose one UDF to the user:
>> > > > > >> - compute: has access to both the vertex state and the incoming
>> > > > > messages,
>> > > > > >> can produce messages and update the vertex value.
>> > > > > >>
>> > > > > >> This might not seem like a big deal, but except from forcing
>> the
>> > > user
>> > > > to
>> > > > > >> split their program logic into 2 phases, Spargel also makes
>> some
>> > > > common
>> > > > > >> computation patterns non-intuitive or impossible to write. A
>> very
>> > > > simple
>> > > > > >> example is propagating a message based on its value or sender
>> ID.
>> > To
>> > > > do
>> > > > > >> this with Spargel, one has to store all the incoming messages
>> in
>> > the
>> > > > > >> vertex
>> > > > > >> value (might be of different type btw) during the messaging
>> phase,
>> > > so
>> > > > > that
>> > > > > >> they can be accessed during the update phase.
>> > > > > >>
>> > > > > >> So, my first question is, when implementing Spargel, were other
>> > > > > >> alternatives considered and maybe rejected in favor of
>> performance
>> > > or
>> > > > > >> because of some other reason? If someone knows, I would love to
>> > hear
>> > > > > about
>> > > > > >> them!
>> > > > > >>
>> > > > > >> Second, I wrote a prototype implementation [2] that only
>> exposes
>> > one
>> > > > > UDF,
>> > > > > >> compute(), by keeping the vertex state in the solution set and
>> the
>> > > > > >> messages
>> > > > > >> in the workset. This way all previously mentioned limitations
>> go
>> > > away
>> > > > > and
>> > > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a
>> lot
>> > > > more
>> > > > > >> like Giraph (see [4]).
>> > > > > >>
>> > > > > >> I have not run any experiments yet and the prototype has some
>> ugly
>> > > > > hacks,
>> > > > > >> but if you think any of this makes sense, then I'd be willing
>> to
>> > > > follow
>> > > > > up
>> > > > > >> and try to optimize it. If we see that it performs well, we can
>> > > > consider
>> > > > > >> either replacing Spargel or adding it as an alternative.
>> > > > > >>
>> > > > > >> Thanks for reading this long e-mail and looking forward to your
>> > > input!
>> > > > > >>
>> > > > > >> Cheers,
>> > > > > >> -Vasia.
>> > > > > >>
>> > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
>> > > > > >> [2]:
>> > > > > >>
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
>> > > > > >> [3]:
>> > > > > >>
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
>> > > > > >> [4]:
>> > > > > >>
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
>> > > > > >>
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Martin Neumann-2
I tried out Spargel during my work with Spotify and have implemented
several algorithms using it. In all implementations I ended up storing
additional Data and Flags on the Vertex to carry them over from one UDF to
the next one. It definitely makes the code harder to write and maintain.

I wonder how much overhead these additional constructs cost in computation
and memory consumption. Maybe going for a less optimized 1 UDF version will
be not so much of a performance hit for most applications.



On Tue, Nov 3, 2015 at 8:43 AM, Stephan Ewen <[hidden email]> wrote:

> Actually GAS was not known when we did the iterations work (and Spargel),
> but the intuition that led to Spargel is similar then the intuition that
> led to GAS.
>
> On Mon, Nov 2, 2015 at 4:35 PM, Stephan Ewen <[hidden email]> wrote:
>
> > When creating the original version of Spargel I was pretty much thinking
> > in GSA terms, more than in Pregel terms. There are some fundamental
> > differences between Spargel and Pregel. Spargel is in between GAS and
> > Pregel in some way, that is how I have always thought about it.
> >
> > The main reason for the form is that it fits the dataflow paradigm
> easier:
> >
> >   - If one function emits the new state of the vertex and the messages,
> it
> > has two different return types, which means you need a union type and
> > filer/split type of operation on the result, which also adds overhead. In
> > the current model, each function has one return type, which makes it
> easy.
> >
> >  - The workset is also the feedback channel, which is materialized at the
> > superstep boundaries, so keeping it small at O(vertices), rather than
> > O(edges) is a win for performance.
> >
> > There is no reason to not add a Pregel model, but I would not kill
> Spargel
> > for it. It will be tough to get the Pregel variant to the same
> efficiency.
> > Unless you want to say, for efficiency, go with GSA, for convenience with
> > Pregel.
> >
> > There are some nice things about the Spargel model. The fact that
> messages
> > are first generated then consumes makes the generation of initial
> messages
> > simpler in many cases, I think. It was always a bit weird to me in Pregel
> > that you had to check whether you are in superstep one, in which case you
> > would expect no message, and generate initial value messages.
> >
> >
> >
> > On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <[hidden email]>
> wrote:
> >
> >> We can of course inject an optional ReduceFunction (or GroupReduce, or
> >> combinable GroupReduce) to reduce the size of the work set.
> >> I suggested to remove the GroupReduce function, because it did only
> >> collect
> >> all messages into a single record by emitting the input iterator which
> is
> >> quite dangerous. Applying a combinable reduce function is could improve
> >> the
> >> performance considerably.
> >>
> >> The good news is that it would come "for free" because the necessary
> >> partitioning and sorting can be reused (given the forwardField
> annotations
> >> are correctly set):
> >> - The partitioning of the reduce can be reused for the join with the
> >> solution set
> >> - The sort of the reduce is preserved by the join with the in-memory
> >> hash-table of the solution set and can be reused for the coGroup.
> >>
> >> Best,
> >> Fabian
> >>
> >> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email]
> >:
> >>
> >> > Hi Fabian,
> >> >
> >> > thanks so much for looking into this so quickly :-)
> >> >
> >> > One update I have to make is that I tried running a few experiments
> with
> >> > this on a 6-node cluster. The current implementation gets stuck at
> >> > "Rebuilding Workset Properties" and never finishes a single iteration.
> >> > Running the plan of one superstep without a delta iteration terminates
> >> > fine. I didn't have access to the cluster today, so I couldn't debug
> >> this
> >> > further, but I will do as soon as I have access again.
> >> >
> >> > The rest of my comments are inline:
> >> >
> >> > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote:
> >> >
> >> > > Hi Vasia,
> >> > >
> >> > > I had a look at your new implementation and have a few ideas for
> >> > > improvements.
> >> > > 1) Sending out the input iterator as you do in the last GroupReduce
> is
> >> > > quite dangerous and does not give a benefit compared to collecting
> all
> >> > > elements. Even though it is an iterator, it needs to be completely
> >> > > materialized in-memory whenever the record is touched by Flink or
> user
> >> > > code.
> >> > > I would propose to skip the reduce step completely and handle all
> >> > messages
> >> > > separates and only collect them in the CoGroup function before
> giving
> >> > them
> >> > > into the VertexComputeFunction. Be careful, to only do that with
> >> > > objectReuse disabled or take care to properly copy the messages. If
> >> you
> >> > > collect the messages in the CoGroup, you don't need the GroupReduce,
> >> have
> >> > > smaller records and you can remove the MessageIterator class
> >> completely.
> >> > >
> >> >
> >> > ​I see. The idea was to expose to message combiner that user could
> >> > ​implement if the messages are combinable, e.g. min, sum. This is a
> >> common
> >> > case and reduces the message load significantly. Is there a way I
> could
> >> do
> >> > something similar before the coGroup?
> >> >
> >> >
> >> >
> >> > > 2) Add this annotation to the AppendVertexState function:
> >> > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete
> >> element
> >> > of
> >> > > the first input becomes the first field of the output. Since the
> >> input is
> >> > > partitioned on "f0" (it comes out of the partitioned solution set)
> the
> >> > > result of ApplyVertexState will be partitioned on "f0.f0" which is
> >> > > (accidentially :-D) the join key of the following coGroup function
> ->
> >> no
> >> > > partitioning :-)
> >> > >
> >> >
> >> > ​Great! I totally missed that ;)​
> >> >
> >> >
> >> >
> >> > > 3) Adding the two flatMap functions behind the CoGroup prevents
> >> chaining
> >> > > and causes therefore some serialization overhead but shouldn't be
> too
> >> > bad.
> >> > >
> >> > > So in total I would make this program as follows:
> >> > >
> >> > > iVertices<K,VV>
> >> > > iMessage<K, Message> = iVertices.map(new InitWorkSet());
> >> > >
> >> > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> >> > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet()
> >> > >   .join(iteration.workSet())
> >> > >   .where(0) // solution set is local and build side
> >> > >   .equalTo(0) // workset is shuffled and probe side of hashjoin
> >> > > superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> >> > > verticesWithMessage.coGroup(edgessWithValue)
> >> > >   .where("f0.f0") // vwm is locally forward and sorted
> >> > >   .equalTo(0) //  edges are already partitioned and sorted (if
> cached
> >> > > correctly)
> >> > >   .with(...) // The coGroup collects all messages in a collection
> and
> >> > gives
> >> > > it to the ComputeFunction
> >> > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when
> merged
> >> > into
> >> > > solution set
> >> > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for
> >> join
> >> > > iteration.closeWith(delta, workSet)
> >> > >
> >> > > So, if I am correct, the program will
> >> > > - partition the workset
> >> > > - sort the vertices with messages
> >> > > - partition the delta
> >> > >
> >> > > One observation I have is that this program requires that all
> messages
> >> > fit
> >> > > into memory. Was that also the case before?
> >> > >
> >> >
> >> > ​I believe not. The plan has one coGroup that produces the messages
> and
> >> a
> >> > following coGroup that groups by the messages "target ID" and consumes
> >> > them​ in an iterator. That doesn't require them to fit in memory,
> right?
> >> >
> >> >
> >> > ​I'm also working on a version where the graph is represented as an
> >> > adjacency list, instead of two separate datasets of vertices and
> edges.
> >> The
> >> > disadvantage is that the graph has to fit in memory, but I think the
> >> > advantages are many​. We'll be able to support edge value updates,
> edge
> >> > mutations and different edge access order guarantees. I'll get back to
> >> this
> >> > thread when I have a working prototype.
> >> >
> >> >
> >> > >
> >> > > Cheers,
> >> > > Fabian
> >> > >
> >> >
> >> > ​Thanks again!
> >> >
> >> > Cheers,
> >> > -Vasia.
> >> > ​
> >> >
> >> >
> >> > >
> >> > >
> >> > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <
> >> [hidden email]>:
> >> > >
> >> > > > @Martin: thanks for your input! If you ran into any other issues
> >> that I
> >> > > > didn't mention, please let us know. Obviously, even with my
> >> proposal,
> >> > > there
> >> > > > are still features we cannot support, e.g. updating edge values
> and
> >> > graph
> >> > > > mutations. We'll need to re-think the underlying iteration and/or
> >> graph
> >> > > > representation for those.
> >> > > >
> >> > > > @Fabian: thanks a lot, no rush :)
> >> > > > Let me give you some more information that might make it easier to
> >> > reason
> >> > > > about performance:
> >> > > >
> >> > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state
> >> and
> >> > the
> >> > > > workset (WS) keeps the active vertices. The iteration is composed
> >> of 2
> >> > > > coGroups. The first one takes the WS and the edges and produces
> >> > messages.
> >> > > > The second one takes the messages and the SS and produced the new
> WS
> >> > and
> >> > > > the SS-delta.
> >> > > >
> >> > > > In my proposal, the SS has the vertex state and the WS has
> >> <vertexId,
> >> > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is
> >> more
> >> > > > complicated because compute() needs to have two iterators: over
> the
> >> > edges
> >> > > > and over the messages.
> >> > > > First, I join SS and WS to get the active vertices (have received
> a
> >> > msg)
> >> > > > and their current state. Then I coGroup the result with the edges
> to
> >> > > access
> >> > > > the neighbors. Now the main problem is that this coGroup needs to
> >> have
> >> > 2
> >> > > > outputs: the new messages and the new vertex value. I couldn't
> >> really
> >> > > find
> >> > > > a nice way to do this, so I'm emitting a Tuple that contains both
> >> types
> >> > > and
> >> > > > I have a flag to separate them later with 2 flatMaps. From the
> >> vertex
> >> > > > flatMap, I crete the SS-delta and from the messaged flatMap I
> apply
> >> a
> >> > > > reduce to group the messages by vertex and send them to the new
> WS.
> >> One
> >> > > > optimization would be to expose a combiner here to reduce message
> >> size.
> >> > > >
> >> > > > tl;dr:
> >> > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> >> > > > 2. how can we efficiently emit 2 different types of records from a
> >> > > coGroup?
> >> > > > 3. does it make any difference if we group/combine the messages
> >> before
> >> > > > updating the workset or after?
> >> > > >
> >> > > > Cheers,
> >> > > > -Vasia.
> >> > > >
> >> > > >
> >> > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]>
> >> wrote:
> >> > > >
> >> > > > > I'll try to have a look at the proposal from a performance point
> >> of
> >> > > view
> >> > > > in
> >> > > > > the next days.
> >> > > > > Please ping me, if I don't follow up this thread.
> >> > > > >
> >> > > > > Cheers, Fabian
> >> > > > >
> >> > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
> >> [hidden email]
> >> > >:
> >> > > > >
> >> > > > > > Hi,
> >> > > > > >
> >> > > > > > At our group, we also moved several algorithms from Giraph to
> >> Gelly
> >> > > and
> >> > > > > > ran into some confusing issues (first in understanding, second
> >> > during
> >> > > > > > implementation) caused by the conceptional differences you
> >> > described.
> >> > > > > >
> >> > > > > > If there are no concrete advantages (performance mainly) in
> the
> >> > > Spargel
> >> > > > > > implementation, we would be very happy to see the Gelly API be
> >> > > aligned
> >> > > > to
> >> > > > > > Pregel-like systems.
> >> > > > > >
> >> > > > > > Your SSSP example speaks for itself. Straightforward, if the
> >> reader
> >> > > is
> >> > > > > > familiar with Pregel/Giraph/...
> >> > > > > >
> >> > > > > > Best,
> >> > > > > > Martin
> >> > > > > >
> >> > > > > >
> >> > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> >> > > > > >
> >> > > > > >> Hello squirrels,
> >> > > > > >>
> >> > > > > >> I want to discuss with you a few concerns I have about our
> >> current
> >> > > > > >> vertex-centric model implementation, Spargel, now fully
> >> subsumed
> >> > by
> >> > > > > Gelly.
> >> > > > > >>
> >> > > > > >> Spargel is our implementation of Pregel [1], but it violates
> >> some
> >> > > > > >> fundamental properties of the model, as described in the
> paper
> >> and
> >> > > as
> >> > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself
> >> > confused
> >> > > > both
> >> > > > > >> when trying to explain it to current Giraph users and when
> >> porting
> >> > > my
> >> > > > > >> Giraph algorithms to it.
> >> > > > > >>
> >> > > > > >> More specifically:
> >> > > > > >> - in the Pregel model, messages produced in superstep n, are
> >> > > received
> >> > > > in
> >> > > > > >> superstep n+1. In Spargel, they are produced and consumed in
> >> the
> >> > > same
> >> > > > > >> iteration.
> >> > > > > >> - in Pregel, vertices are active during a superstep, if they
> >> have
> >> > > > > received
> >> > > > > >> a message in the previous superstep. In Spargel, a vertex is
> >> > active
> >> > > > > during
> >> > > > > >> a superstep if it has changed its value.
> >> > > > > >>
> >> > > > > >> These two differences require a lot of rethinking when
> porting
> >> > > > > >> applications
> >> > > > > >> and can easily cause bugs.
> >> > > > > >>
> >> > > > > >> The most important problem however is that we require the
> user
> >> to
> >> > > > split
> >> > > > > >> the
> >> > > > > >> computation in 2 phases (2 UDFs):
> >> > > > > >> - messaging: has access to the vertex state and can produce
> >> > messages
> >> > > > > >> - update: has access to incoming messages and can update the
> >> > vertex
> >> > > > > value
> >> > > > > >>
> >> > > > > >> Pregel/Giraph only expose one UDF to the user:
> >> > > > > >> - compute: has access to both the vertex state and the
> incoming
> >> > > > > messages,
> >> > > > > >> can produce messages and update the vertex value.
> >> > > > > >>
> >> > > > > >> This might not seem like a big deal, but except from forcing
> >> the
> >> > > user
> >> > > > to
> >> > > > > >> split their program logic into 2 phases, Spargel also makes
> >> some
> >> > > > common
> >> > > > > >> computation patterns non-intuitive or impossible to write. A
> >> very
> >> > > > simple
> >> > > > > >> example is propagating a message based on its value or sender
> >> ID.
> >> > To
> >> > > > do
> >> > > > > >> this with Spargel, one has to store all the incoming messages
> >> in
> >> > the
> >> > > > > >> vertex
> >> > > > > >> value (might be of different type btw) during the messaging
> >> phase,
> >> > > so
> >> > > > > that
> >> > > > > >> they can be accessed during the update phase.
> >> > > > > >>
> >> > > > > >> So, my first question is, when implementing Spargel, were
> other
> >> > > > > >> alternatives considered and maybe rejected in favor of
> >> performance
> >> > > or
> >> > > > > >> because of some other reason? If someone knows, I would love
> to
> >> > hear
> >> > > > > about
> >> > > > > >> them!
> >> > > > > >>
> >> > > > > >> Second, I wrote a prototype implementation [2] that only
> >> exposes
> >> > one
> >> > > > > UDF,
> >> > > > > >> compute(), by keeping the vertex state in the solution set
> and
> >> the
> >> > > > > >> messages
> >> > > > > >> in the workset. This way all previously mentioned limitations
> >> go
> >> > > away
> >> > > > > and
> >> > > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks
> a
> >> lot
> >> > > > more
> >> > > > > >> like Giraph (see [4]).
> >> > > > > >>
> >> > > > > >> I have not run any experiments yet and the prototype has some
> >> ugly
> >> > > > > hacks,
> >> > > > > >> but if you think any of this makes sense, then I'd be willing
> >> to
> >> > > > follow
> >> > > > > up
> >> > > > > >> and try to optimize it. If we see that it performs well, we
> can
> >> > > > consider
> >> > > > > >> either replacing Spargel or adding it as an alternative.
> >> > > > > >>
> >> > > > > >> Thanks for reading this long e-mail and looking forward to
> your
> >> > > input!
> >> > > > > >>
> >> > > > > >> Cheers,
> >> > > > > >> -Vasia.
> >> > > > > >>
> >> > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> >> > > > > >> [2]:
> >> > > > > >>
> >> > > > > >>
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> >> > > > > >> [3]:
> >> > > > > >>
> >> > > > > >>
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> >> > > > > >> [4]:
> >> > > > > >>
> >> > > > > >>
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> >> > > > > >>
> >> > > > > >>
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Vasiliki Kalavri
Thanks for the detailed explanation Stephan!
Seeing Spargel as a Gather-Scatter model makes much more sense :)

I think we should be more careful not to present it as a "Pregel
equivalent" to avoid confusion of users coming from systems like Giraph.
Maybe I could put together a comparison table Pregel-Spargel-GSA to make
clear what each model can support and what are the (dis-)advantages.

I wouldn't kill Spargel either, but I would like to add to Gelly a more
generic iteration model that could support algorithms beyond simplistic
graph propagation. Pregel could be a start, even though it also has its
issues.

Having to return 2 different types and the size of the workset are the two
problems I also faced, when trying to map Pregel to a Flink plan. Also, I
am mostly convinced that representing the Graph as two separate datasets of
vertices and edges is not optimal for the Pregel model. I'll share my
findings as soon as I have an adjacency list version running.

Thanks again!

Cheers,
-Vasia.

On 3 November 2015 at 12:59, Martin Neumann <[hidden email]> wrote:

> I tried out Spargel during my work with Spotify and have implemented
> several algorithms using it. In all implementations I ended up storing
> additional Data and Flags on the Vertex to carry them over from one UDF to
> the next one. It definitely makes the code harder to write and maintain.
>
> I wonder how much overhead these additional constructs cost in computation
> and memory consumption. Maybe going for a less optimized 1 UDF version will
> be not so much of a performance hit for most applications.
>
>
>
> On Tue, Nov 3, 2015 at 8:43 AM, Stephan Ewen <[hidden email]> wrote:
>
> > Actually GAS was not known when we did the iterations work (and Spargel),
> > but the intuition that led to Spargel is similar then the intuition that
> > led to GAS.
> >
> > On Mon, Nov 2, 2015 at 4:35 PM, Stephan Ewen <[hidden email]> wrote:
> >
> > > When creating the original version of Spargel I was pretty much
> thinking
> > > in GSA terms, more than in Pregel terms. There are some fundamental
> > > differences between Spargel and Pregel. Spargel is in between GAS and
> > > Pregel in some way, that is how I have always thought about it.
> > >
> > > The main reason for the form is that it fits the dataflow paradigm
> > easier:
> > >
> > >   - If one function emits the new state of the vertex and the messages,
> > it
> > > has two different return types, which means you need a union type and
> > > filer/split type of operation on the result, which also adds overhead.
> In
> > > the current model, each function has one return type, which makes it
> > easy.
> > >
> > >  - The workset is also the feedback channel, which is materialized at
> the
> > > superstep boundaries, so keeping it small at O(vertices), rather than
> > > O(edges) is a win for performance.
> > >
> > > There is no reason to not add a Pregel model, but I would not kill
> > Spargel
> > > for it. It will be tough to get the Pregel variant to the same
> > efficiency.
> > > Unless you want to say, for efficiency, go with GSA, for convenience
> with
> > > Pregel.
> > >
> > > There are some nice things about the Spargel model. The fact that
> > messages
> > > are first generated then consumes makes the generation of initial
> > messages
> > > simpler in many cases, I think. It was always a bit weird to me in
> Pregel
> > > that you had to check whether you are in superstep one, in which case
> you
> > > would expect no message, and generate initial value messages.
> > >
> > >
> > >
> > > On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <[hidden email]>
> > wrote:
> > >
> > >> We can of course inject an optional ReduceFunction (or GroupReduce, or
> > >> combinable GroupReduce) to reduce the size of the work set.
> > >> I suggested to remove the GroupReduce function, because it did only
> > >> collect
> > >> all messages into a single record by emitting the input iterator which
> > is
> > >> quite dangerous. Applying a combinable reduce function is could
> improve
> > >> the
> > >> performance considerably.
> > >>
> > >> The good news is that it would come "for free" because the necessary
> > >> partitioning and sorting can be reused (given the forwardField
> > annotations
> > >> are correctly set):
> > >> - The partitioning of the reduce can be reused for the join with the
> > >> solution set
> > >> - The sort of the reduce is preserved by the join with the in-memory
> > >> hash-table of the solution set and can be reused for the coGroup.
> > >>
> > >> Best,
> > >> Fabian
> > >>
> > >> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <
> [hidden email]
> > >:
> > >>
> > >> > Hi Fabian,
> > >> >
> > >> > thanks so much for looking into this so quickly :-)
> > >> >
> > >> > One update I have to make is that I tried running a few experiments
> > with
> > >> > this on a 6-node cluster. The current implementation gets stuck at
> > >> > "Rebuilding Workset Properties" and never finishes a single
> iteration.
> > >> > Running the plan of one superstep without a delta iteration
> terminates
> > >> > fine. I didn't have access to the cluster today, so I couldn't debug
> > >> this
> > >> > further, but I will do as soon as I have access again.
> > >> >
> > >> > The rest of my comments are inline:
> > >> >
> > >> > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]>
> wrote:
> > >> >
> > >> > > Hi Vasia,
> > >> > >
> > >> > > I had a look at your new implementation and have a few ideas for
> > >> > > improvements.
> > >> > > 1) Sending out the input iterator as you do in the last
> GroupReduce
> > is
> > >> > > quite dangerous and does not give a benefit compared to collecting
> > all
> > >> > > elements. Even though it is an iterator, it needs to be completely
> > >> > > materialized in-memory whenever the record is touched by Flink or
> > user
> > >> > > code.
> > >> > > I would propose to skip the reduce step completely and handle all
> > >> > messages
> > >> > > separates and only collect them in the CoGroup function before
> > giving
> > >> > them
> > >> > > into the VertexComputeFunction. Be careful, to only do that with
> > >> > > objectReuse disabled or take care to properly copy the messages.
> If
> > >> you
> > >> > > collect the messages in the CoGroup, you don't need the
> GroupReduce,
> > >> have
> > >> > > smaller records and you can remove the MessageIterator class
> > >> completely.
> > >> > >
> > >> >
> > >> > ​I see. The idea was to expose to message combiner that user could
> > >> > ​implement if the messages are combinable, e.g. min, sum. This is a
> > >> common
> > >> > case and reduces the message load significantly. Is there a way I
> > could
> > >> do
> > >> > something similar before the coGroup?
> > >> >
> > >> >
> > >> >
> > >> > > 2) Add this annotation to the AppendVertexState function:
> > >> > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete
> > >> element
> > >> > of
> > >> > > the first input becomes the first field of the output. Since the
> > >> input is
> > >> > > partitioned on "f0" (it comes out of the partitioned solution set)
> > the
> > >> > > result of ApplyVertexState will be partitioned on "f0.f0" which is
> > >> > > (accidentially :-D) the join key of the following coGroup function
> > ->
> > >> no
> > >> > > partitioning :-)
> > >> > >
> > >> >
> > >> > ​Great! I totally missed that ;)​
> > >> >
> > >> >
> > >> >
> > >> > > 3) Adding the two flatMap functions behind the CoGroup prevents
> > >> chaining
> > >> > > and causes therefore some serialization overhead but shouldn't be
> > too
> > >> > bad.
> > >> > >
> > >> > > So in total I would make this program as follows:
> > >> > >
> > >> > > iVertices<K,VV>
> > >> > > iMessage<K, Message> = iVertices.map(new InitWorkSet());
> > >> > >
> > >> > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> > >> > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet()
> > >> > >   .join(iteration.workSet())
> > >> > >   .where(0) // solution set is local and build side
> > >> > >   .equalTo(0) // workset is shuffled and probe side of hashjoin
> > >> > > superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> > >> > > verticesWithMessage.coGroup(edgessWithValue)
> > >> > >   .where("f0.f0") // vwm is locally forward and sorted
> > >> > >   .equalTo(0) //  edges are already partitioned and sorted (if
> > cached
> > >> > > correctly)
> > >> > >   .with(...) // The coGroup collects all messages in a collection
> > and
> > >> > gives
> > >> > > it to the ComputeFunction
> > >> > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when
> > merged
> > >> > into
> > >> > > solution set
> > >> > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned
> for
> > >> join
> > >> > > iteration.closeWith(delta, workSet)
> > >> > >
> > >> > > So, if I am correct, the program will
> > >> > > - partition the workset
> > >> > > - sort the vertices with messages
> > >> > > - partition the delta
> > >> > >
> > >> > > One observation I have is that this program requires that all
> > messages
> > >> > fit
> > >> > > into memory. Was that also the case before?
> > >> > >
> > >> >
> > >> > ​I believe not. The plan has one coGroup that produces the messages
> > and
> > >> a
> > >> > following coGroup that groups by the messages "target ID" and
> consumes
> > >> > them​ in an iterator. That doesn't require them to fit in memory,
> > right?
> > >> >
> > >> >
> > >> > ​I'm also working on a version where the graph is represented as an
> > >> > adjacency list, instead of two separate datasets of vertices and
> > edges.
> > >> The
> > >> > disadvantage is that the graph has to fit in memory, but I think the
> > >> > advantages are many​. We'll be able to support edge value updates,
> > edge
> > >> > mutations and different edge access order guarantees. I'll get back
> to
> > >> this
> > >> > thread when I have a working prototype.
> > >> >
> > >> >
> > >> > >
> > >> > > Cheers,
> > >> > > Fabian
> > >> > >
> > >> >
> > >> > ​Thanks again!
> > >> >
> > >> > Cheers,
> > >> > -Vasia.
> > >> > ​
> > >> >
> > >> >
> > >> > >
> > >> > >
> > >> > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <
> > >> [hidden email]>:
> > >> > >
> > >> > > > @Martin: thanks for your input! If you ran into any other issues
> > >> that I
> > >> > > > didn't mention, please let us know. Obviously, even with my
> > >> proposal,
> > >> > > there
> > >> > > > are still features we cannot support, e.g. updating edge values
> > and
> > >> > graph
> > >> > > > mutations. We'll need to re-think the underlying iteration
> and/or
> > >> graph
> > >> > > > representation for those.
> > >> > > >
> > >> > > > @Fabian: thanks a lot, no rush :)
> > >> > > > Let me give you some more information that might make it easier
> to
> > >> > reason
> > >> > > > about performance:
> > >> > > >
> > >> > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex
> state
> > >> and
> > >> > the
> > >> > > > workset (WS) keeps the active vertices. The iteration is
> composed
> > >> of 2
> > >> > > > coGroups. The first one takes the WS and the edges and produces
> > >> > messages.
> > >> > > > The second one takes the messages and the SS and produced the
> new
> > WS
> > >> > and
> > >> > > > the SS-delta.
> > >> > > >
> > >> > > > In my proposal, the SS has the vertex state and the WS has
> > >> <vertexId,
> > >> > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan
> is
> > >> more
> > >> > > > complicated because compute() needs to have two iterators: over
> > the
> > >> > edges
> > >> > > > and over the messages.
> > >> > > > First, I join SS and WS to get the active vertices (have
> received
> > a
> > >> > msg)
> > >> > > > and their current state. Then I coGroup the result with the
> edges
> > to
> > >> > > access
> > >> > > > the neighbors. Now the main problem is that this coGroup needs
> to
> > >> have
> > >> > 2
> > >> > > > outputs: the new messages and the new vertex value. I couldn't
> > >> really
> > >> > > find
> > >> > > > a nice way to do this, so I'm emitting a Tuple that contains
> both
> > >> types
> > >> > > and
> > >> > > > I have a flag to separate them later with 2 flatMaps. From the
> > >> vertex
> > >> > > > flatMap, I crete the SS-delta and from the messaged flatMap I
> > apply
> > >> a
> > >> > > > reduce to group the messages by vertex and send them to the new
> > WS.
> > >> One
> > >> > > > optimization would be to expose a combiner here to reduce
> message
> > >> size.
> > >> > > >
> > >> > > > tl;dr:
> > >> > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> > >> > > > 2. how can we efficiently emit 2 different types of records
> from a
> > >> > > coGroup?
> > >> > > > 3. does it make any difference if we group/combine the messages
> > >> before
> > >> > > > updating the workset or after?
> > >> > > >
> > >> > > > Cheers,
> > >> > > > -Vasia.
> > >> > > >
> > >> > > >
> > >> > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]>
> > >> wrote:
> > >> > > >
> > >> > > > > I'll try to have a look at the proposal from a performance
> point
> > >> of
> > >> > > view
> > >> > > > in
> > >> > > > > the next days.
> > >> > > > > Please ping me, if I don't follow up this thread.
> > >> > > > >
> > >> > > > > Cheers, Fabian
> > >> > > > >
> > >> > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
> > >> [hidden email]
> > >> > >:
> > >> > > > >
> > >> > > > > > Hi,
> > >> > > > > >
> > >> > > > > > At our group, we also moved several algorithms from Giraph
> to
> > >> Gelly
> > >> > > and
> > >> > > > > > ran into some confusing issues (first in understanding,
> second
> > >> > during
> > >> > > > > > implementation) caused by the conceptional differences you
> > >> > described.
> > >> > > > > >
> > >> > > > > > If there are no concrete advantages (performance mainly) in
> > the
> > >> > > Spargel
> > >> > > > > > implementation, we would be very happy to see the Gelly API
> be
> > >> > > aligned
> > >> > > > to
> > >> > > > > > Pregel-like systems.
> > >> > > > > >
> > >> > > > > > Your SSSP example speaks for itself. Straightforward, if the
> > >> reader
> > >> > > is
> > >> > > > > > familiar with Pregel/Giraph/...
> > >> > > > > >
> > >> > > > > > Best,
> > >> > > > > > Martin
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > >> > > > > >
> > >> > > > > >> Hello squirrels,
> > >> > > > > >>
> > >> > > > > >> I want to discuss with you a few concerns I have about our
> > >> current
> > >> > > > > >> vertex-centric model implementation, Spargel, now fully
> > >> subsumed
> > >> > by
> > >> > > > > Gelly.
> > >> > > > > >>
> > >> > > > > >> Spargel is our implementation of Pregel [1], but it
> violates
> > >> some
> > >> > > > > >> fundamental properties of the model, as described in the
> > paper
> > >> and
> > >> > > as
> > >> > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself
> > >> > confused
> > >> > > > both
> > >> > > > > >> when trying to explain it to current Giraph users and when
> > >> porting
> > >> > > my
> > >> > > > > >> Giraph algorithms to it.
> > >> > > > > >>
> > >> > > > > >> More specifically:
> > >> > > > > >> - in the Pregel model, messages produced in superstep n,
> are
> > >> > > received
> > >> > > > in
> > >> > > > > >> superstep n+1. In Spargel, they are produced and consumed
> in
> > >> the
> > >> > > same
> > >> > > > > >> iteration.
> > >> > > > > >> - in Pregel, vertices are active during a superstep, if
> they
> > >> have
> > >> > > > > received
> > >> > > > > >> a message in the previous superstep. In Spargel, a vertex
> is
> > >> > active
> > >> > > > > during
> > >> > > > > >> a superstep if it has changed its value.
> > >> > > > > >>
> > >> > > > > >> These two differences require a lot of rethinking when
> > porting
> > >> > > > > >> applications
> > >> > > > > >> and can easily cause bugs.
> > >> > > > > >>
> > >> > > > > >> The most important problem however is that we require the
> > user
> > >> to
> > >> > > > split
> > >> > > > > >> the
> > >> > > > > >> computation in 2 phases (2 UDFs):
> > >> > > > > >> - messaging: has access to the vertex state and can produce
> > >> > messages
> > >> > > > > >> - update: has access to incoming messages and can update
> the
> > >> > vertex
> > >> > > > > value
> > >> > > > > >>
> > >> > > > > >> Pregel/Giraph only expose one UDF to the user:
> > >> > > > > >> - compute: has access to both the vertex state and the
> > incoming
> > >> > > > > messages,
> > >> > > > > >> can produce messages and update the vertex value.
> > >> > > > > >>
> > >> > > > > >> This might not seem like a big deal, but except from
> forcing
> > >> the
> > >> > > user
> > >> > > > to
> > >> > > > > >> split their program logic into 2 phases, Spargel also makes
> > >> some
> > >> > > > common
> > >> > > > > >> computation patterns non-intuitive or impossible to write.
> A
> > >> very
> > >> > > > simple
> > >> > > > > >> example is propagating a message based on its value or
> sender
> > >> ID.
> > >> > To
> > >> > > > do
> > >> > > > > >> this with Spargel, one has to store all the incoming
> messages
> > >> in
> > >> > the
> > >> > > > > >> vertex
> > >> > > > > >> value (might be of different type btw) during the messaging
> > >> phase,
> > >> > > so
> > >> > > > > that
> > >> > > > > >> they can be accessed during the update phase.
> > >> > > > > >>
> > >> > > > > >> So, my first question is, when implementing Spargel, were
> > other
> > >> > > > > >> alternatives considered and maybe rejected in favor of
> > >> performance
> > >> > > or
> > >> > > > > >> because of some other reason? If someone knows, I would
> love
> > to
> > >> > hear
> > >> > > > > about
> > >> > > > > >> them!
> > >> > > > > >>
> > >> > > > > >> Second, I wrote a prototype implementation [2] that only
> > >> exposes
> > >> > one
> > >> > > > > UDF,
> > >> > > > > >> compute(), by keeping the vertex state in the solution set
> > and
> > >> the
> > >> > > > > >> messages
> > >> > > > > >> in the workset. This way all previously mentioned
> limitations
> > >> go
> > >> > > away
> > >> > > > > and
> > >> > > > > >> the API (see "SSSPComputeFunction" in the example [3])
> looks
> > a
> > >> lot
> > >> > > > more
> > >> > > > > >> like Giraph (see [4]).
> > >> > > > > >>
> > >> > > > > >> I have not run any experiments yet and the prototype has
> some
> > >> ugly
> > >> > > > > hacks,
> > >> > > > > >> but if you think any of this makes sense, then I'd be
> willing
> > >> to
> > >> > > > follow
> > >> > > > > up
> > >> > > > > >> and try to optimize it. If we see that it performs well, we
> > can
> > >> > > > consider
> > >> > > > > >> either replacing Spargel or adding it as an alternative.
> > >> > > > > >>
> > >> > > > > >> Thanks for reading this long e-mail and looking forward to
> > your
> > >> > > input!
> > >> > > > > >>
> > >> > > > > >> Cheers,
> > >> > > > > >> -Vasia.
> > >> > > > > >>
> > >> > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> > >> > > > > >> [2]:
> > >> > > > > >>
> > >> > > > > >>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > >> > > > > >> [3]:
> > >> > > > > >>
> > >> > > > > >>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > >> > > > > >> [4]:
> > >> > > > > >>
> > >> > > > > >>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > >> > > > > >>
> > >> > > > > >>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Andra Lungu
In reply to this post by Stephan Ewen
I also think a Giraph-like model could be added, but we shouldn't remove
Spargel in favour of it!

On Tue, Nov 3, 2015 at 2:35 AM, Stephan Ewen <[hidden email]> wrote:

> When creating the original version of Spargel I was pretty much thinking in
> GSA terms, more than in Pregel terms. There are some fundamental
> differences between Spargel and Pregel. Spargel is in between GAS and
> Pregel in some way, that is how I have always thought about it.
>
> The main reason for the form is that it fits the dataflow paradigm easier:
>
>   - If one function emits the new state of the vertex and the messages, it
> has two different return types, which means you need a union type and
> filer/split type of operation on the result, which also adds overhead. In
> the current model, each function has one return type, which makes it easy.
>
>  - The workset is also the feedback channel, which is materialized at the
> superstep boundaries, so keeping it small at O(vertices), rather than
> O(edges) is a win for performance.
>
> There is no reason to not add a Pregel model, but I would not kill Spargel
> for it. It will be tough to get the Pregel variant to the same efficiency.
> Unless you want to say, for efficiency, go with GSA, for convenience with
> Pregel.
>
> There are some nice things about the Spargel model. The fact that messages
> are first generated then consumes makes the generation of initial messages
> simpler in many cases, I think. It was always a bit weird to me in Pregel
> that you had to check whether you are in superstep one, in which case you
> would expect no message, and generate initial value messages.
>
>
>
> On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <[hidden email]> wrote:
>
> > We can of course inject an optional ReduceFunction (or GroupReduce, or
> > combinable GroupReduce) to reduce the size of the work set.
> > I suggested to remove the GroupReduce function, because it did only
> collect
> > all messages into a single record by emitting the input iterator which is
> > quite dangerous. Applying a combinable reduce function is could improve
> the
> > performance considerably.
> >
> > The good news is that it would come "for free" because the necessary
> > partitioning and sorting can be reused (given the forwardField
> annotations
> > are correctly set):
> > - The partitioning of the reduce can be reused for the join with the
> > solution set
> > - The sort of the reduce is preserved by the join with the in-memory
> > hash-table of the solution set and can be reused for the coGroup.
> >
> > Best,
> > Fabian
> >
> > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email]>:
> >
> > > Hi Fabian,
> > >
> > > thanks so much for looking into this so quickly :-)
> > >
> > > One update I have to make is that I tried running a few experiments
> with
> > > this on a 6-node cluster. The current implementation gets stuck at
> > > "Rebuilding Workset Properties" and never finishes a single iteration.
> > > Running the plan of one superstep without a delta iteration terminates
> > > fine. I didn't have access to the cluster today, so I couldn't debug
> this
> > > further, but I will do as soon as I have access again.
> > >
> > > The rest of my comments are inline:
> > >
> > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote:
> > >
> > > > Hi Vasia,
> > > >
> > > > I had a look at your new implementation and have a few ideas for
> > > > improvements.
> > > > 1) Sending out the input iterator as you do in the last GroupReduce
> is
> > > > quite dangerous and does not give a benefit compared to collecting
> all
> > > > elements. Even though it is an iterator, it needs to be completely
> > > > materialized in-memory whenever the record is touched by Flink or
> user
> > > > code.
> > > > I would propose to skip the reduce step completely and handle all
> > > messages
> > > > separates and only collect them in the CoGroup function before giving
> > > them
> > > > into the VertexComputeFunction. Be careful, to only do that with
> > > > objectReuse disabled or take care to properly copy the messages. If
> you
> > > > collect the messages in the CoGroup, you don't need the GroupReduce,
> > have
> > > > smaller records and you can remove the MessageIterator class
> > completely.
> > > >
> > >
> > > ​I see. The idea was to expose to message combiner that user could
> > > ​implement if the messages are combinable, e.g. min, sum. This is a
> > common
> > > case and reduces the message load significantly. Is there a way I could
> > do
> > > something similar before the coGroup?
> > >
> > >
> > >
> > > > 2) Add this annotation to the AppendVertexState function:
> > > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete
> > element
> > > of
> > > > the first input becomes the first field of the output. Since the
> input
> > is
> > > > partitioned on "f0" (it comes out of the partitioned solution set)
> the
> > > > result of ApplyVertexState will be partitioned on "f0.f0" which is
> > > > (accidentially :-D) the join key of the following coGroup function ->
> > no
> > > > partitioning :-)
> > > >
> > >
> > > ​Great! I totally missed that ;)​
> > >
> > >
> > >
> > > > 3) Adding the two flatMap functions behind the CoGroup prevents
> > chaining
> > > > and causes therefore some serialization overhead but shouldn't be too
> > > bad.
> > > >
> > > > So in total I would make this program as follows:
> > > >
> > > > iVertices<K,VV>
> > > > iMessage<K, Message> = iVertices.map(new InitWorkSet());
> > > >
> > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> > > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet()
> > > >   .join(iteration.workSet())
> > > >   .where(0) // solution set is local and build side
> > > >   .equalTo(0) // workset is shuffled and probe side of hashjoin
> > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> > > > verticesWithMessage.coGroup(edgessWithValue)
> > > >   .where("f0.f0") // vwm is locally forward and sorted
> > > >   .equalTo(0) //  edges are already partitioned and sorted (if cached
> > > > correctly)
> > > >   .with(...) // The coGroup collects all messages in a collection and
> > > gives
> > > > it to the ComputeFunction
> > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged
> > > into
> > > > solution set
> > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for
> > join
> > > > iteration.closeWith(delta, workSet)
> > > >
> > > > So, if I am correct, the program will
> > > > - partition the workset
> > > > - sort the vertices with messages
> > > > - partition the delta
> > > >
> > > > One observation I have is that this program requires that all
> messages
> > > fit
> > > > into memory. Was that also the case before?
> > > >
> > >
> > > ​I believe not. The plan has one coGroup that produces the messages
> and a
> > > following coGroup that groups by the messages "target ID" and consumes
> > > them​ in an iterator. That doesn't require them to fit in memory,
> right?
> > >
> > >
> > > ​I'm also working on a version where the graph is represented as an
> > > adjacency list, instead of two separate datasets of vertices and edges.
> > The
> > > disadvantage is that the graph has to fit in memory, but I think the
> > > advantages are many​. We'll be able to support edge value updates, edge
> > > mutations and different edge access order guarantees. I'll get back to
> > this
> > > thread when I have a working prototype.
> > >
> > >
> > > >
> > > > Cheers,
> > > > Fabian
> > > >
> > >
> > > ​Thanks again!
> > >
> > > Cheers,
> > > -Vasia.
> > > ​
> > >
> > >
> > > >
> > > >
> > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <
> [hidden email]
> > >:
> > > >
> > > > > @Martin: thanks for your input! If you ran into any other issues
> > that I
> > > > > didn't mention, please let us know. Obviously, even with my
> proposal,
> > > > there
> > > > > are still features we cannot support, e.g. updating edge values and
> > > graph
> > > > > mutations. We'll need to re-think the underlying iteration and/or
> > graph
> > > > > representation for those.
> > > > >
> > > > > @Fabian: thanks a lot, no rush :)
> > > > > Let me give you some more information that might make it easier to
> > > reason
> > > > > about performance:
> > > > >
> > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state
> and
> > > the
> > > > > workset (WS) keeps the active vertices. The iteration is composed
> of
> > 2
> > > > > coGroups. The first one takes the WS and the edges and produces
> > > messages.
> > > > > The second one takes the messages and the SS and produced the new
> WS
> > > and
> > > > > the SS-delta.
> > > > >
> > > > > In my proposal, the SS has the vertex state and the WS has
> <vertexId,
> > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is
> > more
> > > > > complicated because compute() needs to have two iterators: over the
> > > edges
> > > > > and over the messages.
> > > > > First, I join SS and WS to get the active vertices (have received a
> > > msg)
> > > > > and their current state. Then I coGroup the result with the edges
> to
> > > > access
> > > > > the neighbors. Now the main problem is that this coGroup needs to
> > have
> > > 2
> > > > > outputs: the new messages and the new vertex value. I couldn't
> really
> > > > find
> > > > > a nice way to do this, so I'm emitting a Tuple that contains both
> > types
> > > > and
> > > > > I have a flag to separate them later with 2 flatMaps. From the
> vertex
> > > > > flatMap, I crete the SS-delta and from the messaged flatMap I
> apply a
> > > > > reduce to group the messages by vertex and send them to the new WS.
> > One
> > > > > optimization would be to expose a combiner here to reduce message
> > size.
> > > > >
> > > > > tl;dr:
> > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> > > > > 2. how can we efficiently emit 2 different types of records from a
> > > > coGroup?
> > > > > 3. does it make any difference if we group/combine the messages
> > before
> > > > > updating the workset or after?
> > > > >
> > > > > Cheers,
> > > > > -Vasia.
> > > > >
> > > > >
> > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]>
> > wrote:
> > > > >
> > > > > > I'll try to have a look at the proposal from a performance point
> of
> > > > view
> > > > > in
> > > > > > the next days.
> > > > > > Please ping me, if I don't follow up this thread.
> > > > > >
> > > > > > Cheers, Fabian
> > > > > >
> > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
> > [hidden email]
> > > >:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > At our group, we also moved several algorithms from Giraph to
> > Gelly
> > > > and
> > > > > > > ran into some confusing issues (first in understanding, second
> > > during
> > > > > > > implementation) caused by the conceptional differences you
> > > described.
> > > > > > >
> > > > > > > If there are no concrete advantages (performance mainly) in the
> > > > Spargel
> > > > > > > implementation, we would be very happy to see the Gelly API be
> > > > aligned
> > > > > to
> > > > > > > Pregel-like systems.
> > > > > > >
> > > > > > > Your SSSP example speaks for itself. Straightforward, if the
> > reader
> > > > is
> > > > > > > familiar with Pregel/Giraph/...
> > > > > > >
> > > > > > > Best,
> > > > > > > Martin
> > > > > > >
> > > > > > >
> > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > > > > > >
> > > > > > >> Hello squirrels,
> > > > > > >>
> > > > > > >> I want to discuss with you a few concerns I have about our
> > current
> > > > > > >> vertex-centric model implementation, Spargel, now fully
> subsumed
> > > by
> > > > > > Gelly.
> > > > > > >>
> > > > > > >> Spargel is our implementation of Pregel [1], but it violates
> > some
> > > > > > >> fundamental properties of the model, as described in the paper
> > and
> > > > as
> > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself
> > > confused
> > > > > both
> > > > > > >> when trying to explain it to current Giraph users and when
> > porting
> > > > my
> > > > > > >> Giraph algorithms to it.
> > > > > > >>
> > > > > > >> More specifically:
> > > > > > >> - in the Pregel model, messages produced in superstep n, are
> > > > received
> > > > > in
> > > > > > >> superstep n+1. In Spargel, they are produced and consumed in
> the
> > > > same
> > > > > > >> iteration.
> > > > > > >> - in Pregel, vertices are active during a superstep, if they
> > have
> > > > > > received
> > > > > > >> a message in the previous superstep. In Spargel, a vertex is
> > > active
> > > > > > during
> > > > > > >> a superstep if it has changed its value.
> > > > > > >>
> > > > > > >> These two differences require a lot of rethinking when porting
> > > > > > >> applications
> > > > > > >> and can easily cause bugs.
> > > > > > >>
> > > > > > >> The most important problem however is that we require the user
> > to
> > > > > split
> > > > > > >> the
> > > > > > >> computation in 2 phases (2 UDFs):
> > > > > > >> - messaging: has access to the vertex state and can produce
> > > messages
> > > > > > >> - update: has access to incoming messages and can update the
> > > vertex
> > > > > > value
> > > > > > >>
> > > > > > >> Pregel/Giraph only expose one UDF to the user:
> > > > > > >> - compute: has access to both the vertex state and the
> incoming
> > > > > > messages,
> > > > > > >> can produce messages and update the vertex value.
> > > > > > >>
> > > > > > >> This might not seem like a big deal, but except from forcing
> the
> > > > user
> > > > > to
> > > > > > >> split their program logic into 2 phases, Spargel also makes
> some
> > > > > common
> > > > > > >> computation patterns non-intuitive or impossible to write. A
> > very
> > > > > simple
> > > > > > >> example is propagating a message based on its value or sender
> > ID.
> > > To
> > > > > do
> > > > > > >> this with Spargel, one has to store all the incoming messages
> in
> > > the
> > > > > > >> vertex
> > > > > > >> value (might be of different type btw) during the messaging
> > phase,
> > > > so
> > > > > > that
> > > > > > >> they can be accessed during the update phase.
> > > > > > >>
> > > > > > >> So, my first question is, when implementing Spargel, were
> other
> > > > > > >> alternatives considered and maybe rejected in favor of
> > performance
> > > > or
> > > > > > >> because of some other reason? If someone knows, I would love
> to
> > > hear
> > > > > > about
> > > > > > >> them!
> > > > > > >>
> > > > > > >> Second, I wrote a prototype implementation [2] that only
> exposes
> > > one
> > > > > > UDF,
> > > > > > >> compute(), by keeping the vertex state in the solution set and
> > the
> > > > > > >> messages
> > > > > > >> in the workset. This way all previously mentioned limitations
> go
> > > > away
> > > > > > and
> > > > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a
> > lot
> > > > > more
> > > > > > >> like Giraph (see [4]).
> > > > > > >>
> > > > > > >> I have not run any experiments yet and the prototype has some
> > ugly
> > > > > > hacks,
> > > > > > >> but if you think any of this makes sense, then I'd be willing
> to
> > > > > follow
> > > > > > up
> > > > > > >> and try to optimize it. If we see that it performs well, we
> can
> > > > > consider
> > > > > > >> either replacing Spargel or adding it as an alternative.
> > > > > > >>
> > > > > > >> Thanks for reading this long e-mail and looking forward to
> your
> > > > input!
> > > > > > >>
> > > > > > >> Cheers,
> > > > > > >> -Vasia.
> > > > > > >>
> > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> > > > > > >> [2]:
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > > > > > >> [3]:
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > > > > > >> [4]:
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Martin Neumann-2
The problem with having many different graph model in gelly is that it
might get quite confusing for a user.
Maybe this can be fixed with good documentation so that its clear how each
model works and what its benefits are (and maybe when its better to use it
over a different model).

On Tue, Nov 3, 2015 at 3:29 PM, Andra Lungu <[hidden email]> wrote:

> I also think a Giraph-like model could be added, but we shouldn't remove
> Spargel in favour of it!
>
> On Tue, Nov 3, 2015 at 2:35 AM, Stephan Ewen <[hidden email]> wrote:
>
> > When creating the original version of Spargel I was pretty much thinking
> in
> > GSA terms, more than in Pregel terms. There are some fundamental
> > differences between Spargel and Pregel. Spargel is in between GAS and
> > Pregel in some way, that is how I have always thought about it.
> >
> > The main reason for the form is that it fits the dataflow paradigm
> easier:
> >
> >   - If one function emits the new state of the vertex and the messages,
> it
> > has two different return types, which means you need a union type and
> > filer/split type of operation on the result, which also adds overhead. In
> > the current model, each function has one return type, which makes it
> easy.
> >
> >  - The workset is also the feedback channel, which is materialized at the
> > superstep boundaries, so keeping it small at O(vertices), rather than
> > O(edges) is a win for performance.
> >
> > There is no reason to not add a Pregel model, but I would not kill
> Spargel
> > for it. It will be tough to get the Pregel variant to the same
> efficiency.
> > Unless you want to say, for efficiency, go with GSA, for convenience with
> > Pregel.
> >
> > There are some nice things about the Spargel model. The fact that
> messages
> > are first generated then consumes makes the generation of initial
> messages
> > simpler in many cases, I think. It was always a bit weird to me in Pregel
> > that you had to check whether you are in superstep one, in which case you
> > would expect no message, and generate initial value messages.
> >
> >
> >
> > On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <[hidden email]>
> wrote:
> >
> > > We can of course inject an optional ReduceFunction (or GroupReduce, or
> > > combinable GroupReduce) to reduce the size of the work set.
> > > I suggested to remove the GroupReduce function, because it did only
> > collect
> > > all messages into a single record by emitting the input iterator which
> is
> > > quite dangerous. Applying a combinable reduce function is could improve
> > the
> > > performance considerably.
> > >
> > > The good news is that it would come "for free" because the necessary
> > > partitioning and sorting can be reused (given the forwardField
> > annotations
> > > are correctly set):
> > > - The partitioning of the reduce can be reused for the join with the
> > > solution set
> > > - The sort of the reduce is preserved by the join with the in-memory
> > > hash-table of the solution set and can be reused for the coGroup.
> > >
> > > Best,
> > > Fabian
> > >
> > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email]
> >:
> > >
> > > > Hi Fabian,
> > > >
> > > > thanks so much for looking into this so quickly :-)
> > > >
> > > > One update I have to make is that I tried running a few experiments
> > with
> > > > this on a 6-node cluster. The current implementation gets stuck at
> > > > "Rebuilding Workset Properties" and never finishes a single
> iteration.
> > > > Running the plan of one superstep without a delta iteration
> terminates
> > > > fine. I didn't have access to the cluster today, so I couldn't debug
> > this
> > > > further, but I will do as soon as I have access again.
> > > >
> > > > The rest of my comments are inline:
> > > >
> > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]>
> wrote:
> > > >
> > > > > Hi Vasia,
> > > > >
> > > > > I had a look at your new implementation and have a few ideas for
> > > > > improvements.
> > > > > 1) Sending out the input iterator as you do in the last GroupReduce
> > is
> > > > > quite dangerous and does not give a benefit compared to collecting
> > all
> > > > > elements. Even though it is an iterator, it needs to be completely
> > > > > materialized in-memory whenever the record is touched by Flink or
> > user
> > > > > code.
> > > > > I would propose to skip the reduce step completely and handle all
> > > > messages
> > > > > separates and only collect them in the CoGroup function before
> giving
> > > > them
> > > > > into the VertexComputeFunction. Be careful, to only do that with
> > > > > objectReuse disabled or take care to properly copy the messages. If
> > you
> > > > > collect the messages in the CoGroup, you don't need the
> GroupReduce,
> > > have
> > > > > smaller records and you can remove the MessageIterator class
> > > completely.
> > > > >
> > > >
> > > > ​I see. The idea was to expose to message combiner that user could
> > > > ​implement if the messages are combinable, e.g. min, sum. This is a
> > > common
> > > > case and reduces the message load significantly. Is there a way I
> could
> > > do
> > > > something similar before the coGroup?
> > > >
> > > >
> > > >
> > > > > 2) Add this annotation to the AppendVertexState function:
> > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete
> > > element
> > > > of
> > > > > the first input becomes the first field of the output. Since the
> > input
> > > is
> > > > > partitioned on "f0" (it comes out of the partitioned solution set)
> > the
> > > > > result of ApplyVertexState will be partitioned on "f0.f0" which is
> > > > > (accidentially :-D) the join key of the following coGroup function
> ->
> > > no
> > > > > partitioning :-)
> > > > >
> > > >
> > > > ​Great! I totally missed that ;)​
> > > >
> > > >
> > > >
> > > > > 3) Adding the two flatMap functions behind the CoGroup prevents
> > > chaining
> > > > > and causes therefore some serialization overhead but shouldn't be
> too
> > > > bad.
> > > > >
> > > > > So in total I would make this program as follows:
> > > > >
> > > > > iVertices<K,VV>
> > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet());
> > > > >
> > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> > > > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet()
> > > > >   .join(iteration.workSet())
> > > > >   .where(0) // solution set is local and build side
> > > > >   .equalTo(0) // workset is shuffled and probe side of hashjoin
> > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> > > > > verticesWithMessage.coGroup(edgessWithValue)
> > > > >   .where("f0.f0") // vwm is locally forward and sorted
> > > > >   .equalTo(0) //  edges are already partitioned and sorted (if
> cached
> > > > > correctly)
> > > > >   .with(...) // The coGroup collects all messages in a collection
> and
> > > > gives
> > > > > it to the ComputeFunction
> > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when
> merged
> > > > into
> > > > > solution set
> > > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for
> > > join
> > > > > iteration.closeWith(delta, workSet)
> > > > >
> > > > > So, if I am correct, the program will
> > > > > - partition the workset
> > > > > - sort the vertices with messages
> > > > > - partition the delta
> > > > >
> > > > > One observation I have is that this program requires that all
> > messages
> > > > fit
> > > > > into memory. Was that also the case before?
> > > > >
> > > >
> > > > ​I believe not. The plan has one coGroup that produces the messages
> > and a
> > > > following coGroup that groups by the messages "target ID" and
> consumes
> > > > them​ in an iterator. That doesn't require them to fit in memory,
> > right?
> > > >
> > > >
> > > > ​I'm also working on a version where the graph is represented as an
> > > > adjacency list, instead of two separate datasets of vertices and
> edges.
> > > The
> > > > disadvantage is that the graph has to fit in memory, but I think the
> > > > advantages are many​. We'll be able to support edge value updates,
> edge
> > > > mutations and different edge access order guarantees. I'll get back
> to
> > > this
> > > > thread when I have a working prototype.
> > > >
> > > >
> > > > >
> > > > > Cheers,
> > > > > Fabian
> > > > >
> > > >
> > > > ​Thanks again!
> > > >
> > > > Cheers,
> > > > -Vasia.
> > > > ​
> > > >
> > > >
> > > > >
> > > > >
> > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <
> > [hidden email]
> > > >:
> > > > >
> > > > > > @Martin: thanks for your input! If you ran into any other issues
> > > that I
> > > > > > didn't mention, please let us know. Obviously, even with my
> > proposal,
> > > > > there
> > > > > > are still features we cannot support, e.g. updating edge values
> and
> > > > graph
> > > > > > mutations. We'll need to re-think the underlying iteration and/or
> > > graph
> > > > > > representation for those.
> > > > > >
> > > > > > @Fabian: thanks a lot, no rush :)
> > > > > > Let me give you some more information that might make it easier
> to
> > > > reason
> > > > > > about performance:
> > > > > >
> > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state
> > and
> > > > the
> > > > > > workset (WS) keeps the active vertices. The iteration is composed
> > of
> > > 2
> > > > > > coGroups. The first one takes the WS and the edges and produces
> > > > messages.
> > > > > > The second one takes the messages and the SS and produced the new
> > WS
> > > > and
> > > > > > the SS-delta.
> > > > > >
> > > > > > In my proposal, the SS has the vertex state and the WS has
> > <vertexId,
> > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan
> is
> > > more
> > > > > > complicated because compute() needs to have two iterators: over
> the
> > > > edges
> > > > > > and over the messages.
> > > > > > First, I join SS and WS to get the active vertices (have
> received a
> > > > msg)
> > > > > > and their current state. Then I coGroup the result with the edges
> > to
> > > > > access
> > > > > > the neighbors. Now the main problem is that this coGroup needs to
> > > have
> > > > 2
> > > > > > outputs: the new messages and the new vertex value. I couldn't
> > really
> > > > > find
> > > > > > a nice way to do this, so I'm emitting a Tuple that contains both
> > > types
> > > > > and
> > > > > > I have a flag to separate them later with 2 flatMaps. From the
> > vertex
> > > > > > flatMap, I crete the SS-delta and from the messaged flatMap I
> > apply a
> > > > > > reduce to group the messages by vertex and send them to the new
> WS.
> > > One
> > > > > > optimization would be to expose a combiner here to reduce message
> > > size.
> > > > > >
> > > > > > tl;dr:
> > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> > > > > > 2. how can we efficiently emit 2 different types of records from
> a
> > > > > coGroup?
> > > > > > 3. does it make any difference if we group/combine the messages
> > > before
> > > > > > updating the workset or after?
> > > > > >
> > > > > > Cheers,
> > > > > > -Vasia.
> > > > > >
> > > > > >
> > > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]>
> > > wrote:
> > > > > >
> > > > > > > I'll try to have a look at the proposal from a performance
> point
> > of
> > > > > view
> > > > > > in
> > > > > > > the next days.
> > > > > > > Please ping me, if I don't follow up this thread.
> > > > > > >
> > > > > > > Cheers, Fabian
> > > > > > >
> > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
> > > [hidden email]
> > > > >:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > At our group, we also moved several algorithms from Giraph to
> > > Gelly
> > > > > and
> > > > > > > > ran into some confusing issues (first in understanding,
> second
> > > > during
> > > > > > > > implementation) caused by the conceptional differences you
> > > > described.
> > > > > > > >
> > > > > > > > If there are no concrete advantages (performance mainly) in
> the
> > > > > Spargel
> > > > > > > > implementation, we would be very happy to see the Gelly API
> be
> > > > > aligned
> > > > > > to
> > > > > > > > Pregel-like systems.
> > > > > > > >
> > > > > > > > Your SSSP example speaks for itself. Straightforward, if the
> > > reader
> > > > > is
> > > > > > > > familiar with Pregel/Giraph/...
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Martin
> > > > > > > >
> > > > > > > >
> > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > > > > > > >
> > > > > > > >> Hello squirrels,
> > > > > > > >>
> > > > > > > >> I want to discuss with you a few concerns I have about our
> > > current
> > > > > > > >> vertex-centric model implementation, Spargel, now fully
> > subsumed
> > > > by
> > > > > > > Gelly.
> > > > > > > >>
> > > > > > > >> Spargel is our implementation of Pregel [1], but it violates
> > > some
> > > > > > > >> fundamental properties of the model, as described in the
> paper
> > > and
> > > > > as
> > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself
> > > > confused
> > > > > > both
> > > > > > > >> when trying to explain it to current Giraph users and when
> > > porting
> > > > > my
> > > > > > > >> Giraph algorithms to it.
> > > > > > > >>
> > > > > > > >> More specifically:
> > > > > > > >> - in the Pregel model, messages produced in superstep n, are
> > > > > received
> > > > > > in
> > > > > > > >> superstep n+1. In Spargel, they are produced and consumed in
> > the
> > > > > same
> > > > > > > >> iteration.
> > > > > > > >> - in Pregel, vertices are active during a superstep, if they
> > > have
> > > > > > > received
> > > > > > > >> a message in the previous superstep. In Spargel, a vertex is
> > > > active
> > > > > > > during
> > > > > > > >> a superstep if it has changed its value.
> > > > > > > >>
> > > > > > > >> These two differences require a lot of rethinking when
> porting
> > > > > > > >> applications
> > > > > > > >> and can easily cause bugs.
> > > > > > > >>
> > > > > > > >> The most important problem however is that we require the
> user
> > > to
> > > > > > split
> > > > > > > >> the
> > > > > > > >> computation in 2 phases (2 UDFs):
> > > > > > > >> - messaging: has access to the vertex state and can produce
> > > > messages
> > > > > > > >> - update: has access to incoming messages and can update the
> > > > vertex
> > > > > > > value
> > > > > > > >>
> > > > > > > >> Pregel/Giraph only expose one UDF to the user:
> > > > > > > >> - compute: has access to both the vertex state and the
> > incoming
> > > > > > > messages,
> > > > > > > >> can produce messages and update the vertex value.
> > > > > > > >>
> > > > > > > >> This might not seem like a big deal, but except from forcing
> > the
> > > > > user
> > > > > > to
> > > > > > > >> split their program logic into 2 phases, Spargel also makes
> > some
> > > > > > common
> > > > > > > >> computation patterns non-intuitive or impossible to write. A
> > > very
> > > > > > simple
> > > > > > > >> example is propagating a message based on its value or
> sender
> > > ID.
> > > > To
> > > > > > do
> > > > > > > >> this with Spargel, one has to store all the incoming
> messages
> > in
> > > > the
> > > > > > > >> vertex
> > > > > > > >> value (might be of different type btw) during the messaging
> > > phase,
> > > > > so
> > > > > > > that
> > > > > > > >> they can be accessed during the update phase.
> > > > > > > >>
> > > > > > > >> So, my first question is, when implementing Spargel, were
> > other
> > > > > > > >> alternatives considered and maybe rejected in favor of
> > > performance
> > > > > or
> > > > > > > >> because of some other reason? If someone knows, I would love
> > to
> > > > hear
> > > > > > > about
> > > > > > > >> them!
> > > > > > > >>
> > > > > > > >> Second, I wrote a prototype implementation [2] that only
> > exposes
> > > > one
> > > > > > > UDF,
> > > > > > > >> compute(), by keeping the vertex state in the solution set
> and
> > > the
> > > > > > > >> messages
> > > > > > > >> in the workset. This way all previously mentioned
> limitations
> > go
> > > > > away
> > > > > > > and
> > > > > > > >> the API (see "SSSPComputeFunction" in the example [3])
> looks a
> > > lot
> > > > > > more
> > > > > > > >> like Giraph (see [4]).
> > > > > > > >>
> > > > > > > >> I have not run any experiments yet and the prototype has
> some
> > > ugly
> > > > > > > hacks,
> > > > > > > >> but if you think any of this makes sense, then I'd be
> willing
> > to
> > > > > > follow
> > > > > > > up
> > > > > > > >> and try to optimize it. If we see that it performs well, we
> > can
> > > > > > consider
> > > > > > > >> either replacing Spargel or adding it as an alternative.
> > > > > > > >>
> > > > > > > >> Thanks for reading this long e-mail and looking forward to
> > your
> > > > > input!
> > > > > > > >>
> > > > > > > >> Cheers,
> > > > > > > >> -Vasia.
> > > > > > > >>
> > > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> > > > > > > >> [2]:
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > > > > > > >> [3]:
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > > > > > > >> [4]:
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Stephan Ewen
Sounds good. I like the idea of presenting it as a spectrum:

Pregel -> Scatter/Gather (Spargel) -> GAS/GSA/SGA

On Tue, Nov 3, 2015 at 5:55 PM, Martin Neumann <[hidden email]> wrote:

> The problem with having many different graph model in gelly is that it
> might get quite confusing for a user.
> Maybe this can be fixed with good documentation so that its clear how each
> model works and what its benefits are (and maybe when its better to use it
> over a different model).
>
> On Tue, Nov 3, 2015 at 3:29 PM, Andra Lungu <[hidden email]> wrote:
>
> > I also think a Giraph-like model could be added, but we shouldn't remove
> > Spargel in favour of it!
> >
> > On Tue, Nov 3, 2015 at 2:35 AM, Stephan Ewen <[hidden email]> wrote:
> >
> > > When creating the original version of Spargel I was pretty much
> thinking
> > in
> > > GSA terms, more than in Pregel terms. There are some fundamental
> > > differences between Spargel and Pregel. Spargel is in between GAS and
> > > Pregel in some way, that is how I have always thought about it.
> > >
> > > The main reason for the form is that it fits the dataflow paradigm
> > easier:
> > >
> > >   - If one function emits the new state of the vertex and the messages,
> > it
> > > has two different return types, which means you need a union type and
> > > filer/split type of operation on the result, which also adds overhead.
> In
> > > the current model, each function has one return type, which makes it
> > easy.
> > >
> > >  - The workset is also the feedback channel, which is materialized at
> the
> > > superstep boundaries, so keeping it small at O(vertices), rather than
> > > O(edges) is a win for performance.
> > >
> > > There is no reason to not add a Pregel model, but I would not kill
> > Spargel
> > > for it. It will be tough to get the Pregel variant to the same
> > efficiency.
> > > Unless you want to say, for efficiency, go with GSA, for convenience
> with
> > > Pregel.
> > >
> > > There are some nice things about the Spargel model. The fact that
> > messages
> > > are first generated then consumes makes the generation of initial
> > messages
> > > simpler in many cases, I think. It was always a bit weird to me in
> Pregel
> > > that you had to check whether you are in superstep one, in which case
> you
> > > would expect no message, and generate initial value messages.
> > >
> > >
> > >
> > > On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <[hidden email]>
> > wrote:
> > >
> > > > We can of course inject an optional ReduceFunction (or GroupReduce,
> or
> > > > combinable GroupReduce) to reduce the size of the work set.
> > > > I suggested to remove the GroupReduce function, because it did only
> > > collect
> > > > all messages into a single record by emitting the input iterator
> which
> > is
> > > > quite dangerous. Applying a combinable reduce function is could
> improve
> > > the
> > > > performance considerably.
> > > >
> > > > The good news is that it would come "for free" because the necessary
> > > > partitioning and sorting can be reused (given the forwardField
> > > annotations
> > > > are correctly set):
> > > > - The partitioning of the reduce can be reused for the join with the
> > > > solution set
> > > > - The sort of the reduce is preserved by the join with the in-memory
> > > > hash-table of the solution set and can be reused for the coGroup.
> > > >
> > > > Best,
> > > > Fabian
> > > >
> > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <
> [hidden email]
> > >:
> > > >
> > > > > Hi Fabian,
> > > > >
> > > > > thanks so much for looking into this so quickly :-)
> > > > >
> > > > > One update I have to make is that I tried running a few experiments
> > > with
> > > > > this on a 6-node cluster. The current implementation gets stuck at
> > > > > "Rebuilding Workset Properties" and never finishes a single
> > iteration.
> > > > > Running the plan of one superstep without a delta iteration
> > terminates
> > > > > fine. I didn't have access to the cluster today, so I couldn't
> debug
> > > this
> > > > > further, but I will do as soon as I have access again.
> > > > >
> > > > > The rest of my comments are inline:
> > > > >
> > > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]>
> > wrote:
> > > > >
> > > > > > Hi Vasia,
> > > > > >
> > > > > > I had a look at your new implementation and have a few ideas for
> > > > > > improvements.
> > > > > > 1) Sending out the input iterator as you do in the last
> GroupReduce
> > > is
> > > > > > quite dangerous and does not give a benefit compared to
> collecting
> > > all
> > > > > > elements. Even though it is an iterator, it needs to be
> completely
> > > > > > materialized in-memory whenever the record is touched by Flink or
> > > user
> > > > > > code.
> > > > > > I would propose to skip the reduce step completely and handle all
> > > > > messages
> > > > > > separates and only collect them in the CoGroup function before
> > giving
> > > > > them
> > > > > > into the VertexComputeFunction. Be careful, to only do that with
> > > > > > objectReuse disabled or take care to properly copy the messages.
> If
> > > you
> > > > > > collect the messages in the CoGroup, you don't need the
> > GroupReduce,
> > > > have
> > > > > > smaller records and you can remove the MessageIterator class
> > > > completely.
> > > > > >
> > > > >
> > > > > ​I see. The idea was to expose to message combiner that user could
> > > > > ​implement if the messages are combinable, e.g. min, sum. This is a
> > > > common
> > > > > case and reduces the message load significantly. Is there a way I
> > could
> > > > do
> > > > > something similar before the coGroup?
> > > > >
> > > > >
> > > > >
> > > > > > 2) Add this annotation to the AppendVertexState function:
> > > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete
> > > > element
> > > > > of
> > > > > > the first input becomes the first field of the output. Since the
> > > input
> > > > is
> > > > > > partitioned on "f0" (it comes out of the partitioned solution
> set)
> > > the
> > > > > > result of ApplyVertexState will be partitioned on "f0.f0" which
> is
> > > > > > (accidentially :-D) the join key of the following coGroup
> function
> > ->
> > > > no
> > > > > > partitioning :-)
> > > > > >
> > > > >
> > > > > ​Great! I totally missed that ;)​
> > > > >
> > > > >
> > > > >
> > > > > > 3) Adding the two flatMap functions behind the CoGroup prevents
> > > > chaining
> > > > > > and causes therefore some serialization overhead but shouldn't be
> > too
> > > > > bad.
> > > > > >
> > > > > > So in total I would make this program as follows:
> > > > > >
> > > > > > iVertices<K,VV>
> > > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet());
> > > > > >
> > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> > > > > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet()
> > > > > >   .join(iteration.workSet())
> > > > > >   .where(0) // solution set is local and build side
> > > > > >   .equalTo(0) // workset is shuffled and probe side of hashjoin
> > > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> > > > > > verticesWithMessage.coGroup(edgessWithValue)
> > > > > >   .where("f0.f0") // vwm is locally forward and sorted
> > > > > >   .equalTo(0) //  edges are already partitioned and sorted (if
> > cached
> > > > > > correctly)
> > > > > >   .with(...) // The coGroup collects all messages in a collection
> > and
> > > > > gives
> > > > > > it to the ComputeFunction
> > > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when
> > merged
> > > > > into
> > > > > > solution set
> > > > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned
> for
> > > > join
> > > > > > iteration.closeWith(delta, workSet)
> > > > > >
> > > > > > So, if I am correct, the program will
> > > > > > - partition the workset
> > > > > > - sort the vertices with messages
> > > > > > - partition the delta
> > > > > >
> > > > > > One observation I have is that this program requires that all
> > > messages
> > > > > fit
> > > > > > into memory. Was that also the case before?
> > > > > >
> > > > >
> > > > > ​I believe not. The plan has one coGroup that produces the messages
> > > and a
> > > > > following coGroup that groups by the messages "target ID" and
> > consumes
> > > > > them​ in an iterator. That doesn't require them to fit in memory,
> > > right?
> > > > >
> > > > >
> > > > > ​I'm also working on a version where the graph is represented as an
> > > > > adjacency list, instead of two separate datasets of vertices and
> > edges.
> > > > The
> > > > > disadvantage is that the graph has to fit in memory, but I think
> the
> > > > > advantages are many​. We'll be able to support edge value updates,
> > edge
> > > > > mutations and different edge access order guarantees. I'll get back
> > to
> > > > this
> > > > > thread when I have a working prototype.
> > > > >
> > > > >
> > > > > >
> > > > > > Cheers,
> > > > > > Fabian
> > > > > >
> > > > >
> > > > > ​Thanks again!
> > > > >
> > > > > Cheers,
> > > > > -Vasia.
> > > > > ​
> > > > >
> > > > >
> > > > > >
> > > > > >
> > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <
> > > [hidden email]
> > > > >:
> > > > > >
> > > > > > > @Martin: thanks for your input! If you ran into any other
> issues
> > > > that I
> > > > > > > didn't mention, please let us know. Obviously, even with my
> > > proposal,
> > > > > > there
> > > > > > > are still features we cannot support, e.g. updating edge values
> > and
> > > > > graph
> > > > > > > mutations. We'll need to re-think the underlying iteration
> and/or
> > > > graph
> > > > > > > representation for those.
> > > > > > >
> > > > > > > @Fabian: thanks a lot, no rush :)
> > > > > > > Let me give you some more information that might make it easier
> > to
> > > > > reason
> > > > > > > about performance:
> > > > > > >
> > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex
> state
> > > and
> > > > > the
> > > > > > > workset (WS) keeps the active vertices. The iteration is
> composed
> > > of
> > > > 2
> > > > > > > coGroups. The first one takes the WS and the edges and produces
> > > > > messages.
> > > > > > > The second one takes the messages and the SS and produced the
> new
> > > WS
> > > > > and
> > > > > > > the SS-delta.
> > > > > > >
> > > > > > > In my proposal, the SS has the vertex state and the WS has
> > > <vertexId,
> > > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan
> > is
> > > > more
> > > > > > > complicated because compute() needs to have two iterators: over
> > the
> > > > > edges
> > > > > > > and over the messages.
> > > > > > > First, I join SS and WS to get the active vertices (have
> > received a
> > > > > msg)
> > > > > > > and their current state. Then I coGroup the result with the
> edges
> > > to
> > > > > > access
> > > > > > > the neighbors. Now the main problem is that this coGroup needs
> to
> > > > have
> > > > > 2
> > > > > > > outputs: the new messages and the new vertex value. I couldn't
> > > really
> > > > > > find
> > > > > > > a nice way to do this, so I'm emitting a Tuple that contains
> both
> > > > types
> > > > > > and
> > > > > > > I have a flag to separate them later with 2 flatMaps. From the
> > > vertex
> > > > > > > flatMap, I crete the SS-delta and from the messaged flatMap I
> > > apply a
> > > > > > > reduce to group the messages by vertex and send them to the new
> > WS.
> > > > One
> > > > > > > optimization would be to expose a combiner here to reduce
> message
> > > > size.
> > > > > > >
> > > > > > > tl;dr:
> > > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> > > > > > > 2. how can we efficiently emit 2 different types of records
> from
> > a
> > > > > > coGroup?
> > > > > > > 3. does it make any difference if we group/combine the messages
> > > > before
> > > > > > > updating the workset or after?
> > > > > > >
> > > > > > > Cheers,
> > > > > > > -Vasia.
> > > > > > >
> > > > > > >
> > > > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]>
> > > > wrote:
> > > > > > >
> > > > > > > > I'll try to have a look at the proposal from a performance
> > point
> > > of
> > > > > > view
> > > > > > > in
> > > > > > > > the next days.
> > > > > > > > Please ping me, if I don't follow up this thread.
> > > > > > > >
> > > > > > > > Cheers, Fabian
> > > > > > > >
> > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
> > > > [hidden email]
> > > > > >:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > At our group, we also moved several algorithms from Giraph
> to
> > > > Gelly
> > > > > > and
> > > > > > > > > ran into some confusing issues (first in understanding,
> > second
> > > > > during
> > > > > > > > > implementation) caused by the conceptional differences you
> > > > > described.
> > > > > > > > >
> > > > > > > > > If there are no concrete advantages (performance mainly) in
> > the
> > > > > > Spargel
> > > > > > > > > implementation, we would be very happy to see the Gelly API
> > be
> > > > > > aligned
> > > > > > > to
> > > > > > > > > Pregel-like systems.
> > > > > > > > >
> > > > > > > > > Your SSSP example speaks for itself. Straightforward, if
> the
> > > > reader
> > > > > > is
> > > > > > > > > familiar with Pregel/Giraph/...
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Martin
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > > > > > > > >
> > > > > > > > >> Hello squirrels,
> > > > > > > > >>
> > > > > > > > >> I want to discuss with you a few concerns I have about our
> > > > current
> > > > > > > > >> vertex-centric model implementation, Spargel, now fully
> > > subsumed
> > > > > by
> > > > > > > > Gelly.
> > > > > > > > >>
> > > > > > > > >> Spargel is our implementation of Pregel [1], but it
> violates
> > > > some
> > > > > > > > >> fundamental properties of the model, as described in the
> > paper
> > > > and
> > > > > > as
> > > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself
> > > > > confused
> > > > > > > both
> > > > > > > > >> when trying to explain it to current Giraph users and when
> > > > porting
> > > > > > my
> > > > > > > > >> Giraph algorithms to it.
> > > > > > > > >>
> > > > > > > > >> More specifically:
> > > > > > > > >> - in the Pregel model, messages produced in superstep n,
> are
> > > > > > received
> > > > > > > in
> > > > > > > > >> superstep n+1. In Spargel, they are produced and consumed
> in
> > > the
> > > > > > same
> > > > > > > > >> iteration.
> > > > > > > > >> - in Pregel, vertices are active during a superstep, if
> they
> > > > have
> > > > > > > > received
> > > > > > > > >> a message in the previous superstep. In Spargel, a vertex
> is
> > > > > active
> > > > > > > > during
> > > > > > > > >> a superstep if it has changed its value.
> > > > > > > > >>
> > > > > > > > >> These two differences require a lot of rethinking when
> > porting
> > > > > > > > >> applications
> > > > > > > > >> and can easily cause bugs.
> > > > > > > > >>
> > > > > > > > >> The most important problem however is that we require the
> > user
> > > > to
> > > > > > > split
> > > > > > > > >> the
> > > > > > > > >> computation in 2 phases (2 UDFs):
> > > > > > > > >> - messaging: has access to the vertex state and can
> produce
> > > > > messages
> > > > > > > > >> - update: has access to incoming messages and can update
> the
> > > > > vertex
> > > > > > > > value
> > > > > > > > >>
> > > > > > > > >> Pregel/Giraph only expose one UDF to the user:
> > > > > > > > >> - compute: has access to both the vertex state and the
> > > incoming
> > > > > > > > messages,
> > > > > > > > >> can produce messages and update the vertex value.
> > > > > > > > >>
> > > > > > > > >> This might not seem like a big deal, but except from
> forcing
> > > the
> > > > > > user
> > > > > > > to
> > > > > > > > >> split their program logic into 2 phases, Spargel also
> makes
> > > some
> > > > > > > common
> > > > > > > > >> computation patterns non-intuitive or impossible to
> write. A
> > > > very
> > > > > > > simple
> > > > > > > > >> example is propagating a message based on its value or
> > sender
> > > > ID.
> > > > > To
> > > > > > > do
> > > > > > > > >> this with Spargel, one has to store all the incoming
> > messages
> > > in
> > > > > the
> > > > > > > > >> vertex
> > > > > > > > >> value (might be of different type btw) during the
> messaging
> > > > phase,
> > > > > > so
> > > > > > > > that
> > > > > > > > >> they can be accessed during the update phase.
> > > > > > > > >>
> > > > > > > > >> So, my first question is, when implementing Spargel, were
> > > other
> > > > > > > > >> alternatives considered and maybe rejected in favor of
> > > > performance
> > > > > > or
> > > > > > > > >> because of some other reason? If someone knows, I would
> love
> > > to
> > > > > hear
> > > > > > > > about
> > > > > > > > >> them!
> > > > > > > > >>
> > > > > > > > >> Second, I wrote a prototype implementation [2] that only
> > > exposes
> > > > > one
> > > > > > > > UDF,
> > > > > > > > >> compute(), by keeping the vertex state in the solution set
> > and
> > > > the
> > > > > > > > >> messages
> > > > > > > > >> in the workset. This way all previously mentioned
> > limitations
> > > go
> > > > > > away
> > > > > > > > and
> > > > > > > > >> the API (see "SSSPComputeFunction" in the example [3])
> > looks a
> > > > lot
> > > > > > > more
> > > > > > > > >> like Giraph (see [4]).
> > > > > > > > >>
> > > > > > > > >> I have not run any experiments yet and the prototype has
> > some
> > > > ugly
> > > > > > > > hacks,
> > > > > > > > >> but if you think any of this makes sense, then I'd be
> > willing
> > > to
> > > > > > > follow
> > > > > > > > up
> > > > > > > > >> and try to optimize it. If we see that it performs well,
> we
> > > can
> > > > > > > consider
> > > > > > > > >> either replacing Spargel or adding it as an alternative.
> > > > > > > > >>
> > > > > > > > >> Thanks for reading this long e-mail and looking forward to
> > > your
> > > > > > input!
> > > > > > > > >>
> > > > > > > > >> Cheers,
> > > > > > > > >> -Vasia.
> > > > > > > > >>
> > > > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> > > > > > > > >> [2]:
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > > > > > > > >> [3]:
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > > > > > > > >> [4]:
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Vasiliki Kalavri
In reply to this post by Fabian Hueske-2
@Fabian

Is there any advantage in putting the reducer-combiner before updating the
workset vs. after (i.e. right before the join with the solution set)?

If it helps, here are the plans of these 2 alternatives:

https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing
https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing

Thanks a lot for the help!

-Vasia.

On 30 October 2015 at 21:28, Fabian Hueske <[hidden email]> wrote:

> We can of course inject an optional ReduceFunction (or GroupReduce, or
> combinable GroupReduce) to reduce the size of the work set.
> I suggested to remove the GroupReduce function, because it did only collect
> all messages into a single record by emitting the input iterator which is
> quite dangerous. Applying a combinable reduce function is could improve the
> performance considerably.
>
> The good news is that it would come "for free" because the necessary
> partitioning and sorting can be reused (given the forwardField annotations
> are correctly set):
> - The partitioning of the reduce can be reused for the join with the
> solution set
> - The sort of the reduce is preserved by the join with the in-memory
> hash-table of the solution set and can be reused for the coGroup.
>
> Best,
> Fabian
>
> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email]>:
>
> > Hi Fabian,
> >
> > thanks so much for looking into this so quickly :-)
> >
> > One update I have to make is that I tried running a few experiments with
> > this on a 6-node cluster. The current implementation gets stuck at
> > "Rebuilding Workset Properties" and never finishes a single iteration.
> > Running the plan of one superstep without a delta iteration terminates
> > fine. I didn't have access to the cluster today, so I couldn't debug this
> > further, but I will do as soon as I have access again.
> >
> > The rest of my comments are inline:
> >
> > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote:
> >
> > > Hi Vasia,
> > >
> > > I had a look at your new implementation and have a few ideas for
> > > improvements.
> > > 1) Sending out the input iterator as you do in the last GroupReduce is
> > > quite dangerous and does not give a benefit compared to collecting all
> > > elements. Even though it is an iterator, it needs to be completely
> > > materialized in-memory whenever the record is touched by Flink or user
> > > code.
> > > I would propose to skip the reduce step completely and handle all
> > messages
> > > separates and only collect them in the CoGroup function before giving
> > them
> > > into the VertexComputeFunction. Be careful, to only do that with
> > > objectReuse disabled or take care to properly copy the messages. If you
> > > collect the messages in the CoGroup, you don't need the GroupReduce,
> have
> > > smaller records and you can remove the MessageIterator class
> completely.
> > >
> >
> > ​I see. The idea was to expose to message combiner that user could
> > ​implement if the messages are combinable, e.g. min, sum. This is a
> common
> > case and reduces the message load significantly. Is there a way I could
> do
> > something similar before the coGroup?
> >
> >
> >
> > > 2) Add this annotation to the AppendVertexState function:
> > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete
> element
> > of
> > > the first input becomes the first field of the output. Since the input
> is
> > > partitioned on "f0" (it comes out of the partitioned solution set) the
> > > result of ApplyVertexState will be partitioned on "f0.f0" which is
> > > (accidentially :-D) the join key of the following coGroup function ->
> no
> > > partitioning :-)
> > >
> >
> > ​Great! I totally missed that ;)​
> >
> >
> >
> > > 3) Adding the two flatMap functions behind the CoGroup prevents
> chaining
> > > and causes therefore some serialization overhead but shouldn't be too
> > bad.
> > >
> > > So in total I would make this program as follows:
> > >
> > > iVertices<K,VV>
> > > iMessage<K, Message> = iVertices.map(new InitWorkSet());
> > >
> > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet()
> > >   .join(iteration.workSet())
> > >   .where(0) // solution set is local and build side
> > >   .equalTo(0) // workset is shuffled and probe side of hashjoin
> > > superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> > > verticesWithMessage.coGroup(edgessWithValue)
> > >   .where("f0.f0") // vwm is locally forward and sorted
> > >   .equalTo(0) //  edges are already partitioned and sorted (if cached
> > > correctly)
> > >   .with(...) // The coGroup collects all messages in a collection and
> > gives
> > > it to the ComputeFunction
> > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged
> > into
> > > solution set
> > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for
> join
> > > iteration.closeWith(delta, workSet)
> > >
> > > So, if I am correct, the program will
> > > - partition the workset
> > > - sort the vertices with messages
> > > - partition the delta
> > >
> > > One observation I have is that this program requires that all messages
> > fit
> > > into memory. Was that also the case before?
> > >
> >
> > ​I believe not. The plan has one coGroup that produces the messages and a
> > following coGroup that groups by the messages "target ID" and consumes
> > them​ in an iterator. That doesn't require them to fit in memory, right?
> >
> >
> > ​I'm also working on a version where the graph is represented as an
> > adjacency list, instead of two separate datasets of vertices and edges.
> The
> > disadvantage is that the graph has to fit in memory, but I think the
> > advantages are many​. We'll be able to support edge value updates, edge
> > mutations and different edge access order guarantees. I'll get back to
> this
> > thread when I have a working prototype.
> >
> >
> > >
> > > Cheers,
> > > Fabian
> > >
> >
> > ​Thanks again!
> >
> > Cheers,
> > -Vasia.
> > ​
> >
> >
> > >
> > >
> > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <[hidden email]
> >:
> > >
> > > > @Martin: thanks for your input! If you ran into any other issues
> that I
> > > > didn't mention, please let us know. Obviously, even with my proposal,
> > > there
> > > > are still features we cannot support, e.g. updating edge values and
> > graph
> > > > mutations. We'll need to re-think the underlying iteration and/or
> graph
> > > > representation for those.
> > > >
> > > > @Fabian: thanks a lot, no rush :)
> > > > Let me give you some more information that might make it easier to
> > reason
> > > > about performance:
> > > >
> > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state and
> > the
> > > > workset (WS) keeps the active vertices. The iteration is composed of
> 2
> > > > coGroups. The first one takes the WS and the edges and produces
> > messages.
> > > > The second one takes the messages and the SS and produced the new WS
> > and
> > > > the SS-delta.
> > > >
> > > > In my proposal, the SS has the vertex state and the WS has <vertexId,
> > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is
> more
> > > > complicated because compute() needs to have two iterators: over the
> > edges
> > > > and over the messages.
> > > > First, I join SS and WS to get the active vertices (have received a
> > msg)
> > > > and their current state. Then I coGroup the result with the edges to
> > > access
> > > > the neighbors. Now the main problem is that this coGroup needs to
> have
> > 2
> > > > outputs: the new messages and the new vertex value. I couldn't really
> > > find
> > > > a nice way to do this, so I'm emitting a Tuple that contains both
> types
> > > and
> > > > I have a flag to separate them later with 2 flatMaps. From the vertex
> > > > flatMap, I crete the SS-delta and from the messaged flatMap I apply a
> > > > reduce to group the messages by vertex and send them to the new WS.
> One
> > > > optimization would be to expose a combiner here to reduce message
> size.
> > > >
> > > > tl;dr:
> > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> > > > 2. how can we efficiently emit 2 different types of records from a
> > > coGroup?
> > > > 3. does it make any difference if we group/combine the messages
> before
> > > > updating the workset or after?
> > > >
> > > > Cheers,
> > > > -Vasia.
> > > >
> > > >
> > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]>
> wrote:
> > > >
> > > > > I'll try to have a look at the proposal from a performance point of
> > > view
> > > > in
> > > > > the next days.
> > > > > Please ping me, if I don't follow up this thread.
> > > > >
> > > > > Cheers, Fabian
> > > > >
> > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
> [hidden email]
> > >:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > At our group, we also moved several algorithms from Giraph to
> Gelly
> > > and
> > > > > > ran into some confusing issues (first in understanding, second
> > during
> > > > > > implementation) caused by the conceptional differences you
> > described.
> > > > > >
> > > > > > If there are no concrete advantages (performance mainly) in the
> > > Spargel
> > > > > > implementation, we would be very happy to see the Gelly API be
> > > aligned
> > > > to
> > > > > > Pregel-like systems.
> > > > > >
> > > > > > Your SSSP example speaks for itself. Straightforward, if the
> reader
> > > is
> > > > > > familiar with Pregel/Giraph/...
> > > > > >
> > > > > > Best,
> > > > > > Martin
> > > > > >
> > > > > >
> > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > > > > >
> > > > > >> Hello squirrels,
> > > > > >>
> > > > > >> I want to discuss with you a few concerns I have about our
> current
> > > > > >> vertex-centric model implementation, Spargel, now fully subsumed
> > by
> > > > > Gelly.
> > > > > >>
> > > > > >> Spargel is our implementation of Pregel [1], but it violates
> some
> > > > > >> fundamental properties of the model, as described in the paper
> and
> > > as
> > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself
> > confused
> > > > both
> > > > > >> when trying to explain it to current Giraph users and when
> porting
> > > my
> > > > > >> Giraph algorithms to it.
> > > > > >>
> > > > > >> More specifically:
> > > > > >> - in the Pregel model, messages produced in superstep n, are
> > > received
> > > > in
> > > > > >> superstep n+1. In Spargel, they are produced and consumed in the
> > > same
> > > > > >> iteration.
> > > > > >> - in Pregel, vertices are active during a superstep, if they
> have
> > > > > received
> > > > > >> a message in the previous superstep. In Spargel, a vertex is
> > active
> > > > > during
> > > > > >> a superstep if it has changed its value.
> > > > > >>
> > > > > >> These two differences require a lot of rethinking when porting
> > > > > >> applications
> > > > > >> and can easily cause bugs.
> > > > > >>
> > > > > >> The most important problem however is that we require the user
> to
> > > > split
> > > > > >> the
> > > > > >> computation in 2 phases (2 UDFs):
> > > > > >> - messaging: has access to the vertex state and can produce
> > messages
> > > > > >> - update: has access to incoming messages and can update the
> > vertex
> > > > > value
> > > > > >>
> > > > > >> Pregel/Giraph only expose one UDF to the user:
> > > > > >> - compute: has access to both the vertex state and the incoming
> > > > > messages,
> > > > > >> can produce messages and update the vertex value.
> > > > > >>
> > > > > >> This might not seem like a big deal, but except from forcing the
> > > user
> > > > to
> > > > > >> split their program logic into 2 phases, Spargel also makes some
> > > > common
> > > > > >> computation patterns non-intuitive or impossible to write. A
> very
> > > > simple
> > > > > >> example is propagating a message based on its value or sender
> ID.
> > To
> > > > do
> > > > > >> this with Spargel, one has to store all the incoming messages in
> > the
> > > > > >> vertex
> > > > > >> value (might be of different type btw) during the messaging
> phase,
> > > so
> > > > > that
> > > > > >> they can be accessed during the update phase.
> > > > > >>
> > > > > >> So, my first question is, when implementing Spargel, were other
> > > > > >> alternatives considered and maybe rejected in favor of
> performance
> > > or
> > > > > >> because of some other reason? If someone knows, I would love to
> > hear
> > > > > about
> > > > > >> them!
> > > > > >>
> > > > > >> Second, I wrote a prototype implementation [2] that only exposes
> > one
> > > > > UDF,
> > > > > >> compute(), by keeping the vertex state in the solution set and
> the
> > > > > >> messages
> > > > > >> in the workset. This way all previously mentioned limitations go
> > > away
> > > > > and
> > > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a
> lot
> > > > more
> > > > > >> like Giraph (see [4]).
> > > > > >>
> > > > > >> I have not run any experiments yet and the prototype has some
> ugly
> > > > > hacks,
> > > > > >> but if you think any of this makes sense, then I'd be willing to
> > > > follow
> > > > > up
> > > > > >> and try to optimize it. If we see that it performs well, we can
> > > > consider
> > > > > >> either replacing Spargel or adding it as an alternative.
> > > > > >>
> > > > > >> Thanks for reading this long e-mail and looking forward to your
> > > input!
> > > > > >>
> > > > > >> Cheers,
> > > > > >> -Vasia.
> > > > > >>
> > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> > > > > >> [2]:
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > > > > >> [3]:
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > > > > >> [4]:
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Fabian Hueske-2
Hi Vasia,

sorry for the late reply.
I don't think there is a big difference. In both cases, the partitioning
and sorting happens at the end of the iteration.
If the groupReduce is applied before the workset is returned, the sorting
happens on the filtered result (after the flatMap) which might be a little
bit more efficient (depending on the ratio of messages and solution set
updates). Also it does not require that the initial workset is sorted for
the first groupReduce.

I would put it at the end.

Cheers, Fabian

2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri <[hidden email]>:

> @Fabian
>
> Is there any advantage in putting the reducer-combiner before updating the
> workset vs. after (i.e. right before the join with the solution set)?
>
> If it helps, here are the plans of these 2 alternatives:
>
>
> https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing
>
> https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing
>
> Thanks a lot for the help!
>
> -Vasia.
>
> On 30 October 2015 at 21:28, Fabian Hueske <[hidden email]> wrote:
>
> > We can of course inject an optional ReduceFunction (or GroupReduce, or
> > combinable GroupReduce) to reduce the size of the work set.
> > I suggested to remove the GroupReduce function, because it did only
> collect
> > all messages into a single record by emitting the input iterator which is
> > quite dangerous. Applying a combinable reduce function is could improve
> the
> > performance considerably.
> >
> > The good news is that it would come "for free" because the necessary
> > partitioning and sorting can be reused (given the forwardField
> annotations
> > are correctly set):
> > - The partitioning of the reduce can be reused for the join with the
> > solution set
> > - The sort of the reduce is preserved by the join with the in-memory
> > hash-table of the solution set and can be reused for the coGroup.
> >
> > Best,
> > Fabian
> >
> > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email]>:
> >
> > > Hi Fabian,
> > >
> > > thanks so much for looking into this so quickly :-)
> > >
> > > One update I have to make is that I tried running a few experiments
> with
> > > this on a 6-node cluster. The current implementation gets stuck at
> > > "Rebuilding Workset Properties" and never finishes a single iteration.
> > > Running the plan of one superstep without a delta iteration terminates
> > > fine. I didn't have access to the cluster today, so I couldn't debug
> this
> > > further, but I will do as soon as I have access again.
> > >
> > > The rest of my comments are inline:
> > >
> > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]> wrote:
> > >
> > > > Hi Vasia,
> > > >
> > > > I had a look at your new implementation and have a few ideas for
> > > > improvements.
> > > > 1) Sending out the input iterator as you do in the last GroupReduce
> is
> > > > quite dangerous and does not give a benefit compared to collecting
> all
> > > > elements. Even though it is an iterator, it needs to be completely
> > > > materialized in-memory whenever the record is touched by Flink or
> user
> > > > code.
> > > > I would propose to skip the reduce step completely and handle all
> > > messages
> > > > separates and only collect them in the CoGroup function before giving
> > > them
> > > > into the VertexComputeFunction. Be careful, to only do that with
> > > > objectReuse disabled or take care to properly copy the messages. If
> you
> > > > collect the messages in the CoGroup, you don't need the GroupReduce,
> > have
> > > > smaller records and you can remove the MessageIterator class
> > completely.
> > > >
> > >
> > > ​I see. The idea was to expose to message combiner that user could
> > > ​implement if the messages are combinable, e.g. min, sum. This is a
> > common
> > > case and reduces the message load significantly. Is there a way I could
> > do
> > > something similar before the coGroup?
> > >
> > >
> > >
> > > > 2) Add this annotation to the AppendVertexState function:
> > > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete
> > element
> > > of
> > > > the first input becomes the first field of the output. Since the
> input
> > is
> > > > partitioned on "f0" (it comes out of the partitioned solution set)
> the
> > > > result of ApplyVertexState will be partitioned on "f0.f0" which is
> > > > (accidentially :-D) the join key of the following coGroup function ->
> > no
> > > > partitioning :-)
> > > >
> > >
> > > ​Great! I totally missed that ;)​
> > >
> > >
> > >
> > > > 3) Adding the two flatMap functions behind the CoGroup prevents
> > chaining
> > > > and causes therefore some serialization overhead but shouldn't be too
> > > bad.
> > > >
> > > > So in total I would make this program as follows:
> > > >
> > > > iVertices<K,VV>
> > > > iMessage<K, Message> = iVertices.map(new InitWorkSet());
> > > >
> > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> > > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet()
> > > >   .join(iteration.workSet())
> > > >   .where(0) // solution set is local and build side
> > > >   .equalTo(0) // workset is shuffled and probe side of hashjoin
> > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> > > > verticesWithMessage.coGroup(edgessWithValue)
> > > >   .where("f0.f0") // vwm is locally forward and sorted
> > > >   .equalTo(0) //  edges are already partitioned and sorted (if cached
> > > > correctly)
> > > >   .with(...) // The coGroup collects all messages in a collection and
> > > gives
> > > > it to the ComputeFunction
> > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged
> > > into
> > > > solution set
> > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for
> > join
> > > > iteration.closeWith(delta, workSet)
> > > >
> > > > So, if I am correct, the program will
> > > > - partition the workset
> > > > - sort the vertices with messages
> > > > - partition the delta
> > > >
> > > > One observation I have is that this program requires that all
> messages
> > > fit
> > > > into memory. Was that also the case before?
> > > >
> > >
> > > ​I believe not. The plan has one coGroup that produces the messages
> and a
> > > following coGroup that groups by the messages "target ID" and consumes
> > > them​ in an iterator. That doesn't require them to fit in memory,
> right?
> > >
> > >
> > > ​I'm also working on a version where the graph is represented as an
> > > adjacency list, instead of two separate datasets of vertices and edges.
> > The
> > > disadvantage is that the graph has to fit in memory, but I think the
> > > advantages are many​. We'll be able to support edge value updates, edge
> > > mutations and different edge access order guarantees. I'll get back to
> > this
> > > thread when I have a working prototype.
> > >
> > >
> > > >
> > > > Cheers,
> > > > Fabian
> > > >
> > >
> > > ​Thanks again!
> > >
> > > Cheers,
> > > -Vasia.
> > > ​
> > >
> > >
> > > >
> > > >
> > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <
> [hidden email]
> > >:
> > > >
> > > > > @Martin: thanks for your input! If you ran into any other issues
> > that I
> > > > > didn't mention, please let us know. Obviously, even with my
> proposal,
> > > > there
> > > > > are still features we cannot support, e.g. updating edge values and
> > > graph
> > > > > mutations. We'll need to re-think the underlying iteration and/or
> > graph
> > > > > representation for those.
> > > > >
> > > > > @Fabian: thanks a lot, no rush :)
> > > > > Let me give you some more information that might make it easier to
> > > reason
> > > > > about performance:
> > > > >
> > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state
> and
> > > the
> > > > > workset (WS) keeps the active vertices. The iteration is composed
> of
> > 2
> > > > > coGroups. The first one takes the WS and the edges and produces
> > > messages.
> > > > > The second one takes the messages and the SS and produced the new
> WS
> > > and
> > > > > the SS-delta.
> > > > >
> > > > > In my proposal, the SS has the vertex state and the WS has
> <vertexId,
> > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is
> > more
> > > > > complicated because compute() needs to have two iterators: over the
> > > edges
> > > > > and over the messages.
> > > > > First, I join SS and WS to get the active vertices (have received a
> > > msg)
> > > > > and their current state. Then I coGroup the result with the edges
> to
> > > > access
> > > > > the neighbors. Now the main problem is that this coGroup needs to
> > have
> > > 2
> > > > > outputs: the new messages and the new vertex value. I couldn't
> really
> > > > find
> > > > > a nice way to do this, so I'm emitting a Tuple that contains both
> > types
> > > > and
> > > > > I have a flag to separate them later with 2 flatMaps. From the
> vertex
> > > > > flatMap, I crete the SS-delta and from the messaged flatMap I
> apply a
> > > > > reduce to group the messages by vertex and send them to the new WS.
> > One
> > > > > optimization would be to expose a combiner here to reduce message
> > size.
> > > > >
> > > > > tl;dr:
> > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> > > > > 2. how can we efficiently emit 2 different types of records from a
> > > > coGroup?
> > > > > 3. does it make any difference if we group/combine the messages
> > before
> > > > > updating the workset or after?
> > > > >
> > > > > Cheers,
> > > > > -Vasia.
> > > > >
> > > > >
> > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]>
> > wrote:
> > > > >
> > > > > > I'll try to have a look at the proposal from a performance point
> of
> > > > view
> > > > > in
> > > > > > the next days.
> > > > > > Please ping me, if I don't follow up this thread.
> > > > > >
> > > > > > Cheers, Fabian
> > > > > >
> > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
> > [hidden email]
> > > >:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > At our group, we also moved several algorithms from Giraph to
> > Gelly
> > > > and
> > > > > > > ran into some confusing issues (first in understanding, second
> > > during
> > > > > > > implementation) caused by the conceptional differences you
> > > described.
> > > > > > >
> > > > > > > If there are no concrete advantages (performance mainly) in the
> > > > Spargel
> > > > > > > implementation, we would be very happy to see the Gelly API be
> > > > aligned
> > > > > to
> > > > > > > Pregel-like systems.
> > > > > > >
> > > > > > > Your SSSP example speaks for itself. Straightforward, if the
> > reader
> > > > is
> > > > > > > familiar with Pregel/Giraph/...
> > > > > > >
> > > > > > > Best,
> > > > > > > Martin
> > > > > > >
> > > > > > >
> > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > > > > > >
> > > > > > >> Hello squirrels,
> > > > > > >>
> > > > > > >> I want to discuss with you a few concerns I have about our
> > current
> > > > > > >> vertex-centric model implementation, Spargel, now fully
> subsumed
> > > by
> > > > > > Gelly.
> > > > > > >>
> > > > > > >> Spargel is our implementation of Pregel [1], but it violates
> > some
> > > > > > >> fundamental properties of the model, as described in the paper
> > and
> > > > as
> > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself
> > > confused
> > > > > both
> > > > > > >> when trying to explain it to current Giraph users and when
> > porting
> > > > my
> > > > > > >> Giraph algorithms to it.
> > > > > > >>
> > > > > > >> More specifically:
> > > > > > >> - in the Pregel model, messages produced in superstep n, are
> > > > received
> > > > > in
> > > > > > >> superstep n+1. In Spargel, they are produced and consumed in
> the
> > > > same
> > > > > > >> iteration.
> > > > > > >> - in Pregel, vertices are active during a superstep, if they
> > have
> > > > > > received
> > > > > > >> a message in the previous superstep. In Spargel, a vertex is
> > > active
> > > > > > during
> > > > > > >> a superstep if it has changed its value.
> > > > > > >>
> > > > > > >> These two differences require a lot of rethinking when porting
> > > > > > >> applications
> > > > > > >> and can easily cause bugs.
> > > > > > >>
> > > > > > >> The most important problem however is that we require the user
> > to
> > > > > split
> > > > > > >> the
> > > > > > >> computation in 2 phases (2 UDFs):
> > > > > > >> - messaging: has access to the vertex state and can produce
> > > messages
> > > > > > >> - update: has access to incoming messages and can update the
> > > vertex
> > > > > > value
> > > > > > >>
> > > > > > >> Pregel/Giraph only expose one UDF to the user:
> > > > > > >> - compute: has access to both the vertex state and the
> incoming
> > > > > > messages,
> > > > > > >> can produce messages and update the vertex value.
> > > > > > >>
> > > > > > >> This might not seem like a big deal, but except from forcing
> the
> > > > user
> > > > > to
> > > > > > >> split their program logic into 2 phases, Spargel also makes
> some
> > > > > common
> > > > > > >> computation patterns non-intuitive or impossible to write. A
> > very
> > > > > simple
> > > > > > >> example is propagating a message based on its value or sender
> > ID.
> > > To
> > > > > do
> > > > > > >> this with Spargel, one has to store all the incoming messages
> in
> > > the
> > > > > > >> vertex
> > > > > > >> value (might be of different type btw) during the messaging
> > phase,
> > > > so
> > > > > > that
> > > > > > >> they can be accessed during the update phase.
> > > > > > >>
> > > > > > >> So, my first question is, when implementing Spargel, were
> other
> > > > > > >> alternatives considered and maybe rejected in favor of
> > performance
> > > > or
> > > > > > >> because of some other reason? If someone knows, I would love
> to
> > > hear
> > > > > > about
> > > > > > >> them!
> > > > > > >>
> > > > > > >> Second, I wrote a prototype implementation [2] that only
> exposes
> > > one
> > > > > > UDF,
> > > > > > >> compute(), by keeping the vertex state in the solution set and
> > the
> > > > > > >> messages
> > > > > > >> in the workset. This way all previously mentioned limitations
> go
> > > > away
> > > > > > and
> > > > > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a
> > lot
> > > > > more
> > > > > > >> like Giraph (see [4]).
> > > > > > >>
> > > > > > >> I have not run any experiments yet and the prototype has some
> > ugly
> > > > > > hacks,
> > > > > > >> but if you think any of this makes sense, then I'd be willing
> to
> > > > > follow
> > > > > > up
> > > > > > >> and try to optimize it. If we see that it performs well, we
> can
> > > > > consider
> > > > > > >> either replacing Spargel or adding it as an alternative.
> > > > > > >>
> > > > > > >> Thanks for reading this long e-mail and looking forward to
> your
> > > > input!
> > > > > > >>
> > > > > > >> Cheers,
> > > > > > >> -Vasia.
> > > > > > >>
> > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> > > > > > >> [2]:
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > > > > > >> [3]:
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > > > > > >> [4]:
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Vasiliki Kalavri
Hi,

after running a few experiments, I can confirm that putting the combiner
after the flatMap is indeed more efficient.

I ran SSSP and Connected Components with Spargel, GSA, and the Pregel model
and the results are the following:

- for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster and Pregel
is ~1.1x faster without combiner, ~1.3x faster with combiner.
- for Connected Components, Spargel and GSA perform similarly, while Pregel
is 1.4-1.6x slower.

To start with, this is much better than I expected :)
However, there is a main shortcoming in my current implementation that
negatively impacts performance:
Since the compute function coGroup needs to output both new vertex values
and new messages, I emit a wrapping tuple that contains both vertex state
and messages and then filter them out based on a boolean field. The problem
is that since I cannot emit null fields, I emit a dummy message for each
new vertex state and a dummy vertex state for each new message. That
essentially means that the intermediate messages result is double in size,
if say the vertex values are of the same type as the messages (can be worse
if the vertex values are more complex).
So my question is, is there a way to avoid this redundancy, by either
emitting null fields or by creating an operator that could emit 2 different
types of tuples?

Thanks!
-Vasia.

On 9 November 2015 at 15:20, Fabian Hueske <[hidden email]> wrote:

> Hi Vasia,
>
> sorry for the late reply.
> I don't think there is a big difference. In both cases, the partitioning
> and sorting happens at the end of the iteration.
> If the groupReduce is applied before the workset is returned, the sorting
> happens on the filtered result (after the flatMap) which might be a little
> bit more efficient (depending on the ratio of messages and solution set
> updates). Also it does not require that the initial workset is sorted for
> the first groupReduce.
>
> I would put it at the end.
>
> Cheers, Fabian
>
> 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri <[hidden email]>:
>
> > @Fabian
> >
> > Is there any advantage in putting the reducer-combiner before updating
> the
> > workset vs. after (i.e. right before the join with the solution set)?
> >
> > If it helps, here are the plans of these 2 alternatives:
> >
> >
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing
> >
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing
> >
> > Thanks a lot for the help!
> >
> > -Vasia.
> >
> > On 30 October 2015 at 21:28, Fabian Hueske <[hidden email]> wrote:
> >
> > > We can of course inject an optional ReduceFunction (or GroupReduce, or
> > > combinable GroupReduce) to reduce the size of the work set.
> > > I suggested to remove the GroupReduce function, because it did only
> > collect
> > > all messages into a single record by emitting the input iterator which
> is
> > > quite dangerous. Applying a combinable reduce function is could improve
> > the
> > > performance considerably.
> > >
> > > The good news is that it would come "for free" because the necessary
> > > partitioning and sorting can be reused (given the forwardField
> > annotations
> > > are correctly set):
> > > - The partitioning of the reduce can be reused for the join with the
> > > solution set
> > > - The sort of the reduce is preserved by the join with the in-memory
> > > hash-table of the solution set and can be reused for the coGroup.
> > >
> > > Best,
> > > Fabian
> > >
> > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <[hidden email]
> >:
> > >
> > > > Hi Fabian,
> > > >
> > > > thanks so much for looking into this so quickly :-)
> > > >
> > > > One update I have to make is that I tried running a few experiments
> > with
> > > > this on a 6-node cluster. The current implementation gets stuck at
> > > > "Rebuilding Workset Properties" and never finishes a single
> iteration.
> > > > Running the plan of one superstep without a delta iteration
> terminates
> > > > fine. I didn't have access to the cluster today, so I couldn't debug
> > this
> > > > further, but I will do as soon as I have access again.
> > > >
> > > > The rest of my comments are inline:
> > > >
> > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]>
> wrote:
> > > >
> > > > > Hi Vasia,
> > > > >
> > > > > I had a look at your new implementation and have a few ideas for
> > > > > improvements.
> > > > > 1) Sending out the input iterator as you do in the last GroupReduce
> > is
> > > > > quite dangerous and does not give a benefit compared to collecting
> > all
> > > > > elements. Even though it is an iterator, it needs to be completely
> > > > > materialized in-memory whenever the record is touched by Flink or
> > user
> > > > > code.
> > > > > I would propose to skip the reduce step completely and handle all
> > > > messages
> > > > > separates and only collect them in the CoGroup function before
> giving
> > > > them
> > > > > into the VertexComputeFunction. Be careful, to only do that with
> > > > > objectReuse disabled or take care to properly copy the messages. If
> > you
> > > > > collect the messages in the CoGroup, you don't need the
> GroupReduce,
> > > have
> > > > > smaller records and you can remove the MessageIterator class
> > > completely.
> > > > >
> > > >
> > > > ​I see. The idea was to expose to message combiner that user could
> > > > ​implement if the messages are combinable, e.g. min, sum. This is a
> > > common
> > > > case and reduces the message load significantly. Is there a way I
> could
> > > do
> > > > something similar before the coGroup?
> > > >
> > > >
> > > >
> > > > > 2) Add this annotation to the AppendVertexState function:
> > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete
> > > element
> > > > of
> > > > > the first input becomes the first field of the output. Since the
> > input
> > > is
> > > > > partitioned on "f0" (it comes out of the partitioned solution set)
> > the
> > > > > result of ApplyVertexState will be partitioned on "f0.f0" which is
> > > > > (accidentially :-D) the join key of the following coGroup function
> ->
> > > no
> > > > > partitioning :-)
> > > > >
> > > >
> > > > ​Great! I totally missed that ;)​
> > > >
> > > >
> > > >
> > > > > 3) Adding the two flatMap functions behind the CoGroup prevents
> > > chaining
> > > > > and causes therefore some serialization overhead but shouldn't be
> too
> > > > bad.
> > > > >
> > > > > So in total I would make this program as follows:
> > > > >
> > > > > iVertices<K,VV>
> > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet());
> > > > >
> > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> > > > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet()
> > > > >   .join(iteration.workSet())
> > > > >   .where(0) // solution set is local and build side
> > > > >   .equalTo(0) // workset is shuffled and probe side of hashjoin
> > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> > > > > verticesWithMessage.coGroup(edgessWithValue)
> > > > >   .where("f0.f0") // vwm is locally forward and sorted
> > > > >   .equalTo(0) //  edges are already partitioned and sorted (if
> cached
> > > > > correctly)
> > > > >   .with(...) // The coGroup collects all messages in a collection
> and
> > > > gives
> > > > > it to the ComputeFunction
> > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when
> merged
> > > > into
> > > > > solution set
> > > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned for
> > > join
> > > > > iteration.closeWith(delta, workSet)
> > > > >
> > > > > So, if I am correct, the program will
> > > > > - partition the workset
> > > > > - sort the vertices with messages
> > > > > - partition the delta
> > > > >
> > > > > One observation I have is that this program requires that all
> > messages
> > > > fit
> > > > > into memory. Was that also the case before?
> > > > >
> > > >
> > > > ​I believe not. The plan has one coGroup that produces the messages
> > and a
> > > > following coGroup that groups by the messages "target ID" and
> consumes
> > > > them​ in an iterator. That doesn't require them to fit in memory,
> > right?
> > > >
> > > >
> > > > ​I'm also working on a version where the graph is represented as an
> > > > adjacency list, instead of two separate datasets of vertices and
> edges.
> > > The
> > > > disadvantage is that the graph has to fit in memory, but I think the
> > > > advantages are many​. We'll be able to support edge value updates,
> edge
> > > > mutations and different edge access order guarantees. I'll get back
> to
> > > this
> > > > thread when I have a working prototype.
> > > >
> > > >
> > > > >
> > > > > Cheers,
> > > > > Fabian
> > > > >
> > > >
> > > > ​Thanks again!
> > > >
> > > > Cheers,
> > > > -Vasia.
> > > > ​
> > > >
> > > >
> > > > >
> > > > >
> > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <
> > [hidden email]
> > > >:
> > > > >
> > > > > > @Martin: thanks for your input! If you ran into any other issues
> > > that I
> > > > > > didn't mention, please let us know. Obviously, even with my
> > proposal,
> > > > > there
> > > > > > are still features we cannot support, e.g. updating edge values
> and
> > > > graph
> > > > > > mutations. We'll need to re-think the underlying iteration and/or
> > > graph
> > > > > > representation for those.
> > > > > >
> > > > > > @Fabian: thanks a lot, no rush :)
> > > > > > Let me give you some more information that might make it easier
> to
> > > > reason
> > > > > > about performance:
> > > > > >
> > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state
> > and
> > > > the
> > > > > > workset (WS) keeps the active vertices. The iteration is composed
> > of
> > > 2
> > > > > > coGroups. The first one takes the WS and the edges and produces
> > > > messages.
> > > > > > The second one takes the messages and the SS and produced the new
> > WS
> > > > and
> > > > > > the SS-delta.
> > > > > >
> > > > > > In my proposal, the SS has the vertex state and the WS has
> > <vertexId,
> > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan
> is
> > > more
> > > > > > complicated because compute() needs to have two iterators: over
> the
> > > > edges
> > > > > > and over the messages.
> > > > > > First, I join SS and WS to get the active vertices (have
> received a
> > > > msg)
> > > > > > and their current state. Then I coGroup the result with the edges
> > to
> > > > > access
> > > > > > the neighbors. Now the main problem is that this coGroup needs to
> > > have
> > > > 2
> > > > > > outputs: the new messages and the new vertex value. I couldn't
> > really
> > > > > find
> > > > > > a nice way to do this, so I'm emitting a Tuple that contains both
> > > types
> > > > > and
> > > > > > I have a flag to separate them later with 2 flatMaps. From the
> > vertex
> > > > > > flatMap, I crete the SS-delta and from the messaged flatMap I
> > apply a
> > > > > > reduce to group the messages by vertex and send them to the new
> WS.
> > > One
> > > > > > optimization would be to expose a combiner here to reduce message
> > > size.
> > > > > >
> > > > > > tl;dr:
> > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> > > > > > 2. how can we efficiently emit 2 different types of records from
> a
> > > > > coGroup?
> > > > > > 3. does it make any difference if we group/combine the messages
> > > before
> > > > > > updating the workset or after?
> > > > > >
> > > > > > Cheers,
> > > > > > -Vasia.
> > > > > >
> > > > > >
> > > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]>
> > > wrote:
> > > > > >
> > > > > > > I'll try to have a look at the proposal from a performance
> point
> > of
> > > > > view
> > > > > > in
> > > > > > > the next days.
> > > > > > > Please ping me, if I don't follow up this thread.
> > > > > > >
> > > > > > > Cheers, Fabian
> > > > > > >
> > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
> > > [hidden email]
> > > > >:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > At our group, we also moved several algorithms from Giraph to
> > > Gelly
> > > > > and
> > > > > > > > ran into some confusing issues (first in understanding,
> second
> > > > during
> > > > > > > > implementation) caused by the conceptional differences you
> > > > described.
> > > > > > > >
> > > > > > > > If there are no concrete advantages (performance mainly) in
> the
> > > > > Spargel
> > > > > > > > implementation, we would be very happy to see the Gelly API
> be
> > > > > aligned
> > > > > > to
> > > > > > > > Pregel-like systems.
> > > > > > > >
> > > > > > > > Your SSSP example speaks for itself. Straightforward, if the
> > > reader
> > > > > is
> > > > > > > > familiar with Pregel/Giraph/...
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Martin
> > > > > > > >
> > > > > > > >
> > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > > > > > > >
> > > > > > > >> Hello squirrels,
> > > > > > > >>
> > > > > > > >> I want to discuss with you a few concerns I have about our
> > > current
> > > > > > > >> vertex-centric model implementation, Spargel, now fully
> > subsumed
> > > > by
> > > > > > > Gelly.
> > > > > > > >>
> > > > > > > >> Spargel is our implementation of Pregel [1], but it violates
> > > some
> > > > > > > >> fundamental properties of the model, as described in the
> paper
> > > and
> > > > > as
> > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself
> > > > confused
> > > > > > both
> > > > > > > >> when trying to explain it to current Giraph users and when
> > > porting
> > > > > my
> > > > > > > >> Giraph algorithms to it.
> > > > > > > >>
> > > > > > > >> More specifically:
> > > > > > > >> - in the Pregel model, messages produced in superstep n, are
> > > > > received
> > > > > > in
> > > > > > > >> superstep n+1. In Spargel, they are produced and consumed in
> > the
> > > > > same
> > > > > > > >> iteration.
> > > > > > > >> - in Pregel, vertices are active during a superstep, if they
> > > have
> > > > > > > received
> > > > > > > >> a message in the previous superstep. In Spargel, a vertex is
> > > > active
> > > > > > > during
> > > > > > > >> a superstep if it has changed its value.
> > > > > > > >>
> > > > > > > >> These two differences require a lot of rethinking when
> porting
> > > > > > > >> applications
> > > > > > > >> and can easily cause bugs.
> > > > > > > >>
> > > > > > > >> The most important problem however is that we require the
> user
> > > to
> > > > > > split
> > > > > > > >> the
> > > > > > > >> computation in 2 phases (2 UDFs):
> > > > > > > >> - messaging: has access to the vertex state and can produce
> > > > messages
> > > > > > > >> - update: has access to incoming messages and can update the
> > > > vertex
> > > > > > > value
> > > > > > > >>
> > > > > > > >> Pregel/Giraph only expose one UDF to the user:
> > > > > > > >> - compute: has access to both the vertex state and the
> > incoming
> > > > > > > messages,
> > > > > > > >> can produce messages and update the vertex value.
> > > > > > > >>
> > > > > > > >> This might not seem like a big deal, but except from forcing
> > the
> > > > > user
> > > > > > to
> > > > > > > >> split their program logic into 2 phases, Spargel also makes
> > some
> > > > > > common
> > > > > > > >> computation patterns non-intuitive or impossible to write. A
> > > very
> > > > > > simple
> > > > > > > >> example is propagating a message based on its value or
> sender
> > > ID.
> > > > To
> > > > > > do
> > > > > > > >> this with Spargel, one has to store all the incoming
> messages
> > in
> > > > the
> > > > > > > >> vertex
> > > > > > > >> value (might be of different type btw) during the messaging
> > > phase,
> > > > > so
> > > > > > > that
> > > > > > > >> they can be accessed during the update phase.
> > > > > > > >>
> > > > > > > >> So, my first question is, when implementing Spargel, were
> > other
> > > > > > > >> alternatives considered and maybe rejected in favor of
> > > performance
> > > > > or
> > > > > > > >> because of some other reason? If someone knows, I would love
> > to
> > > > hear
> > > > > > > about
> > > > > > > >> them!
> > > > > > > >>
> > > > > > > >> Second, I wrote a prototype implementation [2] that only
> > exposes
> > > > one
> > > > > > > UDF,
> > > > > > > >> compute(), by keeping the vertex state in the solution set
> and
> > > the
> > > > > > > >> messages
> > > > > > > >> in the workset. This way all previously mentioned
> limitations
> > go
> > > > > away
> > > > > > > and
> > > > > > > >> the API (see "SSSPComputeFunction" in the example [3])
> looks a
> > > lot
> > > > > > more
> > > > > > > >> like Giraph (see [4]).
> > > > > > > >>
> > > > > > > >> I have not run any experiments yet and the prototype has
> some
> > > ugly
> > > > > > > hacks,
> > > > > > > >> but if you think any of this makes sense, then I'd be
> willing
> > to
> > > > > > follow
> > > > > > > up
> > > > > > > >> and try to optimize it. If we see that it performs well, we
> > can
> > > > > > consider
> > > > > > > >> either replacing Spargel or adding it as an alternative.
> > > > > > > >>
> > > > > > > >> Thanks for reading this long e-mail and looking forward to
> > your
> > > > > input!
> > > > > > > >>
> > > > > > > >> Cheers,
> > > > > > > >> -Vasia.
> > > > > > > >>
> > > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> > > > > > > >> [2]:
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > > > > > > >> [3]:
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > > > > > > >> [4]:
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Fabian Hueske-2
You could implement a Java Either type (similar to Scala's Either) that
either has a Message or the VertexState and a corresponding TypeInformation
and TypeSerializer that serializes a byte flag to indicate which both types
is used.
It might actually make sense, to add a generic Either type to the Java API
in general (similar to the Java Tuples with resemble the Scala Tuples).

Cheers, Fabian

2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri <[hidden email]>:

> Hi,
>
> after running a few experiments, I can confirm that putting the combiner
> after the flatMap is indeed more efficient.
>
> I ran SSSP and Connected Components with Spargel, GSA, and the Pregel model
> and the results are the following:
>
> - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster and Pregel
> is ~1.1x faster without combiner, ~1.3x faster with combiner.
> - for Connected Components, Spargel and GSA perform similarly, while Pregel
> is 1.4-1.6x slower.
>
> To start with, this is much better than I expected :)
> However, there is a main shortcoming in my current implementation that
> negatively impacts performance:
> Since the compute function coGroup needs to output both new vertex values
> and new messages, I emit a wrapping tuple that contains both vertex state
> and messages and then filter them out based on a boolean field. The problem
> is that since I cannot emit null fields, I emit a dummy message for each
> new vertex state and a dummy vertex state for each new message. That
> essentially means that the intermediate messages result is double in size,
> if say the vertex values are of the same type as the messages (can be worse
> if the vertex values are more complex).
> So my question is, is there a way to avoid this redundancy, by either
> emitting null fields or by creating an operator that could emit 2 different
> types of tuples?
>
> Thanks!
> -Vasia.
>
> On 9 November 2015 at 15:20, Fabian Hueske <[hidden email]> wrote:
>
> > Hi Vasia,
> >
> > sorry for the late reply.
> > I don't think there is a big difference. In both cases, the partitioning
> > and sorting happens at the end of the iteration.
> > If the groupReduce is applied before the workset is returned, the sorting
> > happens on the filtered result (after the flatMap) which might be a
> little
> > bit more efficient (depending on the ratio of messages and solution set
> > updates). Also it does not require that the initial workset is sorted for
> > the first groupReduce.
> >
> > I would put it at the end.
> >
> > Cheers, Fabian
> >
> > 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri <[hidden email]>:
> >
> > > @Fabian
> > >
> > > Is there any advantage in putting the reducer-combiner before updating
> > the
> > > workset vs. after (i.e. right before the join with the solution set)?
> > >
> > > If it helps, here are the plans of these 2 alternatives:
> > >
> > >
> > >
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing
> > >
> > >
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing
> > >
> > > Thanks a lot for the help!
> > >
> > > -Vasia.
> > >
> > > On 30 October 2015 at 21:28, Fabian Hueske <[hidden email]> wrote:
> > >
> > > > We can of course inject an optional ReduceFunction (or GroupReduce,
> or
> > > > combinable GroupReduce) to reduce the size of the work set.
> > > > I suggested to remove the GroupReduce function, because it did only
> > > collect
> > > > all messages into a single record by emitting the input iterator
> which
> > is
> > > > quite dangerous. Applying a combinable reduce function is could
> improve
> > > the
> > > > performance considerably.
> > > >
> > > > The good news is that it would come "for free" because the necessary
> > > > partitioning and sorting can be reused (given the forwardField
> > > annotations
> > > > are correctly set):
> > > > - The partitioning of the reduce can be reused for the join with the
> > > > solution set
> > > > - The sort of the reduce is preserved by the join with the in-memory
> > > > hash-table of the solution set and can be reused for the coGroup.
> > > >
> > > > Best,
> > > > Fabian
> > > >
> > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <
> [hidden email]
> > >:
> > > >
> > > > > Hi Fabian,
> > > > >
> > > > > thanks so much for looking into this so quickly :-)
> > > > >
> > > > > One update I have to make is that I tried running a few experiments
> > > with
> > > > > this on a 6-node cluster. The current implementation gets stuck at
> > > > > "Rebuilding Workset Properties" and never finishes a single
> > iteration.
> > > > > Running the plan of one superstep without a delta iteration
> > terminates
> > > > > fine. I didn't have access to the cluster today, so I couldn't
> debug
> > > this
> > > > > further, but I will do as soon as I have access again.
> > > > >
> > > > > The rest of my comments are inline:
> > > > >
> > > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]>
> > wrote:
> > > > >
> > > > > > Hi Vasia,
> > > > > >
> > > > > > I had a look at your new implementation and have a few ideas for
> > > > > > improvements.
> > > > > > 1) Sending out the input iterator as you do in the last
> GroupReduce
> > > is
> > > > > > quite dangerous and does not give a benefit compared to
> collecting
> > > all
> > > > > > elements. Even though it is an iterator, it needs to be
> completely
> > > > > > materialized in-memory whenever the record is touched by Flink or
> > > user
> > > > > > code.
> > > > > > I would propose to skip the reduce step completely and handle all
> > > > > messages
> > > > > > separates and only collect them in the CoGroup function before
> > giving
> > > > > them
> > > > > > into the VertexComputeFunction. Be careful, to only do that with
> > > > > > objectReuse disabled or take care to properly copy the messages.
> If
> > > you
> > > > > > collect the messages in the CoGroup, you don't need the
> > GroupReduce,
> > > > have
> > > > > > smaller records and you can remove the MessageIterator class
> > > > completely.
> > > > > >
> > > > >
> > > > > ​I see. The idea was to expose to message combiner that user could
> > > > > ​implement if the messages are combinable, e.g. min, sum. This is a
> > > > common
> > > > > case and reduces the message load significantly. Is there a way I
> > could
> > > > do
> > > > > something similar before the coGroup?
> > > > >
> > > > >
> > > > >
> > > > > > 2) Add this annotation to the AppendVertexState function:
> > > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete
> > > > element
> > > > > of
> > > > > > the first input becomes the first field of the output. Since the
> > > input
> > > > is
> > > > > > partitioned on "f0" (it comes out of the partitioned solution
> set)
> > > the
> > > > > > result of ApplyVertexState will be partitioned on "f0.f0" which
> is
> > > > > > (accidentially :-D) the join key of the following coGroup
> function
> > ->
> > > > no
> > > > > > partitioning :-)
> > > > > >
> > > > >
> > > > > ​Great! I totally missed that ;)​
> > > > >
> > > > >
> > > > >
> > > > > > 3) Adding the two flatMap functions behind the CoGroup prevents
> > > > chaining
> > > > > > and causes therefore some serialization overhead but shouldn't be
> > too
> > > > > bad.
> > > > > >
> > > > > > So in total I would make this program as follows:
> > > > > >
> > > > > > iVertices<K,VV>
> > > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet());
> > > > > >
> > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> > > > > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet()
> > > > > >   .join(iteration.workSet())
> > > > > >   .where(0) // solution set is local and build side
> > > > > >   .equalTo(0) // workset is shuffled and probe side of hashjoin
> > > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> > > > > > verticesWithMessage.coGroup(edgessWithValue)
> > > > > >   .where("f0.f0") // vwm is locally forward and sorted
> > > > > >   .equalTo(0) //  edges are already partitioned and sorted (if
> > cached
> > > > > > correctly)
> > > > > >   .with(...) // The coGroup collects all messages in a collection
> > and
> > > > > gives
> > > > > > it to the ComputeFunction
> > > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when
> > merged
> > > > > into
> > > > > > solution set
> > > > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned
> for
> > > > join
> > > > > > iteration.closeWith(delta, workSet)
> > > > > >
> > > > > > So, if I am correct, the program will
> > > > > > - partition the workset
> > > > > > - sort the vertices with messages
> > > > > > - partition the delta
> > > > > >
> > > > > > One observation I have is that this program requires that all
> > > messages
> > > > > fit
> > > > > > into memory. Was that also the case before?
> > > > > >
> > > > >
> > > > > ​I believe not. The plan has one coGroup that produces the messages
> > > and a
> > > > > following coGroup that groups by the messages "target ID" and
> > consumes
> > > > > them​ in an iterator. That doesn't require them to fit in memory,
> > > right?
> > > > >
> > > > >
> > > > > ​I'm also working on a version where the graph is represented as an
> > > > > adjacency list, instead of two separate datasets of vertices and
> > edges.
> > > > The
> > > > > disadvantage is that the graph has to fit in memory, but I think
> the
> > > > > advantages are many​. We'll be able to support edge value updates,
> > edge
> > > > > mutations and different edge access order guarantees. I'll get back
> > to
> > > > this
> > > > > thread when I have a working prototype.
> > > > >
> > > > >
> > > > > >
> > > > > > Cheers,
> > > > > > Fabian
> > > > > >
> > > > >
> > > > > ​Thanks again!
> > > > >
> > > > > Cheers,
> > > > > -Vasia.
> > > > > ​
> > > > >
> > > > >
> > > > > >
> > > > > >
> > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <
> > > [hidden email]
> > > > >:
> > > > > >
> > > > > > > @Martin: thanks for your input! If you ran into any other
> issues
> > > > that I
> > > > > > > didn't mention, please let us know. Obviously, even with my
> > > proposal,
> > > > > > there
> > > > > > > are still features we cannot support, e.g. updating edge values
> > and
> > > > > graph
> > > > > > > mutations. We'll need to re-think the underlying iteration
> and/or
> > > > graph
> > > > > > > representation for those.
> > > > > > >
> > > > > > > @Fabian: thanks a lot, no rush :)
> > > > > > > Let me give you some more information that might make it easier
> > to
> > > > > reason
> > > > > > > about performance:
> > > > > > >
> > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex
> state
> > > and
> > > > > the
> > > > > > > workset (WS) keeps the active vertices. The iteration is
> composed
> > > of
> > > > 2
> > > > > > > coGroups. The first one takes the WS and the edges and produces
> > > > > messages.
> > > > > > > The second one takes the messages and the SS and produced the
> new
> > > WS
> > > > > and
> > > > > > > the SS-delta.
> > > > > > >
> > > > > > > In my proposal, the SS has the vertex state and the WS has
> > > <vertexId,
> > > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan
> > is
> > > > more
> > > > > > > complicated because compute() needs to have two iterators: over
> > the
> > > > > edges
> > > > > > > and over the messages.
> > > > > > > First, I join SS and WS to get the active vertices (have
> > received a
> > > > > msg)
> > > > > > > and their current state. Then I coGroup the result with the
> edges
> > > to
> > > > > > access
> > > > > > > the neighbors. Now the main problem is that this coGroup needs
> to
> > > > have
> > > > > 2
> > > > > > > outputs: the new messages and the new vertex value. I couldn't
> > > really
> > > > > > find
> > > > > > > a nice way to do this, so I'm emitting a Tuple that contains
> both
> > > > types
> > > > > > and
> > > > > > > I have a flag to separate them later with 2 flatMaps. From the
> > > vertex
> > > > > > > flatMap, I crete the SS-delta and from the messaged flatMap I
> > > apply a
> > > > > > > reduce to group the messages by vertex and send them to the new
> > WS.
> > > > One
> > > > > > > optimization would be to expose a combiner here to reduce
> message
> > > > size.
> > > > > > >
> > > > > > > tl;dr:
> > > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> > > > > > > 2. how can we efficiently emit 2 different types of records
> from
> > a
> > > > > > coGroup?
> > > > > > > 3. does it make any difference if we group/combine the messages
> > > > before
> > > > > > > updating the workset or after?
> > > > > > >
> > > > > > > Cheers,
> > > > > > > -Vasia.
> > > > > > >
> > > > > > >
> > > > > > > On 27 October 2015 at 18:39, Fabian Hueske <[hidden email]>
> > > > wrote:
> > > > > > >
> > > > > > > > I'll try to have a look at the proposal from a performance
> > point
> > > of
> > > > > > view
> > > > > > > in
> > > > > > > > the next days.
> > > > > > > > Please ping me, if I don't follow up this thread.
> > > > > > > >
> > > > > > > > Cheers, Fabian
> > > > > > > >
> > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
> > > > [hidden email]
> > > > > >:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > At our group, we also moved several algorithms from Giraph
> to
> > > > Gelly
> > > > > > and
> > > > > > > > > ran into some confusing issues (first in understanding,
> > second
> > > > > during
> > > > > > > > > implementation) caused by the conceptional differences you
> > > > > described.
> > > > > > > > >
> > > > > > > > > If there are no concrete advantages (performance mainly) in
> > the
> > > > > > Spargel
> > > > > > > > > implementation, we would be very happy to see the Gelly API
> > be
> > > > > > aligned
> > > > > > > to
> > > > > > > > > Pregel-like systems.
> > > > > > > > >
> > > > > > > > > Your SSSP example speaks for itself. Straightforward, if
> the
> > > > reader
> > > > > > is
> > > > > > > > > familiar with Pregel/Giraph/...
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Martin
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > > > > > > > >
> > > > > > > > >> Hello squirrels,
> > > > > > > > >>
> > > > > > > > >> I want to discuss with you a few concerns I have about our
> > > > current
> > > > > > > > >> vertex-centric model implementation, Spargel, now fully
> > > subsumed
> > > > > by
> > > > > > > > Gelly.
> > > > > > > > >>
> > > > > > > > >> Spargel is our implementation of Pregel [1], but it
> violates
> > > > some
> > > > > > > > >> fundamental properties of the model, as described in the
> > paper
> > > > and
> > > > > > as
> > > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself
> > > > > confused
> > > > > > > both
> > > > > > > > >> when trying to explain it to current Giraph users and when
> > > > porting
> > > > > > my
> > > > > > > > >> Giraph algorithms to it.
> > > > > > > > >>
> > > > > > > > >> More specifically:
> > > > > > > > >> - in the Pregel model, messages produced in superstep n,
> are
> > > > > > received
> > > > > > > in
> > > > > > > > >> superstep n+1. In Spargel, they are produced and consumed
> in
> > > the
> > > > > > same
> > > > > > > > >> iteration.
> > > > > > > > >> - in Pregel, vertices are active during a superstep, if
> they
> > > > have
> > > > > > > > received
> > > > > > > > >> a message in the previous superstep. In Spargel, a vertex
> is
> > > > > active
> > > > > > > > during
> > > > > > > > >> a superstep if it has changed its value.
> > > > > > > > >>
> > > > > > > > >> These two differences require a lot of rethinking when
> > porting
> > > > > > > > >> applications
> > > > > > > > >> and can easily cause bugs.
> > > > > > > > >>
> > > > > > > > >> The most important problem however is that we require the
> > user
> > > > to
> > > > > > > split
> > > > > > > > >> the
> > > > > > > > >> computation in 2 phases (2 UDFs):
> > > > > > > > >> - messaging: has access to the vertex state and can
> produce
> > > > > messages
> > > > > > > > >> - update: has access to incoming messages and can update
> the
> > > > > vertex
> > > > > > > > value
> > > > > > > > >>
> > > > > > > > >> Pregel/Giraph only expose one UDF to the user:
> > > > > > > > >> - compute: has access to both the vertex state and the
> > > incoming
> > > > > > > > messages,
> > > > > > > > >> can produce messages and update the vertex value.
> > > > > > > > >>
> > > > > > > > >> This might not seem like a big deal, but except from
> forcing
> > > the
> > > > > > user
> > > > > > > to
> > > > > > > > >> split their program logic into 2 phases, Spargel also
> makes
> > > some
> > > > > > > common
> > > > > > > > >> computation patterns non-intuitive or impossible to
> write. A
> > > > very
> > > > > > > simple
> > > > > > > > >> example is propagating a message based on its value or
> > sender
> > > > ID.
> > > > > To
> > > > > > > do
> > > > > > > > >> this with Spargel, one has to store all the incoming
> > messages
> > > in
> > > > > the
> > > > > > > > >> vertex
> > > > > > > > >> value (might be of different type btw) during the
> messaging
> > > > phase,
> > > > > > so
> > > > > > > > that
> > > > > > > > >> they can be accessed during the update phase.
> > > > > > > > >>
> > > > > > > > >> So, my first question is, when implementing Spargel, were
> > > other
> > > > > > > > >> alternatives considered and maybe rejected in favor of
> > > > performance
> > > > > > or
> > > > > > > > >> because of some other reason? If someone knows, I would
> love
> > > to
> > > > > hear
> > > > > > > > about
> > > > > > > > >> them!
> > > > > > > > >>
> > > > > > > > >> Second, I wrote a prototype implementation [2] that only
> > > exposes
> > > > > one
> > > > > > > > UDF,
> > > > > > > > >> compute(), by keeping the vertex state in the solution set
> > and
> > > > the
> > > > > > > > >> messages
> > > > > > > > >> in the workset. This way all previously mentioned
> > limitations
> > > go
> > > > > > away
> > > > > > > > and
> > > > > > > > >> the API (see "SSSPComputeFunction" in the example [3])
> > looks a
> > > > lot
> > > > > > > more
> > > > > > > > >> like Giraph (see [4]).
> > > > > > > > >>
> > > > > > > > >> I have not run any experiments yet and the prototype has
> > some
> > > > ugly
> > > > > > > > hacks,
> > > > > > > > >> but if you think any of this makes sense, then I'd be
> > willing
> > > to
> > > > > > > follow
> > > > > > > > up
> > > > > > > > >> and try to optimize it. If we see that it performs well,
> we
> > > can
> > > > > > > consider
> > > > > > > > >> either replacing Spargel or adding it as an alternative.
> > > > > > > > >>
> > > > > > > > >> Thanks for reading this long e-mail and looking forward to
> > > your
> > > > > > input!
> > > > > > > > >>
> > > > > > > > >> Cheers,
> > > > > > > > >> -Vasia.
> > > > > > > > >>
> > > > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> > > > > > > > >> [2]:
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > > > > > > > >> [3]:
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > > > > > > > >> [4]:
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Vasiliki Kalavri
Thanks Fabian! I'll try that :)

On 10 November 2015 at 22:31, Fabian Hueske <[hidden email]> wrote:

> You could implement a Java Either type (similar to Scala's Either) that
> either has a Message or the VertexState and a corresponding TypeInformation
> and TypeSerializer that serializes a byte flag to indicate which both types
> is used.
> It might actually make sense, to add a generic Either type to the Java API
> in general (similar to the Java Tuples with resemble the Scala Tuples).
>
> Cheers, Fabian
>
> 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri <[hidden email]>:
>
> > Hi,
> >
> > after running a few experiments, I can confirm that putting the combiner
> > after the flatMap is indeed more efficient.
> >
> > I ran SSSP and Connected Components with Spargel, GSA, and the Pregel
> model
> > and the results are the following:
> >
> > - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster and
> Pregel
> > is ~1.1x faster without combiner, ~1.3x faster with combiner.
> > - for Connected Components, Spargel and GSA perform similarly, while
> Pregel
> > is 1.4-1.6x slower.
> >
> > To start with, this is much better than I expected :)
> > However, there is a main shortcoming in my current implementation that
> > negatively impacts performance:
> > Since the compute function coGroup needs to output both new vertex values
> > and new messages, I emit a wrapping tuple that contains both vertex state
> > and messages and then filter them out based on a boolean field. The
> problem
> > is that since I cannot emit null fields, I emit a dummy message for each
> > new vertex state and a dummy vertex state for each new message. That
> > essentially means that the intermediate messages result is double in
> size,
> > if say the vertex values are of the same type as the messages (can be
> worse
> > if the vertex values are more complex).
> > So my question is, is there a way to avoid this redundancy, by either
> > emitting null fields or by creating an operator that could emit 2
> different
> > types of tuples?
> >
> > Thanks!
> > -Vasia.
> >
> > On 9 November 2015 at 15:20, Fabian Hueske <[hidden email]> wrote:
> >
> > > Hi Vasia,
> > >
> > > sorry for the late reply.
> > > I don't think there is a big difference. In both cases, the
> partitioning
> > > and sorting happens at the end of the iteration.
> > > If the groupReduce is applied before the workset is returned, the
> sorting
> > > happens on the filtered result (after the flatMap) which might be a
> > little
> > > bit more efficient (depending on the ratio of messages and solution set
> > > updates). Also it does not require that the initial workset is sorted
> for
> > > the first groupReduce.
> > >
> > > I would put it at the end.
> > >
> > > Cheers, Fabian
> > >
> > > 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri <[hidden email]
> >:
> > >
> > > > @Fabian
> > > >
> > > > Is there any advantage in putting the reducer-combiner before
> updating
> > > the
> > > > workset vs. after (i.e. right before the join with the solution set)?
> > > >
> > > > If it helps, here are the plans of these 2 alternatives:
> > > >
> > > >
> > > >
> > >
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing
> > > >
> > > >
> > >
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing
> > > >
> > > > Thanks a lot for the help!
> > > >
> > > > -Vasia.
> > > >
> > > > On 30 October 2015 at 21:28, Fabian Hueske <[hidden email]>
> wrote:
> > > >
> > > > > We can of course inject an optional ReduceFunction (or GroupReduce,
> > or
> > > > > combinable GroupReduce) to reduce the size of the work set.
> > > > > I suggested to remove the GroupReduce function, because it did only
> > > > collect
> > > > > all messages into a single record by emitting the input iterator
> > which
> > > is
> > > > > quite dangerous. Applying a combinable reduce function is could
> > improve
> > > > the
> > > > > performance considerably.
> > > > >
> > > > > The good news is that it would come "for free" because the
> necessary
> > > > > partitioning and sorting can be reused (given the forwardField
> > > > annotations
> > > > > are correctly set):
> > > > > - The partitioning of the reduce can be reused for the join with
> the
> > > > > solution set
> > > > > - The sort of the reduce is preserved by the join with the
> in-memory
> > > > > hash-table of the solution set and can be reused for the coGroup.
> > > > >
> > > > > Best,
> > > > > Fabian
> > > > >
> > > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <
> > [hidden email]
> > > >:
> > > > >
> > > > > > Hi Fabian,
> > > > > >
> > > > > > thanks so much for looking into this so quickly :-)
> > > > > >
> > > > > > One update I have to make is that I tried running a few
> experiments
> > > > with
> > > > > > this on a 6-node cluster. The current implementation gets stuck
> at
> > > > > > "Rebuilding Workset Properties" and never finishes a single
> > > iteration.
> > > > > > Running the plan of one superstep without a delta iteration
> > > terminates
> > > > > > fine. I didn't have access to the cluster today, so I couldn't
> > debug
> > > > this
> > > > > > further, but I will do as soon as I have access again.
> > > > > >
> > > > > > The rest of my comments are inline:
> > > > > >
> > > > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]>
> > > wrote:
> > > > > >
> > > > > > > Hi Vasia,
> > > > > > >
> > > > > > > I had a look at your new implementation and have a few ideas
> for
> > > > > > > improvements.
> > > > > > > 1) Sending out the input iterator as you do in the last
> > GroupReduce
> > > > is
> > > > > > > quite dangerous and does not give a benefit compared to
> > collecting
> > > > all
> > > > > > > elements. Even though it is an iterator, it needs to be
> > completely
> > > > > > > materialized in-memory whenever the record is touched by Flink
> or
> > > > user
> > > > > > > code.
> > > > > > > I would propose to skip the reduce step completely and handle
> all
> > > > > > messages
> > > > > > > separates and only collect them in the CoGroup function before
> > > giving
> > > > > > them
> > > > > > > into the VertexComputeFunction. Be careful, to only do that
> with
> > > > > > > objectReuse disabled or take care to properly copy the
> messages.
> > If
> > > > you
> > > > > > > collect the messages in the CoGroup, you don't need the
> > > GroupReduce,
> > > > > have
> > > > > > > smaller records and you can remove the MessageIterator class
> > > > > completely.
> > > > > > >
> > > > > >
> > > > > > ​I see. The idea was to expose to message combiner that user
> could
> > > > > > ​implement if the messages are combinable, e.g. min, sum. This
> is a
> > > > > common
> > > > > > case and reduces the message load significantly. Is there a way I
> > > could
> > > > > do
> > > > > > something similar before the coGroup?
> > > > > >
> > > > > >
> > > > > >
> > > > > > > 2) Add this annotation to the AppendVertexState function:
> > > > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the
> complete
> > > > > element
> > > > > > of
> > > > > > > the first input becomes the first field of the output. Since
> the
> > > > input
> > > > > is
> > > > > > > partitioned on "f0" (it comes out of the partitioned solution
> > set)
> > > > the
> > > > > > > result of ApplyVertexState will be partitioned on "f0.f0" which
> > is
> > > > > > > (accidentially :-D) the join key of the following coGroup
> > function
> > > ->
> > > > > no
> > > > > > > partitioning :-)
> > > > > > >
> > > > > >
> > > > > > ​Great! I totally missed that ;)​
> > > > > >
> > > > > >
> > > > > >
> > > > > > > 3) Adding the two flatMap functions behind the CoGroup prevents
> > > > > chaining
> > > > > > > and causes therefore some serialization overhead but shouldn't
> be
> > > too
> > > > > > bad.
> > > > > > >
> > > > > > > So in total I would make this program as follows:
> > > > > > >
> > > > > > > iVertices<K,VV>
> > > > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet());
> > > > > > >
> > > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> > > > > > > verticesWithMessage<Vertex, Message> =
> iteration.getSolutionSet()
> > > > > > >   .join(iteration.workSet())
> > > > > > >   .where(0) // solution set is local and build side
> > > > > > >   .equalTo(0) // workset is shuffled and probe side of hashjoin
> > > > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> > > > > > > verticesWithMessage.coGroup(edgessWithValue)
> > > > > > >   .where("f0.f0") // vwm is locally forward and sorted
> > > > > > >   .equalTo(0) //  edges are already partitioned and sorted (if
> > > cached
> > > > > > > correctly)
> > > > > > >   .with(...) // The coGroup collects all messages in a
> collection
> > > and
> > > > > > gives
> > > > > > > it to the ComputeFunction
> > > > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when
> > > merged
> > > > > > into
> > > > > > > solution set
> > > > > > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned
> > for
> > > > > join
> > > > > > > iteration.closeWith(delta, workSet)
> > > > > > >
> > > > > > > So, if I am correct, the program will
> > > > > > > - partition the workset
> > > > > > > - sort the vertices with messages
> > > > > > > - partition the delta
> > > > > > >
> > > > > > > One observation I have is that this program requires that all
> > > > messages
> > > > > > fit
> > > > > > > into memory. Was that also the case before?
> > > > > > >
> > > > > >
> > > > > > ​I believe not. The plan has one coGroup that produces the
> messages
> > > > and a
> > > > > > following coGroup that groups by the messages "target ID" and
> > > consumes
> > > > > > them​ in an iterator. That doesn't require them to fit in memory,
> > > > right?
> > > > > >
> > > > > >
> > > > > > ​I'm also working on a version where the graph is represented as
> an
> > > > > > adjacency list, instead of two separate datasets of vertices and
> > > edges.
> > > > > The
> > > > > > disadvantage is that the graph has to fit in memory, but I think
> > the
> > > > > > advantages are many​. We'll be able to support edge value
> updates,
> > > edge
> > > > > > mutations and different edge access order guarantees. I'll get
> back
> > > to
> > > > > this
> > > > > > thread when I have a working prototype.
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Fabian
> > > > > > >
> > > > > >
> > > > > > ​Thanks again!
> > > > > >
> > > > > > Cheers,
> > > > > > -Vasia.
> > > > > > ​
> > > > > >
> > > > > >
> > > > > > >
> > > > > > >
> > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <
> > > > [hidden email]
> > > > > >:
> > > > > > >
> > > > > > > > @Martin: thanks for your input! If you ran into any other
> > issues
> > > > > that I
> > > > > > > > didn't mention, please let us know. Obviously, even with my
> > > > proposal,
> > > > > > > there
> > > > > > > > are still features we cannot support, e.g. updating edge
> values
> > > and
> > > > > > graph
> > > > > > > > mutations. We'll need to re-think the underlying iteration
> > and/or
> > > > > graph
> > > > > > > > representation for those.
> > > > > > > >
> > > > > > > > @Fabian: thanks a lot, no rush :)
> > > > > > > > Let me give you some more information that might make it
> easier
> > > to
> > > > > > reason
> > > > > > > > about performance:
> > > > > > > >
> > > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex
> > state
> > > > and
> > > > > > the
> > > > > > > > workset (WS) keeps the active vertices. The iteration is
> > composed
> > > > of
> > > > > 2
> > > > > > > > coGroups. The first one takes the WS and the edges and
> produces
> > > > > > messages.
> > > > > > > > The second one takes the messages and the SS and produced the
> > new
> > > > WS
> > > > > > and
> > > > > > > > the SS-delta.
> > > > > > > >
> > > > > > > > In my proposal, the SS has the vertex state and the WS has
> > > > <vertexId,
> > > > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The
> plan
> > > is
> > > > > more
> > > > > > > > complicated because compute() needs to have two iterators:
> over
> > > the
> > > > > > edges
> > > > > > > > and over the messages.
> > > > > > > > First, I join SS and WS to get the active vertices (have
> > > received a
> > > > > > msg)
> > > > > > > > and their current state. Then I coGroup the result with the
> > edges
> > > > to
> > > > > > > access
> > > > > > > > the neighbors. Now the main problem is that this coGroup
> needs
> > to
> > > > > have
> > > > > > 2
> > > > > > > > outputs: the new messages and the new vertex value. I
> couldn't
> > > > really
> > > > > > > find
> > > > > > > > a nice way to do this, so I'm emitting a Tuple that contains
> > both
> > > > > types
> > > > > > > and
> > > > > > > > I have a flag to separate them later with 2 flatMaps. From
> the
> > > > vertex
> > > > > > > > flatMap, I crete the SS-delta and from the messaged flatMap I
> > > > apply a
> > > > > > > > reduce to group the messages by vertex and send them to the
> new
> > > WS.
> > > > > One
> > > > > > > > optimization would be to expose a combiner here to reduce
> > message
> > > > > size.
> > > > > > > >
> > > > > > > > tl;dr:
> > > > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> > > > > > > > 2. how can we efficiently emit 2 different types of records
> > from
> > > a
> > > > > > > coGroup?
> > > > > > > > 3. does it make any difference if we group/combine the
> messages
> > > > > before
> > > > > > > > updating the workset or after?
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > -Vasia.
> > > > > > > >
> > > > > > > >
> > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske <
> [hidden email]>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > I'll try to have a look at the proposal from a performance
> > > point
> > > > of
> > > > > > > view
> > > > > > > > in
> > > > > > > > > the next days.
> > > > > > > > > Please ping me, if I don't follow up this thread.
> > > > > > > > >
> > > > > > > > > Cheers, Fabian
> > > > > > > > >
> > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
> > > > > [hidden email]
> > > > > > >:
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > >
> > > > > > > > > > At our group, we also moved several algorithms from
> Giraph
> > to
> > > > > Gelly
> > > > > > > and
> > > > > > > > > > ran into some confusing issues (first in understanding,
> > > second
> > > > > > during
> > > > > > > > > > implementation) caused by the conceptional differences
> you
> > > > > > described.
> > > > > > > > > >
> > > > > > > > > > If there are no concrete advantages (performance mainly)
> in
> > > the
> > > > > > > Spargel
> > > > > > > > > > implementation, we would be very happy to see the Gelly
> API
> > > be
> > > > > > > aligned
> > > > > > > > to
> > > > > > > > > > Pregel-like systems.
> > > > > > > > > >
> > > > > > > > > > Your SSSP example speaks for itself. Straightforward, if
> > the
> > > > > reader
> > > > > > > is
> > > > > > > > > > familiar with Pregel/Giraph/...
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Martin
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > > > > > > > > >
> > > > > > > > > >> Hello squirrels,
> > > > > > > > > >>
> > > > > > > > > >> I want to discuss with you a few concerns I have about
> our
> > > > > current
> > > > > > > > > >> vertex-centric model implementation, Spargel, now fully
> > > > subsumed
> > > > > > by
> > > > > > > > > Gelly.
> > > > > > > > > >>
> > > > > > > > > >> Spargel is our implementation of Pregel [1], but it
> > violates
> > > > > some
> > > > > > > > > >> fundamental properties of the model, as described in the
> > > paper
> > > > > and
> > > > > > > as
> > > > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find
> myself
> > > > > > confused
> > > > > > > > both
> > > > > > > > > >> when trying to explain it to current Giraph users and
> when
> > > > > porting
> > > > > > > my
> > > > > > > > > >> Giraph algorithms to it.
> > > > > > > > > >>
> > > > > > > > > >> More specifically:
> > > > > > > > > >> - in the Pregel model, messages produced in superstep n,
> > are
> > > > > > > received
> > > > > > > > in
> > > > > > > > > >> superstep n+1. In Spargel, they are produced and
> consumed
> > in
> > > > the
> > > > > > > same
> > > > > > > > > >> iteration.
> > > > > > > > > >> - in Pregel, vertices are active during a superstep, if
> > they
> > > > > have
> > > > > > > > > received
> > > > > > > > > >> a message in the previous superstep. In Spargel, a
> vertex
> > is
> > > > > > active
> > > > > > > > > during
> > > > > > > > > >> a superstep if it has changed its value.
> > > > > > > > > >>
> > > > > > > > > >> These two differences require a lot of rethinking when
> > > porting
> > > > > > > > > >> applications
> > > > > > > > > >> and can easily cause bugs.
> > > > > > > > > >>
> > > > > > > > > >> The most important problem however is that we require
> the
> > > user
> > > > > to
> > > > > > > > split
> > > > > > > > > >> the
> > > > > > > > > >> computation in 2 phases (2 UDFs):
> > > > > > > > > >> - messaging: has access to the vertex state and can
> > produce
> > > > > > messages
> > > > > > > > > >> - update: has access to incoming messages and can update
> > the
> > > > > > vertex
> > > > > > > > > value
> > > > > > > > > >>
> > > > > > > > > >> Pregel/Giraph only expose one UDF to the user:
> > > > > > > > > >> - compute: has access to both the vertex state and the
> > > > incoming
> > > > > > > > > messages,
> > > > > > > > > >> can produce messages and update the vertex value.
> > > > > > > > > >>
> > > > > > > > > >> This might not seem like a big deal, but except from
> > forcing
> > > > the
> > > > > > > user
> > > > > > > > to
> > > > > > > > > >> split their program logic into 2 phases, Spargel also
> > makes
> > > > some
> > > > > > > > common
> > > > > > > > > >> computation patterns non-intuitive or impossible to
> > write. A
> > > > > very
> > > > > > > > simple
> > > > > > > > > >> example is propagating a message based on its value or
> > > sender
> > > > > ID.
> > > > > > To
> > > > > > > > do
> > > > > > > > > >> this with Spargel, one has to store all the incoming
> > > messages
> > > > in
> > > > > > the
> > > > > > > > > >> vertex
> > > > > > > > > >> value (might be of different type btw) during the
> > messaging
> > > > > phase,
> > > > > > > so
> > > > > > > > > that
> > > > > > > > > >> they can be accessed during the update phase.
> > > > > > > > > >>
> > > > > > > > > >> So, my first question is, when implementing Spargel,
> were
> > > > other
> > > > > > > > > >> alternatives considered and maybe rejected in favor of
> > > > > performance
> > > > > > > or
> > > > > > > > > >> because of some other reason? If someone knows, I would
> > love
> > > > to
> > > > > > hear
> > > > > > > > > about
> > > > > > > > > >> them!
> > > > > > > > > >>
> > > > > > > > > >> Second, I wrote a prototype implementation [2] that only
> > > > exposes
> > > > > > one
> > > > > > > > > UDF,
> > > > > > > > > >> compute(), by keeping the vertex state in the solution
> set
> > > and
> > > > > the
> > > > > > > > > >> messages
> > > > > > > > > >> in the workset. This way all previously mentioned
> > > limitations
> > > > go
> > > > > > > away
> > > > > > > > > and
> > > > > > > > > >> the API (see "SSSPComputeFunction" in the example [3])
> > > looks a
> > > > > lot
> > > > > > > > more
> > > > > > > > > >> like Giraph (see [4]).
> > > > > > > > > >>
> > > > > > > > > >> I have not run any experiments yet and the prototype has
> > > some
> > > > > ugly
> > > > > > > > > hacks,
> > > > > > > > > >> but if you think any of this makes sense, then I'd be
> > > willing
> > > > to
> > > > > > > > follow
> > > > > > > > > up
> > > > > > > > > >> and try to optimize it. If we see that it performs well,
> > we
> > > > can
> > > > > > > > consider
> > > > > > > > > >> either replacing Spargel or adding it as an alternative.
> > > > > > > > > >>
> > > > > > > > > >> Thanks for reading this long e-mail and looking forward
> to
> > > > your
> > > > > > > input!
> > > > > > > > > >>
> > > > > > > > > >> Cheers,
> > > > > > > > > >> -Vasia.
> > > > > > > > > >>
> > > > > > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> > > > > > > > > >> [2]:
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > > > > > > > > >> [3]:
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > > > > > > > > >> [4]:
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [gelly] Spargel model rework

Stephan Ewen
"Either" an "Optional" types are quite useful.

Let's add them to the core Java API.

On Wed, Nov 11, 2015 at 10:00 AM, Vasiliki Kalavri <
[hidden email]> wrote:

> Thanks Fabian! I'll try that :)
>
> On 10 November 2015 at 22:31, Fabian Hueske <[hidden email]> wrote:
>
> > You could implement a Java Either type (similar to Scala's Either) that
> > either has a Message or the VertexState and a corresponding
> TypeInformation
> > and TypeSerializer that serializes a byte flag to indicate which both
> types
> > is used.
> > It might actually make sense, to add a generic Either type to the Java
> API
> > in general (similar to the Java Tuples with resemble the Scala Tuples).
> >
> > Cheers, Fabian
> >
> > 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri <[hidden email]>:
> >
> > > Hi,
> > >
> > > after running a few experiments, I can confirm that putting the
> combiner
> > > after the flatMap is indeed more efficient.
> > >
> > > I ran SSSP and Connected Components with Spargel, GSA, and the Pregel
> > model
> > > and the results are the following:
> > >
> > > - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster and
> > Pregel
> > > is ~1.1x faster without combiner, ~1.3x faster with combiner.
> > > - for Connected Components, Spargel and GSA perform similarly, while
> > Pregel
> > > is 1.4-1.6x slower.
> > >
> > > To start with, this is much better than I expected :)
> > > However, there is a main shortcoming in my current implementation that
> > > negatively impacts performance:
> > > Since the compute function coGroup needs to output both new vertex
> values
> > > and new messages, I emit a wrapping tuple that contains both vertex
> state
> > > and messages and then filter them out based on a boolean field. The
> > problem
> > > is that since I cannot emit null fields, I emit a dummy message for
> each
> > > new vertex state and a dummy vertex state for each new message. That
> > > essentially means that the intermediate messages result is double in
> > size,
> > > if say the vertex values are of the same type as the messages (can be
> > worse
> > > if the vertex values are more complex).
> > > So my question is, is there a way to avoid this redundancy, by either
> > > emitting null fields or by creating an operator that could emit 2
> > different
> > > types of tuples?
> > >
> > > Thanks!
> > > -Vasia.
> > >
> > > On 9 November 2015 at 15:20, Fabian Hueske <[hidden email]> wrote:
> > >
> > > > Hi Vasia,
> > > >
> > > > sorry for the late reply.
> > > > I don't think there is a big difference. In both cases, the
> > partitioning
> > > > and sorting happens at the end of the iteration.
> > > > If the groupReduce is applied before the workset is returned, the
> > sorting
> > > > happens on the filtered result (after the flatMap) which might be a
> > > little
> > > > bit more efficient (depending on the ratio of messages and solution
> set
> > > > updates). Also it does not require that the initial workset is sorted
> > for
> > > > the first groupReduce.
> > > >
> > > > I would put it at the end.
> > > >
> > > > Cheers, Fabian
> > > >
> > > > 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri <
> [hidden email]
> > >:
> > > >
> > > > > @Fabian
> > > > >
> > > > > Is there any advantage in putting the reducer-combiner before
> > updating
> > > > the
> > > > > workset vs. after (i.e. right before the join with the solution
> set)?
> > > > >
> > > > > If it helps, here are the plans of these 2 alternatives:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing
> > > > >
> > > > >
> > > >
> > >
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing
> > > > >
> > > > > Thanks a lot for the help!
> > > > >
> > > > > -Vasia.
> > > > >
> > > > > On 30 October 2015 at 21:28, Fabian Hueske <[hidden email]>
> > wrote:
> > > > >
> > > > > > We can of course inject an optional ReduceFunction (or
> GroupReduce,
> > > or
> > > > > > combinable GroupReduce) to reduce the size of the work set.
> > > > > > I suggested to remove the GroupReduce function, because it did
> only
> > > > > collect
> > > > > > all messages into a single record by emitting the input iterator
> > > which
> > > > is
> > > > > > quite dangerous. Applying a combinable reduce function is could
> > > improve
> > > > > the
> > > > > > performance considerably.
> > > > > >
> > > > > > The good news is that it would come "for free" because the
> > necessary
> > > > > > partitioning and sorting can be reused (given the forwardField
> > > > > annotations
> > > > > > are correctly set):
> > > > > > - The partitioning of the reduce can be reused for the join with
> > the
> > > > > > solution set
> > > > > > - The sort of the reduce is preserved by the join with the
> > in-memory
> > > > > > hash-table of the solution set and can be reused for the coGroup.
> > > > > >
> > > > > > Best,
> > > > > > Fabian
> > > > > >
> > > > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <
> > > [hidden email]
> > > > >:
> > > > > >
> > > > > > > Hi Fabian,
> > > > > > >
> > > > > > > thanks so much for looking into this so quickly :-)
> > > > > > >
> > > > > > > One update I have to make is that I tried running a few
> > experiments
> > > > > with
> > > > > > > this on a 6-node cluster. The current implementation gets stuck
> > at
> > > > > > > "Rebuilding Workset Properties" and never finishes a single
> > > > iteration.
> > > > > > > Running the plan of one superstep without a delta iteration
> > > > terminates
> > > > > > > fine. I didn't have access to the cluster today, so I couldn't
> > > debug
> > > > > this
> > > > > > > further, but I will do as soon as I have access again.
> > > > > > >
> > > > > > > The rest of my comments are inline:
> > > > > > >
> > > > > > > On 30 October 2015 at 17:53, Fabian Hueske <[hidden email]>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Vasia,
> > > > > > > >
> > > > > > > > I had a look at your new implementation and have a few ideas
> > for
> > > > > > > > improvements.
> > > > > > > > 1) Sending out the input iterator as you do in the last
> > > GroupReduce
> > > > > is
> > > > > > > > quite dangerous and does not give a benefit compared to
> > > collecting
> > > > > all
> > > > > > > > elements. Even though it is an iterator, it needs to be
> > > completely
> > > > > > > > materialized in-memory whenever the record is touched by
> Flink
> > or
> > > > > user
> > > > > > > > code.
> > > > > > > > I would propose to skip the reduce step completely and handle
> > all
> > > > > > > messages
> > > > > > > > separates and only collect them in the CoGroup function
> before
> > > > giving
> > > > > > > them
> > > > > > > > into the VertexComputeFunction. Be careful, to only do that
> > with
> > > > > > > > objectReuse disabled or take care to properly copy the
> > messages.
> > > If
> > > > > you
> > > > > > > > collect the messages in the CoGroup, you don't need the
> > > > GroupReduce,
> > > > > > have
> > > > > > > > smaller records and you can remove the MessageIterator class
> > > > > > completely.
> > > > > > > >
> > > > > > >
> > > > > > > ​I see. The idea was to expose to message combiner that user
> > could
> > > > > > > ​implement if the messages are combinable, e.g. min, sum. This
> > is a
> > > > > > common
> > > > > > > case and reduces the message load significantly. Is there a
> way I
> > > > could
> > > > > > do
> > > > > > > something similar before the coGroup?
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > > 2) Add this annotation to the AppendVertexState function:
> > > > > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the
> > complete
> > > > > > element
> > > > > > > of
> > > > > > > > the first input becomes the first field of the output. Since
> > the
> > > > > input
> > > > > > is
> > > > > > > > partitioned on "f0" (it comes out of the partitioned solution
> > > set)
> > > > > the
> > > > > > > > result of ApplyVertexState will be partitioned on "f0.f0"
> which
> > > is
> > > > > > > > (accidentially :-D) the join key of the following coGroup
> > > function
> > > > ->
> > > > > > no
> > > > > > > > partitioning :-)
> > > > > > > >
> > > > > > >
> > > > > > > ​Great! I totally missed that ;)​
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > > 3) Adding the two flatMap functions behind the CoGroup
> prevents
> > > > > > chaining
> > > > > > > > and causes therefore some serialization overhead but
> shouldn't
> > be
> > > > too
> > > > > > > bad.
> > > > > > > >
> > > > > > > > So in total I would make this program as follows:
> > > > > > > >
> > > > > > > > iVertices<K,VV>
> > > > > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet());
> > > > > > > >
> > > > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> > > > > > > > verticesWithMessage<Vertex, Message> =
> > iteration.getSolutionSet()
> > > > > > > >   .join(iteration.workSet())
> > > > > > > >   .where(0) // solution set is local and build side
> > > > > > > >   .equalTo(0) // workset is shuffled and probe side of
> hashjoin
> > > > > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> > > > > > > > verticesWithMessage.coGroup(edgessWithValue)
> > > > > > > >   .where("f0.f0") // vwm is locally forward and sorted
> > > > > > > >   .equalTo(0) //  edges are already partitioned and sorted
> (if
> > > > cached
> > > > > > > > correctly)
> > > > > > > >   .with(...) // The coGroup collects all messages in a
> > collection
> > > > and
> > > > > > > gives
> > > > > > > > it to the ComputeFunction
> > > > > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned
> when
> > > > merged
> > > > > > > into
> > > > > > > > solution set
> > > > > > > > workSet<K, Message> = superStepComp.flatMap(...) //
> partitioned
> > > for
> > > > > > join
> > > > > > > > iteration.closeWith(delta, workSet)
> > > > > > > >
> > > > > > > > So, if I am correct, the program will
> > > > > > > > - partition the workset
> > > > > > > > - sort the vertices with messages
> > > > > > > > - partition the delta
> > > > > > > >
> > > > > > > > One observation I have is that this program requires that all
> > > > > messages
> > > > > > > fit
> > > > > > > > into memory. Was that also the case before?
> > > > > > > >
> > > > > > >
> > > > > > > ​I believe not. The plan has one coGroup that produces the
> > messages
> > > > > and a
> > > > > > > following coGroup that groups by the messages "target ID" and
> > > > consumes
> > > > > > > them​ in an iterator. That doesn't require them to fit in
> memory,
> > > > > right?
> > > > > > >
> > > > > > >
> > > > > > > ​I'm also working on a version where the graph is represented
> as
> > an
> > > > > > > adjacency list, instead of two separate datasets of vertices
> and
> > > > edges.
> > > > > > The
> > > > > > > disadvantage is that the graph has to fit in memory, but I
> think
> > > the
> > > > > > > advantages are many​. We'll be able to support edge value
> > updates,
> > > > edge
> > > > > > > mutations and different edge access order guarantees. I'll get
> > back
> > > > to
> > > > > > this
> > > > > > > thread when I have a working prototype.
> > > > > > >
> > > > > > >
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Fabian
> > > > > > > >
> > > > > > >
> > > > > > > ​Thanks again!
> > > > > > >
> > > > > > > Cheers,
> > > > > > > -Vasia.
> > > > > > > ​
> > > > > > >
> > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <
> > > > > [hidden email]
> > > > > > >:
> > > > > > > >
> > > > > > > > > @Martin: thanks for your input! If you ran into any other
> > > issues
> > > > > > that I
> > > > > > > > > didn't mention, please let us know. Obviously, even with my
> > > > > proposal,
> > > > > > > > there
> > > > > > > > > are still features we cannot support, e.g. updating edge
> > values
> > > > and
> > > > > > > graph
> > > > > > > > > mutations. We'll need to re-think the underlying iteration
> > > and/or
> > > > > > graph
> > > > > > > > > representation for those.
> > > > > > > > >
> > > > > > > > > @Fabian: thanks a lot, no rush :)
> > > > > > > > > Let me give you some more information that might make it
> > easier
> > > > to
> > > > > > > reason
> > > > > > > > > about performance:
> > > > > > > > >
> > > > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex
> > > state
> > > > > and
> > > > > > > the
> > > > > > > > > workset (WS) keeps the active vertices. The iteration is
> > > composed
> > > > > of
> > > > > > 2
> > > > > > > > > coGroups. The first one takes the WS and the edges and
> > produces
> > > > > > > messages.
> > > > > > > > > The second one takes the messages and the SS and produced
> the
> > > new
> > > > > WS
> > > > > > > and
> > > > > > > > > the SS-delta.
> > > > > > > > >
> > > > > > > > > In my proposal, the SS has the vertex state and the WS has
> > > > > <vertexId,
> > > > > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The
> > plan
> > > > is
> > > > > > more
> > > > > > > > > complicated because compute() needs to have two iterators:
> > over
> > > > the
> > > > > > > edges
> > > > > > > > > and over the messages.
> > > > > > > > > First, I join SS and WS to get the active vertices (have
> > > > received a
> > > > > > > msg)
> > > > > > > > > and their current state. Then I coGroup the result with the
> > > edges
> > > > > to
> > > > > > > > access
> > > > > > > > > the neighbors. Now the main problem is that this coGroup
> > needs
> > > to
> > > > > > have
> > > > > > > 2
> > > > > > > > > outputs: the new messages and the new vertex value. I
> > couldn't
> > > > > really
> > > > > > > > find
> > > > > > > > > a nice way to do this, so I'm emitting a Tuple that
> contains
> > > both
> > > > > > types
> > > > > > > > and
> > > > > > > > > I have a flag to separate them later with 2 flatMaps. From
> > the
> > > > > vertex
> > > > > > > > > flatMap, I crete the SS-delta and from the messaged
> flatMap I
> > > > > apply a
> > > > > > > > > reduce to group the messages by vertex and send them to the
> > new
> > > > WS.
> > > > > > One
> > > > > > > > > optimization would be to expose a combiner here to reduce
> > > message
> > > > > > size.
> > > > > > > > >
> > > > > > > > > tl;dr:
> > > > > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> > > > > > > > > 2. how can we efficiently emit 2 different types of records
> > > from
> > > > a
> > > > > > > > coGroup?
> > > > > > > > > 3. does it make any difference if we group/combine the
> > messages
> > > > > > before
> > > > > > > > > updating the workset or after?
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > -Vasia.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske <
> > [hidden email]>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > I'll try to have a look at the proposal from a
> performance
> > > > point
> > > > > of
> > > > > > > > view
> > > > > > > > > in
> > > > > > > > > > the next days.
> > > > > > > > > > Please ping me, if I don't follow up this thread.
> > > > > > > > > >
> > > > > > > > > > Cheers, Fabian
> > > > > > > > > >
> > > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
> > > > > > [hidden email]
> > > > > > > >:
> > > > > > > > > >
> > > > > > > > > > > Hi,
> > > > > > > > > > >
> > > > > > > > > > > At our group, we also moved several algorithms from
> > Giraph
> > > to
> > > > > > Gelly
> > > > > > > > and
> > > > > > > > > > > ran into some confusing issues (first in understanding,
> > > > second
> > > > > > > during
> > > > > > > > > > > implementation) caused by the conceptional differences
> > you
> > > > > > > described.
> > > > > > > > > > >
> > > > > > > > > > > If there are no concrete advantages (performance
> mainly)
> > in
> > > > the
> > > > > > > > Spargel
> > > > > > > > > > > implementation, we would be very happy to see the Gelly
> > API
> > > > be
> > > > > > > > aligned
> > > > > > > > > to
> > > > > > > > > > > Pregel-like systems.
> > > > > > > > > > >
> > > > > > > > > > > Your SSSP example speaks for itself. Straightforward,
> if
> > > the
> > > > > > reader
> > > > > > > > is
> > > > > > > > > > > familiar with Pregel/Giraph/...
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Martin
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > > > > > > > > > >
> > > > > > > > > > >> Hello squirrels,
> > > > > > > > > > >>
> > > > > > > > > > >> I want to discuss with you a few concerns I have about
> > our
> > > > > > current
> > > > > > > > > > >> vertex-centric model implementation, Spargel, now
> fully
> > > > > subsumed
> > > > > > > by
> > > > > > > > > > Gelly.
> > > > > > > > > > >>
> > > > > > > > > > >> Spargel is our implementation of Pregel [1], but it
> > > violates
> > > > > > some
> > > > > > > > > > >> fundamental properties of the model, as described in
> the
> > > > paper
> > > > > > and
> > > > > > > > as
> > > > > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find
> > myself
> > > > > > > confused
> > > > > > > > > both
> > > > > > > > > > >> when trying to explain it to current Giraph users and
> > when
> > > > > > porting
> > > > > > > > my
> > > > > > > > > > >> Giraph algorithms to it.
> > > > > > > > > > >>
> > > > > > > > > > >> More specifically:
> > > > > > > > > > >> - in the Pregel model, messages produced in superstep
> n,
> > > are
> > > > > > > > received
> > > > > > > > > in
> > > > > > > > > > >> superstep n+1. In Spargel, they are produced and
> > consumed
> > > in
> > > > > the
> > > > > > > > same
> > > > > > > > > > >> iteration.
> > > > > > > > > > >> - in Pregel, vertices are active during a superstep,
> if
> > > they
> > > > > > have
> > > > > > > > > > received
> > > > > > > > > > >> a message in the previous superstep. In Spargel, a
> > vertex
> > > is
> > > > > > > active
> > > > > > > > > > during
> > > > > > > > > > >> a superstep if it has changed its value.
> > > > > > > > > > >>
> > > > > > > > > > >> These two differences require a lot of rethinking when
> > > > porting
> > > > > > > > > > >> applications
> > > > > > > > > > >> and can easily cause bugs.
> > > > > > > > > > >>
> > > > > > > > > > >> The most important problem however is that we require
> > the
> > > > user
> > > > > > to
> > > > > > > > > split
> > > > > > > > > > >> the
> > > > > > > > > > >> computation in 2 phases (2 UDFs):
> > > > > > > > > > >> - messaging: has access to the vertex state and can
> > > produce
> > > > > > > messages
> > > > > > > > > > >> - update: has access to incoming messages and can
> update
> > > the
> > > > > > > vertex
> > > > > > > > > > value
> > > > > > > > > > >>
> > > > > > > > > > >> Pregel/Giraph only expose one UDF to the user:
> > > > > > > > > > >> - compute: has access to both the vertex state and the
> > > > > incoming
> > > > > > > > > > messages,
> > > > > > > > > > >> can produce messages and update the vertex value.
> > > > > > > > > > >>
> > > > > > > > > > >> This might not seem like a big deal, but except from
> > > forcing
> > > > > the
> > > > > > > > user
> > > > > > > > > to
> > > > > > > > > > >> split their program logic into 2 phases, Spargel also
> > > makes
> > > > > some
> > > > > > > > > common
> > > > > > > > > > >> computation patterns non-intuitive or impossible to
> > > write. A
> > > > > > very
> > > > > > > > > simple
> > > > > > > > > > >> example is propagating a message based on its value or
> > > > sender
> > > > > > ID.
> > > > > > > To
> > > > > > > > > do
> > > > > > > > > > >> this with Spargel, one has to store all the incoming
> > > > messages
> > > > > in
> > > > > > > the
> > > > > > > > > > >> vertex
> > > > > > > > > > >> value (might be of different type btw) during the
> > > messaging
> > > > > > phase,
> > > > > > > > so
> > > > > > > > > > that
> > > > > > > > > > >> they can be accessed during the update phase.
> > > > > > > > > > >>
> > > > > > > > > > >> So, my first question is, when implementing Spargel,
> > were
> > > > > other
> > > > > > > > > > >> alternatives considered and maybe rejected in favor of
> > > > > > performance
> > > > > > > > or
> > > > > > > > > > >> because of some other reason? If someone knows, I
> would
> > > love
> > > > > to
> > > > > > > hear
> > > > > > > > > > about
> > > > > > > > > > >> them!
> > > > > > > > > > >>
> > > > > > > > > > >> Second, I wrote a prototype implementation [2] that
> only
> > > > > exposes
> > > > > > > one
> > > > > > > > > > UDF,
> > > > > > > > > > >> compute(), by keeping the vertex state in the solution
> > set
> > > > and
> > > > > > the
> > > > > > > > > > >> messages
> > > > > > > > > > >> in the workset. This way all previously mentioned
> > > > limitations
> > > > > go
> > > > > > > > away
> > > > > > > > > > and
> > > > > > > > > > >> the API (see "SSSPComputeFunction" in the example [3])
> > > > looks a
> > > > > > lot
> > > > > > > > > more
> > > > > > > > > > >> like Giraph (see [4]).
> > > > > > > > > > >>
> > > > > > > > > > >> I have not run any experiments yet and the prototype
> has
> > > > some
> > > > > > ugly
> > > > > > > > > > hacks,
> > > > > > > > > > >> but if you think any of this makes sense, then I'd be
> > > > willing
> > > > > to
> > > > > > > > > follow
> > > > > > > > > > up
> > > > > > > > > > >> and try to optimize it. If we see that it performs
> well,
> > > we
> > > > > can
> > > > > > > > > consider
> > > > > > > > > > >> either replacing Spargel or adding it as an
> alternative.
> > > > > > > > > > >>
> > > > > > > > > > >> Thanks for reading this long e-mail and looking
> forward
> > to
> > > > > your
> > > > > > > > input!
> > > > > > > > > > >>
> > > > > > > > > > >> Cheers,
> > > > > > > > > > >> -Vasia.
> > > > > > > > > > >>
> > > > > > > > > > >> [1]:
> https://kowshik.github.io/JPregel/pregel_paper.pdf
> > > > > > > > > > >> [2]:
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > > > > > > > > > >> [3]:
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > > > > > > > > > >> [4]:
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
12