iterative processing

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

iterative processing

Yingjun Wu
Dear all,

In general case, iterative processing jobs usually contains one reduce task
and multiple parallel processing tasks. In some cases, the state size in
reduce task may exceeds the memory size, and it seems that flink directly
goes to out-of-core mode. I am wondering whether it is meaningful to
support distributed shared memory access in order to maintain large states
in multiple nodes? Thanks.

Regards,
Yingjun
Reply | Threaded
Open this post in threaded view
|

Re: iterative processing

Stephan Ewen
Hi Yingjun!

Thanks for pointing this out. I would like to learn a bit more about the
problem you have, so let me ask you a few questions to make sure I
understand the matter in more detail...

I assume you are thinking abut programs that run a single reduce function
in the end, "all reduce" , rather than a "by-key-reduce"? And the size of
the data in that "all-reduce" is too large for the main memory?

In many cases, you can do a "combine" step (like a pre-reduce) before the
final reduce function. Because there are many parallel instances of the
"combine", that one has much more memory available, and is often able to
shrink the size of the data such that the final reduce function works on a
data set small enough to fit in memory. In flink, you get that "combine"
step automatically when you use the ReduceFunction, or when you annotate a
GroupReduceFunction with "@Combinable".

Sometimes, such a "combine" behavior cannot be applied to a reduce
function, when an operation cannot be executed on a sub-part of the data.
The case you are mentioning is such a case?

We have not looked into maintaining a distributed state that can be
accessed across machines. Sometimes that may be helpful, I agree. It would
mean that some data accesses have quite a latency, if they cross the
network, but that may be acceptable for certain applications. How would you
imagine such a feature to look like? Could you give us an example of what
you would design it like?

Greetings,
Stephan
Reply | Threaded
Open this post in threaded view
|

Re: iterative processing

Yingjun Wu-2
Hi Stephan,

Thanks for your quick reply.

Let's just consider a simple iterative processing algorithm, say, pagerank.
If we wanna compute the pagerank value for 100k webpages, then the internal
state, if represent the graph as matrix, should be at least 100k * 100k * 8
bytes=74GB, which obviously exceeds the memory size of a single commodity
machine. GraphLab does not suffer from this problem as it stores the graph
in distributed memory. So in this case, do you think it is necessary to
distributed the internal state to multiple machines?

Regards,
Yingjun


On Tue, Jun 17, 2014 at 9:27 PM, Stephan Ewen <[hidden email]> wrote:

> Hi Yingjun!
>
> Thanks for pointing this out. I would like to learn a bit more about the
> problem you have, so let me ask you a few questions to make sure I
> understand the matter in more detail...
>
> I assume you are thinking abut programs that run a single reduce function
> in the end, "all reduce" , rather than a "by-key-reduce"? And the size of
> the data in that "all-reduce" is too large for the main memory?
>
> In many cases, you can do a "combine" step (like a pre-reduce) before the
> final reduce function. Because there are many parallel instances of the
> "combine", that one has much more memory available, and is often able to
> shrink the size of the data such that the final reduce function works on a
> data set small enough to fit in memory. In flink, you get that "combine"
> step automatically when you use the ReduceFunction, or when you annotate a
> GroupReduceFunction with "@Combinable".
>
> Sometimes, such a "combine" behavior cannot be applied to a reduce
> function, when an operation cannot be executed on a sub-part of the data.
> The case you are mentioning is such a case?
>
> We have not looked into maintaining a distributed state that can be
> accessed across machines. Sometimes that may be helpful, I agree. It would
> mean that some data accesses have quite a latency, if they cross the
> network, but that may be acceptable for certain applications. How would you
> imagine such a feature to look like? Could you give us an example of what
> you would design it like?
>
> Greetings,
> Stephan
>
Reply | Threaded
Open this post in threaded view
|

Re: iterative processing

Kostas Tzoumas
Hi Yingjun,

What you describe actually does happen: The website ranks and the
transition probabilities between websites are indeed partitioned across
machines and the operators in the Stratosphere program that implements
pagerank operate on partitioned data sets. The individual partitions of the
state in each machine will be in memory unless they are too big for the
memory of the machine.

