Partition problem

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Partition problem

Andrew Palumbo
Hi All,


I've run into a problem with empty partitions when the number of elements in a DataSet is less than the Degree of Parallelism.  I've created a gist here to describe it:


https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3


I have two 2x2 matrices, Matrix A and Matrix B and an execution environment where the degree of parallelism is 4. Both matrices   are blockified in  2 different DataSet s . In this case (the case of a 2x2 matrices with 4 partitions) this means that each row goes into a partition leaving 2 empty partitions. In Matrix A, the rows go into partitions 0, 1. However the rows of Matrix B end up in partitions 1, 2. I assign the ordinal index of the blockified matrix's partition to its block, and then join on that index.


However in this case, with differently partitioned matrices of the same geometry, the intersection of the blockified matrices' indices is 1, and partitions 0 and 2 are dropped.


I've tried explicitly defining the dop for Matrix B using the count of non-empty partitions in Matrix A, however this changes the order of the DataSet, placing partition 2 into partition 0.


Is there a way to make sure that these datasets are partitioned in the same way?


Thank you,


Andy


Reply | Threaded
Open this post in threaded view
|

Re: Partition problem

Till Rohrmann
Hi Andrew,

I think the problem is that you assume that both matrices have the same
partitioning. If you guarantee that this is the case, then you can use the
subtask index as the block index. But in the general case this is not true,
and then you have to calculate the blocks by first assigning a block index
(e.g. rows with 0-9 index are assigned to block 0, rows with 10-19 assigned
to block 1, etc.) and then create the blocks by reducing on this block
index. That's because the distribution of the individual rows in the
cluster is not necessarily the same between two matrices.

Cheers,
Till

On Mon, Apr 25, 2016 at 1:40 AM, Andrew Palumbo <[hidden email]> wrote:

> Hi All,
>
>
> I've run into a problem with empty partitions when the number of elements
> in a DataSet is less than the Degree of Parallelism.  I've created a gist
> here to describe it:
>
>
> https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
>
>
> I have two 2x2 matrices, Matrix A and Matrix B and an execution
> environment where the degree of parallelism is 4. Both matrices   are
> blockified in  2 different DataSet s . In this case (the case of a 2x2
> matrices with 4 partitions) this means that each row goes into a partition
> leaving 2 empty partitions. In Matrix A, the rows go into partitions 0, 1.
> However the rows of Matrix B end up in partitions 1, 2. I assign the
> ordinal index of the blockified matrix's partition to its block, and then
> join on that index.
>
>
> However in this case, with differently partitioned matrices of the same
> geometry, the intersection of the blockified matrices' indices is 1, and
> partitions 0 and 2 are dropped.
>
>
> I've tried explicitly defining the dop for Matrix B using the count of
> non-empty partitions in Matrix A, however this changes the order of the
> DataSet, placing partition 2 into partition 0.
>
>
> Is there a way to make sure that these datasets are partitioned in the
> same way?
>
>
> Thank you,
>
>
> Andy
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Partition problem

Fabian Hueske-2
Hi Andrew,

I might be wrong, but I think this problem is caused by an assumption of
how Flink reads input data.
In Flink, each InputSplit is not read by a new task and a split does not
correspond to a partition. This is different from how Hadoop MR and Spark
handle InputSplits.

Instead, Flink creates as many DataSource tasks as specified by the task
parallelism and lazily assigns InputSplits to its subtasks. Idle DataSource
subtasks request InputSplits from the JobManager and the assignment happens
first-come-first-serve.
Hence, the subtask ID (or partition ID) of an InputSplit is not
deterministic and a DataSource might read more than one or also no split at
all (such as in your case).

If you need the split ID in your program, you can implement an InputFormat,
which wraps another IF and assigns the ID of the current InputSplit to the
read data, i.e., converts the DataType from T to Tuple2[Int, T].

Hope this helps,
Fabian


2016-04-25 11:27 GMT+02:00 Till Rohrmann <[hidden email]>:

> Hi Andrew,
>
> I think the problem is that you assume that both matrices have the same
> partitioning. If you guarantee that this is the case, then you can use the
> subtask index as the block index. But in the general case this is not true,
> and then you have to calculate the blocks by first assigning a block index
> (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19 assigned
> to block 1, etc.) and then create the blocks by reducing on this block
> index. That's because the distribution of the individual rows in the
> cluster is not necessarily the same between two matrices.
>
> Cheers,
> Till
>
> On Mon, Apr 25, 2016 at 1:40 AM, Andrew Palumbo <[hidden email]>
> wrote:
>
> > Hi All,
> >
> >
> > I've run into a problem with empty partitions when the number of elements
> > in a DataSet is less than the Degree of Parallelism.  I've created a gist
> > here to describe it:
> >
> >
> > https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
> >
> >
> > I have two 2x2 matrices, Matrix A and Matrix B and an execution
> > environment where the degree of parallelism is 4. Both matrices   are
> > blockified in  2 different DataSet s . In this case (the case of a 2x2
> > matrices with 4 partitions) this means that each row goes into a
> partition
> > leaving 2 empty partitions. In Matrix A, the rows go into partitions 0,
> 1.
> > However the rows of Matrix B end up in partitions 1, 2. I assign the
> > ordinal index of the blockified matrix's partition to its block, and then
> > join on that index.
> >
> >
> > However in this case, with differently partitioned matrices of the same
> > geometry, the intersection of the blockified matrices' indices is 1, and
> > partitions 0 and 2 are dropped.
> >
> >
> > I've tried explicitly defining the dop for Matrix B using the count of
> > non-empty partitions in Matrix A, however this changes the order of the
> > DataSet, placing partition 2 into partition 0.
> >
> >
> > Is there a way to make sure that these datasets are partitioned in the
> > same way?
> >
> >
> > Thank you,
> >
> >
> > Andy
> >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Partition problem

