Possible bug?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Possible bug?

Matthias J. Sax
Hi,

I worked on rewriting flink-test according to
https://issues.apache.org/jira/browse/FLINK-2275

In "org.apache.flink.test.javaApiOperators.SortPartitionITCase" I hit
something strange. When I rewrite the code slightly differently, the
test passes or fails and I have no idea why.

The following code works (result is of type java.util.List)

> result = ds
> .map(new IdMapper()).setParallelism(4) // parallelize input
> .sortPartition(1, Order.DESCENDING)
> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
> .distinct().collect();

Rewriting the above as follows result in a failing test:

> ds.map(new IdMapper()).setParallelism(4) // parallelize input
> .sortPartition(1, Order.DESCENDING)
> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
> .distinct();
> result = ds.collect();

I have no clue what the problem might be. The code looks semantically
identical to me. Can anyone explain the difference? Do I miss anything?
Or is this a bug?

You can find the working version of the code in my github repo:
https://github.com/mjsax/flink/tree/flink-2275-migrateFlinkTests


-Matthias


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Possible bug?

Chiwan Park-2
Although you run `ds.map(blahblah).sortPartition(blahblah).mapPartition(blahblah).distinct()`, DataSet ds is not changed.
You should receive the result of transformation.

So if you modify the code to `intermediateResult = blahblah; result = intermediateResult.collect();`, the test works.

Regards,
Chiwan Park

> On Jun 25, 2015, at 10:03 AM, Matthias J. Sax <[hidden email]> wrote:
>
> Hi,
>
> I worked on rewriting flink-test according to
> https://issues.apache.org/jira/browse/FLINK-2275
>
> In "org.apache.flink.test.javaApiOperators.SortPartitionITCase" I hit
> something strange. When I rewrite the code slightly differently, the
> test passes or fails and I have no idea why.
>
> The following code works (result is of type java.util.List)
>
>> result = ds
>> .map(new IdMapper()).setParallelism(4) // parallelize input
>> .sortPartition(1, Order.DESCENDING)
>> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
>> .distinct().collect();
>
> Rewriting the above as follows result in a failing test:
>
>> ds.map(new IdMapper()).setParallelism(4) // parallelize input
>> .sortPartition(1, Order.DESCENDING)
>> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
>> .distinct();
>> result = ds.collect();
>
> I have no clue what the problem might be. The code looks semantically
> identical to me. Can anyone explain the difference? Do I miss anything?
> Or is this a bug?
>
> You can find the working version of the code in my github repo:
> https://github.com/mjsax/flink/tree/flink-2275-migrateFlinkTests
>
>
> -Matthias
>





Reply | Threaded
Open this post in threaded view
|

Re: Possible bug?

Matthias J. Sax
Stupid me. Thanks! Of course, it cannot work. I forgot to assign ds to
itself:

ds = ds.x().distinct();
result = ds.collect();

I guess it was to late in the night ;)

-Matthias

On 06/25/2015 07:58 AM, Chiwan Park wrote:

> Although you run `ds.map(blahblah).sortPartition(blahblah).mapPartition(blahblah).distinct()`, DataSet ds is not changed.
> You should receive the result of transformation.
>
> So if you modify the code to `intermediateResult = blahblah; result = intermediateResult.collect();`, the test works.
>
> Regards,
> Chiwan Park
>
>> On Jun 25, 2015, at 10:03 AM, Matthias J. Sax <[hidden email]> wrote:
>>
>> Hi,
>>
>> I worked on rewriting flink-test according to
>> https://issues.apache.org/jira/browse/FLINK-2275
>>
>> In "org.apache.flink.test.javaApiOperators.SortPartitionITCase" I hit
>> something strange. When I rewrite the code slightly differently, the
>> test passes or fails and I have no idea why.
>>
>> The following code works (result is of type java.util.List)
>>
>>> result = ds
>>> .map(new IdMapper()).setParallelism(4) // parallelize input
>>> .sortPartition(1, Order.DESCENDING)
>>> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
>>> .distinct().collect();
>>
>> Rewriting the above as follows result in a failing test:
>>
>>> ds.map(new IdMapper()).setParallelism(4) // parallelize input
>>> .sortPartition(1, Order.DESCENDING)
>>> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
>>> .distinct();
>>> result = ds.collect();
>>
>> I have no clue what the problem might be. The code looks semantically
>> identical to me. Can anyone explain the difference? Do I miss anything?
>> Or is this a bug?
>>
>> You can find the working version of the code in my github repo:
>> https://github.com/mjsax/flink/tree/flink-2275-migrateFlinkTests
>>
>>
>> -Matthias
>>
>
>
>
>
>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Possible bug?

Stephan Ewen
Happens to everyone once in a while ;-)

On Thu, Jun 25, 2015 at 10:33 AM, Matthias J. Sax <
[hidden email]> wrote:

> Stupid me. Thanks! Of course, it cannot work. I forgot to assign ds to
> itself:
>
> ds = ds.x().distinct();
> result = ds.collect();
>
> I guess it was to late in the night ;)
>
> -Matthias
>
> On 06/25/2015 07:58 AM, Chiwan Park wrote:
> > Although you run
> `ds.map(blahblah).sortPartition(blahblah).mapPartition(blahblah).distinct()`,
> DataSet ds is not changed.
> > You should receive the result of transformation.
> >
> > So if you modify the code to `intermediateResult = blahblah; result =
> intermediateResult.collect();`, the test works.
> >
> > Regards,
> > Chiwan Park
> >
> >> On Jun 25, 2015, at 10:03 AM, Matthias J. Sax <
> [hidden email]> wrote:
> >>
> >> Hi,
> >>
> >> I worked on rewriting flink-test according to
> >> https://issues.apache.org/jira/browse/FLINK-2275
> >>
> >> In "org.apache.flink.test.javaApiOperators.SortPartitionITCase" I hit
> >> something strange. When I rewrite the code slightly differently, the
> >> test passes or fails and I have no idea why.
> >>
> >> The following code works (result is of type java.util.List)
> >>
> >>> result = ds
> >>>     .map(new IdMapper()).setParallelism(4) // parallelize input
> >>>     .sortPartition(1, Order.DESCENDING)
> >>>     .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long,
> String>>(new Tuple3Checker()))
> >>>     .distinct().collect();
> >>
> >> Rewriting the above as follows result in a failing test:
> >>
> >>> ds.map(new IdMapper()).setParallelism(4) // parallelize input
> >>>     .sortPartition(1, Order.DESCENDING)
> >>>     .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long,
> String>>(new Tuple3Checker()))
> >>>     .distinct();
> >>> result = ds.collect();
> >>
> >> I have no clue what the problem might be. The code looks semantically
> >> identical to me. Can anyone explain the difference? Do I miss anything?
> >> Or is this a bug?
> >>
> >> You can find the working version of the code in my github repo:
> >> https://github.com/mjsax/flink/tree/flink-2275-migrateFlinkTests
> >>
> >>
> >> -Matthias
> >>
> >
> >
> >
> >
> >
> >
>
>