The partitioned data set in Stratosphere's case is not mutable, in the
sense that the user cannot update it in place, but can only use functional
primitives to manipulate it.

You can check out the PageRank Java implementation here:
https://github.com/apache/incubator-flink/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/PageRankBasic.java
and an implementation using Spargel (Stratosphere's vertex-centric
front-end) here:
https://github.com/apache/incubator-flink/blob/master/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/examples/SpargelPageRank.java

Did I miss something?

Kostas



On Tue, Jun 17, 2014 at 3:39 PM, Yingjun Wu <[hidden email]>
wrote:

> Hi Stephan,
>
> Thanks for your quick reply.
>
> Let's just consider a simple iterative processing algorithm, say, pagerank.
> If we wanna compute the pagerank value for 100k webpages, then the internal
> state, if represent the graph as matrix, should be at least 100k * 100k * 8
> bytes=74GB, which obviously exceeds the memory size of a single commodity
> machine. GraphLab does not suffer from this problem as it stores the graph
> in distributed memory. So in this case, do you think it is necessary to
> distributed the internal state to multiple machines?
>
> Regards,
> Yingjun
>
>
> On Tue, Jun 17, 2014 at 9:27 PM, Stephan Ewen <[hidden email]> wrote:
>
> > Hi Yingjun!
> >
> > Thanks for pointing this out. I would like to learn a bit more about the
> > problem you have, so let me ask you a few questions to make sure I
> > understand the matter in more detail...
> >
> > I assume you are thinking abut programs that run a single reduce function
> > in the end, "all reduce" , rather than a "by-key-reduce"? And the size of
> > the data in that "all-reduce" is too large for the main memory?
> >
> > In many cases, you can do a "combine" step (like a pre-reduce) before the
> > final reduce function. Because there are many parallel instances of the
> > "combine", that one has much more memory available, and is often able to
> > shrink the size of the data such that the final reduce function works on
> a
> > data set small enough to fit in memory. In flink, you get that "combine"
> > step automatically when you use the ReduceFunction, or when you annotate
> a
> > GroupReduceFunction with "@Combinable".
> >
> > Sometimes, such a "combine" behavior cannot be applied to a reduce
> > function, when an operation cannot be executed on a sub-part of the data.
> > The case you are mentioning is such a case?
> >
> > We have not looked into maintaining a distributed state that can be
> > accessed across machines. Sometimes that may be helpful, I agree. It
> would
> > mean that some data accesses have quite a latency, if they cross the
> > network, but that may be acceptable for certain applications. How would
> you
> > imagine such a feature to look like? Could you give us an example of what
> > you would design it like?
> >
> > Greetings,
> > Stephan
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: iterative processing

Sebastian Schelter-2
In reply to this post by Yingjun Wu-2
The internal state of pagerank is just the rank vector which would be 100k
times 1.

--sebastian
Am 17.06.2014 17:56 schrieb "Yingjun Wu" <[hidden email]>:

> Hi Stephan,
>
> Thanks for your quick reply.
>
> Let's just consider a simple iterative processing algorithm, say, pagerank.
> If we wanna compute the pagerank value for 100k webpages, then the internal
> state, if represent the graph as matrix, should be at least 100k * 100k * 8
> bytes=74GB, which obviously exceeds the memory size of a single commodity
> machine. GraphLab does not suffer from this problem as it stores the graph
> in distributed memory. So in this case, do you think it is necessary to
> distributed the internal state to multiple machines?
>
> Regards,
> Yingjun
>
>
> On Tue, Jun 17, 2014 at 9:27 PM, Stephan Ewen <[hidden email]> wrote:
>
> > Hi Yingjun!
> >
> > Thanks for pointing this out. I would like to learn a bit more about the
> > problem you have, so let me ask you a few questions to make sure I
> > understand the matter in more detail...
> >
> > I assume you are thinking abut programs that run a single reduce function
> > in the end, "all reduce" , rather than a "by-key-reduce"? And the size of
> > the data in that "all-reduce" is too large for the main memory?
> >
> > In many cases, you can do a "combine" step (like a pre-reduce) before the
> > final reduce function. Because there are many parallel instances of the
> > "combine", that one has much more memory available, and is often able to
> > shrink the size of the data such that the final reduce function works on
> a
> > data set small enough to fit in memory. In flink, you get that "combine"
> > step automatically when you use the ReduceFunction, or when you annotate
> a
> > GroupReduceFunction with "@Combinable".
> >
> > Sometimes, such a "combine" behavior cannot be applied to a reduce
> > function, when an operation cannot be executed on a sub-part of the data.
> > The case you are mentioning is such a case?
> >
> > We have not looked into maintaining a distributed state that can be
> > accessed across machines. Sometimes that may be helpful, I agree. It
> would
> > mean that some data accesses have quite a latency, if they cross the
> > network, but that may be acceptable for certain applications. How would
> you
> > imagine such a feature to look like? Could you give us an example of what
> > you would design it like?
> >
> > Greetings,
> > Stephan
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: iterative processing