Andrew Palumbo
Thank you Fabian and Till for answering,

 I think that my explanation of the problem was a bit over simplified (I am trying to implement an operator that will pass our tests, and didn't want to throw too much code at you).  I realize that this is an odd case, a 2x2 matrix in a distributed context, but it is for a specific Unit test that we enforce.

So we have two different Distributed Matrix Representations: Row-Wise and Blockified.  The Row-Wise representation is a `DataSet[(K, Vector)]` where K is e.g., an Int Key and Vector is a row of the Matrix.  The Blockified representation is a `DataSet[Array(K), Matrix]`.  In the Gist that I posted, I was working with a Blockified Distributed dataset.  Since it was a 2x2 matrix that was Blockified into 4 partitions, the non-empty partitions actually contain a 1x2 Matrix (rather than a (Vector) "row" as i think It reads I will update that to be more clear.  

@Fabian In this case, I am using ExecutionEnvironment.FromCollection to create the original Row-Wise Matrix DataSet.  (There are other cases in which we read from HDFS directly).  But for this problem I am doing something like:

     val inCoreA = dense((1, 2), (3, 4))
     val inCoreB = dense((3, 5), (4, 6))

     val rowsA = (0 until m.nrow).map(i => (i, inCoreA(i, ::)))
     drmA = env.fromCollection(rows).partitionByRange(0)

     val rowsB = (0 until m.nrow).map(i => (i, inCoreB(i, ::)))
     drmA = env.fromCollection(rows).partitionByRange(0)

>If you need the split ID in your program, you can implement an InputFormat,
>which wraps another IF and assigns the ID of the current InputSplit to the
>read data, i.e., converts the DataType from T to Tuple2[Int, T].

I'm not sure if the partitioning at this point matters (of the row-wise Matrices)?  (In next map these into Blockified Matrices)

@Till I think that you're right in that my assumption of Identical partitioning is a problem.

The above Matrices are then mapped into Blockified Matrices currently using the method something as follows:

    val blocksA = drmA.mapPartition {
      values =>
        val (keys, vectors) = values.toIterable.unzip

        if (vectors.nonEmpty) {
          val vector = vectors.head
          val matrix: Matrix = if (vector.isDense) {
            val matrix = new DenseMatrix(vectors.size, ncolLocal)
            vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec }
            matrix
          } else {
            new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
          }
          Seq((keys.toArray(classTag), matrix))
        } else {
          Seq()
        }
    }

And the same for Matrix B.

Which is where the partition index assignment begins in the gist: https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3


> I think the problem is that you assume that both matrices have the same
> partitioning. If you guarantee that this is the case, then you can use the
> subtask index as the block index.

Yes I think this is the problem- I'd assumed that when mapping into partitions, the 0 partiton would be used first and then the 1 partition and so on...  I understand what your saying now though re: lazy assignment via task Id.  So essentially the partition that the data ends up in is arbitrary based on the task ID that happens to be assigning it.  

But in the general case this is not true,
> and then you have to calculate the blocks by first assigning a block index
> (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19 assigned
> to block 1, etc.) and then create the blocks by reducing on this block
> index. That's because the distribution of the individual rows in the
> cluster is not necessarily the same between two matrices.

I suppose this would have to be done when Blockifying in the method above.  The row-wise matrix may be 2x2 or 20000000 x 2 and directly read from HDFS.  I'm not sure how to assign divide the data and partition it myself when mapping a row-wise matrix into blocks.  Eg. can I know the size of the DataSet before the computation is triggered by env.execute()? If I guess what you are saying is to hand- partition the data in the above `.asBlockified()` method.


As well is it not still possible that i may end up with the same problem when the # of matrix blocks is < the degree of parallelism?



In the end what I really need to do is be able to join the two Bockified DataSets (of any size) in the correct order.. so maybe there is an other way to do this?


Thanks again for your time.

Andy
________________________________________
From: Fabian Hueske <[hidden email]>
Sent: Monday, April 25, 2016 6:09 AM
To: [hidden email]
Subject: Re: Partition problem

Hi Andrew,

I might be wrong, but I think this problem is caused by an assumption of
how Flink reads input data.
In Flink, each InputSplit is not read by a new task and a split does not
correspond to a partition. This is different from how Hadoop MR and Spark
handle InputSplits.

Instead, Flink creates as many DataSource tasks as specified by the task
parallelism and lazily assigns InputSplits to its subtasks. Idle DataSource
subtasks request InputSplits from the JobManager and the assignment happens
first-come-first-serve.
Hence, the subtask ID (or partition ID) of an InputSplit is not
deterministic and a DataSource might read more than one or also no split at
all (such as in your case).

If you need the split ID in your program, you can implement an InputFormat,
which wraps another IF and assigns the ID of the current InputSplit to the
read data, i.e., converts the DataType from T to Tuple2[Int, T].

Hope this helps,
Fabian


2016-04-25 11:27 GMT+02:00 Till Rohrmann <[hidden email]>:

> Hi Andrew,
>
> I think the problem is that you assume that both matrices have the same
> partitioning. If you guarantee that this is the case, then you can use the
> subtask index as the block index. But in the general case this is not true,
> and then you have to calculate the blocks by first assigning a block index
> (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19 assigned
> to block 1, etc.) and then create the blocks by reducing on this block
> index. That's because the distribution of the individual rows in the
> cluster is not necessarily the same between two matrices.
>
> Cheers,
> Till
>
> On Mon, Apr 25, 2016 at 1:40 AM, Andrew Palumbo <[hidden email]>
> wrote:
>
> > Hi All,
> >
> >
> > I've run into a problem with empty partitions when the number of elements
> > in a DataSet is less than the Degree of Parallelism.  I've created a gist
> > here to describe it:
> >
> >
> > https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
> >
> >
> > I have two 2x2 matrices, Matrix A and Matrix B and an execution
> > environment where the degree of parallelism is 4. Both matrices   are
> > blockified in  2 different DataSet s . In this case (the case of a 2x2
> > matrices with 4 partitions) this means that each row goes into a
> partition
> > leaving 2 empty partitions. In Matrix A, the rows go into partitions 0,
> 1.
> > However the rows of Matrix B end up in partitions 1, 2. I assign the
> > ordinal index of the blockified matrix's partition to its block, and then
> > join on that index.
> >
> >
> > However in this case, with differently partitioned matrices of the same
> > geometry, the intersection of the blockified matrices' indices is 1, and
> > partitions 0 and 2 are dropped.
> >
> >
> > I've tried explicitly defining the dop for Matrix B using the count of
> > non-empty partitions in Matrix A, however this changes the order of the
> > DataSet, placing partition 2 into partition 0.
> >
> >
> > Is there a way to make sure that these datasets are partitioned in the
> > same way?
> >
> >
> > Thank you,
> >
> >
> > Andy
> >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Partition problem

Andrew Palumbo
sorry - just noticed below should read:

     val rowsA = (0 until inCoreA.nrow).map(i => (i, inCoreA(i, ::)))
     drmA = env.fromCollection(rowsA).partitionByRange(0)

     val rowsB = (0 until inCoreB.nrow).map(i => (i, inCoreB(i, ::)))
     drmB = env.fromCollection(rowsB).partitionByRange(0)


also:

The Blockified representation is a `DataSet[(Array(K), Matrix)]`.

Thanks

________________________________________
From: Andrew Palumbo <[hidden email]>
Sent: Monday, April 25, 2016 1:58 PM
To: [hidden email]
Subject: Re: Partition problem

Thank you Fabian and Till for answering,

 I think that my explanation of the problem was a bit over simplified (I am trying to implement an operator that will pass our tests, and didn't want to throw too much code at you).  I realize that this is an odd case, a 2x2 matrix in a distributed context, but it is for a specific Unit test that we enforce.

So we have two different Distributed Matrix Representations: Row-Wise and Blockified.  The Row-Wise representation is a `DataSet[(K, Vector)]` where K is e.g., an Int Key and Vector is a row of the Matrix.  The Blockified representation is a `DataSet[Array(K), Matrix]`.  In the Gist that I posted, I was working with a Blockified Distributed dataset.  Since it was a 2x2 matrix that was Blockified into 4 partitions, the non-empty partitions actually contain a 1x2 Matrix (rather than a (Vector) "row" as i think It reads I will update that to be more clear.

@Fabian In this case, I am using ExecutionEnvironment.FromCollection to create the original Row-Wise Matrix DataSet.  (There are other cases in which we read from HDFS directly).  But for this problem I am doing something like:

     val inCoreA = dense((1, 2), (3, 4))
     val inCoreB = dense((3, 5), (4, 6))

     val rowsA = (0 until m.nrow).map(i => (i, inCoreA(i, ::)))
     drmA = env.fromCollection(rows).partitionByRange(0)

     val rowsB = (0 until m.nrow).map(i => (i, inCoreB(i, ::)))
     drmA = env.fromCollection(rows).partitionByRange(0)

>If you need the split ID in your program, you can implement an InputFormat,
>which wraps another IF and assigns the ID of the current InputSplit to the
>read data, i.e., converts the DataType from T to Tuple2[Int, T].

I'm not sure if the partitioning at this point matters (of the row-wise Matrices)?  (In next map these into Blockified Matrices)

@Till I think that you're right in that my assumption of Identical partitioning is a problem.

The above Matrices are then mapped into Blockified Matrices currently using the method something as follows:

    val blocksA = drmA.mapPartition {
      values =>
        val (keys, vectors) = values.toIterable.unzip

        if (vectors.nonEmpty) {
          val vector = vectors.head
          val matrix: Matrix = if (vector.isDense) {
            val matrix = new DenseMatrix(vectors.size, ncolLocal)
            vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec }
            matrix
          } else {
            new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
          }
          Seq((keys.toArray(classTag), matrix))
        } else {
          Seq()
        }
    }

And the same for Matrix B.

Which is where the partition index assignment begins in the gist: https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3


> I think the problem is that you assume that both matrices have the same
> partitioning. If you guarantee that this is the case, then you can use the
> subtask index as the block index.

Yes I think this is the problem- I'd assumed that when mapping into partitions, the 0 partiton would be used first and then the 1 partition and so on...  I understand what your saying now though re: lazy assignment via task Id.  So essentially the partition that the data ends up in is arbitrary based on the task ID that happens to be assigning it.

But in the general case this is not true,
> and then you have to calculate the blocks by first assigning a block index
> (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19 assigned
> to block 1, etc.) and then create the blocks by reducing on this block
> index. That's because the distribution of the individual rows in the
> cluster is not necessarily the same between two matrices.

I suppose this would have to be done when Blockifying in the method above.  The row-wise matrix may be 2x2 or 20000000 x 2 and directly read from HDFS.  I'm not sure how to assign divide the data and partition it myself when mapping a row-wise matrix into blocks.  Eg. can I know the size of the DataSet before the computation is triggered by env.execute()? If I guess what you are saying is to hand- partition the data in the above `.asBlockified()` method.


As well is it not still possible that i may end up with the same problem when the # of matrix blocks is < the degree of parallelism?



In the end what I really need to do is be able to join the two Bockified DataSets (of any size) in the correct order.. so maybe there is an other way to do this?


Thanks again for your time.

Andy
________________________________________
From: Fabian Hueske <[hidden email]>
Sent: Monday, April 25, 2016 6:09 AM
To: [hidden email]
Subject: Re: Partition problem

Hi Andrew,

I might be wrong, but I think this problem is caused by an assumption of
how Flink reads input data.
In Flink, each InputSplit is not read by a new task and a split does not
correspond to a partition. This is different from how Hadoop MR and Spark
handle InputSplits.

Instead, Flink creates as many DataSource tasks as specified by the task
parallelism and lazily assigns InputSplits to its subtasks. Idle DataSource
subtasks request InputSplits from the JobManager and the assignment happens
first-come-first-serve.
Hence, the subtask ID (or partition ID) of an InputSplit is not
deterministic and a DataSource might read more than one or also no split at
all (such as in your case).

If you need the split ID in your program, you can implement an InputFormat,
which wraps another IF and assigns the ID of the current InputSplit to the
read data, i.e., converts the DataType from T to Tuple2[Int, T].

Hope this helps,
Fabian


2016-04-25 11:27 GMT+02:00 Till Rohrmann <[hidden email]>:

> Hi Andrew,
>
> I think the problem is that you assume that both matrices have the same
> partitioning. If you guarantee that this is the case, then you can use the
> subtask index as the block index. But in the general case this is not true,
> and then you have to calculate the blocks by first assigning a block index
> (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19 assigned
> to block 1, etc.) and then create the blocks by reducing on this block
> index. That's because the distribution of the individual rows in the
> cluster is not necessarily the same between two matrices.
>
> Cheers,
> Till
>
> On Mon, Apr 25, 2016 at 1:40 AM, Andrew Palumbo <[hidden email]>
> wrote:
>
> > Hi All,
> >
> >
> > I've run into a problem with empty partitions when the number of elements
> > in a DataSet is less than the Degree of Parallelism.  I've created a gist
> > here to describe it:
> >
> >
> > https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
> >
> >
> > I have two 2x2 matrices, Matrix A and Matrix B and an execution
> > environment where the degree of parallelism is 4. Both matrices   are
> > blockified in  2 different DataSet s . In this case (the case of a 2x2
> > matrices with 4 partitions) this means that each row goes into a
> partition
> > leaving 2 empty partitions. In Matrix A, the rows go into partitions 0,
> 1.
> > However the rows of Matrix B end up in partitions 1, 2. I assign the
> > ordinal index of the blockified matrix's partition to its block, and then
> > join on that index.
> >
> >
> > However in this case, with differently partitioned matrices of the same
> > geometry, the intersection of the blockified matrices' indices is 1, and
> > partitions 0 and 2 are dropped.
> >
> >
> > I've tried explicitly defining the dop for Matrix B using the count of
> > non-empty partitions in Matrix A, however this changes the order of the
> > DataSet, placing partition 2 into partition 0.
> >
> >
> > Is there a way to make sure that these datasets are partitioned in the
> > same way?
> >
> >
> > Thank you,
> >
> >
> > Andy
> >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Partition problem

Till Rohrmann
If you don’t know the size of your matrix, then you cannot partition it
into continuous chunks of rows. The problem is that partitionByRange
samples the data set to generate a distribution and, thus, two matrices
will rarely be partitioned identically. Also if you want to provide a data
distribution, then you have to know how many rows you have. Thus, you would
first have to count the number of rows and broadcast this information to
the subsequent partitioning step (don’t use collect for that).

Alternatively, if it’s ok that the grouping just has to be consistent and
not continuous with respect to the row index, then you could use the row
index modulo the number of blocks, you want to create, to calculate a
partition id. Using a custom partitioner which partitions the data
according to this partition ID and then applying you flat map operation
should do the trick. You could take a look at the ALS (line 458)
implementation, where I did something similar.

Cheers,
Till


On Mon, Apr 25, 2016 at 8:18 PM, Andrew Palumbo <[hidden email]> wrote:

> sorry - just noticed below should read:
>
>      val rowsA = (0 until inCoreA.nrow).map(i => (i, inCoreA(i, ::)))
>      drmA = env.fromCollection(rowsA).partitionByRange(0)
>
>      val rowsB = (0 until inCoreB.nrow).map(i => (i, inCoreB(i, ::)))
>      drmB = env.fromCollection(rowsB).partitionByRange(0)
>
>
> also:
>
> The Blockified representation is a `DataSet[(Array(K), Matrix)]`.
>
> Thanks
>
> ________________________________________
> From: Andrew Palumbo <[hidden email]>
> Sent: Monday, April 25, 2016 1:58 PM
> To: [hidden email]
> Subject: Re: Partition problem
>
> Thank you Fabian and Till for answering,
>
>  I think that my explanation of the problem was a bit over simplified (I
> am trying to implement an operator that will pass our tests, and didn't
> want to throw too much code at you).  I realize that this is an odd case, a
> 2x2 matrix in a distributed context, but it is for a specific Unit test
> that we enforce.
>
> So we have two different Distributed Matrix Representations: Row-Wise and
> Blockified.  The Row-Wise representation is a `DataSet[(K, Vector)]` where
> K is e.g., an Int Key and Vector is a row of the Matrix.  The Blockified
> representation is a `DataSet[Array(K), Matrix]`.  In the Gist that I
> posted, I was working with a Blockified Distributed dataset.  Since it was
> a 2x2 matrix that was Blockified into 4 partitions, the non-empty
> partitions actually contain a 1x2 Matrix (rather than a (Vector) "row" as i
> think It reads I will update that to be more clear.
>
> @Fabian In this case, I am using ExecutionEnvironment.FromCollection to
> create the original Row-Wise Matrix DataSet.  (There are other cases in
> which we read from HDFS directly).  But for this problem I am doing
> something like:
>
>      val inCoreA = dense((1, 2), (3, 4))
>      val inCoreB = dense((3, 5), (4, 6))
>
>      val rowsA = (0 until m.nrow).map(i => (i, inCoreA(i, ::)))
>      drmA = env.fromCollection(rows).partitionByRange(0)
>
>      val rowsB = (0 until m.nrow).map(i => (i, inCoreB(i, ::)))
>      drmA = env.fromCollection(rows).partitionByRange(0)
>
> >If you need the split ID in your program, you can implement an
> InputFormat,
> >which wraps another IF and assigns the ID of the current InputSplit to the
> >read data, i.e., converts the DataType from T to Tuple2[Int, T].
>
> I'm not sure if the partitioning at this point matters (of the row-wise
> Matrices)?  (In next map these into Blockified Matrices)
>
> @Till I think that you're right in that my assumption of Identical
> partitioning is a problem.
>
> The above Matrices are then mapped into Blockified Matrices currently
> using the method something as follows:
>
>     val blocksA = drmA.mapPartition {
>       values =>
>         val (keys, vectors) = values.toIterable.unzip
>
>         if (vectors.nonEmpty) {
>           val vector = vectors.head
>           val matrix: Matrix = if (vector.isDense) {
>             val matrix = new DenseMatrix(vectors.size, ncolLocal)
>             vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx,
> ::) := vec }
>             matrix
>           } else {
>             new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
>           }
>           Seq((keys.toArray(classTag), matrix))
>         } else {
>           Seq()
>         }
>     }
>
> And the same for Matrix B.
>
> Which is where the partition index assignment begins in the gist:
> https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
>
>
> > I think the problem is that you assume that both matrices have the same
> > partitioning. If you guarantee that this is the case, then you can use
> the
> > subtask index as the block index.
>
> Yes I think this is the problem- I'd assumed that when mapping into
> partitions, the 0 partiton would be used first and then the 1 partition and
> so on...  I understand what your saying now though re: lazy assignment via
> task Id.  So essentially the partition that the data ends up in is
> arbitrary based on the task ID that happens to be assigning it.
>
> But in the general case this is not true,
> > and then you have to calculate the blocks by first assigning a block
> index
> > (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19
> assigned
> > to block 1, etc.) and then create the blocks by reducing on this block
> > index. That's because the distribution of the individual rows in the
> > cluster is not necessarily the same between two matrices.
>
> I suppose this would have to be done when Blockifying in the method
> above.  The row-wise matrix may be 2x2 or 20000000 x 2 and directly read
> from HDFS.  I'm not sure how to assign divide the data and partition it
> myself when mapping a row-wise matrix into blocks.  Eg. can I know the size
> of the DataSet before the computation is triggered by env.execute()? If I
> guess what you are saying is to hand- partition the data in the above
> `.asBlockified()` method.
>
>
> As well is it not still possible that i may end up with the same problem
> when the # of matrix blocks is < the degree of parallelism?
>
>
>
> In the end what I really need to do is be able to join the two Bockified
> DataSets (of any size) in the correct order.. so maybe there is an other
> way to do this?
>
>
> Thanks again for your time.
>
> Andy
> ________________________________________
> From: Fabian Hueske <[hidden email]>
> Sent: Monday, April 25, 2016 6:09 AM
> To: [hidden email]
> Subject: Re: Partition problem
>
> Hi Andrew,
>
> I might be wrong, but I think this problem is caused by an assumption of
> how Flink reads input data.
> In Flink, each InputSplit is not read by a new task and a split does not
> correspond to a partition. This is different from how Hadoop MR and Spark
> handle InputSplits.
>
> Instead, Flink creates as many DataSource tasks as specified by the task
> parallelism and lazily assigns InputSplits to its subtasks. Idle DataSource
> subtasks request InputSplits from the JobManager and the assignment happens
> first-come-first-serve.
> Hence, the subtask ID (or partition ID) of an InputSplit is not
> deterministic and a DataSource might read more than one or also no split at
> all (such as in your case).
>
> If you need the split ID in your program, you can implement an InputFormat,
> which wraps another IF and assigns the ID of the current InputSplit to the
> read data, i.e., converts the DataType from T to Tuple2[Int, T].
>
> Hope this helps,
> Fabian
>
>
> 2016-04-25 11:27 GMT+02:00 Till Rohrmann <[hidden email]>:
>
> > Hi Andrew,
> >
> > I think the problem is that you assume that both matrices have the same
> > partitioning. If you guarantee that this is the case, then you can use
> the
> > subtask index as the block index. But in the general case this is not
> true,
> > and then you have to calculate the blocks by first assigning a block
> index
> > (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19
> assigned
> > to block 1, etc.) and then create the blocks by reducing on this block
> > index. That's because the distribution of the individual rows in the
> > cluster is not necessarily the same between two matrices.
> >
> > Cheers,
> > Till
> >
> > On Mon, Apr 25, 2016 at 1:40 AM, Andrew Palumbo <[hidden email]>
> > wrote:
> >
> > > Hi All,
> > >
> > >
> > > I've run into a problem with empty partitions when the number of
> elements
> > > in a DataSet is less than the Degree of Parallelism.  I've created a
> gist
> > > here to describe it:
> > >
> > >
> > > https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
> > >
> > >
> > > I have two 2x2 matrices, Matrix A and Matrix B and an execution
> > > environment where the degree of parallelism is 4. Both matrices   are
> > > blockified in  2 different DataSet s . In this case (the case of a 2x2
> > > matrices with 4 partitions) this means that each row goes into a
> > partition
> > > leaving 2 empty partitions. In Matrix A, the rows go into partitions 0,
> > 1.
> > > However the rows of Matrix B end up in partitions 1, 2. I assign the
> > > ordinal index of the blockified matrix's partition to its block, and
> then
> > > join on that index.
> > >
> > >
> > > However in this case, with differently partitioned matrices of the same
> > > geometry, the intersection of the blockified matrices' indices is 1,
> and
> > > partitions 0 and 2 are dropped.
> > >
> > >
> > > I've tried explicitly defining the dop for Matrix B using the count of
> > > non-empty partitions in Matrix A, however this changes the order of the
> > > DataSet, placing partition 2 into partition 0.
> > >
> > >
> > > Is there a way to make sure that these datasets are partitioned in the
> > > same way?
> > >
> > >
> > > Thank you,
> > >
> > >
> > > Andy
> > >
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Partition problem

