Hi everyone,
I'm running into some problems implementing a Accumulator for returning a list of a DataSet. https://github.com/mxm/flink/tree/count/collect Basically, it works fine in this test case: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; DataSet<Integer> data = env.fromElements(input); // count long numEntries = data.count(); assertEquals(10, numEntries); // collect ArrayList<Integer> list = (ArrayList<Integer>) data.collect(); assertArrayEquals(input, list.toArray()); But with non-primitive objects strange results occur: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableObjectReuse(); DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2); // count long numEntries = data3.count(); assertEquals(100, numEntries); // collect ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, Integer>>) data3.collect(); System.out.println(list) .... Output: [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)] I assume, the problem is the clone() method of the ListAccumulator where we just create a shallow copy. This is fine for accumulators which use primitive objects, like IntCounter but here we have a real object. How do we work around this problem? Best regards, Max |
I just checked out your branch and ran it with a break point set at the CollectHelper. If you look into the (list) accumulator you see that always the same object is added to it. Strangely enough, object re-use is disabled in the config. I don't have time to look further into it now, but it seems to be a problem with the object re-use mode.
– Ufuk On 20 Jan 2015, at 20:53, Max Michels <[hidden email]> wrote: > Hi everyone, > > I'm running into some problems implementing a Accumulator for > returning a list of a DataSet. > > https://github.com/mxm/flink/tree/count/collect > > Basically, it works fine in this test case: > > > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > > Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; > > DataSet<Integer> data = env.fromElements(input); > > // count > long numEntries = data.count(); > assertEquals(10, numEntries); > > // collect > ArrayList<Integer> list = (ArrayList<Integer>) data.collect(); > assertArrayEquals(input, list.toArray()); > > > But with non-primitive objects strange results occur: > > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().disableObjectReuse(); > > DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); > DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); > > DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2); > > // count > long numEntries = data3.count(); > assertEquals(100, numEntries); > > // collect > ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, > Integer>>) data3.collect(); > > System.out.println(list) > > .... > > Output: > > [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), > (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), > (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6), > (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), > (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), > (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), > (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), > (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9), > (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), > (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), > (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), > (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), > (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)] > > I assume, the problem is the clone() method of the ListAccumulator > where we just create a shallow copy. This is fine for accumulators > which use primitive objects, like IntCounter but here we have a real > object. > > How do we work around this problem? > > Best regards, > Max |
The thing is that the DefaultCrossFunction always uses the same holder Tuple2 object, which is then handed over to the chained collect helper flatMap(). I can see why it is OK to keep the default functions to reuse "holder" objects, but when they are chained to an operator it becomes problematic.
On 21 Jan 2015, at 17:12, Ufuk Celebi <[hidden email]> wrote: > I just checked out your branch and ran it with a break point set at the CollectHelper. If you look into the (list) accumulator you see that always the same object is added to it. Strangely enough, object re-use is disabled in the config. I don't have time to look further into it now, but it seems to be a problem with the object re-use mode. > > – Ufuk > > On 20 Jan 2015, at 20:53, Max Michels <[hidden email]> wrote: > >> Hi everyone, >> >> I'm running into some problems implementing a Accumulator for >> returning a list of a DataSet. >> >> https://github.com/mxm/flink/tree/count/collect >> >> Basically, it works fine in this test case: >> >> >> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); >> >> Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; >> >> DataSet<Integer> data = env.fromElements(input); >> >> // count >> long numEntries = data.count(); >> assertEquals(10, numEntries); >> >> // collect >> ArrayList<Integer> list = (ArrayList<Integer>) data.collect(); >> assertArrayEquals(input, list.toArray()); >> >> >> But with non-primitive objects strange results occur: >> >> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); >> env.getConfig().disableObjectReuse(); >> >> DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); >> DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); >> >> DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2); >> >> // count >> long numEntries = data3.count(); >> assertEquals(100, numEntries); >> >> // collect >> ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, >> Integer>>) data3.collect(); >> >> System.out.println(list) >> >> .... >> >> Output: >> >> [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6), >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), >> (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9), >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), >> (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)] >> >> I assume, the problem is the clone() method of the ListAccumulator >> where we just create a shallow copy. This is fine for accumulators >> which use primitive objects, like IntCounter but here we have a real >> object. >> >> How do we work around this problem? >> >> Best regards, >> Max > |
True, that is tricky. The user code does not necessarily respect the
non-reuse mode. That may be true for any user code. Can the list accumulator immediately serialize the objects and send over a byte array? That should since it reliably without adding overhead (serialization will happen anyways). Am 21.01.2015 11:04 schrieb "Ufuk Celebi" <[hidden email]>: > The thing is that the DefaultCrossFunction always uses the same holder > Tuple2 object, which is then handed over to the chained collect helper > flatMap(). I can see why it is OK to keep the default functions to reuse > "holder" objects, but when they are chained to an operator it becomes > problematic. > > On 21 Jan 2015, at 17:12, Ufuk Celebi <[hidden email]> wrote: > > > I just checked out your branch and ran it with a break point set at the > CollectHelper. If you look into the (list) accumulator you see that always > the same object is added to it. Strangely enough, object re-use is disabled > in the config. I don't have time to look further into it now, but it seems > to be a problem with the object re-use mode. > > > > – Ufuk > > > > On 20 Jan 2015, at 20:53, Max Michels <[hidden email]> wrote: > > > >> Hi everyone, > >> > >> I'm running into some problems implementing a Accumulator for > >> returning a list of a DataSet. > >> > >> https://github.com/mxm/flink/tree/count/collect > >> > >> Basically, it works fine in this test case: > >> > >> > >> ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > >> > >> Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; > >> > >> DataSet<Integer> data = env.fromElements(input); > >> > >> // count > >> long numEntries = data.count(); > >> assertEquals(10, numEntries); > >> > >> // collect > >> ArrayList<Integer> list = (ArrayList<Integer>) data.collect(); > >> assertArrayEquals(input, list.toArray()); > >> > >> > >> But with non-primitive objects strange results occur: > >> > >> ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > >> env.getConfig().disableObjectReuse(); > >> > >> DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); > >> DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, > 10); > >> > >> DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2); > >> > >> // count > >> long numEntries = data3.count(); > >> assertEquals(100, numEntries); > >> > >> // collect > >> ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, > >> Integer>>) data3.collect(); > >> > >> System.out.println(list) > >> > >> .... > >> > >> Output: > >> > >> [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), > >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), > >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6), > >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), > >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), > >> (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), > >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), > >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9), > >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), > >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), > >> (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), > >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), > >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)] > >> > >> I assume, the problem is the clone() method of the ListAccumulator > >> where we just create a shallow copy. This is fine for accumulators > >> which use primitive objects, like IntCounter but here we have a real > >> object. > >> > >> How do we work around this problem? > >> > >> Best regards, > >> Max > > > > |
Thank you for your help, Ufuk and Stephan. I made some changes to
immediately serialize the stored objects. On Thu, Jan 22, 2015 at 2:58 AM, Stephan Ewen <[hidden email]> wrote: > True, that is tricky. The user code does not necessarily respect the > non-reuse mode. That may be true for any user code. Can the list > accumulator immediately serialize the objects and send over a byte array? > That should since it reliably without adding overhead (serialization will > happen anyways). > Am 21.01.2015 11:04 schrieb "Ufuk Celebi" <[hidden email]>: > >> The thing is that the DefaultCrossFunction always uses the same holder >> Tuple2 object, which is then handed over to the chained collect helper >> flatMap(). I can see why it is OK to keep the default functions to reuse >> "holder" objects, but when they are chained to an operator it becomes >> problematic. >> >> On 21 Jan 2015, at 17:12, Ufuk Celebi <[hidden email]> wrote: >> >> > I just checked out your branch and ran it with a break point set at the >> CollectHelper. If you look into the (list) accumulator you see that always >> the same object is added to it. Strangely enough, object re-use is disabled >> in the config. I don't have time to look further into it now, but it seems >> to be a problem with the object re-use mode. >> > >> > – Ufuk >> > >> > On 20 Jan 2015, at 20:53, Max Michels <[hidden email]> wrote: >> > >> >> Hi everyone, >> >> >> >> I'm running into some problems implementing a Accumulator for >> >> returning a list of a DataSet. >> >> >> >> https://github.com/mxm/flink/tree/count/collect >> >> >> >> Basically, it works fine in this test case: >> >> >> >> >> >> ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> >> >> >> Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; >> >> >> >> DataSet<Integer> data = env.fromElements(input); >> >> >> >> // count >> >> long numEntries = data.count(); >> >> assertEquals(10, numEntries); >> >> >> >> // collect >> >> ArrayList<Integer> list = (ArrayList<Integer>) data.collect(); >> >> assertArrayEquals(input, list.toArray()); >> >> >> >> >> >> But with non-primitive objects strange results occur: >> >> >> >> ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> >> env.getConfig().disableObjectReuse(); >> >> >> >> DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); >> >> DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, >> 10); >> >> >> >> DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2); >> >> >> >> // count >> >> long numEntries = data3.count(); >> >> assertEquals(100, numEntries); >> >> >> >> // collect >> >> ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, >> >> Integer>>) data3.collect(); >> >> >> >> System.out.println(list) >> >> >> >> .... >> >> >> >> Output: >> >> >> >> [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), >> >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), >> >> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6), >> >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), >> >> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), >> >> (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), >> >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), >> >> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9), >> >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), >> >> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), >> >> (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), >> >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), >> >> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)] >> >> >> >> I assume, the problem is the clone() method of the ListAccumulator >> >> where we just create a shallow copy. This is fine for accumulators >> >> which use primitive objects, like IntCounter but here we have a real >> >> object. >> >> >> >> How do we work around this problem? >> >> >> >> Best regards, >> >> Max >> > >> >> |
Free forum by Nabble | Edit this page |