till.rohrmann
In reply to this post by Yingjun Wu-2
Hi Yingjun,

you are right that the adjacency matrix would not fit into the memory of
one machine. Thus, the data should be distributed across several machines
and the computation should also be executed in parallel. And that is
exactly what Flink allows you to do. The thing is that you often have to
adjust your algorithms a little bit to make them run with Flink.
Furthermore, one has to think about data representation. If you want to
implement the PageRank algorithm based on iterative Matrix Vector
multiplications, you would have to do the following:

First think about a distributed representation of the adjacency matrix.
There are plenty of possible partitionings but let us stick with the
simplest: Cellwise representation. Thus, each entry has the form (rowIndx,
columnIndex, cellEntry). Well do the same for the PageRank vector: (index,
cellEntry).

Once this is done, we have to conceive a way to implement a matrix vector
multiplication within the programming framework of Flink, namely using map,
reduce, join, cogroup and cross operations. Looking at the definition of
the matrix vector multiplication, we see that we have to join the ith
PageRank vector entry with the ith column of the adjacency matrix. This
could be expressed like adjacencyMatrix.join(vector).where(1).equalTo(0).
In the MapFunction, we would have to compute the product. Since we only
need the row index information of the pair for the succeeding reduce
operation, the result would have the form (rowIndex,
matrixEntry*vectorEntry). Reducing now on the rowIndex, where we add the
individual products, produces the entries of the resulting vector. That way
you can implement a matrix vector multiplication with Flink.

I guess that he missing vector vector addition is now easy to implement.
The last thing to do is to use this dataflow as a building block for your
iteration and then you are done. If I should have missed your point, then
let me know.

Best,

Till


On Tue, Jun 17, 2014 at 3:39 PM, Yingjun Wu <[hidden email]>
wrote:

> Hi Stephan,
>
> Thanks for your quick reply.
>
> Let's just consider a simple iterative processing algorithm, say, pagerank.
> If we wanna compute the pagerank value for 100k webpages, then the internal
> state, if represent the graph as matrix, should be at least 100k * 100k * 8
> bytes=74GB, which obviously exceeds the memory size of a single commodity
> machine. GraphLab does not suffer from this problem as it stores the graph
> in distributed memory. So in this case, do you think it is necessary to
> distributed the internal state to multiple machines?
>
> Regards,
> Yingjun
>
>
> On Tue, Jun 17, 2014 at 9:27 PM, Stephan Ewen <[hidden email]> wrote:
>
> > Hi Yingjun!
> >
> > Thanks for pointing this out. I would like to learn a bit more about the
> > problem you have, so let me ask you a few questions to make sure I
> > understand the matter in more detail...
> >
> > I assume you are thinking abut programs that run a single reduce function
> > in the end, "all reduce" , rather than a "by-key-reduce"? And the size of
> > the data in that "all-reduce" is too large for the main memory?
> >
> > In many cases, you can do a "combine" step (like a pre-reduce) before the
> > final reduce function. Because there are many parallel instances of the
> > "combine", that one has much more memory available, and is often able to
> > shrink the size of the data such that the final reduce function works on
> a
> > data set small enough to fit in memory. In flink, you get that "combine"
> > step automatically when you use the ReduceFunction, or when you annotate
> a
> > GroupReduceFunction with "@Combinable".
> >
> > Sometimes, such a "combine" behavior cannot be applied to a reduce
> > function, when an operation cannot be executed on a sub-part of the data.
> > The case you are mentioning is such a case?
> >
> > We have not looked into maintaining a distributed state that can be
> > accessed across machines. Sometimes that may be helpful, I agree. It
> would
> > mean that some data accesses have quite a latency, if they cross the
> > network, but that may be acceptable for certain applications. How would
> you
> > imagine such a feature to look like? Could you give us an example of what
> > you would design it like?
> >
> > Greetings,
> > Stephan
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: iterative processing