Andrew Palumbo
Hi Till and Fabian,
I had to come back to this problem because we're putting out a maintenance release soon. I think I overcomplicated the issue here.  I don't need equal partitions.  All that I need is to ensure that we have continuous partitions based in the 0th partion.  Ie.  if there if there are 4 partitions and 2 are empty, partition 0 will have data and partition 1 will have data.


@till, I see what you did in ALS, with a Custom partitioner,  Is there a way that I can write a custom partitioner to make sure that we have data in the 0th and 1st partition?  I don't see much documentation for custom partitioners.

Thanks.

Andy
________________________________________
From: Till Rohrmann <[hidden email]>
Sent: Tuesday, April 26, 2016 9:39:41 AM
To: [hidden email]
Subject: Re: Partition problem

If you don’t know the size of your matrix, then you cannot partition it
into continuous chunks of rows. The problem is that partitionByRange
samples the data set to generate a distribution and, thus, two matrices
will rarely be partitioned identically. Also if you want to provide a data
distribution, then you have to know how many rows you have. Thus, you would
first have to count the number of rows and broadcast this information to
the subsequent partitioning step (don’t use collect for that).

Alternatively, if it’s ok that the grouping just has to be consistent and
not continuous with respect to the row index, then you could use the row
index modulo the number of blocks, you want to create, to calculate a
partition id. Using a custom partitioner which partitions the data
according to this partition ID and then applying you flat map operation
should do the trick. You could take a look at the ALS (line 458)
implementation, where I did something similar.

Cheers,
Till


On Mon, Apr 25, 2016 at 8:18 PM, Andrew Palumbo <[hidden email]> wrote:

