[FLINK-4832] Count/Sum 0 elements

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[FLINK-4832] Count/Sum 0 elements

Anton Mushin
Hi everybody,
Could you explain issue https://issues.apache.org/jira/browse/FLINK-4832, please?

Simple, I chose another option for resolve this issue, unlike as described in issue description
In the `org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections` I added next code:
        if( inputData.size() == 0) {
  IN inCopy = inSerializer.createInstance();
                OUT out = function.map(inCopy);
                result.add(outSerializer.copy(out));
        }

 And I change `org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate` as

        override def initiate(partial: Row): Unit = {
    partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type class  in each [Type]SumAggregate class are extends SumAggregate[T]
  }

And now next code is executing correct:
        val sqlQuery =
      "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
        "FROM (select * from MyTable where _1 = 4)"
    val ds = env.fromElements(
      (1: Byte, 2l,1D,1f,1,1:Short ),
      (2: Byte, 2l,1D,1f,1,1:Short ))

        val result = tEnv.sql(sqlQuery) //result == "0,0,0.0,0.0,0,0"

        val sqlQuery2 =
      "SELECT count(_1),count(_2),count(_3),count(_4),count(_5),count(_6) " +
        "FROM (select * from MyTable where _1 = 4)"
        val result2 = tEnv.sql(sqlQuery2) //result == " 0,0,0,0,0,0"

Is this the correct solution for this ticket or not?

Best regards,
Anton Mushin
Reply | Threaded
Open this post in threaded view
|

Re: [FLINK-4832] Count/Sum 0 elements

Aljoscha Krettek-2
Hi Anton,
executeOnCollection() is only meant for executing Flink Jobs in the local
machine without bringing up a local (or actual) Flink cluster. So solving
the problem there does not really solve the problem.

The underlying problem is this: in a Map-Reduce world the way to count
elements of type T is to map those T to (T, 1) and then to group by T and
sum up the ones. If you have no elements then you have no ones that you can
sum up, i.e. you also don't realise that you have zero elements.

Cheers,
Aljoscha

On Fri, 21 Oct 2016 at 13:55 Anton Mushin <[hidden email]> wrote:

> Hi everybody,
> Could you explain issue https://issues.apache.org/jira/browse/FLINK-4832,
> please?
>
> Simple, I chose another option for resolve this issue, unlike as described
> in issue description
> In the
> `org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections`
> I added next code:
>         if( inputData.size() == 0) {
>                 IN inCopy = inSerializer.createInstance();
>                 OUT out = function.map(inCopy);
>                 result.add(outSerializer.copy(out));
>         }
>
>  And I change
> `org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate` as
>
>         override def initiate(partial: Row): Unit = {
>                 partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to
> type class  in each [Type]SumAggregate class are extends SumAggregate[T]
>         }
>
> And now next code is executing correct:
>         val sqlQuery =
>                 "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
>                         "FROM (select * from MyTable where _1 = 4)"
>          val ds = env.fromElements(
>                  (1: Byte, 2l,1D,1f,1,1:Short ),
>                  (2: Byte, 2l,1D,1f,1,1:Short ))
>
>         val result = tEnv.sql(sqlQuery) //result == "0,0,0.0,0.0,0,0"
>
>         val sqlQuery2 =
>                 "SELECT
> count(_1),count(_2),count(_3),count(_4),count(_5),count(_6) " +
>                  "FROM (select * from MyTable where _1 = 4)"
>         val result2 = tEnv.sql(sqlQuery2) //result == " 0,0,0,0,0,0"
>
> Is this the correct solution for this ticket or not?
>
> Best regards,
> Anton Mushin
>