Yingjun Wu-2
Dear all,

Thanks for your replies. As I thought over this problem, I think if we
model the pagerank problem with matrix computation, then it is true that
the internal state can be easily partitioned. The partitioned states can
keep inconsistent and update independently unless final results should be
reported. Let's say that we have a n*n transition matrix and n*1 pagerank
array. For each iteration, we multiply the transition matrix with the
pagerank array. If the transition matrix is too large, then we simply
partition the matrix by rows and replicates the pagerank array to all the
involved nodes. In this case, the computation can be made local and we
accumulate the pagerank arrays in different machines once the pagerank
arrays converge.

However, if we model the pagerank problem with graph, then the problem
becomes difficult to solve if the graph size exceeds the memory size. In
this case, the graph should be maintained in multiple machines and remote
memory access is required. And actually this is the model that graphlab
adopts. In fact, Flink adopts the similar model with graplab, which also
requires synchronization barrier after each iteration in order to maintain
consistent states. I think the case where Flink can beat graphlab is that
network communication time is small enough compared with computation time.
Am I right? Please correct me if anything goes wrong.

Regards,
Yingjun


On Wed, Jun 18, 2014 at 1:14 AM, Till Rohrmann <[hidden email]>
wrote:

> Hi Yingjun,
>
> you are right that the adjacency matrix would not fit into the memory of
> one machine. Thus, the data should be distributed across several machines
> and the computation should also be executed in parallel. And that is
> exactly what Flink allows you to do. The thing is that you often have to
> adjust your algorithms a little bit to make them run with Flink.
> Furthermore, one has to think about data representation. If you want to
> implement the PageRank algorithm based on iterative Matrix Vector
> multiplications, you would have to do the following:
>
> First think about a distributed representation of the adjacency matrix.
> There are plenty of possible partitionings but let us stick with the
> simplest: Cellwise representation. Thus, each entry has the form (rowIndx,
> columnIndex, cellEntry). Well do the same for the PageRank vector: (index,
> cellEntry).
>
> Once this is done, we have to conceive a way to implement a matrix vector
> multiplication within the programming framework of Flink, namely using map,
> reduce, join, cogroup and cross operations. Looking at the definition of
> the matrix vector multiplication, we see that we have to join the ith
> PageRank vector entry with the ith column of the adjacency matrix. This
> could be expressed like adjacencyMatrix.join(vector).where(1).equalTo(0).
> In the MapFunction, we would have to compute the product. Since we only
> need the row index information of the pair for the succeeding reduce
> operation, the result would have the form (rowIndex,
> matrixEntry*vectorEntry). Reducing now on the rowIndex, where we add the
> individual products, produces the entries of the resulting vector. That way
> you can implement a matrix vector multiplication with Flink.
>
> I guess that he missing vector vector addition is now easy to implement.
> The last thing to do is to use this dataflow as a building block for your
> iteration and then you are done. If I should have missed your point, then
> let me know.
>
> Best,
>
> Till
>
>
> On Tue, Jun 17, 2014 at 3:39 PM, Yingjun Wu <[hidden email]>
> wrote:
>
> > Hi Stephan,
> >
> > Thanks for your quick reply.
> >
> > Let's just consider a simple iterative processing algorithm, say,
> pagerank.
> > If we wanna compute the pagerank value for 100k webpages, then the
> internal
> > state, if represent the graph as matrix, should be at least 100k * 100k
> * 8
> > bytes=74GB, which obviously exceeds the memory size of a single commodity
> > machine. GraphLab does not suffer from this problem as it stores the
> graph
> > in distributed memory. So in this case, do you think it is necessary to
> > distributed the internal state to multiple machines?
> >
> > Regards,
> > Yingjun
> >
> >
> > On Tue, Jun 17, 2014 at 9:27 PM, Stephan Ewen <[hidden email]> wrote:
> >
> > > Hi Yingjun!
> > >
> > > Thanks for pointing this out. I would like to learn a bit more about
> the
> > > problem you have, so let me ask you a few questions to make sure I
> > > understand the matter in more detail...
> > >
> > > I assume you are thinking abut programs that run a single reduce
> function
> > > in the end, "all reduce" , rather than a "by-key-reduce"? And the size
> of
> > > the data in that "all-reduce" is too large for the main memory?
> > >
> > > In many cases, you can do a "combine" step (like a pre-reduce) before
> the
> > > final reduce function. Because there are many parallel instances of the
> > > "combine", that one has much more memory available, and is often able
> to
> > > shrink the size of the data such that the final reduce function works
> on
> > a
> > > data set small enough to fit in memory. In flink, you get that
> "combine"
> > > step automatically when you use the ReduceFunction, or when you
> annotate
> > a
> > > GroupReduceFunction with "@Combinable".
> > >
> > > Sometimes, such a "combine" behavior cannot be applied to a reduce
> > > function, when an operation cannot be executed on a sub-part of the
> data.
> > > The case you are mentioning is such a case?
> > >
> > > We have not looked into maintaining a distributed state that can be
> > > accessed across machines. Sometimes that may be helpful, I agree. It
> > would
> > > mean that some data accesses have quite a latency, if they cross the
> > > network, but that may be acceptable for certain applications. How would
> > you
> > > imagine such a feature to look like? Could you give us an example of
> what
> > > you would design it like?
> > >
> > > Greetings,
> > > Stephan
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: iterative processing