> sorry - just noticed below should read:
>
>      val rowsA = (0 until inCoreA.nrow).map(i => (i, inCoreA(i, ::)))
>      drmA = env.fromCollection(rowsA).partitionByRange(0)
>
>      val rowsB = (0 until inCoreB.nrow).map(i => (i, inCoreB(i, ::)))
>      drmB = env.fromCollection(rowsB).partitionByRange(0)
>
>
> also:
>
> The Blockified representation is a `DataSet[(Array(K), Matrix)]`.
>
> Thanks
>
> ________________________________________
> From: Andrew Palumbo <[hidden email]>
> Sent: Monday, April 25, 2016 1:58 PM
> To: [hidden email]
> Subject: Re: Partition problem
>
> Thank you Fabian and Till for answering,
>
>  I think that my explanation of the problem was a bit over simplified (I
> am trying to implement an operator that will pass our tests, and didn't
> want to throw too much code at you).  I realize that this is an odd case, a
> 2x2 matrix in a distributed context, but it is for a specific Unit test
> that we enforce.
>
> So we have two different Distributed Matrix Representations: Row-Wise and
> Blockified.  The Row-Wise representation is a `DataSet[(K, Vector)]` where
> K is e.g., an Int Key and Vector is a row of the Matrix.  The Blockified
> representation is a `DataSet[Array(K), Matrix]`.  In the Gist that I
> posted, I was working with a Blockified Distributed dataset.  Since it was
> a 2x2 matrix that was Blockified into 4 partitions, the non-empty
> partitions actually contain a 1x2 Matrix (rather than a (Vector) "row" as i
> think It reads I will update that to be more clear.
>
> @Fabian In this case, I am using ExecutionEnvironment.FromCollection to
> create the original Row-Wise Matrix DataSet.  (There are other cases in
> which we read from HDFS directly).  But for this problem I am doing
> something like:
>
>      val inCoreA = dense((1, 2), (3, 4))
>      val inCoreB = dense((3, 5), (4, 6))
>
>      val rowsA = (0 until m.nrow).map(i => (i, inCoreA(i, ::)))
>      drmA = env.fromCollection(rows).partitionByRange(0)
>
>      val rowsB = (0 until m.nrow).map(i => (i, inCoreB(i, ::)))
>      drmA = env.fromCollection(rows).partitionByRange(0)
>
> >If you need the split ID in your program, you can implement an
> InputFormat,
> >which wraps another IF and assigns the ID of the current InputSplit to the
> >read data, i.e., converts the DataType from T to Tuple2[Int, T].
>
> I'm not sure if the partitioning at this point matters (of the row-wise
> Matrices)?  (In next map these into Blockified Matrices)
>
> @Till I think that you're right in that my assumption of Identical
> partitioning is a problem.
>
> The above Matrices are then mapped into Blockified Matrices currently
> using the method something as follows:
>
>     val blocksA = drmA.mapPartition {
>       values =>
>         val (keys, vectors) = values.toIterable.unzip
>
>         if (vectors.nonEmpty) {
>           val vector = vectors.head
>           val matrix: Matrix = if (vector.isDense) {
>             val matrix = new DenseMatrix(vectors.size, ncolLocal)
>             vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx,
> ::) := vec }
>             matrix
>           } else {
>             new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
>           }
>           Seq((keys.toArray(classTag), matrix))
>         } else {
>           Seq()
>         }
>     }
>
> And the same for Matrix B.
>
> Which is where the partition index assignment begins in the gist:
> https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
>
>
> > I think the problem is that you assume that both matrices have the same
> > partitioning. If you guarantee that this is the case, then you can use
> the
> > subtask index as the block index.
>
> Yes I think this is the problem- I'd assumed that when mapping into
> partitions, the 0 partiton would be used first and then the 1 partition and
> so on...  I understand what your saying now though re: lazy assignment via
> task Id.  So essentially the partition that the data ends up in is
> arbitrary based on the task ID that happens to be assigning it.
>
> But in the general case this is not true,
> > and then you have to calculate the blocks by first assigning a block
> index
> > (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19
> assigned
> > to block 1, etc.) and then create the blocks by reducing on this block
> > index. That's because the distribution of the individual rows in the
> > cluster is not necessarily the same between two matrices.
>
> I suppose this would have to be done when Blockifying in the method
> above.  The row-wise matrix may be 2x2 or 20000000 x 2 and directly read
> from HDFS.  I'm not sure how to assign divide the data and partition it
> myself when mapping a row-wise matrix into blocks.  Eg. can I know the size
> of the DataSet before the computation is triggered by env.execute()? If I
> guess what you are saying is to hand- partition the data in the above
> `.asBlockified()` method.
>
>
> As well is it not still possible that i may end up with the same problem
> when the # of matrix blocks is < the degree of parallelism?
>
>
>
> In the end what I really need to do is be able to join the two Bockified
> DataSets (of any size) in the correct order.. so maybe there is an other
> way to do this?
>
>
> Thanks again for your time.
>
> Andy
> ________________________________________
> From: Fabian Hueske <[hidden email]>
> Sent: Monday, April 25, 2016 6:09 AM
> To: [hidden email]
> Subject: Re: Partition problem
>
> Hi Andrew,
>
> I might be wrong, but I think this problem is caused by an assumption of
> how Flink reads input data.
> In Flink, each InputSplit is not read by a new task and a split does not
> correspond to a partition. This is different from how Hadoop MR and Spark
> handle InputSplits.
>
> Instead, Flink creates as many DataSource tasks as specified by the task
> parallelism and lazily assigns InputSplits to its subtasks. Idle DataSource
> subtasks request InputSplits from the JobManager and the assignment happens
> first-come-first-serve.
> Hence, the subtask ID (or partition ID) of an InputSplit is not
> deterministic and a DataSource might read more than one or also no split at
> all (such as in your case).
>
> If you need the split ID in your program, you can implement an InputFormat,
> which wraps another IF and assigns the ID of the current InputSplit to the
> read data, i.e., converts the DataType from T to Tuple2[Int, T].
>
> Hope this helps,
> Fabian
>
>
> 2016-04-25 11:27 GMT+02:00 Till Rohrmann <[hidden email]>:
>
> > Hi Andrew,
> >
> > I think the problem is that you assume that both matrices have the same
> > partitioning. If you guarantee that this is the case, then you can use
> the
> > subtask index as the block index. But in the general case this is not
> true,
> > and then you have to calculate the blocks by first assigning a block
> index
> > (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19
> assigned
> > to block 1, etc.) and then create the blocks by reducing on this block
> > index. That's because the distribution of the individual rows in the
> > cluster is not necessarily the same between two matrices.
> >
> > Cheers,
> > Till
> >
> > On Mon, Apr 25, 2016 at 1:40 AM, Andrew Palumbo <[hidden email]>
> > wrote:
> >
> > > Hi All,
> > >
> > >
> > > I've run into a problem with empty partitions when the number of
> elements
> > > in a DataSet is less than the Degree of Parallelism.  I've created a
> gist
> > > here to describe it:
> > >
> > >
> > > https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
> > >
> > >
> > > I have two 2x2 matrices, Matrix A and Matrix B and an execution
> > > environment where the degree of parallelism is 4. Both matrices   are
> > > blockified in  2 different DataSet s . In this case (the case of a 2x2
> > > matrices with 4 partitions) this means that each row goes into a
> > partition
> > > leaving 2 empty partitions. In Matrix A, the rows go into partitions 0,
> > 1.
> > > However the rows of Matrix B end up in partitions 1, 2. I assign the
> > > ordinal index of the blockified matrix's partition to its block, and
> then
> > > join on that index.
> > >
> > >
> > > However in this case, with differently partitioned matrices of the same
> > > geometry, the intersection of the blockified matrices' indices is 1,
> and
> > > partitions 0 and 2 are dropped.
> > >
> > >
> > > I've tried explicitly defining the dop for Matrix B using the count of
> > > non-empty partitions in Matrix A, however this changes the order of the
> > > DataSet, placing partition 2 into partition 0.
> > >
> > >
> > > Is there a way to make sure that these datasets are partitioned in the
> > > same way?
> > >
> > >
> > > Thank you,
> > >
> > >
> > > Andy
> > >
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Partition problem

Andrew Palumbo
Also I shoud say that i doo need the data in the partitions to remain in the same order.  Ie.  partition 3 can not go into partiton 0 if the partition 0 is empty and there are 2 of 4 partitions being used.

________________________________________
From: Andrew Palumbo <[hidden email]>
Sent: Saturday, May 14, 2016 3:25:38 PM
To: [hidden email]
Subject: Re: Partition problem

Hi Till and Fabian,
I had to come back to this problem because we're putting out a maintenance release soon. I think I overcomplicated the issue here.  I don't need equal partitions.  All that I need is to ensure that we have continuous partitions based in the 0th partion.  Ie.  if there if there are 4 partitions and 2 are empty, partition 0 will have data and partition 1 will have data.


@till, I see what you did in ALS, with a Custom partitioner,  Is there a way that I can write a custom partitioner to make sure that we have data in the 0th and 1st partition?  I don't see much documentation for custom partitioners.

Thanks.

Andy
________________________________________
From: Till Rohrmann <[hidden email]>
Sent: Tuesday, April 26, 2016 9:39:41 AM
To: [hidden email]
Subject: Re: Partition problem

If you don’t know the size of your matrix, then you cannot partition it
into continuous chunks of rows. The problem is that partitionByRange
samples the data set to generate a distribution and, thus, two matrices
will rarely be partitioned identically. Also if you want to provide a data
distribution, then you have to know how many rows you have. Thus, you would
first have to count the number of rows and broadcast this information to
the subsequent partitioning step (don’t use collect for that).

Alternatively, if it’s ok that the grouping just has to be consistent and
not continuous with respect to the row index, then you could use the row
index modulo the number of blocks, you want to create, to calculate a
partition id. Using a custom partitioner which partitions the data
according to this partition ID and then applying you flat map operation
should do the trick. You could take a look at the ALS (line 458)
implementation, where I did something similar.

Cheers,
Till


On Mon, Apr 25, 2016 at 8:18 PM, Andrew Palumbo <[hidden email]> wrote:

