Posted by
till.rohrmann on
Jun 17, 2014; 5:14pm
URL: http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/iterative-processing-tp334p348.html
Hi Yingjun,
you are right that the adjacency matrix would not fit into the memory of
one machine. Thus, the data should be distributed across several machines
and the computation should also be executed in parallel. And that is
exactly what Flink allows you to do. The thing is that you often have to
adjust your algorithms a little bit to make them run with Flink.
Furthermore, one has to think about data representation. If you want to
implement the PageRank algorithm based on iterative Matrix Vector
multiplications, you would have to do the following:
First think about a distributed representation of the adjacency matrix.
There are plenty of possible partitionings but let us stick with the
simplest: Cellwise representation. Thus, each entry has the form (rowIndx,
columnIndex, cellEntry). Well do the same for the PageRank vector: (index,
cellEntry).
Once this is done, we have to conceive a way to implement a matrix vector
multiplication within the programming framework of Flink, namely using map,
reduce, join, cogroup and cross operations. Looking at the definition of
the matrix vector multiplication, we see that we have to join the ith
PageRank vector entry with the ith column of the adjacency matrix. This
could be expressed like adjacencyMatrix.join(vector).where(1).equalTo(0).
In the MapFunction, we would have to compute the product. Since we only
need the row index information of the pair for the succeeding reduce
operation, the result would have the form (rowIndex,
matrixEntry*vectorEntry). Reducing now on the rowIndex, where we add the
individual products, produces the entries of the resulting vector. That way
you can implement a matrix vector multiplication with Flink.
I guess that he missing vector vector addition is now easy to implement.
The last thing to do is to use this dataflow as a building block for your
iteration and then you are done. If I should have missed your point, then
let me know.
Best,
Till
On Tue, Jun 17, 2014 at 3:39 PM, Yingjun Wu <
[hidden email]>
wrote:
> Hi Stephan,
>
> Thanks for your quick reply.
>
> Let's just consider a simple iterative processing algorithm, say, pagerank.
> If we wanna compute the pagerank value for 100k webpages, then the internal
> state, if represent the graph as matrix, should be at least 100k * 100k * 8
> bytes=74GB, which obviously exceeds the memory size of a single commodity
> machine. GraphLab does not suffer from this problem as it stores the graph
> in distributed memory. So in this case, do you think it is necessary to
> distributed the internal state to multiple machines?
>
> Regards,
> Yingjun
>
>
> On Tue, Jun 17, 2014 at 9:27 PM, Stephan Ewen <
[hidden email]> wrote:
>
> > Hi Yingjun!
> >
> > Thanks for pointing this out. I would like to learn a bit more about the
> > problem you have, so let me ask you a few questions to make sure I
> > understand the matter in more detail...
> >
> > I assume you are thinking abut programs that run a single reduce function
> > in the end, "all reduce" , rather than a "by-key-reduce"? And the size of
> > the data in that "all-reduce" is too large for the main memory?
> >
> > In many cases, you can do a "combine" step (like a pre-reduce) before the
> > final reduce function. Because there are many parallel instances of the
> > "combine", that one has much more memory available, and is often able to
> > shrink the size of the data such that the final reduce function works on
> a
> > data set small enough to fit in memory. In flink, you get that "combine"
> > step automatically when you use the ReduceFunction, or when you annotate
> a
> > GroupReduceFunction with "@Combinable".
> >
> > Sometimes, such a "combine" behavior cannot be applied to a reduce
> > function, when an operation cannot be executed on a sub-part of the data.
> > The case you are mentioning is such a case?
> >
> > We have not looked into maintaining a distributed state that can be
> > accessed across machines. Sometimes that may be helpful, I agree. It
> would
> > mean that some data accesses have quite a latency, if they cross the
> > network, but that may be acceptable for certain applications. How would
> you
> > imagine such a feature to look like? Could you give us an example of what
> > you would design it like?
> >
> > Greetings,
> > Stephan
> >
>