Yingjun Wu-2
Btw, I think matrix computation is actually the same as graph computation
when the matrix is sparse. This is because sparse matrix is usually
represented as adjacency list. Spark mllib has a good documentation on
matrix computation in the cloud:
http://spark.apache.org/docs/latest/mllib-basics.html.


On Wed, Jun 18, 2014 at 4:17 AM, Yingjun Wu <[hidden email]>
wrote:

> Dear all,
>
> Thanks for your replies. As I thought over this problem, I think if we
> model the pagerank problem with matrix computation, then it is true that
> the internal state can be easily partitioned. The partitioned states can
> keep inconsistent and update independently unless final results should be
> reported. Let's say that we have a n*n transition matrix and n*1 pagerank
> array. For each iteration, we multiply the transition matrix with the
> pagerank array. If the transition matrix is too large, then we simply
> partition the matrix by rows and replicates the pagerank array to all the
> involved nodes. In this case, the computation can be made local and we
> accumulate the pagerank arrays in different machines once the pagerank
> arrays converge.
>
> However, if we model the pagerank problem with graph, then the problem
> becomes difficult to solve if the graph size exceeds the memory size. In
> this case, the graph should be maintained in multiple machines and remote
> memory access is required. And actually this is the model that graphlab
> adopts. In fact, Flink adopts the similar model with graplab, which also
> requires synchronization barrier after each iteration in order to maintain
> consistent states. I think the case where Flink can beat graphlab is that
> network communication time is small enough compared with computation time.
> Am I right? Please correct me if anything goes wrong.
>
> Regards,
> Yingjun
>
>
> On Wed, Jun 18, 2014 at 1:14 AM, Till Rohrmann <[hidden email]>
> wrote:
>
>> Hi Yingjun,
>>
>> you are right that the adjacency matrix would not fit into the memory of
>> one machine. Thus, the data should be distributed across several machines
>> and the computation should also be executed in parallel. And that is
>> exactly what Flink allows you to do. The thing is that you often have to
>> adjust your algorithms a little bit to make them run with Flink.
>> Furthermore, one has to think about data representation. If you want to
>> implement the PageRank algorithm based on iterative Matrix Vector
>> multiplications, you would have to do the following:
>>
>> First think about a distributed representation of the adjacency matrix.
>> There are plenty of possible partitionings but let us stick with the
>> simplest: Cellwise representation. Thus, each entry has the form (rowIndx,
>> columnIndex, cellEntry). Well do the same for the PageRank vector: (index,
>> cellEntry).
>>
>> Once this is done, we have to conceive a way to implement a matrix vector
>> multiplication within the programming framework of Flink, namely using
>> map,
>> reduce, join, cogroup and cross operations. Looking at the definition of
>> the matrix vector multiplication, we see that we have to join the ith
>> PageRank vector entry with the ith column of the adjacency matrix. This
>> could be expressed like adjacencyMatrix.join(vector).where(1).equalTo(0).
>> In the MapFunction, we would have to compute the product. Since we only
>> need the row index information of the pair for the succeeding reduce
>> operation, the result would have the form (rowIndex,
>> matrixEntry*vectorEntry). Reducing now on the rowIndex, where we add the
>> individual products, produces the entries of the resulting vector. That
>> way
>> you can implement a matrix vector multiplication with Flink.
>>
>> I guess that he missing vector vector addition is now easy to implement.
>> The last thing to do is to use this dataflow as a building block for your
>> iteration and then you are done. If I should have missed your point, then
>> let me know.
>>
>> Best,
>>
>> Till
>>
>>
>> On Tue, Jun 17, 2014 at 3:39 PM, Yingjun Wu <[hidden email]>
>> wrote:
>>
>> > Hi Stephan,
>> >
>> > Thanks for your quick reply.
>> >
>> > Let's just consider a simple iterative processing algorithm, say,
>> pagerank.
>> > If we wanna compute the pagerank value for 100k webpages, then the
>> internal
>> > state, if represent the graph as matrix, should be at least 100k * 100k
>> * 8
>> > bytes=74GB, which obviously exceeds the memory size of a single
>> commodity
>> > machine. GraphLab does not suffer from this problem as it stores the
>> graph
>> > in distributed memory. So in this case, do you think it is necessary to
>> > distributed the internal state to multiple machines?
>> >
>> > Regards,
>> > Yingjun
>> >
>> >
>> > On Tue, Jun 17, 2014 at 9:27 PM, Stephan Ewen <[hidden email]> wrote:
>> >
>> > > Hi Yingjun!
>> > >
>> > > Thanks for pointing this out. I would like to learn a bit more about
>> the
>> > > problem you have, so let me ask you a few questions to make sure I
>> > > understand the matter in more detail...
>> > >
>> > > I assume you are thinking abut programs that run a single reduce
>> function
>> > > in the end, "all reduce" , rather than a "by-key-reduce"? And the
>> size of
>> > > the data in that "all-reduce" is too large for the main memory?
>> > >
>> > > In many cases, you can do a "combine" step (like a pre-reduce) before
>> the
>> > > final reduce function. Because there are many parallel instances of
>> the
>> > > "combine", that one has much more memory available, and is often able
>> to
>> > > shrink the size of the data such that the final reduce function works
>> on
>> > a
>> > > data set small enough to fit in memory. In flink, you get that
>> "combine"
>> > > step automatically when you use the ReduceFunction, or when you
>> annotate
>> > a
>> > > GroupReduceFunction with "@Combinable".
>> > >
>> > > Sometimes, such a "combine" behavior cannot be applied to a reduce
>> > > function, when an operation cannot be executed on a sub-part of the
>> data.
>> > > The case you are mentioning is such a case?
>> > >
>> > > We have not looked into maintaining a distributed state that can be
>> > > accessed across machines. Sometimes that may be helpful, I agree. It
>> > would
>> > > mean that some data accesses have quite a latency, if they cross the
>> > > network, but that may be acceptable for certain applications. How
>> would
>> > you
>> > > imagine such a feature to look like? Could you give us an example of
>> what
>> > > you would design it like?
>> > >
>> > > Greetings,
>> > > Stephan
>> > >
>> >
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: iterative processing

Stephan Ewen
Yes, I think you can few graphs and sparse matrices very similar.

Flink can do graphs (and sparse matrices) quite well. It has support for
"stateful" iterations what help a lot with graphs. There are also some
custom operators that allow you to write code in a similar form as the GAS
(gather apply scatter) pattern that power-graph implements. Have a look at
the examples here:

  -
https://github.com/apache/incubator-flink/blob/master/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/examples/SpargelPageRank.java
  -
https://github.com/apache/incubator-flink/blob/master/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/examples/SpargelConnectedComponents.java

Stephan