> sorry - just noticed below should read:
>
>      val rowsA = (0 until inCoreA.nrow).map(i => (i, inCoreA(i, ::)))
>      drmA = env.fromCollection(rowsA).partitionByRange(0)
>
>      val rowsB = (0 until inCoreB.nrow).map(i => (i, inCoreB(i, ::)))
>      drmB = env.fromCollection(rowsB).partitionByRange(0)
>
>
> also:
>
> The Blockified representation is a `DataSet[(Array(K), Matrix)]`.
>
> Thanks
>
> ________________________________________
> From: Andrew Palumbo <[hidden email]>
> Sent: Monday, April 25, 2016 1:58 PM
> To: [hidden email]
> Subject: Re: Partition problem
>
> Thank you Fabian and Till for answering,
>
>  I think that my explanation of the problem was a bit over simplified (I
> am trying to implement an operator that will pass our tests, and didn't
> want to throw too much code at you).  I realize that this is an odd case, a
> 2x2 matrix in a distributed context, but it is for a specific Unit test
> that we enforce.
>
> So we have two different Distributed Matrix Representations: Row-Wise and
> Blockified.  The Row-Wise representation is a `DataSet[(K, Vector)]` where
> K is e.g., an Int Key and Vector is a row of the Matrix.  The Blockified
> representation is a `DataSet[Array(K), Matrix]`.  In the Gist that I
> posted, I was working with a Blockified Distributed dataset.  Since it was
> a 2x2 matrix that was Blockified into 4 partitions, the non-empty
> partitions actually contain a 1x2 Matrix (rather than a (Vector) "row" as i
> think It reads I will update that to be more clear.
>
> @Fabian In this case, I am using ExecutionEnvironment.FromCollection to
> create the original Row-Wise Matrix DataSet.  (There are other cases in
> which we read from HDFS directly).  But for this problem I am doing
> something like:
>
>      val inCoreA = dense((1, 2), (3, 4))
>      val inCoreB = dense((3, 5), (4, 6))
>
>      val rowsA = (0 until m.nrow).map(i => (i, inCoreA(i, ::)))
>      drmA = env.fromCollection(rows).partitionByRange(0)
>
>      val rowsB = (0 until m.nrow).map(i => (i, inCoreB(i, ::)))
>      drmA = env.fromCollection(rows).partitionByRange(0)
>
> >If you need the split ID in your program, you can implement an
> InputFormat,
> >which wraps another IF and assigns the ID of the current InputSplit to the
> >read data, i.e., converts the DataType from T to Tuple2[Int, T].
>
> I'm not sure if the partitioning at this point matters (of the row-wise
> Matrices)?  (In next map these into Blockified Matrices)
>
> @Till I think that you're right in that my assumption of Identical
> partitioning is a problem.
>
> The above Matrices are then mapped into Blockified Matrices currently
> using the method something as follows:
>
>     val blocksA = drmA.mapPartition {
>       values =>
>         val (keys, vectors) = values.toIterable.unzip
>
>         if (vectors.nonEmpty) {
>           val vector = vectors.head
>           val matrix: Matrix = if (vector.isDense) {
>             val matrix = new DenseMatrix(vectors.size, ncolLocal)
>             vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx,
> ::) := vec }
>             matrix
>           } else {
>             new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
>           }
>           Seq((keys.toArray(classTag), matrix))
>         } else {
>           Seq()
>         }
>     }
>
> And the same for Matrix B.
>
> Which is where the partition index assignment begins in the gist:
> https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
>
>
> > I think the problem is that you assume that both matrices have the same
> > partitioning. If you guarantee that this is the case, then you can use
> the
> > subtask index as the block index.
>
> Yes I think this is the problem- I'd assumed that when mapping into
> partitions, the 0 partiton would be used first and then the 1 partition and
> so on...  I understand what your saying now though re: lazy assignment via
> task Id.  So essentially the partition that the data ends up in is
> arbitrary based on the task ID that happens to be assigning it.
>
> But in the general case this is not true,
> > and then you have to calculate the blocks by first assigning a block
> index
> > (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19
> assigned
> > to block 1, etc.) and then create the blocks by reducing on this block
> > index. That's because the distribution of the individual rows in the
> > cluster is not necessarily the same between two matrices.
>
> I suppose this would have to be done when Blockifying in the method
> above.  The row-wise matrix may be 2x2 or 20000000 x 2 and directly read
> from HDFS.  I'm not sure how to assign divide the data and partition it
> myself when mapping a row-wise matrix into blocks.  Eg. can I know the size
> of the DataSet before the computation is triggered by env.execute()? If I
> guess what you are saying is to hand- partition the data in the above
> `.asBlockified()` method.
>
>
> As well is it not still possible that i may end up with the same problem
> when the # of matrix blocks is < the degree of parallelism?
>
>
>
> In the end what I really need to do is be able to join the two Bockified
> DataSets (of any size) in the correct order.. so maybe there is an other
> way to do this?
>
>
> Thanks again for your time.
>
> Andy
> ________________________________________
> From: Fabian Hueske <[hidden email]>
> Sent: Monday, April 25, 2016 6:09 AM
> To: [hidden email]
> Subject: Re: Partition problem
>
> Hi Andrew,
>
> I might be wrong, but I think this problem is caused by an assumption of
> how Flink reads input data.
> In Flink, each InputSplit is not read by a new task and a split does not
> correspond to a partition. This is different from how Hadoop MR and Spark
> handle InputSplits.
>
> Instead, Flink creates as many DataSource tasks as specified by the task
> parallelism and lazily assigns InputSplits to its subtasks. Idle DataSource
> subtasks request InputSplits from the JobManager and the assignment happens
> first-come-first-serve.
> Hence, the subtask ID (or partition ID) of an InputSplit is not
> deterministic and a DataSource might read more than one or also no split at
> all (such as in your case).
>
> If you need the split ID in your program, you can implement an InputFormat,
> which wraps another IF and assigns the ID of the current InputSplit to the
> read data, i.e., converts the DataType from T to Tuple2[Int, T].
>
> Hope this helps,
> Fabian
>
>
> 2016-04-25 11:27 GMT+02:00 Till Rohrmann <[hidden email]>:
>
> > Hi Andrew,
> >
> > I think the problem is that you assume that both matrices have the same
> > partitioning. If you guarantee that this is the case, then you can use
> the
> > subtask index as the block index. But in the general case this is not
> true,
> > and then you have to calculate the blocks by first assigning a block
> index
> > (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19
> assigned
> > to block 1, etc.) and then create the blocks by reducing on this block
> > index. That's because the distribution of the individual rows in the
> > cluster is not necessarily the same between two matrices.
> >
> > Cheers,
> > Till
> >
> > On Mon, Apr 25, 2016 at 1:40 AM, Andrew Palumbo <[hidden email]>
> > wrote:
> >
> > > Hi All,
> > >
> > >
> > > I've run into a problem with empty partitions when the number of
> elements
> > > in a DataSet is less than the Degree of Parallelism.  I've created a
> gist
> > > here to describe it:
> > >
> > >
> > > https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
> > >
> > >
> > > I have two 2x2 matrices, Matrix A and Matrix B and an execution
> > > environment where the degree of parallelism is 4. Both matrices   are
> > > blockified in  2 different DataSet s . In this case (the case of a 2x2
> > > matrices with 4 partitions) this means that each row goes into a
> > partition
> > > leaving 2 empty partitions. In Matrix A, the rows go into partitions 0,
> > 1.
> > > However the rows of Matrix B end up in partitions 1, 2. I assign the
> > > ordinal index of the blockified matrix's partition to its block, and
> then
> > > join on that index.
> > >
> > >
> > > However in this case, with differently partitioned matrices of the same
> > > geometry, the intersection of the blockified matrices' indices is 1,
> and
> > > partitions 0 and 2 are dropped.
> > >
> > >
> > > I've tried explicitly defining the dop for Matrix B using the count of
> > > non-empty partitions in Matrix A, however this changes the order of the
> > > DataSet, placing partition 2 into partition 0.
> > >
> > >
> > > Is there a way to make sure that these datasets are partitioned in the
> > > same way?
> > >
> > >
> > > Thank you,
> > >
> > >
> > > Andy
> > >
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Partition problem

Fabian Hueske-2
Hi Andrew,

I am not sure that I fully understand your requirements. Please correct me
if I some of my assumptions are not correct.

Requirements:
- Rows must be partitioned in partitions of consecutive row ids, i.e., rows
0 to 10 in partition 0, rows 11 to 20 in partition 1, etc..
- Rows of both inputs must be equally partitioned.

This can be done using range partitioning with a provided DataDistribution
(to avoid auto-generated distributions based on sampling) or a custom
partitioner. The custom partitioner would basically reimplement the logic
of the range partitioner. So I would go with the range partitioner. The
DataSetUtils class provides method to pass a DataDistribution to a range
partitioner.

The data distribution should be such that the partitioner generates equally
sized partitions. If the row ids are dense (or gaps are uniformly
distributed), you only need to know the minimum and maximum row id. In case
of skewed row id distribution, it is good to know the distribution to
compute equally sized partitions.

The DataDistribution interface has a method to return the bucket boundaries
among which the partitions are divided.

Please let me know if I didn't get the requirements right or if you have
further questions.

Best, Fabian




2016-05-14 21:28 GMT+02:00 Andrew Palumbo <[hidden email]>:

> Also I shoud say that i doo need the data in the partitions to remain in
> the same order.  Ie.  partition 3 can not go into partiton 0 if the
> partition 0 is empty and there are 2 of 4 partitions being used.
>
> ________________________________________
> From: Andrew Palumbo <[hidden email]>
> Sent: Saturday, May 14, 2016 3:25:38 PM
> To: [hidden email]
> Subject: Re: Partition problem
>
> Hi Till and Fabian,
> I had to come back to this problem because we're putting out a maintenance
> release soon. I think I overcomplicated the issue here.  I don't need equal
> partitions.  All that I need is to ensure that we have continuous
> partitions based in the 0th partion.  Ie.  if there if there are 4
> partitions and 2 are empty, partition 0 will have data and partition 1 will
> have data.
>
>
> @till, I see what you did in ALS, with a Custom partitioner,  Is there a
> way that I can write a custom partitioner to make sure that we have data in
> the 0th and 1st partition?  I don't see much documentation for custom
> partitioners.
>
> Thanks.
>
> Andy
> ________________________________________
> From: Till Rohrmann <[hidden email]>
> Sent: Tuesday, April 26, 2016 9:39:41 AM
> To: [hidden email]
> Subject: Re: Partition problem
>
> If you don’t know the size of your matrix, then you cannot partition it
> into continuous chunks of rows. The problem is that partitionByRange
> samples the data set to generate a distribution and, thus, two matrices
> will rarely be partitioned identically. Also if you want to provide a data
> distribution, then you have to know how many rows you have. Thus, you would
> first have to count the number of rows and broadcast this information to
> the subsequent partitioning step (don’t use collect for that).
>
> Alternatively, if it’s ok that the grouping just has to be consistent and
> not continuous with respect to the row index, then you could use the row
> index modulo the number of blocks, you want to create, to calculate a
> partition id. Using a custom partitioner which partitions the data
> according to this partition ID and then applying you flat map operation
> should do the trick. You could take a look at the ALS (line 458)
> implementation, where I did something similar.
>
> Cheers,
> Till
> ​
>
> On Mon, Apr 25, 2016 at 8:18 PM, Andrew Palumbo <[hidden email]>
> wrote:
>
> > sorry - just noticed below should read:
> >
> >      val rowsA = (0 until inCoreA.nrow).map(i => (i, inCoreA(i, ::)))
> >      drmA = env.fromCollection(rowsA).partitionByRange(0)
> >
> >      val rowsB = (0 until inCoreB.nrow).map(i => (i, inCoreB(i, ::)))
> >      drmB = env.fromCollection(rowsB).partitionByRange(0)
> >
> >
> > also:
> >
> > The Blockified representation is a `DataSet[(Array(K), Matrix)]`.
> >
> > Thanks
> >
> > ________________________________________
> > From: Andrew Palumbo <[hidden email]>
> > Sent: Monday, April 25, 2016 1:58 PM
> > To: [hidden email]
> > Subject: Re: Partition problem
> >
> > Thank you Fabian and Till for answering,
> >
> >  I think that my explanation of the problem was a bit over simplified (I
> > am trying to implement an operator that will pass our tests, and didn't
> > want to throw too much code at you).  I realize that this is an odd
> case, a
> > 2x2 matrix in a distributed context, but it is for a specific Unit test
> > that we enforce.
> >
> > So we have two different Distributed Matrix Representations: Row-Wise and
> > Blockified.  The Row-Wise representation is a `DataSet[(K, Vector)]`
> where
> > K is e.g., an Int Key and Vector is a row of the Matrix.  The Blockified
> > representation is a `DataSet[Array(K), Matrix]`.  In the Gist that I
> > posted, I was working with a Blockified Distributed dataset.  Since it
> was
> > a 2x2 matrix that was Blockified into 4 partitions, the non-empty
> > partitions actually contain a 1x2 Matrix (rather than a (Vector) "row"
> as i
> > think It reads I will update that to be more clear.
> >
> > @Fabian In this case, I am using ExecutionEnvironment.FromCollection to
> > create the original Row-Wise Matrix DataSet.  (There are other cases in
> > which we read from HDFS directly).  But for this problem I am doing
> > something like:
> >
> >      val inCoreA = dense((1, 2), (3, 4))
> >      val inCoreB = dense((3, 5), (4, 6))
> >
> >      val rowsA = (0 until m.nrow).map(i => (i, inCoreA(i, ::)))
> >      drmA = env.fromCollection(rows).partitionByRange(0)
> >
> >      val rowsB = (0 until m.nrow).map(i => (i, inCoreB(i, ::)))
> >      drmA = env.fromCollection(rows).partitionByRange(0)
> >
> > >If you need the split ID in your program, you can implement an
> > InputFormat,
> > >which wraps another IF and assigns the ID of the current InputSplit to
> the
> > >read data, i.e., converts the DataType from T to Tuple2[Int, T].
> >
> > I'm not sure if the partitioning at this point matters (of the row-wise
> > Matrices)?  (In next map these into Blockified Matrices)
> >
> > @Till I think that you're right in that my assumption of Identical
> > partitioning is a problem.
> >
> > The above Matrices are then mapped into Blockified Matrices currently
> > using the method something as follows:
> >
> >     val blocksA = drmA.mapPartition {
> >       values =>
> >         val (keys, vectors) = values.toIterable.unzip
> >
> >         if (vectors.nonEmpty) {
> >           val vector = vectors.head
> >           val matrix: Matrix = if (vector.isDense) {
> >             val matrix = new DenseMatrix(vectors.size, ncolLocal)
> >             vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx,
> > ::) := vec }
> >             matrix
> >           } else {
> >             new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
> >           }
> >           Seq((keys.toArray(classTag), matrix))
> >         } else {
> >           Seq()
> >         }
> >     }
> >
> > And the same for Matrix B.
> >
> > Which is where the partition index assignment begins in the gist:
> > https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
> >
> >
> > > I think the problem is that you assume that both matrices have the same
> > > partitioning. If you guarantee that this is the case, then you can use
> > the
> > > subtask index as the block index.
> >
> > Yes I think this is the problem- I'd assumed that when mapping into
> > partitions, the 0 partiton would be used first and then the 1 partition
> and
> > so on...  I understand what your saying now though re: lazy assignment
> via
> > task Id.  So essentially the partition that the data ends up in is
> > arbitrary based on the task ID that happens to be assigning it.
> >
> > But in the general case this is not true,
> > > and then you have to calculate the blocks by first assigning a block
> > index
> > > (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19
> > assigned
> > > to block 1, etc.) and then create the blocks by reducing on this block
> > > index. That's because the distribution of the individual rows in the
> > > cluster is not necessarily the same between two matrices.
> >
> > I suppose this would have to be done when Blockifying in the method
> > above.  The row-wise matrix may be 2x2 or 20000000 x 2 and directly read
> > from HDFS.  I'm not sure how to assign divide the data and partition it
> > myself when mapping a row-wise matrix into blocks.  Eg. can I know the
> size
> > of the DataSet before the computation is triggered by env.execute()? If I
> > guess what you are saying is to hand- partition the data in the above
> > `.asBlockified()` method.
> >
> >
> > As well is it not still possible that i may end up with the same problem
> > when the # of matrix blocks is < the degree of parallelism?
> >
> >
> >
> > In the end what I really need to do is be able to join the two Bockified
> > DataSets (of any size) in the correct order.. so maybe there is an other
> > way to do this?
> >
> >
> > Thanks again for your time.
> >
> > Andy
> > ________________________________________
> > From: Fabian Hueske <[hidden email]>
> > Sent: Monday, April 25, 2016 6:09 AM
> > To: [hidden email]
> > Subject: Re: Partition problem
> >
> > Hi Andrew,
> >
> > I might be wrong, but I think this problem is caused by an assumption of
> > how Flink reads input data.
> > In Flink, each InputSplit is not read by a new task and a split does not
> > correspond to a partition. This is different from how Hadoop MR and Spark
> > handle InputSplits.
> >
> > Instead, Flink creates as many DataSource tasks as specified by the task
> > parallelism and lazily assigns InputSplits to its subtasks. Idle
> DataSource
> > subtasks request InputSplits from the JobManager and the assignment
> happens
> > first-come-first-serve.
> > Hence, the subtask ID (or partition ID) of an InputSplit is not
> > deterministic and a DataSource might read more than one or also no split
> at
> > all (such as in your case).
> >
> > If you need the split ID in your program, you can implement an
> InputFormat,
> > which wraps another IF and assigns the ID of the current InputSplit to
> the
> > read data, i.e., converts the DataType from T to Tuple2[Int, T].
> >
> > Hope this helps,
> > Fabian
> >
> >
> > 2016-04-25 11:27 GMT+02:00 Till Rohrmann <[hidden email]>:
> >
> > > Hi Andrew,
> > >
> > > I think the problem is that you assume that both matrices have the same
> > > partitioning. If you guarantee that this is the case, then you can use
> > the
> > > subtask index as the block index. But in the general case this is not
> > true,
> > > and then you have to calculate the blocks by first assigning a block
> > index
> > > (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19
> > assigned
> > > to block 1, etc.) and then create the blocks by reducing on this block
> > > index. That's because the distribution of the individual rows in the
> > > cluster is not necessarily the same between two matrices.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Apr 25, 2016 at 1:40 AM, Andrew Palumbo <[hidden email]>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > >
> > > > I've run into a problem with empty partitions when the number of
> > elements
> > > > in a DataSet is less than the Degree of Parallelism.  I've created a
> > gist
> > > > here to describe it:
> > > >
> > > >
> > > >
> https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
> > > >
> > > >
> > > > I have two 2x2 matrices, Matrix A and Matrix B and an execution
> > > > environment where the degree of parallelism is 4. Both matrices   are
> > > > blockified in  2 different DataSet s . In this case (the case of a
> 2x2
> > > > matrices with 4 partitions) this means that each row goes into a
> > > partition
> > > > leaving 2 empty partitions. In Matrix A, the rows go into partitions
> 0,
> > > 1.
> > > > However the rows of Matrix B end up in partitions 1, 2. I assign the
> > > > ordinal index of the blockified matrix's partition to its block, and
> > then
> > > > join on that index.
> > > >
> > > >
> > > > However in this case, with differently partitioned matrices of the
> same
> > > > geometry, the intersection of the blockified matrices' indices is 1,
> > and
> > > > partitions 0 and 2 are dropped.
> > > >
> > > >
> > > > I've tried explicitly defining the dop for Matrix B using the count
> of
> > > > non-empty partitions in Matrix A, however this changes the order of
> the
> > > > DataSet, placing partition 2 into partition 0.
> > > >
> > > >
> > > > Is there a way to make sure that these datasets are partitioned in
> the
> > > > same way?
> > > >
> > > >
> > > > Thank you,
> > > >
> > > >
> > > > Andy
> > > >
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Partition problem

