Implementing a list accumulator

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Implementing a list accumulator

Max Michels
Hi everyone,

I'm running into some problems implementing a Accumulator for
returning a list of a DataSet.

https://github.com/mxm/flink/tree/count/collect

Basically, it works fine in this test case:


    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

    DataSet<Integer> data = env.fromElements(input);

    // count
    long numEntries = data.count();
    assertEquals(10, numEntries);

    // collect
    ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
    assertArrayEquals(input, list.toArray());


But with non-primitive objects strange results occur:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableObjectReuse();

DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);

// count
long numEntries = data3.count();
assertEquals(100, numEntries);

// collect
ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer,
Integer>>) data3.collect();

System.out.println(list)

....

Output:

[(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6),
(10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
(10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
(10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
(10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
(10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9),
(10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
(10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
(10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
(10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
(10,8), (10,8), (10,8), (10,8), (10,8), (10,8)]

I assume, the problem is the clone() method of the ListAccumulator
where we just create a shallow copy. This is fine for accumulators
which use primitive objects, like IntCounter but here we have a real
object.

How do we work around this problem?

Best regards,
Max
Reply | Threaded
Open this post in threaded view
|

Re: Implementing a list accumulator

Ufuk Celebi-2
I just checked out your branch and ran it with a break point set at the CollectHelper. If you look into the (list) accumulator you see that always the same object is added to it. Strangely enough, object re-use is disabled in the config. I don't have time to look further into it now, but it seems to be a problem with the object re-use mode.

– Ufuk

On 20 Jan 2015, at 20:53, Max Michels <[hidden email]> wrote:

> Hi everyone,
>
> I'm running into some problems implementing a Accumulator for
> returning a list of a DataSet.
>
> https://github.com/mxm/flink/tree/count/collect
>
> Basically, it works fine in this test case:
>
>
>    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>
>    Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
>
>    DataSet<Integer> data = env.fromElements(input);
>
>    // count
>    long numEntries = data.count();
>    assertEquals(10, numEntries);
>
>    // collect
>    ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
>    assertArrayEquals(input, list.toArray());
>
>
> But with non-primitive objects strange results occur:
>
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().disableObjectReuse();
>
> DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
> DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
>
> DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);
>
> // count
> long numEntries = data3.count();
> assertEquals(100, numEntries);
>
> // collect
> ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer,
> Integer>>) data3.collect();
>
> System.out.println(list)
>
> ....
>
> Output:
>
> [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6),
> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
> (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9),
> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
> (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)]
>
> I assume, the problem is the clone() method of the ListAccumulator
> where we just create a shallow copy. This is fine for accumulators
> which use primitive objects, like IntCounter but here we have a real
> object.
>
> How do we work around this problem?
>
> Best regards,
> Max

Reply | Threaded
Open this post in threaded view
|

Re: Implementing a list accumulator

Ufuk Celebi-2
The thing is that the DefaultCrossFunction always uses the same holder Tuple2 object, which is then handed over to the chained collect helper flatMap(). I can see why it is OK to keep the default functions to reuse "holder" objects, but when they are chained to an operator it becomes problematic.

On 21 Jan 2015, at 17:12, Ufuk Celebi <[hidden email]> wrote:

> I just checked out your branch and ran it with a break point set at the CollectHelper. If you look into the (list) accumulator you see that always the same object is added to it. Strangely enough, object re-use is disabled in the config. I don't have time to look further into it now, but it seems to be a problem with the object re-use mode.
>
> – Ufuk
>
> On 20 Jan 2015, at 20:53, Max Michels <[hidden email]> wrote:
>
>> Hi everyone,
>>
>> I'm running into some problems implementing a Accumulator for
>> returning a list of a DataSet.
>>
>> https://github.com/mxm/flink/tree/count/collect
>>
>> Basically, it works fine in this test case:
>>
>>
>>   ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>
>>   Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
>>
>>   DataSet<Integer> data = env.fromElements(input);
>>
>>   // count
>>   long numEntries = data.count();
>>   assertEquals(10, numEntries);
>>
>>   // collect
>>   ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
>>   assertArrayEquals(input, list.toArray());
>>
>>
>> But with non-primitive objects strange results occur:
>>
>> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>> env.getConfig().disableObjectReuse();
>>
>> DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
>> DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
>>
>> DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);
>>
>> // count
>> long numEntries = data3.count();
>> assertEquals(100, numEntries);
>>
>> // collect
>> ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer,
>> Integer>>) data3.collect();
>>
>> System.out.println(list)
>>
>> ....
>>
>> Output:
>>
>> [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
>> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
>> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6),
>> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
>> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
>> (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
>> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
>> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9),
>> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
>> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
>> (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
>> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
>> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)]
>>
>> I assume, the problem is the clone() method of the ListAccumulator
>> where we just create a shallow copy. This is fine for accumulators
>> which use primitive objects, like IntCounter but here we have a real
>> object.
>>
>> How do we work around this problem?
>>
>> Best regards,
>> Max
>

Reply | Threaded
Open this post in threaded view
|

Re: Implementing a list accumulator

Stephan Ewen
True, that is tricky. The user code does not necessarily respect the
non-reuse mode. That may be true for any user code. Can the list
accumulator immediately serialize the objects and send over a byte array?
That should since it reliably without adding overhead (serialization will
happen anyways).
Am 21.01.2015 11:04 schrieb "Ufuk Celebi" <[hidden email]>:

> The thing is that the DefaultCrossFunction always uses the same holder
> Tuple2 object, which is then handed over to the chained collect helper
> flatMap(). I can see why it is OK to keep the default functions to reuse
> "holder" objects, but when they are chained to an operator it becomes
> problematic.
>
> On 21 Jan 2015, at 17:12, Ufuk Celebi <[hidden email]> wrote:
>
> > I just checked out your branch and ran it with a break point set at the
> CollectHelper. If you look into the (list) accumulator you see that always
> the same object is added to it. Strangely enough, object re-use is disabled
> in the config. I don't have time to look further into it now, but it seems
> to be a problem with the object re-use mode.
> >
> > – Ufuk
> >
> > On 20 Jan 2015, at 20:53, Max Michels <[hidden email]> wrote:
> >
> >> Hi everyone,
> >>
> >> I'm running into some problems implementing a Accumulator for
> >> returning a list of a DataSet.
> >>
> >> https://github.com/mxm/flink/tree/count/collect
> >>
> >> Basically, it works fine in this test case:
> >>
> >>
> >>   ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> >>
> >>   Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
> >>
> >>   DataSet<Integer> data = env.fromElements(input);
> >>
> >>   // count
> >>   long numEntries = data.count();
> >>   assertEquals(10, numEntries);
> >>
> >>   // collect
> >>   ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
> >>   assertArrayEquals(input, list.toArray());
> >>
> >>
> >> But with non-primitive objects strange results occur:
> >>
> >> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> >> env.getConfig().disableObjectReuse();
> >>
> >> DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
> >> DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9,
> 10);
> >>
> >> DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);
> >>
> >> // count
> >> long numEntries = data3.count();
> >> assertEquals(100, numEntries);
> >>
> >> // collect
> >> ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer,
> >> Integer>>) data3.collect();
> >>
> >> System.out.println(list)
> >>
> >> ....
> >>
> >> Output:
> >>
> >> [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
> >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
> >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6),
> >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
> >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
> >> (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
> >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
> >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9),
> >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
> >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
> >> (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
> >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
> >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)]
> >>
> >> I assume, the problem is the clone() method of the ListAccumulator
> >> where we just create a shallow copy. This is fine for accumulators
> >> which use primitive objects, like IntCounter but here we have a real
> >> object.
> >>
> >> How do we work around this problem?
> >>
> >> Best regards,
> >> Max
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Implementing a list accumulator

Max Michels
Thank you for your help, Ufuk and Stephan. I made some changes to
immediately serialize the stored objects.

On Thu, Jan 22, 2015 at 2:58 AM, Stephan Ewen <[hidden email]> wrote:

> True, that is tricky. The user code does not necessarily respect the
> non-reuse mode. That may be true for any user code. Can the list
> accumulator immediately serialize the objects and send over a byte array?
> That should since it reliably without adding overhead (serialization will
> happen anyways).
> Am 21.01.2015 11:04 schrieb "Ufuk Celebi" <[hidden email]>:
>
>> The thing is that the DefaultCrossFunction always uses the same holder
>> Tuple2 object, which is then handed over to the chained collect helper
>> flatMap(). I can see why it is OK to keep the default functions to reuse
>> "holder" objects, but when they are chained to an operator it becomes
>> problematic.
>>
>> On 21 Jan 2015, at 17:12, Ufuk Celebi <[hidden email]> wrote:
>>
>> > I just checked out your branch and ran it with a break point set at the
>> CollectHelper. If you look into the (list) accumulator you see that always
>> the same object is added to it. Strangely enough, object re-use is disabled
>> in the config. I don't have time to look further into it now, but it seems
>> to be a problem with the object re-use mode.
>> >
>> > – Ufuk
>> >
>> > On 20 Jan 2015, at 20:53, Max Michels <[hidden email]> wrote:
>> >
>> >> Hi everyone,
>> >>
>> >> I'm running into some problems implementing a Accumulator for
>> >> returning a list of a DataSet.
>> >>
>> >> https://github.com/mxm/flink/tree/count/collect
>> >>
>> >> Basically, it works fine in this test case:
>> >>
>> >>
>> >>   ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> >>
>> >>   Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
>> >>
>> >>   DataSet<Integer> data = env.fromElements(input);
>> >>
>> >>   // count
>> >>   long numEntries = data.count();
>> >>   assertEquals(10, numEntries);
>> >>
>> >>   // collect
>> >>   ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
>> >>   assertArrayEquals(input, list.toArray());
>> >>
>> >>
>> >> But with non-primitive objects strange results occur:
>> >>
>> >> ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> >> env.getConfig().disableObjectReuse();
>> >>
>> >> DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
>> >> DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9,
>> 10);
>> >>
>> >> DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);
>> >>
>> >> // count
>> >> long numEntries = data3.count();
>> >> assertEquals(100, numEntries);
>> >>
>> >> // collect
>> >> ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer,
>> >> Integer>>) data3.collect();
>> >>
>> >> System.out.println(list)
>> >>
>> >> ....
>> >>
>> >> Output:
>> >>
>> >> [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
>> >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
>> >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6),
>> >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
>> >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
>> >> (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
>> >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
>> >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9),
>> >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
>> >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
>> >> (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
>> >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
>> >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)]
>> >>
>> >> I assume, the problem is the clone() method of the ListAccumulator
>> >> where we just create a shallow copy. This is fine for accumulators
>> >> which use primitive objects, like IntCounter but here we have a real
>> >> object.
>> >>
>> >> How do we work around this problem?
>> >>
>> >> Best regards,
>> >> Max
>> >
>>
>>