Null Pointer Exception in tests but only in COLLECTION mode

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Null Pointer Exception in tests but only in COLLECTION mode

André Petermann
Hi all,

during a workflow, a data set may run empty, e.g., because of a join
without matches.

We're using FlinkTestBase and found out, that aggregate functions on
empty data sets work fine in CLUSTER execution mode but cause a Null
Pointer Exception at AggregateOperator$AggregatingUdf in COLLECTION mode.

Here is the minimal example on 1.0-SNAPSHOT:
https://gist.github.com/p3et/59a65bab11098dd11054

Are we doing something wrong, or is this a bug?

Cheers,
Andre

--
-------------------------------------------
PhD Student
University of Leipzig
Department of Computer Science
Database Research Group

email: [hidden email]
web:   dbs.uni-leipzig.de
-------------------------------------------
Reply | Threaded
Open this post in threaded view
|

Re: Null Pointer Exception in tests but only in COLLECTION mode

Martin Junghanns
Hi,

What he meant was MultipleProgramsTestBase, not FlinkTestBase.

I debugged this a bit.

The NPE is thrown in

https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java#L296

since current can be null if the input iterator is empty.

In Cluster Execution, it is checked that the output of the previous
function (e.g. Filter) is not empty in:

https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java#L144

which avoids going into AggregateOperator and getting a NPE.

However, in Collection Mode, the execution is not grouped (don't know
why, yet). In

https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java#L207

the copied input data is handed over to the aggregate function which
leads to the NPE.

Checking inputDataCopy.size() > 0 before calling the aggregate solves
the problem.

If someone can confirm that this is not a more generic problem, I would
open an issue and a PR.

Best,
Martin

On 20.11.2015 18:41, André Petermann wrote:

> Hi all,
>
> during a workflow, a data set may run empty, e.g., because of a join
> without matches.
>
> We're using FlinkTestBase and found out, that aggregate functions on
> empty data sets work fine in CLUSTER execution mode but cause a Null
> Pointer Exception at AggregateOperator$AggregatingUdf in COLLECTION mode.
>
> Here is the minimal example on 1.0-SNAPSHOT:
> https://gist.github.com/p3et/59a65bab11098dd11054
>
> Are we doing something wrong, or is this a bug?
>
> Cheers,
> Andre
>
mxm
Reply | Threaded
Open this post in threaded view
|

Re: Null Pointer Exception in tests but only in COLLECTION mode

mxm
Hi André, hi Martin,

This looks very much like a bug. Martin, I would be happy if you
opened a JIRA issue.

Thanks,
Max

On Sun, Nov 22, 2015 at 12:27 PM, Martin Junghanns
<[hidden email]> wrote:

> Hi,
>
> What he meant was MultipleProgramsTestBase, not FlinkTestBase.
>
> I debugged this a bit.
>
> The NPE is thrown in
>
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java#L296
>
> since current can be null if the input iterator is empty.
>
> In Cluster Execution, it is checked that the output of the previous function
> (e.g. Filter) is not empty in:
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java#L144
>
> which avoids going into AggregateOperator and getting a NPE.
>
> However, in Collection Mode, the execution is not grouped (don't know why,
> yet). In
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java#L207
>
> the copied input data is handed over to the aggregate function which leads
> to the NPE.
>
> Checking inputDataCopy.size() > 0 before calling the aggregate solves the
> problem.
>
> If someone can confirm that this is not a more generic problem, I would open
> an issue and a PR.
>
> Best,
> Martin
>
>
> On 20.11.2015 18:41, André Petermann wrote:
>>
>> Hi all,
>>
>> during a workflow, a data set may run empty, e.g., because of a join
>> without matches.
>>
>> We're using FlinkTestBase and found out, that aggregate functions on
>> empty data sets work fine in CLUSTER execution mode but cause a Null
>> Pointer Exception at AggregateOperator$AggregatingUdf in COLLECTION mode.
>>
>> Here is the minimal example on 1.0-SNAPSHOT:
>> https://gist.github.com/p3et/59a65bab11098dd11054
>>
>> Are we doing something wrong, or is this a bug?
>>
>> Cheers,
>> Andre
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Null Pointer Exception in tests but only in COLLECTION mode

Martin Junghanns
Hi Max,

fixed in https://github.com/apache/flink/pull/1396

Best,
Martin

On 24.11.2015 13:46, Maximilian Michels wrote:

> Hi André, hi Martin,
>
> This looks very much like a bug. Martin, I would be happy if you
> opened a JIRA issue.
>
> Thanks,
> Max
>
> On Sun, Nov 22, 2015 at 12:27 PM, Martin Junghanns
> <[hidden email]> wrote:
>> Hi,
>>
>> What he meant was MultipleProgramsTestBase, not FlinkTestBase.
>>
>> I debugged this a bit.
>>
>> The NPE is thrown in
>>
>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java#L296
>>
>> since current can be null if the input iterator is empty.
>>
>> In Cluster Execution, it is checked that the output of the previous function
>> (e.g. Filter) is not empty in:
>>
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java#L144
>>
>> which avoids going into AggregateOperator and getting a NPE.
>>
>> However, in Collection Mode, the execution is not grouped (don't know why,
>> yet). In
>>
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java#L207
>>
>> the copied input data is handed over to the aggregate function which leads
>> to the NPE.
>>
>> Checking inputDataCopy.size() > 0 before calling the aggregate solves the
>> problem.
>>
>> If someone can confirm that this is not a more generic problem, I would open
>> an issue and a PR.
>>
>> Best,
>> Martin
>>
>>
>> On 20.11.2015 18:41, André Petermann wrote:
>>>
>>> Hi all,
>>>
>>> during a workflow, a data set may run empty, e.g., because of a join
>>> without matches.
>>>
>>> We're using FlinkTestBase and found out, that aggregate functions on
>>> empty data sets work fine in CLUSTER execution mode but cause a Null
>>> Pointer Exception at AggregateOperator$AggregatingUdf in COLLECTION mode.
>>>
>>> Here is the minimal example on 1.0-SNAPSHOT:
>>> https://gist.github.com/p3et/59a65bab11098dd11054
>>>
>>> Are we doing something wrong, or is this a bug?
>>>
>>> Cheers,
>>> Andre
>>>
>>
mxm
Reply | Threaded
Open this post in threaded view
|

Re: Null Pointer Exception in tests but only in COLLECTION mode

mxm
Hi Martin,

Great. Thanks for the fix!

Cheers,
Max

On Tue, Nov 24, 2015 at 7:40 PM, Martin Junghanns
<[hidden email]> wrote:

> Hi Max,
>
> fixed in https://github.com/apache/flink/pull/1396
>
> Best,
> Martin
>
>
> On 24.11.2015 13:46, Maximilian Michels wrote:
>>
>> Hi André, hi Martin,
>>
>> This looks very much like a bug. Martin, I would be happy if you
>> opened a JIRA issue.
>>
>> Thanks,
>> Max
>>
>> On Sun, Nov 22, 2015 at 12:27 PM, Martin Junghanns
>> <[hidden email]> wrote:
>>>
>>> Hi,
>>>
>>> What he meant was MultipleProgramsTestBase, not FlinkTestBase.
>>>
>>> I debugged this a bit.
>>>
>>> The NPE is thrown in
>>>
>>>
>>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java#L296
>>>
>>> since current can be null if the input iterator is empty.
>>>
>>> In Cluster Execution, it is checked that the output of the previous
>>> function
>>> (e.g. Filter) is not empty in:
>>>
>>>
>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java#L144
>>>
>>> which avoids going into AggregateOperator and getting a NPE.
>>>
>>> However, in Collection Mode, the execution is not grouped (don't know
>>> why,
>>> yet). In
>>>
>>>
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java#L207
>>>
>>> the copied input data is handed over to the aggregate function which
>>> leads
>>> to the NPE.
>>>
>>> Checking inputDataCopy.size() > 0 before calling the aggregate solves the
>>> problem.
>>>
>>> If someone can confirm that this is not a more generic problem, I would
>>> open
>>> an issue and a PR.
>>>
>>> Best,
>>> Martin
>>>
>>>
>>> On 20.11.2015 18:41, André Petermann wrote:
>>>>
>>>>
>>>> Hi all,
>>>>
>>>> during a workflow, a data set may run empty, e.g., because of a join
>>>> without matches.
>>>>
>>>> We're using FlinkTestBase and found out, that aggregate functions on
>>>> empty data sets work fine in CLUSTER execution mode but cause a Null
>>>> Pointer Exception at AggregateOperator$AggregatingUdf in COLLECTION
>>>> mode.
>>>>
>>>> Here is the minimal example on 1.0-SNAPSHOT:
>>>> https://gist.github.com/p3et/59a65bab11098dd11054
>>>>
>>>> Are we doing something wrong, or is this a bug?
>>>>
>>>> Cheers,
>>>> Andre
>>>>
>>>
>