[jira] [Created] (FLINK-2883) Combinable reduce produces wrong result

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-2883) Combinable reduce produces wrong result

Shang Yuanchun (Jira)
Till Rohrmann created FLINK-2883:
------------------------------------

             Summary: Combinable reduce produces wrong result
                 Key: FLINK-2883
                 URL: https://issues.apache.org/jira/browse/FLINK-2883
             Project: Flink
          Issue Type: Bug
    Affects Versions: 0.10
            Reporter: Till Rohrmann


If one uses a combinable reduce operation which also changes the key value of the underlying data element, then the results of the reduce operation can become wrong. The reason is that after the combine phase, another reduce operator is executed which will then reduce the elements based on the new key values. This might be not so surprising if one explicitly defined ones {{GroupReduceOperation}} as combinable. However, the {{ReduceFunction}} conceals the fact that a combiner is used implicitly. Furthermore, the API does not prevent the user from changing the key fields which could solve the problem.

The following example program illustrates the problem

{code}
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val input = env.fromElements((1,2), (1,3), (2,3), (3,3), (3,4))

val result = input.groupBy(0).reduce{
  (left, right) =>
    (left._1 + right._1, left._2 + right._2)
}

result.output(new PrintingOutputFormat[Int]())

env.execute()
{code}

The expected output is
{code}
(2, 5)
(2, 3)
(6, 7)
{code}

However, the actual output is
{code}
(4, 8)
(6, 7)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)