Till Rohrmann
Hi Andrew,

I think in the end it boils down to counting the number of rows/finding the
maximum index in the set of rows if you want to partition your matrix into
blocks where the row indices are monotonically increasing. Without this
information none of the described methods (range partition or custom
partitioner) will work. However, both build-in mechanisms suffer from the
fact that you first need to calculate the count/max index, retrieve the
value to the client and then call the range partitioning/custom
partitioner. Thus, in the end you will execute parts of your job multiple
times.

A more efficient way would be to keep the count/max index as a `DataSet` in
your cluster and broadcast it to a map operation where you assign partition
ids. Group reducing on these partition ids will allow you to generate the
blocked representation of your matrix.

Cheers,
Till

On Tue, May 17, 2016 at 10:31 AM, Fabian Hueske <[hidden email]> wrote:

> Hi Andrew,
>
> I am not sure that I fully understand your requirements. Please correct me
> if I some of my assumptions are not correct.
>
> Requirements:
> - Rows must be partitioned in partitions of consecutive row ids, i.e., rows
> 0 to 10 in partition 0, rows 11 to 20 in partition 1, etc..
> - Rows of both inputs must be equally partitioned.
>
> This can be done using range partitioning with a provided DataDistribution
> (to avoid auto-generated distributions based on sampling) or a custom
> partitioner. The custom partitioner would basically reimplement the logic
> of the range partitioner. So I would go with the range partitioner. The
> DataSetUtils class provides method to pass a DataDistribution to a range
> partitioner.
>
> The data distribution should be such that the partitioner generates equally
> sized partitions. If the row ids are dense (or gaps are uniformly
> distributed), you only need to know the minimum and maximum row id. In case
> of skewed row id distribution, it is good to know the distribution to
> compute equally sized partitions.
>
> The DataDistribution interface has a method to return the bucket boundaries
> among which the partitions are divided.
>
> Please let me know if I didn't get the requirements right or if you have
> further questions.
>
> Best, Fabian
>
>
>
>
> 2016-05-14 21:28 GMT+02:00 Andrew Palumbo <[hidden email]>:
>
> > Also I shoud say that i doo need the data in the partitions to remain in
> > the same order.  Ie.  partition 3 can not go into partiton 0 if the
> > partition 0 is empty and there are 2 of 4 partitions being used.
> >
> > ________________________________________
> > From: Andrew Palumbo <[hidden email]>
> > Sent: Saturday, May 14, 2016 3:25:38 PM
> > To: [hidden email]
> > Subject: Re: Partition problem
> >
> > Hi Till and Fabian,
> > I had to come back to this problem because we're putting out a
> maintenance
> > release soon. I think I overcomplicated the issue here.  I don't need
> equal
> > partitions.  All that I need is to ensure that we have continuous
> > partitions based in the 0th partion.  Ie.  if there if there are 4
> > partitions and 2 are empty, partition 0 will have data and partition 1
> will
> > have data.
> >
> >
> > @till, I see what you did in ALS, with a Custom partitioner,  Is there a
> > way that I can write a custom partitioner to make sure that we have data
> in
> > the 0th and 1st partition?  I don't see much documentation for custom
> > partitioners.
> >
> > Thanks.
> >
> > Andy
> > ________________________________________
> > From: Till Rohrmann <[hidden email]>
> > Sent: Tuesday, April 26, 2016 9:39:41 AM
> > To: [hidden email]
> > Subject: Re: Partition problem
> >
> > If you don’t know the size of your matrix, then you cannot partition it
> > into continuous chunks of rows. The problem is that partitionByRange
> > samples the data set to generate a distribution and, thus, two matrices
> > will rarely be partitioned identically. Also if you want to provide a
> data
> > distribution, then you have to know how many rows you have. Thus, you
> would
> > first have to count the number of rows and broadcast this information to
> > the subsequent partitioning step (don’t use collect for that).
> >
> > Alternatively, if it’s ok that the grouping just has to be consistent and
> > not continuous with respect to the row index, then you could use the row
> > index modulo the number of blocks, you want to create, to calculate a
> > partition id. Using a custom partitioner which partitions the data
> > according to this partition ID and then applying you flat map operation
> > should do the trick. You could take a look at the ALS (line 458)
> > implementation, where I did something similar.
> >
> > Cheers,
> > Till
> > ​
> >
> > On Mon, Apr 25, 2016 at 8:18 PM, Andrew Palumbo <[hidden email]>
> > wrote:
> >
> > > sorry - just noticed below should read:
> > >
> > >      val rowsA = (0 until inCoreA.nrow).map(i => (i, inCoreA(i, ::)))
> > >      drmA = env.fromCollection(rowsA).partitionByRange(0)
> > >
> > >      val rowsB = (0 until inCoreB.nrow).map(i => (i, inCoreB(i, ::)))
> > >      drmB = env.fromCollection(rowsB).partitionByRange(0)
> > >
> > >
> > > also:
> > >
> > > The Blockified representation is a `DataSet[(Array(K), Matrix)]`.
> > >
> > > Thanks
> > >
> > > ________________________________________
> > > From: Andrew Palumbo <[hidden email]>
> > > Sent: Monday, April 25, 2016 1:58 PM
> > > To: [hidden email]
> > > Subject: Re: Partition problem
> > >
> > > Thank you Fabian and Till for answering,
> > >
> > >  I think that my explanation of the problem was a bit over simplified
> (I
> > > am trying to implement an operator that will pass our tests, and didn't
> > > want to throw too much code at you).  I realize that this is an odd
> > case, a
> > > 2x2 matrix in a distributed context, but it is for a specific Unit test
> > > that we enforce.
> > >
> > > So we have two different Distributed Matrix Representations: Row-Wise
> and
> > > Blockified.  The Row-Wise representation is a `DataSet[(K, Vector)]`
> > where
> > > K is e.g., an Int Key and Vector is a row of the Matrix.  The
> Blockified
> > > representation is a `DataSet[Array(K), Matrix]`.  In the Gist that I
> > > posted, I was working with a Blockified Distributed dataset.  Since it
> > was
> > > a 2x2 matrix that was Blockified into 4 partitions, the non-empty
> > > partitions actually contain a 1x2 Matrix (rather than a (Vector) "row"
> > as i
> > > think It reads I will update that to be more clear.
> > >
> > > @Fabian In this case, I am using ExecutionEnvironment.FromCollection to
> > > create the original Row-Wise Matrix DataSet.  (There are other cases in
> > > which we read from HDFS directly).  But for this problem I am doing
> > > something like:
> > >
> > >      val inCoreA = dense((1, 2), (3, 4))
> > >      val inCoreB = dense((3, 5), (4, 6))
> > >
> > >      val rowsA = (0 until m.nrow).map(i => (i, inCoreA(i, ::)))
> > >      drmA = env.fromCollection(rows).partitionByRange(0)
> > >
> > >      val rowsB = (0 until m.nrow).map(i => (i, inCoreB(i, ::)))
> > >      drmA = env.fromCollection(rows).partitionByRange(0)
> > >
> > > >If you need the split ID in your program, you can implement an
> > > InputFormat,
> > > >which wraps another IF and assigns the ID of the current InputSplit to
> > the
> > > >read data, i.e., converts the DataType from T to Tuple2[Int, T].
> > >
> > > I'm not sure if the partitioning at this point matters (of the row-wise
> > > Matrices)?  (In next map these into Blockified Matrices)
> > >
> > > @Till I think that you're right in that my assumption of Identical
> > > partitioning is a problem.
> > >
> > > The above Matrices are then mapped into Blockified Matrices currently
> > > using the method something as follows:
> > >
> > >     val blocksA = drmA.mapPartition {
> > >       values =>
> > >         val (keys, vectors) = values.toIterable.unzip
> > >
> > >         if (vectors.nonEmpty) {
> > >           val vector = vectors.head
> > >           val matrix: Matrix = if (vector.isDense) {
> > >             val matrix = new DenseMatrix(vectors.size, ncolLocal)
> > >             vectors.zipWithIndex.foreach { case (vec, idx) =>
> matrix(idx,
> > > ::) := vec }
> > >             matrix
> > >           } else {
> > >             new SparseRowMatrix(vectors.size, ncolLocal,
> vectors.toArray)
> > >           }
> > >           Seq((keys.toArray(classTag), matrix))
> > >         } else {
> > >           Seq()
> > >         }
> > >     }
> > >
> > > And the same for Matrix B.
> > >
> > > Which is where the partition index assignment begins in the gist:
> > > https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
> > >
> > >
> > > > I think the problem is that you assume that both matrices have the
> same
> > > > partitioning. If you guarantee that this is the case, then you can
> use
> > > the
> > > > subtask index as the block index.
> > >
> > > Yes I think this is the problem- I'd assumed that when mapping into
> > > partitions, the 0 partiton would be used first and then the 1 partition
> > and
> > > so on...  I understand what your saying now though re: lazy assignment
> > via
> > > task Id.  So essentially the partition that the data ends up in is
> > > arbitrary based on the task ID that happens to be assigning it.
> > >
> > > But in the general case this is not true,
> > > > and then you have to calculate the blocks by first assigning a block
> > > index
> > > > (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19
> > > assigned
> > > > to block 1, etc.) and then create the blocks by reducing on this
> block
> > > > index. That's because the distribution of the individual rows in the
> > > > cluster is not necessarily the same between two matrices.
> > >
> > > I suppose this would have to be done when Blockifying in the method
> > > above.  The row-wise matrix may be 2x2 or 20000000 x 2 and directly
> read
> > > from HDFS.  I'm not sure how to assign divide the data and partition it
> > > myself when mapping a row-wise matrix into blocks.  Eg. can I know the
> > size
> > > of the DataSet before the computation is triggered by env.execute()?
> If I
> > > guess what you are saying is to hand- partition the data in the above
> > > `.asBlockified()` method.
> > >
> > >
> > > As well is it not still possible that i may end up with the same
> problem
> > > when the # of matrix blocks is < the degree of parallelism?
> > >
> > >
> > >
> > > In the end what I really need to do is be able to join the two
> Bockified
> > > DataSets (of any size) in the correct order.. so maybe there is an
> other
> > > way to do this?
> > >
> > >
> > > Thanks again for your time.
> > >
> > > Andy
> > > ________________________________________
> > > From: Fabian Hueske <[hidden email]>
> > > Sent: Monday, April 25, 2016 6:09 AM
> > > To: [hidden email]
> > > Subject: Re: Partition problem
> > >
> > > Hi Andrew,
> > >
> > > I might be wrong, but I think this problem is caused by an assumption
> of
> > > how Flink reads input data.
> > > In Flink, each InputSplit is not read by a new task and a split does
> not
> > > correspond to a partition. This is different from how Hadoop MR and
> Spark
> > > handle InputSplits.
> > >
> > > Instead, Flink creates as many DataSource tasks as specified by the
> task
> > > parallelism and lazily assigns InputSplits to its subtasks. Idle
> > DataSource
> > > subtasks request InputSplits from the JobManager and the assignment
> > happens
> > > first-come-first-serve.
> > > Hence, the subtask ID (or partition ID) of an InputSplit is not
> > > deterministic and a DataSource might read more than one or also no
> split
> > at
> > > all (such as in your case).
> > >
> > > If you need the split ID in your program, you can implement an
> > InputFormat,
> > > which wraps another IF and assigns the ID of the current InputSplit to
> > the
> > > read data, i.e., converts the DataType from T to Tuple2[Int, T].
> > >
> > > Hope this helps,
> > > Fabian
> > >
> > >
> > > 2016-04-25 11:27 GMT+02:00 Till Rohrmann <[hidden email]>:
> > >
> > > > Hi Andrew,
> > > >
> > > > I think the problem is that you assume that both matrices have the
> same
> > > > partitioning. If you guarantee that this is the case, then you can
> use
> > > the
> > > > subtask index as the block index. But in the general case this is not
> > > true,
> > > > and then you have to calculate the blocks by first assigning a block
> > > index
> > > > (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19
> > > assigned
> > > > to block 1, etc.) and then create the blocks by reducing on this
> block
> > > > index. That's because the distribution of the individual rows in the
> > > > cluster is not necessarily the same between two matrices.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Apr 25, 2016 at 1:40 AM, Andrew Palumbo <[hidden email]>
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > >
> > > > > I've run into a problem with empty partitions when the number of
> > > elements
> > > > > in a DataSet is less than the Degree of Parallelism.  I've created
> a
> > > gist
> > > > > here to describe it:
> > > > >
> > > > >
> > > > >
> > https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
> > > > >
> > > > >
> > > > > I have two 2x2 matrices, Matrix A and Matrix B and an execution
> > > > > environment where the degree of parallelism is 4. Both matrices
>  are
> > > > > blockified in  2 different DataSet s . In this case (the case of a
> > 2x2
> > > > > matrices with 4 partitions) this means that each row goes into a
> > > > partition
> > > > > leaving 2 empty partitions. In Matrix A, the rows go into
> partitions
> > 0,
> > > > 1.
> > > > > However the rows of Matrix B end up in partitions 1, 2. I assign
> the
> > > > > ordinal index of the blockified matrix's partition to its block,
> and
> > > then
> > > > > join on that index.
> > > > >
> > > > >
> > > > > However in this case, with differently partitioned matrices of the
> > same
> > > > > geometry, the intersection of the blockified matrices' indices is
> 1,
> > > and
> > > > > partitions 0 and 2 are dropped.
> > > > >
> > > > >
> > > > > I've tried explicitly defining the dop for Matrix B using the count
> > of
> > > > > non-empty partitions in Matrix A, however this changes the order of
> > the
> > > > > DataSet, placing partition 2 into partition 0.
> > > > >
> > > > >
> > > > > Is there a way to make sure that these datasets are partitioned in
> > the
> > > > > same way?
> > > > >
> > > > >
> > > > > Thank you,
> > > > >
> > > > >
> > > > > Andy
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Partition problem

