Hi,
I worked on rewriting flink-test according to https://issues.apache.org/jira/browse/FLINK-2275 In "org.apache.flink.test.javaApiOperators.SortPartitionITCase" I hit something strange. When I rewrite the code slightly differently, the test passes or fails and I have no idea why. The following code works (result is of type java.util.List) > result = ds > .map(new IdMapper()).setParallelism(4) // parallelize input > .sortPartition(1, Order.DESCENDING) > .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())) > .distinct().collect(); Rewriting the above as follows result in a failing test: > ds.map(new IdMapper()).setParallelism(4) // parallelize input > .sortPartition(1, Order.DESCENDING) > .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())) > .distinct(); > result = ds.collect(); I have no clue what the problem might be. The code looks semantically identical to me. Can anyone explain the difference? Do I miss anything? Or is this a bug? You can find the working version of the code in my github repo: https://github.com/mjsax/flink/tree/flink-2275-migrateFlinkTests -Matthias |
Although you run `ds.map(blahblah).sortPartition(blahblah).mapPartition(blahblah).distinct()`, DataSet ds is not changed.
You should receive the result of transformation. So if you modify the code to `intermediateResult = blahblah; result = intermediateResult.collect();`, the test works. Regards, Chiwan Park > On Jun 25, 2015, at 10:03 AM, Matthias J. Sax <[hidden email]> wrote: > > Hi, > > I worked on rewriting flink-test according to > https://issues.apache.org/jira/browse/FLINK-2275 > > In "org.apache.flink.test.javaApiOperators.SortPartitionITCase" I hit > something strange. When I rewrite the code slightly differently, the > test passes or fails and I have no idea why. > > The following code works (result is of type java.util.List) > >> result = ds >> .map(new IdMapper()).setParallelism(4) // parallelize input >> .sortPartition(1, Order.DESCENDING) >> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())) >> .distinct().collect(); > > Rewriting the above as follows result in a failing test: > >> ds.map(new IdMapper()).setParallelism(4) // parallelize input >> .sortPartition(1, Order.DESCENDING) >> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())) >> .distinct(); >> result = ds.collect(); > > I have no clue what the problem might be. The code looks semantically > identical to me. Can anyone explain the difference? Do I miss anything? > Or is this a bug? > > You can find the working version of the code in my github repo: > https://github.com/mjsax/flink/tree/flink-2275-migrateFlinkTests > > > -Matthias > |
Stupid me. Thanks! Of course, it cannot work. I forgot to assign ds to
itself: ds = ds.x().distinct(); result = ds.collect(); I guess it was to late in the night ;) -Matthias On 06/25/2015 07:58 AM, Chiwan Park wrote: > Although you run `ds.map(blahblah).sortPartition(blahblah).mapPartition(blahblah).distinct()`, DataSet ds is not changed. > You should receive the result of transformation. > > So if you modify the code to `intermediateResult = blahblah; result = intermediateResult.collect();`, the test works. > > Regards, > Chiwan Park > >> On Jun 25, 2015, at 10:03 AM, Matthias J. Sax <[hidden email]> wrote: >> >> Hi, >> >> I worked on rewriting flink-test according to >> https://issues.apache.org/jira/browse/FLINK-2275 >> >> In "org.apache.flink.test.javaApiOperators.SortPartitionITCase" I hit >> something strange. When I rewrite the code slightly differently, the >> test passes or fails and I have no idea why. >> >> The following code works (result is of type java.util.List) >> >>> result = ds >>> .map(new IdMapper()).setParallelism(4) // parallelize input >>> .sortPartition(1, Order.DESCENDING) >>> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())) >>> .distinct().collect(); >> >> Rewriting the above as follows result in a failing test: >> >>> ds.map(new IdMapper()).setParallelism(4) // parallelize input >>> .sortPartition(1, Order.DESCENDING) >>> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())) >>> .distinct(); >>> result = ds.collect(); >> >> I have no clue what the problem might be. The code looks semantically >> identical to me. Can anyone explain the difference? Do I miss anything? >> Or is this a bug? >> >> You can find the working version of the code in my github repo: >> https://github.com/mjsax/flink/tree/flink-2275-migrateFlinkTests >> >> >> -Matthias >> > > > > > > |
Happens to everyone once in a while ;-)
On Thu, Jun 25, 2015 at 10:33 AM, Matthias J. Sax < [hidden email]> wrote: > Stupid me. Thanks! Of course, it cannot work. I forgot to assign ds to > itself: > > ds = ds.x().distinct(); > result = ds.collect(); > > I guess it was to late in the night ;) > > -Matthias > > On 06/25/2015 07:58 AM, Chiwan Park wrote: > > Although you run > `ds.map(blahblah).sortPartition(blahblah).mapPartition(blahblah).distinct()`, > DataSet ds is not changed. > > You should receive the result of transformation. > > > > So if you modify the code to `intermediateResult = blahblah; result = > intermediateResult.collect();`, the test works. > > > > Regards, > > Chiwan Park > > > >> On Jun 25, 2015, at 10:03 AM, Matthias J. Sax < > [hidden email]> wrote: > >> > >> Hi, > >> > >> I worked on rewriting flink-test according to > >> https://issues.apache.org/jira/browse/FLINK-2275 > >> > >> In "org.apache.flink.test.javaApiOperators.SortPartitionITCase" I hit > >> something strange. When I rewrite the code slightly differently, the > >> test passes or fails and I have no idea why. > >> > >> The following code works (result is of type java.util.List) > >> > >>> result = ds > >>> .map(new IdMapper()).setParallelism(4) // parallelize input > >>> .sortPartition(1, Order.DESCENDING) > >>> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, > String>>(new Tuple3Checker())) > >>> .distinct().collect(); > >> > >> Rewriting the above as follows result in a failing test: > >> > >>> ds.map(new IdMapper()).setParallelism(4) // parallelize input > >>> .sortPartition(1, Order.DESCENDING) > >>> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, > String>>(new Tuple3Checker())) > >>> .distinct(); > >>> result = ds.collect(); > >> > >> I have no clue what the problem might be. The code looks semantically > >> identical to me. Can anyone explain the difference? Do I miss anything? > >> Or is this a bug? > >> > >> You can find the working version of the code in my github repo: > >> https://github.com/mjsax/flink/tree/flink-2275-migrateFlinkTests > >> > >> > >> -Matthias > >> > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |