KeyBy fields do not support java.lang.Array?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

KeyBy fields do not support java.lang.Array?

Xu Pingyong
Hi Aljoscha:
   
       The java.lang.Array hashCode depends on the reference instead of the content. If the keyBy field contains an array, Two records are hash-partitioned to different stream although their keys are equal.
     
                 int a1[] = new int[]{1, 2};  //  hashcode is : 5592464
int a2[] = new int[]{1, 2};  //  hashcode is 1830712962


        streaming job example:


Tuple2<byte[], Integer>[] soures = new Tuple2[]{new Tuple2("a".getBytes(), 2), new Tuple2("a".getBytes(), 5)};


final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(soures)
.keyBy(0)
.sum(1)
.map(new MapFunction<Tuple2<byte[], Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Tuple2<byte[], Integer> value) throws Exception {
return new Tuple2<>(new String(value.f0), value.f1);
}
}).print();


env.execute();


      Expected result is: (a, 7), not the actual result. What do you think about this case?


Best Regards!
Xu Pingyong



Reply | Threaded
Open this post in threaded view
|

Re: KeyBy fields do not support java.lang.Array?

Aljoscha Krettek-2
Hi,

Which version of Flink are you using? This issue should have been resolved at least by 1.3.0: https://issues.apache.org/jira/browse/FLINK-5874 <https://issues.apache.org/jira/browse/FLINK-5874>. Currently such keys should be rejected. There is also this issue, that aims to re-introduce proper support for arrays as keys: https://issues.apache.org/jira/browse/FLINK-5299 <https://issues.apache.org/jira/browse/FLINK-5299>

Best,
Aljoscha

> On 31. Jul 2017, at 15:16, Xu Pingyong <[hidden email]> wrote:
>
> Hi Aljoscha:
>
>       The java.lang.Array hashCode depends on the reference instead of the content. If the keyBy field contains an array, Two records are hash-partitioned to different stream although their keys are equal.
>
>                 int a1[] = new int[]{1, 2};  //  hashcode is : 5592464
> int a2[] = new int[]{1, 2};  //  hashcode is 1830712962
>
>
>        streaming job example:
>
>
> Tuple2<byte[], Integer>[] soures = new Tuple2[]{new Tuple2("a".getBytes(), 2), new Tuple2("a".getBytes(), 5)};
>
>
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromElements(soures)
> .keyBy(0)
> .sum(1)
> .map(new MapFunction<Tuple2<byte[], Integer>, Tuple2<String, Integer>>() {
> @Override
> public Tuple2<String, Integer> map(Tuple2<byte[], Integer> value) throws Exception {
> return new Tuple2<>(new String(value.f0), value.f1);
> }
> }).print();
>
>
> env.execute();
>
>
>      Expected result is: (a, 7), not the actual result. What do you think about this case?
>
>
> Best Regards!
> Xu Pingyong
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re:Re: KeyBy fields do not support java.lang.Array?

Xu Pingyong
Hi Aljoscha:

     
      Thanks for your reply. Look forward to the final solution.


Best Regards
Xu Pingyong



在 2017-07-31 22:39:38,"Aljoscha Krettek" <[hidden email]> 写道:

>Hi,
>
>Which version of Flink are you using? This issue should have been resolved at least by 1.3.0: https://issues.apache.org/jira/browse/FLINK-5874 <https://issues.apache.org/jira/browse/FLINK-5874>. Currently such keys should be rejected. There is also this issue, that aims to re-introduce proper support for arrays as keys: https://issues.apache.org/jira/browse/FLINK-5299 <https://issues.apache.org/jira/browse/FLINK-5299>
>
>Best,
>Aljoscha
>
>> On 31. Jul 2017, at 15:16, Xu Pingyong <[hidden email]> wrote:
>>
>> Hi Aljoscha:
>>
>>       The java.lang.Array hashCode depends on the reference instead of the content. If the keyBy field contains an array, Two records are hash-partitioned to different stream although their keys are equal.
>>
>>                 int a1[] = new int[]{1, 2};  //  hashcode is : 5592464
>> int a2[] = new int[]{1, 2};  //  hashcode is 1830712962
>>
>>
>>        streaming job example:
>>
>>
>> Tuple2<byte[], Integer>[] soures = new Tuple2[]{new Tuple2("a".getBytes(), 2), new Tuple2("a".getBytes(), 5)};
>>
>>
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> env.fromElements(soures)
>> .keyBy(0)
>> .sum(1)
>> .map(new MapFunction<Tuple2<byte[], Integer>, Tuple2<String, Integer>>() {
>> @Override
>> public Tuple2<String, Integer> map(Tuple2<byte[], Integer> value) throws Exception {
>> return new Tuple2<>(new String(value.f0), value.f1);
>> }
>> }).print();
>>
>>
>> env.execute();
>>
>>
>>      Expected result is: (a, 7), not the actual result. What do you think about this case?
>>
>>
>> Best Regards!
>> Xu Pingyong
>>
>>
>>
>