Andrew Palumbo
Thank you both for your time, we've bumped it to a further version since it was an optimization of what we currently have (a working Mahout-on-flink), and we need to release Mahout 0.12.1.

Andy
________________________________________
From: Till Rohrmann <[hidden email]>
Sent: Tuesday, May 17, 2016 8:52:51 AM
To: [hidden email]
Subject: Re: Partition problem

Hi Andrew,

I think in the end it boils down to counting the number of rows/finding the
maximum index in the set of rows if you want to partition your matrix into
blocks where the row indices are monotonically increasing. Without this
information none of the described methods (range partition or custom
partitioner) will work. However, both build-in mechanisms suffer from the
fact that you first need to calculate the count/max index, retrieve the
value to the client and then call the range partitioning/custom
partitioner. Thus, in the end you will execute parts of your job multiple
times.

A more efficient way would be to keep the count/max index as a `DataSet` in
your cluster and broadcast it to a map operation where you assign partition
ids. Group reducing on these partition ids will allow you to generate the
blocked representation of your matrix.

Cheers,
Till

On Tue, May 17, 2016 at 10:31 AM, Fabian Hueske <[hidden email]> wrote:

> Hi Andrew,
>
> I am not sure that I fully understand your requirements. Please correct me
> if I some of my assumptions are not correct.
>
> Requirements:
> - Rows must be partitioned in partitions of consecutive row ids, i.e., rows
> 0 to 10 in partition 0, rows 11 to 20 in partition 1, etc..
> - Rows of both inputs must be equally partitioned.
>
> This can be done using range partitioning with a provided DataDistribution
> (to avoid auto-generated distributions based on sampling) or a custom
> partitioner. The custom partitioner would basically reimplement the logic
> of the range partitioner. So I would go with the range partitioner. The
> DataSetUtils class provides method to pass a DataDistribution to a range
> partitioner.
>
> The data distribution should be such that the partitioner generates equally
> sized partitions. If the row ids are dense (or gaps are uniformly
> distributed), you only need to know the minimum and maximum row id. In case
> of skewed row id distribution, it is good to know the distribution to
> compute equally sized partitions.
>
> The DataDistribution interface has a method to return the bucket boundaries
> among which the partitions are divided.
>
> Please let me know if I didn't get the requirements right or if you have
> further questions.
>
> Best, Fabian
>
>
>
>
> 2016-05-14 21:28 GMT+02:00 Andrew Palumbo <[hidden email]>:
>
> > Also I shoud say that i doo need the data in the partitions to remain in
> > the same order.  Ie.  partition 3 can not go into partiton 0 if the
> > partition 0 is empty and there are 2 of 4 partitions being used.
> >
> > ________________________________________
> > From: Andrew Palumbo <[hidden email]>
> > Sent: Saturday, May 14, 2016 3:25:38 PM
> > To: [hidden email]
> > Subject: Re: Partition problem
> >
> > Hi Till and Fabian,
> > I had to come back to this problem because we're putting out a
> maintenance
> > release soon. I think I overcomplicated the issue here.  I don't need
> equal
> > partitions.  All that I need is to ensure that we have continuous
> > partitions based in the 0th partion.  Ie.  if there if there are 4
> > partitions and 2 are empty, partition 0 will have data and partition 1
> will
> > have data.
> >
> >
> > @till, I see what you did in ALS, with a Custom partitioner,  Is there a
> > way that I can write a custom partitioner to make sure that we have data
> in
> > the 0th and 1st partition?  I don't see much documentation for custom
> > partitioners.
> >
> > Thanks.
> >
> > Andy
> > ________________________________________
> > From: Till Rohrmann <[hidden email]>
> > Sent: Tuesday, April 26, 2016 9:39:41 AM
> > To: [hidden email]
> > Subject: Re: Partition problem
> >
> > If you don’t know the size of your matrix, then you cannot partition it
> > into continuous chunks of rows. The problem is that partitionByRange
> > samples the data set to generate a distribution and, thus, two matrices
> > will rarely be partitioned identically. Also if you want to provide a
> data
> > distribution, then you have to know how many rows you have. Thus, you
> would
> > first have to count the number of rows and broadcast this information to
> > the subsequent partitioning step (don’t use collect for that).
> >
> > Alternatively, if it’s ok that the grouping just has to be consistent and
> > not continuous with respect to the row index, then you could use the row
> > index modulo the number of blocks, you want to create, to calculate a
> > partition id. Using a custom partitioner which partitions the data
> > according to this partition ID and then applying you flat map operation
> > should do the trick. You could take a look at the ALS (line 458)
> > implementation, where I did something similar.
> >
> > Cheers,
> > Till
> > ​
> >
> > On Mon, Apr 25, 2016 at 8:18 PM, Andrew Palumbo <[hidden email]>
> > wrote:
> >
> > > sorry - just noticed below should read:
> > >
> > >      val rowsA = (0 until inCoreA.nrow).map(i => (i, inCoreA(i, ::)))
> > >      drmA = env.fromCollection(rowsA).partitionByRange(0)
> > >
> > >      val rowsB = (0 until inCoreB.nrow).map(i => (i, inCoreB(i, ::)))
> > >      drmB = env.fromCollection(rowsB).partitionByRange(0)
> > >
> > >
> > > also:
> > >
> > > The Blockified representation is a `DataSet[(Array(K), Matrix)]`.
> > >
> > > Thanks
> > >
> > > ________________________________________
> > > From: Andrew Palumbo <[hidden email]>
> > > Sent: Monday, April 25, 2016 1:58 PM
> > > To: [hidden email]
> > > Subject: Re: Partition problem
> > >
> > > Thank you Fabian and Till for answering,
> > >
> > >  I think that my explanation of the problem was a bit over simplified
> (I
> > > am trying to implement an operator that will pass our tests, and didn't
> > > want to throw too much code at you).  I realize that this is an odd
> > case, a
> > > 2x2 matrix in a distributed context, but it is for a specific Unit test
> > > that we enforce.
> > >
> > > So we have two different Distributed Matrix Representations: Row-Wise
> and
> > > Blockified.  The Row-Wise representation is a `DataSet[(K, Vector)]`
> > where
> > > K is e.g., an Int Key and Vector is a row of the Matrix.  The
> Blockified
> > > representation is a `DataSet[Array(K), Matrix]`.  In the Gist that I
> > > posted, I was working with a Blockified Distributed dataset.  Since it
> > was
> > > a 2x2 matrix that was Blockified into 4 partitions, the non-empty
> > > partitions actually contain a 1x2 Matrix (rather than a (Vector) "row"
> > as i
> > > think It reads I will update that to be more clear.
> > >
> > > @Fabian In this case, I am using ExecutionEnvironment.FromCollection to
> > > create the original Row-Wise Matrix DataSet.  (There are other cases in
> > > which we read from HDFS directly).  But for this problem I am doing
> > > something like:
> > >
> > >      val inCoreA = dense((1, 2), (3, 4))
> > >      val inCoreB = dense((3, 5), (4, 6))
> > >
> > >      val rowsA = (0 until m.nrow).map(i => (i, inCoreA(i, ::)))
> > >      drmA = env.fromCollection(rows).partitionByRange(0)
> > >
> > >      val rowsB = (0 until m.nrow).map(i => (i, inCoreB(i, ::)))
> > >      drmA = env.fromCollection(rows).partitionByRange(0)
> > >
> > > >If you need the split ID in your program, you can implement an
> > > InputFormat,
> > > >which wraps another IF and assigns the ID of the current InputSplit to
> > the
> > > >read data, i.e., converts the DataType from T to Tuple2[Int, T].
> > >
> > > I'm not sure if the partitioning at this point matters (of the row-wise
> > > Matrices)?  (In next map these into Blockified Matrices)
> > >
> > > @Till I think that you're right in that my assumption of Identical
> > > partitioning is a problem.
> > >
> > > The above Matrices are then mapped into Blockified Matrices currently
> > > using the method something as follows:
> > >
> > >     val blocksA = drmA.mapPartition {
> > >       values =>
> > >         val (keys, vectors) = values.toIterable.unzip
> > >
> > >         if (vectors.nonEmpty) {
> > >           val vector = vectors.head
> > >           val matrix: Matrix = if (vector.isDense) {
> > >             val matrix = new DenseMatrix(vectors.size, ncolLocal)
> > >             vectors.zipWithIndex.foreach { case (vec, idx) =>
> matrix(idx,
> > > ::) := vec }
> > >             matrix
> > >           } else {
> > >             new SparseRowMatrix(vectors.size, ncolLocal,
> vectors.toArray)
> > >           }
> > >           Seq((keys.toArray(classTag), matrix))
> > >         } else {
> > >           Seq()
> > >         }
> > >     }
> > >
> > > And the same for Matrix B.
> > >
> > > Which is where the partition index assignment begins in the gist:
> > > https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
> > >
> > >
> > > > I think the problem is that you assume that both matrices have the
> same
> > > > partitioning. If you guarantee that this is the case, then you can
> use
> > > the
> > > > subtask index as the block index.
> > >
> > > Yes I think this is the problem- I'd assumed that when mapping into
> > > partitions, the 0 partiton would be used first and then the 1 partition
> > and
> > > so on...  I understand what your saying now though re: lazy assignment
> > via
> > > task Id.  So essentially the partition that the data ends up in is
> > > arbitrary based on the task ID that happens to be assigning it.
> > >
> > > But in the general case this is not true,
> > > > and then you have to calculate the blocks by first assigning a block
> > > index
> > > > (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19
> > > assigned
> > > > to block 1, etc.) and then create the blocks by reducing on this
> block
> > > > index. That's because the distribution of the individual rows in the
> > > > cluster is not necessarily the same between two matrices.
> > >
> > > I suppose this would have to be done when Blockifying in the method
> > > above.  The row-wise matrix may be 2x2 or 20000000 x 2 and directly
> read
> > > from HDFS.  I'm not sure how to assign divide the data and partition it
> > > myself when mapping a row-wise matrix into blocks.  Eg. can I know the
> > size
> > > of the DataSet before the computation is triggered by env.execute()?
> If I
> > > guess what you are saying is to hand- partition the data in the above
> > > `.asBlockified()` method.
> > >
> > >
> > > As well is it not still possible that i may end up with the same
> problem
> > > when the # of matrix blocks is < the degree of parallelism?
> > >
> > >
> > >
> > > In the end what I really need to do is be able to join the two
> Bockified
> > > DataSets (of any size) in the correct order.. so maybe there is an
> other
> > > way to do this?
> > >
> > >
> > > Thanks again for your time.
> > >
> > > Andy
> > > ________________________________________
> > > From: Fabian Hueske <[hidden email]>
> > > Sent: Monday, April 25, 2016 6:09 AM
> > > To: [hidden email]
> > > Subject: Re: Partition problem
> > >
> > > Hi Andrew,
> > >
> > > I might be wrong, but I think this problem is caused by an assumption
> of
> > > how Flink reads input data.
> > > In Flink, each InputSplit is not read by a new task and a split does
> not
> > > correspond to a partition. This is different from how Hadoop MR and
> Spark
> > > handle InputSplits.
> > >
> > > Instead, Flink creates as many DataSource tasks as specified by the
> task
> > > parallelism and lazily assigns InputSplits to its subtasks. Idle
> > DataSource
> > > subtasks request InputSplits from the JobManager and the assignment
> > happens
> > > first-come-first-serve.
> > > Hence, the subtask ID (or partition ID) of an InputSplit is not
> > > deterministic and a DataSource might read more than one or also no
> split
> > at
> > > all (such as in your case).
> > >
> > > If you need the split ID in your program, you can implement an
> > InputFormat,
> > > which wraps another IF and assigns the ID of the current InputSplit to
> > the
> > > read data, i.e., converts the DataType from T to Tuple2[Int, T].
> > >
> > > Hope this helps,
> > > Fabian
> > >
> > >
> > > 2016-04-25 11:27 GMT+02:00 Till Rohrmann <[hidden email]>:
> > >
> > > > Hi Andrew,
> > > >
> > > > I think the problem is that you assume that both matrices have the
> same
> > > > partitioning. If you guarantee that this is the case, then you can
> use
> > > the
> > > > subtask index as the block index. But in the general case this is not
> > > true,
> > > > and then you have to calculate the blocks by first assigning a block
> > > index
> > > > (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19
> > > assigned
> > > > to block 1, etc.) and then create the blocks by reducing on this
> block
> > > > index. That's because the distribution of the individual rows in the
> > > > cluster is not necessarily the same between two matrices.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Apr 25, 2016 at 1:40 AM, Andrew Palumbo <[hidden email]>
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > >
> > > > > I've run into a problem with empty partitions when the number of
> > > elements
> > > > > in a DataSet is less than the Degree of Parallelism.  I've created
> a
> > > gist
> > > > > here to describe it:
> > > > >
> > > > >
> > > > >
> > https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
> > > > >
> > > > >
> > > > > I have two 2x2 matrices, Matrix A and Matrix B and an execution
> > > > > environment where the degree of parallelism is 4. Both matrices
>  are
> > > > > blockified in  2 different DataSet s . In this case (the case of a
> > 2x2
> > > > > matrices with 4 partitions) this means that each row goes into a
> > > > partition
> > > > > leaving 2 empty partitions. In Matrix A, the rows go into
> partitions
> > 0,
> > > > 1.
> > > > > However the rows of Matrix B end up in partitions 1, 2. I assign
> the
> > > > > ordinal index of the blockified matrix's partition to its block,
> and
> > > then
> > > > > join on that index.
> > > > >
> > > > >
> > > > > However in this case, with differently partitioned matrices of the
> > same
> > > > > geometry, the intersection of the blockified matrices' indices is
> 1,
> > > and
> > > > > partitions 0 and 2 are dropped.
> > > > >
> > > > >
> > > > > I've tried explicitly defining the dop for Matrix B using the count
> > of
> > > > > non-empty partitions in Matrix A, however this changes the order of
> > the
> > > > > DataSet, placing partition 2 into partition 0.
> > > > >
> > > > >
> > > > > Is there a way to make sure that these datasets are partitioned in
> > the
> > > > > same way?
> > > > >
> > > > >
> > > > > Thank you,
> > > > >
> > > > >
> > > > > Andy
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>