RichMapPartitionFunction - problems with collect

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

RichMapPartitionFunction - problems with collect

Sergio Ramírez
Hi all,

I've been having some problems with RichMapPartitionFunction. Firstly, I
tried to convert the iterable into an array unsuccessfully. Then, I have
used some buffers to store the values per column. I am trying to  
transpose the local matrix of LabeledVectors that I have in each partition.

None of these solutions have worked. For example, for partition 7 and
feature 10, the vector is empty, whereas for the same partition and
feature 11, the vectors contains 200 elements. And this change on each
execution, different partitions and features.

I think there is a problem with using the collect method out of the
iterable loop.

new RichMapPartitionFunction[LabeledVector, ((Int, Int), Array[Byte])]() {
         def mapPartition(it: java.lang.Iterable[LabeledVector], out:
Collector[((Int, Int), Array[Byte])]): Unit = {
           val index = getRuntimeContext().getIndexOfThisSubtask()
           val mat = for (i <- 0 until nFeatures) yield new
scala.collection.mutable.ListBuffer[Byte]
           for(reg <- it.asScala) {
             for (i <- 0 until (nFeatures - 1)) mat(i) +=
reg.vector(i).toByte
             mat(nFeatures - 1) += classMap(reg.label)
           }
           for(i <- 0 until nFeatures) out.collect((i, index) ->
mat(i).toArray) // numPartitions
         }
  }

Regards
Reply | Threaded
Open this post in threaded view
|

Re: RichMapPartitionFunction - problems with collect

Chesnay Schepler-3
Haven't looked to deeply into this, but this sounds like object reuse is
enabled, at which point buffering values effectively causes you to store
the same value multiple times.

can you try disabling objectReuse using
env.getConfig().disableObjectReuse() ?

On 22.03.2016 16:53, Sergio Ramírez wrote:

> Hi all,
>
> I've been having some problems with RichMapPartitionFunction. Firstly,
> I tried to convert the iterable into an array unsuccessfully. Then, I
> have used some buffers to store the values per column. I am trying to  
> transpose the local matrix of LabeledVectors that I have in each
> partition.
>
> None of these solutions have worked. For example, for partition 7 and
> feature 10, the vector is empty, whereas for the same partition and
> feature 11, the vectors contains 200 elements. And this change on each
> execution, different partitions and features.
>
> I think there is a problem with using the collect method out of the
> iterable loop.
>
> new RichMapPartitionFunction[LabeledVector, ((Int, Int),
> Array[Byte])]() {
>         def mapPartition(it: java.lang.Iterable[LabeledVector], out:
> Collector[((Int, Int), Array[Byte])]): Unit = {
>           val index = getRuntimeContext().getIndexOfThisSubtask()
>           val mat = for (i <- 0 until nFeatures) yield new
> scala.collection.mutable.ListBuffer[Byte]
>           for(reg <- it.asScala) {
>             for (i <- 0 until (nFeatures - 1)) mat(i) +=
> reg.vector(i).toByte
>             mat(nFeatures - 1) += classMap(reg.label)
>           }
>           for(i <- 0 until nFeatures) out.collect((i, index) ->
> mat(i).toArray) // numPartitions
>         }
>  }
>
> Regards
>

Reply | Threaded
Open this post in threaded view
|

Re: RichMapPartitionFunction - problems with collect

Sergio Ramírez
Hi again,

I've not been able to solve the problem with the instruction you gave
me. I've tried with static variables (matrices) also unsuccessfully.
I've also tried this simpler code:


def mapPartition(it: java.lang.Iterable[LabeledVector], out:
Collector[((Int, Int), Int)]): Unit = {
           val index = getRuntimeContext().getIndexOfThisSubtask() //
Partition index
           var ninst = 0
           for(reg <- it.asScala) {
             requireByteValues(reg.vector)
             ninst += 1
           }
           for(i <- 0 until nFeatures) out.collect((i, index) -> ninst)
         }

The result is as follows:

Attribute 10, first seven partitions:
((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201)
Attribute 12, first seven partitions:
((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201)

As you can see, for example, for block 6 different number of instances
are shown, but  it's impossible.

On 24/03/16 22:39, Chesnay Schepler wrote:

> Haven't looked to deeply into this, but this sounds like object reuse
> is enabled, at which point buffering values effectively causes you to
> store the same value multiple times.
>
> can you try disabling objectReuse using
> env.getConfig().disableObjectReuse() ?
>
> On 22.03.2016 16:53, Sergio Ramírez wrote:
>> Hi all,
>>
>> I've been having some problems with RichMapPartitionFunction.
>> Firstly, I tried to convert the iterable into an array
>> unsuccessfully. Then, I have used some buffers to store the values
>> per column. I am trying to  transpose the local matrix of
>> LabeledVectors that I have in each partition.
>>
>> None of these solutions have worked. For example, for partition 7 and
>> feature 10, the vector is empty, whereas for the same partition and
>> feature 11, the vectors contains 200 elements. And this change on
>> each execution, different partitions and features.
>>
>> I think there is a problem with using the collect method out of the
>> iterable loop.
>>
>> new RichMapPartitionFunction[LabeledVector, ((Int, Int),
>> Array[Byte])]() {
>>         def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>> Collector[((Int, Int), Array[Byte])]): Unit = {
>>           val index = getRuntimeContext().getIndexOfThisSubtask()
>>           val mat = for (i <- 0 until nFeatures) yield new
>> scala.collection.mutable.ListBuffer[Byte]
>>           for(reg <- it.asScala) {
>>             for (i <- 0 until (nFeatures - 1)) mat(i) +=
>> reg.vector(i).toByte
>>             mat(nFeatures - 1) += classMap(reg.label)
>>           }
>>           for(i <- 0 until nFeatures) out.collect((i, index) ->
>> mat(i).toArray) // numPartitions
>>         }
>>  }
>>
>> Regards
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: RichMapPartitionFunction - problems with collect

Till Rohrmann
Hi Sergio,

could you please provide a complete example (including input data) to
reproduce your problem. It is hard to tell what's going wrong when one only
sees a fraction of the program.

Cheers,
Till

On Tue, Mar 29, 2016 at 5:58 PM, Sergio Ramírez <[hidden email]>
wrote:

> Hi again,
>
> I've not been able to solve the problem with the instruction you gave me.
> I've tried with static variables (matrices) also unsuccessfully. I've also
> tried this simpler code:
>
>
> def mapPartition(it: java.lang.Iterable[LabeledVector], out:
> Collector[((Int, Int), Int)]): Unit = {
>           val index = getRuntimeContext().getIndexOfThisSubtask() //
> Partition index
>           var ninst = 0
>           for(reg <- it.asScala) {
>             requireByteValues(reg.vector)
>             ninst += 1
>           }
>           for(i <- 0 until nFeatures) out.collect((i, index) -> ninst)
>         }
>
> The result is as follows:
>
> Attribute 10, first seven partitions:
> ((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201)
> Attribute 12, first seven partitions:
> ((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201)
>
> As you can see, for example, for block 6 different number of instances are
> shown, but  it's impossible.
>
>
> On 24/03/16 22:39, Chesnay Schepler wrote:
>
>> Haven't looked to deeply into this, but this sounds like object reuse is
>> enabled, at which point buffering values effectively causes you to store
>> the same value multiple times.
>>
>> can you try disabling objectReuse using
>> env.getConfig().disableObjectReuse() ?
>>
>> On 22.03.2016 16:53, Sergio Ramírez wrote:
>>
>>> Hi all,
>>>
>>> I've been having some problems with RichMapPartitionFunction. Firstly, I
>>> tried to convert the iterable into an array unsuccessfully. Then, I have
>>> used some buffers to store the values per column. I am trying to  transpose
>>> the local matrix of LabeledVectors that I have in each partition.
>>>
>>> None of these solutions have worked. For example, for partition 7 and
>>> feature 10, the vector is empty, whereas for the same partition and feature
>>> 11, the vectors contains 200 elements. And this change on each execution,
>>> different partitions and features.
>>>
>>> I think there is a problem with using the collect method out of the
>>> iterable loop.
>>>
>>> new RichMapPartitionFunction[LabeledVector, ((Int, Int), Array[Byte])]()
>>> {
>>>         def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>>> Collector[((Int, Int), Array[Byte])]): Unit = {
>>>           val index = getRuntimeContext().getIndexOfThisSubtask()
>>>           val mat = for (i <- 0 until nFeatures) yield new
>>> scala.collection.mutable.ListBuffer[Byte]
>>>           for(reg <- it.asScala) {
>>>             for (i <- 0 until (nFeatures - 1)) mat(i) +=
>>> reg.vector(i).toByte
>>>             mat(nFeatures - 1) += classMap(reg.label)
>>>           }
>>>           for(i <- 0 until nFeatures) out.collect((i, index) ->
>>> mat(i).toArray) // numPartitions
>>>         }
>>>  }
>>>
>>> Regards
>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: RichMapPartitionFunction - problems with collect

Sergio Ramírez
Hello,

Ok, please find enclosed the test code and the input data.

Cheers

On 31/03/16 10:07, Till Rohrmann wrote:

> Hi Sergio,
>
> could you please provide a complete example (including input data) to
> reproduce your problem. It is hard to tell what's going wrong when one only
> sees a fraction of the program.
>
> Cheers,
> Till
>
> On Tue, Mar 29, 2016 at 5:58 PM, Sergio Ramírez <[hidden email]>
> wrote:
>
>> Hi again,
>>
>> I've not been able to solve the problem with the instruction you gave me.
>> I've tried with static variables (matrices) also unsuccessfully. I've also
>> tried this simpler code:
>>
>>
>> def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>> Collector[((Int, Int), Int)]): Unit = {
>>            val index = getRuntimeContext().getIndexOfThisSubtask() //
>> Partition index
>>            var ninst = 0
>>            for(reg <- it.asScala) {
>>              requireByteValues(reg.vector)
>>              ninst += 1
>>            }
>>            for(i <- 0 until nFeatures) out.collect((i, index) -> ninst)
>>          }
>>
>> The result is as follows:
>>
>> Attribute 10, first seven partitions:
>> ((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201)
>> Attribute 12, first seven partitions:
>> ((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201)
>>
>> As you can see, for example, for block 6 different number of instances are
>> shown, but  it's impossible.
>>
>>
>> On 24/03/16 22:39, Chesnay Schepler wrote:
>>
>>> Haven't looked to deeply into this, but this sounds like object reuse is
>>> enabled, at which point buffering values effectively causes you to store
>>> the same value multiple times.
>>>
>>> can you try disabling objectReuse using
>>> env.getConfig().disableObjectReuse() ?
>>>
>>> On 22.03.2016 16:53, Sergio Ramírez wrote:
>>>
>>>> Hi all,
>>>>
>>>> I've been having some problems with RichMapPartitionFunction. Firstly, I
>>>> tried to convert the iterable into an array unsuccessfully. Then, I have
>>>> used some buffers to store the values per column. I am trying to  transpose
>>>> the local matrix of LabeledVectors that I have in each partition.
>>>>
>>>> None of these solutions have worked. For example, for partition 7 and
>>>> feature 10, the vector is empty, whereas for the same partition and feature
>>>> 11, the vectors contains 200 elements. And this change on each execution,
>>>> different partitions and features.
>>>>
>>>> I think there is a problem with using the collect method out of the
>>>> iterable loop.
>>>>
>>>> new RichMapPartitionFunction[LabeledVector, ((Int, Int), Array[Byte])]()
>>>> {
>>>>          def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>>>> Collector[((Int, Int), Array[Byte])]): Unit = {
>>>>            val index = getRuntimeContext().getIndexOfThisSubtask()
>>>>            val mat = for (i <- 0 until nFeatures) yield new
>>>> scala.collection.mutable.ListBuffer[Byte]
>>>>            for(reg <- it.asScala) {
>>>>              for (i <- 0 until (nFeatures - 1)) mat(i) +=
>>>> reg.vector(i).toByte
>>>>              mat(nFeatures - 1) += classMap(reg.label)
>>>>            }
>>>>            for(i <- 0 until nFeatures) out.collect((i, index) ->
>>>> mat(i).toArray) // numPartitions
>>>>          }
>>>>   }
>>>>
>>>> Regards
>>>>
>>>>


ErrorTest.scala (3K) Download Attachment
a1a.txt (153K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: RichMapPartitionFunction - problems with collect

Sergio Ramírez
Hello again:

Any news about this problem with enriched MapPartition function?

Thank you

On 06/04/16 17:01, Sergio Ramírez wrote:

> Hello,
>
> Ok, please find enclosed the test code and the input data.
>
> Cheers
>
> On 31/03/16 10:07, Till Rohrmann wrote:
>> Hi Sergio,
>>
>> could you please provide a complete example (including input data) to
>> reproduce your problem. It is hard to tell what's going wrong when
>> one only
>> sees a fraction of the program.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 29, 2016 at 5:58 PM, Sergio Ramírez <[hidden email]>
>> wrote:
>>
>>> Hi again,
>>>
>>> I've not been able to solve the problem with the instruction you
>>> gave me.
>>> I've tried with static variables (matrices) also unsuccessfully.
>>> I've also
>>> tried this simpler code:
>>>
>>>
>>> def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>>> Collector[((Int, Int), Int)]): Unit = {
>>>            val index = getRuntimeContext().getIndexOfThisSubtask() //
>>> Partition index
>>>            var ninst = 0
>>>            for(reg <- it.asScala) {
>>>              requireByteValues(reg.vector)
>>>              ninst += 1
>>>            }
>>>            for(i <- 0 until nFeatures) out.collect((i, index) -> ninst)
>>>          }
>>>
>>> The result is as follows:
>>>
>>> Attribute 10, first seven partitions:
>>> ((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201)
>>>
>>> Attribute 12, first seven partitions:
>>> ((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201)
>>>
>>>
>>> As you can see, for example, for block 6 different number of
>>> instances are
>>> shown, but  it's impossible.
>>>
>>>
>>> On 24/03/16 22:39, Chesnay Schepler wrote:
>>>
>>>> Haven't looked to deeply into this, but this sounds like object
>>>> reuse is
>>>> enabled, at which point buffering values effectively causes you to
>>>> store
>>>> the same value multiple times.
>>>>
>>>> can you try disabling objectReuse using
>>>> env.getConfig().disableObjectReuse() ?
>>>>
>>>> On 22.03.2016 16:53, Sergio Ramírez wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I've been having some problems with RichMapPartitionFunction.
>>>>> Firstly, I
>>>>> tried to convert the iterable into an array unsuccessfully. Then,
>>>>> I have
>>>>> used some buffers to store the values per column. I am trying to  
>>>>> transpose
>>>>> the local matrix of LabeledVectors that I have in each partition.
>>>>>
>>>>> None of these solutions have worked. For example, for partition 7 and
>>>>> feature 10, the vector is empty, whereas for the same partition
>>>>> and feature
>>>>> 11, the vectors contains 200 elements. And this change on each
>>>>> execution,
>>>>> different partitions and features.
>>>>>
>>>>> I think there is a problem with using the collect method out of the
>>>>> iterable loop.
>>>>>
>>>>> new RichMapPartitionFunction[LabeledVector, ((Int, Int),
>>>>> Array[Byte])]()
>>>>> {
>>>>>          def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>>>>> Collector[((Int, Int), Array[Byte])]): Unit = {
>>>>>            val index = getRuntimeContext().getIndexOfThisSubtask()
>>>>>            val mat = for (i <- 0 until nFeatures) yield new
>>>>> scala.collection.mutable.ListBuffer[Byte]
>>>>>            for(reg <- it.asScala) {
>>>>>              for (i <- 0 until (nFeatures - 1)) mat(i) +=
>>>>> reg.vector(i).toByte
>>>>>              mat(nFeatures - 1) += classMap(reg.label)
>>>>>            }
>>>>>            for(i <- 0 until nFeatures) out.collect((i, index) ->
>>>>> mat(i).toArray) // numPartitions
>>>>>          }
>>>>>   }
>>>>>
>>>>> Regards
>>>>>
>>>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: RichMapPartitionFunction - problems with collect

Till Rohrmann
Hi Sergio,

sorry for the late reply. I figured out your problem. The reason why you
see apparently inconsistent results is that you execute your job multiple
times. Each collect call triggers an eager execution of your Flink job.
Since the intermediate results are not stored the whole topology has to be
re-executed for every collect call. Since the input split assignment of
your libSVM file happens lazily, it can happen that the different sub tasks
get different splits of the input file assigned. Therefore, it happens that
you see different lengths for different features of the same partition.

If you replace the last 6 lines of your program with:

transposed.filter(_._1._1 == (nFeatures - 1)).map(t => t._1 ->
t._2.size).reduceGroup(_.mkString(",")).output(new
PrintingOutputFormat())
val b = transposed.filter(_._1._1 == 10).map(t => t._1 ->
t._2.size).reduceGroup(_.mkString(",")).output(new
PrintingOutputFormat())
val c = transposed.filter(_._1._1 == 12).map(t => t._1 ->
t._2.size).reduceGroup(_.mkString(",")).output(new
PrintingOutputFormat())

env.execute()

you should see the correct results. If you need a deterministic input split
assignment to the different sources, then you would have to implement your
own InputFormat which returns a special InputSplitAssigner which does the
deterministic input split assignment. Or simply try to avoid collect, count
and print which trigger the eager execution of a Flink job.

Cheers,
Till


On Wed, Apr 13, 2016 at 5:47 PM, Sergio Ramírez <[hidden email]>
wrote:

> Hello again:
>
> Any news about this problem with enriched MapPartition function?
>
> Thank you
>
>
> On 06/04/16 17:01, Sergio Ramírez wrote:
>
>> Hello,
>>
>> Ok, please find enclosed the test code and the input data.
>>
>> Cheers
>>
>> On 31/03/16 10:07, Till Rohrmann wrote:
>>
>>> Hi Sergio,
>>>
>>> could you please provide a complete example (including input data) to
>>> reproduce your problem. It is hard to tell what's going wrong when one
>>> only
>>> sees a fraction of the program.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Mar 29, 2016 at 5:58 PM, Sergio Ramírez <[hidden email]>
>>> wrote:
>>>
>>> Hi again,
>>>>
>>>> I've not been able to solve the problem with the instruction you gave
>>>> me.
>>>> I've tried with static variables (matrices) also unsuccessfully. I've
>>>> also
>>>> tried this simpler code:
>>>>
>>>>
>>>> def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>>>> Collector[((Int, Int), Int)]): Unit = {
>>>>            val index = getRuntimeContext().getIndexOfThisSubtask() //
>>>> Partition index
>>>>            var ninst = 0
>>>>            for(reg <- it.asScala) {
>>>>              requireByteValues(reg.vector)
>>>>              ninst += 1
>>>>            }
>>>>            for(i <- 0 until nFeatures) out.collect((i, index) -> ninst)
>>>>          }
>>>>
>>>> The result is as follows:
>>>>
>>>> Attribute 10, first seven partitions:
>>>> ((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201)
>>>>
>>>> Attribute 12, first seven partitions:
>>>> ((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201)
>>>>
>>>>
>>>> As you can see, for example, for block 6 different number of instances
>>>> are
>>>> shown, but  it's impossible.
>>>>
>>>>
>>>> On 24/03/16 22:39, Chesnay Schepler wrote:
>>>>
>>>> Haven't looked to deeply into this, but this sounds like object reuse is
>>>>> enabled, at which point buffering values effectively causes you to
>>>>> store
>>>>> the same value multiple times.
>>>>>
>>>>> can you try disabling objectReuse using
>>>>> env.getConfig().disableObjectReuse() ?
>>>>>
>>>>> On 22.03.2016 16:53, Sergio Ramírez wrote:
>>>>>
>>>>> Hi all,
>>>>>>
>>>>>> I've been having some problems with RichMapPartitionFunction.
>>>>>> Firstly, I
>>>>>> tried to convert the iterable into an array unsuccessfully. Then, I
>>>>>> have
>>>>>> used some buffers to store the values per column. I am trying to
>>>>>> transpose
>>>>>> the local matrix of LabeledVectors that I have in each partition.
>>>>>>
>>>>>> None of these solutions have worked. For example, for partition 7 and
>>>>>> feature 10, the vector is empty, whereas for the same partition and
>>>>>> feature
>>>>>> 11, the vectors contains 200 elements. And this change on each
>>>>>> execution,
>>>>>> different partitions and features.
>>>>>>
>>>>>> I think there is a problem with using the collect method out of the
>>>>>> iterable loop.
>>>>>>
>>>>>> new RichMapPartitionFunction[LabeledVector, ((Int, Int),
>>>>>> Array[Byte])]()
>>>>>> {
>>>>>>          def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>>>>>> Collector[((Int, Int), Array[Byte])]): Unit = {
>>>>>>            val index = getRuntimeContext().getIndexOfThisSubtask()
>>>>>>            val mat = for (i <- 0 until nFeatures) yield new
>>>>>> scala.collection.mutable.ListBuffer[Byte]
>>>>>>            for(reg <- it.asScala) {
>>>>>>              for (i <- 0 until (nFeatures - 1)) mat(i) +=
>>>>>> reg.vector(i).toByte
>>>>>>              mat(nFeatures - 1) += classMap(reg.label)
>>>>>>            }
>>>>>>            for(i <- 0 until nFeatures) out.collect((i, index) ->
>>>>>> mat(i).toArray) // numPartitions
>>>>>>          }
>>>>>>   }
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>>
>>>>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: RichMapPartitionFunction - problems with collect

Sergio Ramírez
Hello,

OK, now I understand everything. So if I want to re-use my DataSet in
several different operations, what should I do? Is there any way to
maintain the save the data from re-computation? I am not only talking
about iteration. I mean re-use of data.

For example, imagine I want filter some results and collect, and then I
want to apply another filter.

Regards

On 26/04/16 14:25, Till Rohrmann wrote:

> Hi Sergio,
>
> sorry for the late reply. I figured out your problem. The reason why you
> see apparently inconsistent results is that you execute your job multiple
> times. Each collect call triggers an eager execution of your Flink job.
> Since the intermediate results are not stored the whole topology has to be
> re-executed for every collect call. Since the input split assignment of
> your libSVM file happens lazily, it can happen that the different sub tasks
> get different splits of the input file assigned. Therefore, it happens that
> you see different lengths for different features of the same partition.
>
> If you replace the last 6 lines of your program with:
>
> transposed.filter(_._1._1 == (nFeatures - 1)).map(t => t._1 ->
> t._2.size).reduceGroup(_.mkString(",")).output(new
> PrintingOutputFormat())
> val b = transposed.filter(_._1._1 == 10).map(t => t._1 ->
> t._2.size).reduceGroup(_.mkString(",")).output(new
> PrintingOutputFormat())
> val c = transposed.filter(_._1._1 == 12).map(t => t._1 ->
> t._2.size).reduceGroup(_.mkString(",")).output(new
> PrintingOutputFormat())
>
> env.execute()
>
> you should see the correct results. If you need a deterministic input split
> assignment to the different sources, then you would have to implement your
> own InputFormat which returns a special InputSplitAssigner which does the
> deterministic input split assignment. Or simply try to avoid collect, count
> and print which trigger the eager execution of a Flink job.
>
> Cheers,
> Till
> ​
>
> On Wed, Apr 13, 2016 at 5:47 PM, Sergio Ramírez <[hidden email]>
> wrote:
>
>> Hello again:
>>
>> Any news about this problem with enriched MapPartition function?
>>
>> Thank you
>>
>>
>> On 06/04/16 17:01, Sergio Ramírez wrote:
>>
>>> Hello,
>>>
>>> Ok, please find enclosed the test code and the input data.
>>>
>>> Cheers
>>>
>>> On 31/03/16 10:07, Till Rohrmann wrote:
>>>
>>>> Hi Sergio,
>>>>
>>>> could you please provide a complete example (including input data) to
>>>> reproduce your problem. It is hard to tell what's going wrong when one
>>>> only
>>>> sees a fraction of the program.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Mar 29, 2016 at 5:58 PM, Sergio Ramírez <[hidden email]>
>>>> wrote:
>>>>
>>>> Hi again,
>>>>> I've not been able to solve the problem with the instruction you gave
>>>>> me.
>>>>> I've tried with static variables (matrices) also unsuccessfully. I've
>>>>> also
>>>>> tried this simpler code:
>>>>>
>>>>>
>>>>> def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>>>>> Collector[((Int, Int), Int)]): Unit = {
>>>>>             val index = getRuntimeContext().getIndexOfThisSubtask() //
>>>>> Partition index
>>>>>             var ninst = 0
>>>>>             for(reg <- it.asScala) {
>>>>>               requireByteValues(reg.vector)
>>>>>               ninst += 1
>>>>>             }
>>>>>             for(i <- 0 until nFeatures) out.collect((i, index) -> ninst)
>>>>>           }
>>>>>
>>>>> The result is as follows:
>>>>>
>>>>> Attribute 10, first seven partitions:
>>>>> ((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201)
>>>>>
>>>>> Attribute 12, first seven partitions:
>>>>> ((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201)
>>>>>
>>>>>
>>>>> As you can see, for example, for block 6 different number of instances
>>>>> are
>>>>> shown, but  it's impossible.
>>>>>
>>>>>
>>>>> On 24/03/16 22:39, Chesnay Schepler wrote:
>>>>>
>>>>> Haven't looked to deeply into this, but this sounds like object reuse is
>>>>>> enabled, at which point buffering values effectively causes you to
>>>>>> store
>>>>>> the same value multiple times.
>>>>>>
>>>>>> can you try disabling objectReuse using
>>>>>> env.getConfig().disableObjectReuse() ?
>>>>>>
>>>>>> On 22.03.2016 16:53, Sergio Ramírez wrote:
>>>>>>
>>>>>> Hi all,
>>>>>>> I've been having some problems with RichMapPartitionFunction.
>>>>>>> Firstly, I
>>>>>>> tried to convert the iterable into an array unsuccessfully. Then, I
>>>>>>> have
>>>>>>> used some buffers to store the values per column. I am trying to
>>>>>>> transpose
>>>>>>> the local matrix of LabeledVectors that I have in each partition.
>>>>>>>
>>>>>>> None of these solutions have worked. For example, for partition 7 and
>>>>>>> feature 10, the vector is empty, whereas for the same partition and
>>>>>>> feature
>>>>>>> 11, the vectors contains 200 elements. And this change on each
>>>>>>> execution,
>>>>>>> different partitions and features.
>>>>>>>
>>>>>>> I think there is a problem with using the collect method out of the
>>>>>>> iterable loop.
>>>>>>>
>>>>>>> new RichMapPartitionFunction[LabeledVector, ((Int, Int),
>>>>>>> Array[Byte])]()
>>>>>>> {
>>>>>>>           def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>>>>>>> Collector[((Int, Int), Array[Byte])]): Unit = {
>>>>>>>             val index = getRuntimeContext().getIndexOfThisSubtask()
>>>>>>>             val mat = for (i <- 0 until nFeatures) yield new
>>>>>>> scala.collection.mutable.ListBuffer[Byte]
>>>>>>>             for(reg <- it.asScala) {
>>>>>>>               for (i <- 0 until (nFeatures - 1)) mat(i) +=
>>>>>>> reg.vector(i).toByte
>>>>>>>               mat(nFeatures - 1) += classMap(reg.label)
>>>>>>>             }
>>>>>>>             for(i <- 0 until nFeatures) out.collect((i, index) ->
>>>>>>> mat(i).toArray) // numPartitions
>>>>>>>           }
>>>>>>>    }
>>>>>>>
>>>>>>> Regards
>>>>>>>
>>>>>>>
>>>>>>>

Reply | Threaded
Open this post in threaded view
|

Re: RichMapPartitionFunction - problems with collect

Till Rohrmann
You have to persist it and read from it for the subsequent operations. Take
a look at the FlinkMLTools.persist methods.

Cheers,
Till


On Thu, Apr 28, 2016 at 6:14 PM, Sergio Ramírez <[hidden email]>
wrote:

> Hello,
>
> OK, now I understand everything. So if I want to re-use my DataSet in
> several different operations, what should I do? Is there any way to
> maintain the save the data from re-computation? I am not only talking about
> iteration. I mean re-use of data.
>
> For example, imagine I want filter some results and collect, and then I
> want to apply another filter.
>
> Regards
>
>
> On 26/04/16 14:25, Till Rohrmann wrote:
>
>> Hi Sergio,
>>
>> sorry for the late reply. I figured out your problem. The reason why you
>> see apparently inconsistent results is that you execute your job multiple
>> times. Each collect call triggers an eager execution of your Flink job.
>> Since the intermediate results are not stored the whole topology has to be
>> re-executed for every collect call. Since the input split assignment of
>> your libSVM file happens lazily, it can happen that the different sub
>> tasks
>> get different splits of the input file assigned. Therefore, it happens
>> that
>> you see different lengths for different features of the same partition.
>>
>> If you replace the last 6 lines of your program with:
>>
>> transposed.filter(_._1._1 == (nFeatures - 1)).map(t => t._1 ->
>> t._2.size).reduceGroup(_.mkString(",")).output(new
>> PrintingOutputFormat())
>> val b = transposed.filter(_._1._1 == 10).map(t => t._1 ->
>> t._2.size).reduceGroup(_.mkString(",")).output(new
>> PrintingOutputFormat())
>> val c = transposed.filter(_._1._1 == 12).map(t => t._1 ->
>> t._2.size).reduceGroup(_.mkString(",")).output(new
>> PrintingOutputFormat())
>>
>> env.execute()
>>
>> you should see the correct results. If you need a deterministic input
>> split
>> assignment to the different sources, then you would have to implement your
>> own InputFormat which returns a special InputSplitAssigner which does the
>> deterministic input split assignment. Or simply try to avoid collect,
>> count
>> and print which trigger the eager execution of a Flink job.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Wed, Apr 13, 2016 at 5:47 PM, Sergio Ramírez <[hidden email]>
>> wrote:
>>
>> Hello again:
>>>
>>> Any news about this problem with enriched MapPartition function?
>>>
>>> Thank you
>>>
>>>
>>> On 06/04/16 17:01, Sergio Ramírez wrote:
>>>
>>> Hello,
>>>>
>>>> Ok, please find enclosed the test code and the input data.
>>>>
>>>> Cheers
>>>>
>>>> On 31/03/16 10:07, Till Rohrmann wrote:
>>>>
>>>> Hi Sergio,
>>>>>
>>>>> could you please provide a complete example (including input data) to
>>>>> reproduce your problem. It is hard to tell what's going wrong when one
>>>>> only
>>>>> sees a fraction of the program.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Mar 29, 2016 at 5:58 PM, Sergio Ramírez <
>>>>> [hidden email]>
>>>>> wrote:
>>>>>
>>>>> Hi again,
>>>>>
>>>>>> I've not been able to solve the problem with the instruction you gave
>>>>>> me.
>>>>>> I've tried with static variables (matrices) also unsuccessfully. I've
>>>>>> also
>>>>>> tried this simpler code:
>>>>>>
>>>>>>
>>>>>> def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>>>>>> Collector[((Int, Int), Int)]): Unit = {
>>>>>>             val index = getRuntimeContext().getIndexOfThisSubtask() //
>>>>>> Partition index
>>>>>>             var ninst = 0
>>>>>>             for(reg <- it.asScala) {
>>>>>>               requireByteValues(reg.vector)
>>>>>>               ninst += 1
>>>>>>             }
>>>>>>             for(i <- 0 until nFeatures) out.collect((i, index) ->
>>>>>> ninst)
>>>>>>           }
>>>>>>
>>>>>> The result is as follows:
>>>>>>
>>>>>> Attribute 10, first seven partitions:
>>>>>>
>>>>>> ((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201)
>>>>>>
>>>>>> Attribute 12, first seven partitions:
>>>>>>
>>>>>> ((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201)
>>>>>>
>>>>>>
>>>>>> As you can see, for example, for block 6 different number of instances
>>>>>> are
>>>>>> shown, but  it's impossible.
>>>>>>
>>>>>>
>>>>>> On 24/03/16 22:39, Chesnay Schepler wrote:
>>>>>>
>>>>>> Haven't looked to deeply into this, but this sounds like object reuse
>>>>>> is
>>>>>>
>>>>>>> enabled, at which point buffering values effectively causes you to
>>>>>>> store
>>>>>>> the same value multiple times.
>>>>>>>
>>>>>>> can you try disabling objectReuse using
>>>>>>> env.getConfig().disableObjectReuse() ?
>>>>>>>
>>>>>>> On 22.03.2016 16:53, Sergio Ramírez wrote:
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>>> I've been having some problems with RichMapPartitionFunction.
>>>>>>>> Firstly, I
>>>>>>>> tried to convert the iterable into an array unsuccessfully. Then, I
>>>>>>>> have
>>>>>>>> used some buffers to store the values per column. I am trying to
>>>>>>>> transpose
>>>>>>>> the local matrix of LabeledVectors that I have in each partition.
>>>>>>>>
>>>>>>>> None of these solutions have worked. For example, for partition 7
>>>>>>>> and
>>>>>>>> feature 10, the vector is empty, whereas for the same partition and
>>>>>>>> feature
>>>>>>>> 11, the vectors contains 200 elements. And this change on each
>>>>>>>> execution,
>>>>>>>> different partitions and features.
>>>>>>>>
>>>>>>>> I think there is a problem with using the collect method out of the
>>>>>>>> iterable loop.
>>>>>>>>
>>>>>>>> new RichMapPartitionFunction[LabeledVector, ((Int, Int),
>>>>>>>> Array[Byte])]()
>>>>>>>> {
>>>>>>>>           def mapPartition(it: java.lang.Iterable[LabeledVector],
>>>>>>>> out:
>>>>>>>> Collector[((Int, Int), Array[Byte])]): Unit = {
>>>>>>>>             val index = getRuntimeContext().getIndexOfThisSubtask()
>>>>>>>>             val mat = for (i <- 0 until nFeatures) yield new
>>>>>>>> scala.collection.mutable.ListBuffer[Byte]
>>>>>>>>             for(reg <- it.asScala) {
>>>>>>>>               for (i <- 0 until (nFeatures - 1)) mat(i) +=
>>>>>>>> reg.vector(i).toByte
>>>>>>>>               mat(nFeatures - 1) += classMap(reg.label)
>>>>>>>>             }
>>>>>>>>             for(i <- 0 until nFeatures) out.collect((i, index) ->
>>>>>>>> mat(i).toArray) // numPartitions
>>>>>>>>           }
>>>>>>>>    }
>>>>>>>>
>>>>>>>> Regards
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>