groupBy on a Dataset of Maps

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

groupBy on a Dataset of Maps

Punit Naik
Below is my code:

val env = ExecutionEnvironment.getExecutionEnvironment
val data=env.readTextFile("file:///home/punit/test").flatMap( line =>
JSON.parseFull(line) )
val j=data.flatMap{ _ match {case map: Map[String, Any] =>
                            {List(Map("ga_date" ->
map.get("ga_dateHour").get.toString().substring(0,
map.get("ga_dateHour").get.toString().length()-2))) }}}

val k=j.groupBy(_.get("ga_date"))

But when I execute this, it throws an exception saying:

org.apache.flink.api.common.InvalidProgramException: Return type
Option[String] of KeySelector class
org.apache.flink.api.scala.DataSet$$anon$12 is not a valid key type

Where am I going wrong?
--
Thank You

Regards

Punit Naik
Reply | Threaded
Open this post in threaded view
|

Re: groupBy on a Dataset of Maps

stefanobaghino
The `get` method on the Scala map returns an Option, which is not
(currently) a valid key type for Flink (but there's ongoing work on this
[1]). Flink must be aware of how to use a particular type as a key if you
want to group by a value of said type. See the advanced DataSet concepts in
the official Flink training for more info on this [2].

If you're just playing around the easy way to make it work is to directly
apply the key to the map (or use the apply method). Beware that you're
prone to exceptions in this way. A cleaner solution would be to write your
own KeySelector for the Option type.

val k=j.groupBy(_("ga_date")) or
val k=j.groupBy(ga => ga("ga_date")) or
val k=j.groupBy(_.apply("ga_date"))

As a side note, I believe the user mailing list may be more appropriate for
this kind of issues.

[1]: https://issues.apache.org/jira/browse/FLINK-2673
[2]:
http://dataartisans.github.io/flink-training/dataSetAdvanced/slides.html

On Fri, Apr 29, 2016 at 10:13 AM, Punit Naik <[hidden email]> wrote:

> Below is my code:
>
> val env = ExecutionEnvironment.getExecutionEnvironment
> val data=env.readTextFile("file:///home/punit/test").flatMap( line =>
> JSON.parseFull(line) )
> val j=data.flatMap{ _ match {case map: Map[String, Any] =>
>                             {List(Map("ga_date" ->
> map.get("ga_dateHour").get.toString().substring(0,
> map.get("ga_dateHour").get.toString().length()-2))) }}}
>
> val k=j.groupBy(_.get("ga_date"))
>
> But when I execute this, it throws an exception saying:
>
> org.apache.flink.api.common.InvalidProgramException: Return type
> Option[String] of KeySelector class
> org.apache.flink.api.scala.DataSet$$anon$12 is not a valid key type
>
> Where am I going wrong?
> --
> Thank You
>
> Regards
>
> Punit Naik
>



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit
Reply | Threaded
Open this post in threaded view
|

Re: groupBy on a Dataset of Maps

Punit Naik
okay I'll take all your points into consideration.

On Fri, Apr 29, 2016 at 2:15 PM, Stefano Baghino <
[hidden email]> wrote:

> The `get` method on the Scala map returns an Option, which is not
> (currently) a valid key type for Flink (but there's ongoing work on this
> [1]). Flink must be aware of how to use a particular type as a key if you
> want to group by a value of said type. See the advanced DataSet concepts in
> the official Flink training for more info on this [2].
>
> If you're just playing around the easy way to make it work is to directly
> apply the key to the map (or use the apply method). Beware that you're
> prone to exceptions in this way. A cleaner solution would be to write your
> own KeySelector for the Option type.
>
> val k=j.groupBy(_("ga_date")) or
> val k=j.groupBy(ga => ga("ga_date")) or
> val k=j.groupBy(_.apply("ga_date"))
>
> As a side note, I believe the user mailing list may be more appropriate for
> this kind of issues.
>
> [1]: https://issues.apache.org/jira/browse/FLINK-2673
> [2]:
> http://dataartisans.github.io/flink-training/dataSetAdvanced/slides.html
>
> On Fri, Apr 29, 2016 at 10:13 AM, Punit Naik <[hidden email]>
> wrote:
>
> > Below is my code:
> >
> > val env = ExecutionEnvironment.getExecutionEnvironment
> > val data=env.readTextFile("file:///home/punit/test").flatMap( line =>
> > JSON.parseFull(line) )
> > val j=data.flatMap{ _ match {case map: Map[String, Any] =>
> >                             {List(Map("ga_date" ->
> > map.get("ga_dateHour").get.toString().substring(0,
> > map.get("ga_dateHour").get.toString().length()-2))) }}}
> >
> > val k=j.groupBy(_.get("ga_date"))
> >
> > But when I execute this, it throws an exception saying:
> >
> > org.apache.flink.api.common.InvalidProgramException: Return type
> > Option[String] of KeySelector class
> > org.apache.flink.api.scala.DataSet$$anon$12 is not a valid key type
> >
> > Where am I going wrong?
> > --
> > Thank You
> >
> > Regards
> >
> > Punit Naik
> >
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>



--
Thank You

Regards

Punit Naik
Reply | Threaded
Open this post in threaded view
|

Re: groupBy on a Dataset of Maps

Punit Naik
In reply to this post by stefanobaghino
What if after grouping I wanted to count the occurrences of the key
"ga_date"?

On Fri, Apr 29, 2016 at 2:15 PM, Stefano Baghino <
[hidden email]> wrote:

> The `get` method on the Scala map returns an Option, which is not
> (currently) a valid key type for Flink (but there's ongoing work on this
> [1]). Flink must be aware of how to use a particular type as a key if you
> want to group by a value of said type. See the advanced DataSet concepts in
> the official Flink training for more info on this [2].
>
> If you're just playing around the easy way to make it work is to directly
> apply the key to the map (or use the apply method). Beware that you're
> prone to exceptions in this way. A cleaner solution would be to write your
> own KeySelector for the Option type.
>
> val k=j.groupBy(_("ga_date")) or
> val k=j.groupBy(ga => ga("ga_date")) or
> val k=j.groupBy(_.apply("ga_date"))
>
> As a side note, I believe the user mailing list may be more appropriate for
> this kind of issues.
>
> [1]: https://issues.apache.org/jira/browse/FLINK-2673
> [2]:
> http://dataartisans.github.io/flink-training/dataSetAdvanced/slides.html
>
> On Fri, Apr 29, 2016 at 10:13 AM, Punit Naik <[hidden email]>
> wrote:
>
> > Below is my code:
> >
> > val env = ExecutionEnvironment.getExecutionEnvironment
> > val data=env.readTextFile("file:///home/punit/test").flatMap( line =>
> > JSON.parseFull(line) )
> > val j=data.flatMap{ _ match {case map: Map[String, Any] =>
> >                             {List(Map("ga_date" ->
> > map.get("ga_dateHour").get.toString().substring(0,
> > map.get("ga_dateHour").get.toString().length()-2))) }}}
> >
> > val k=j.groupBy(_.get("ga_date"))
> >
> > But when I execute this, it throws an exception saying:
> >
> > org.apache.flink.api.common.InvalidProgramException: Return type
> > Option[String] of KeySelector class
> > org.apache.flink.api.scala.DataSet$$anon$12 is not a valid key type
> >
> > Where am I going wrong?
> > --
> > Thank You
> >
> > Regards
> >
> > Punit Naik
> >
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>



--
Thank You

Regards

Punit Naik
Reply | Threaded
Open this post in threaded view
|

Re: groupBy on a Dataset of Maps

Punit Naik
Anyways, I fixed it. To you groupBy you should attach this:

.reduceGroup {
      (in, out: org.apache.flink.util.Collector[(Map[key,value], Int)]) =>
        var v:Int = 0;
        var k:Map[key,value]=Map()
        for (t <- in) {
          v+=1;
          k=t
        }
        out.collect((k,v))
    }

On Fri, Apr 29, 2016 at 2:58 PM, Punit Naik <[hidden email]> wrote:

> What if after grouping I wanted to count the occurrences of the key
> "ga_date"?
>
> On Fri, Apr 29, 2016 at 2:15 PM, Stefano Baghino <
> [hidden email]> wrote:
>
>> The `get` method on the Scala map returns an Option, which is not
>> (currently) a valid key type for Flink (but there's ongoing work on this
>> [1]). Flink must be aware of how to use a particular type as a key if you
>> want to group by a value of said type. See the advanced DataSet concepts
>> in
>> the official Flink training for more info on this [2].
>>
>> If you're just playing around the easy way to make it work is to directly
>> apply the key to the map (or use the apply method). Beware that you're
>> prone to exceptions in this way. A cleaner solution would be to write your
>> own KeySelector for the Option type.
>>
>> val k=j.groupBy(_("ga_date")) or
>> val k=j.groupBy(ga => ga("ga_date")) or
>> val k=j.groupBy(_.apply("ga_date"))
>>
>> As a side note, I believe the user mailing list may be more appropriate
>> for
>> this kind of issues.
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-2673
>> [2]:
>> http://dataartisans.github.io/flink-training/dataSetAdvanced/slides.html
>>
>> On Fri, Apr 29, 2016 at 10:13 AM, Punit Naik <[hidden email]>
>> wrote:
>>
>> > Below is my code:
>> >
>> > val env = ExecutionEnvironment.getExecutionEnvironment
>> > val data=env.readTextFile("file:///home/punit/test").flatMap( line =>
>> > JSON.parseFull(line) )
>> > val j=data.flatMap{ _ match {case map: Map[String, Any] =>
>> >                             {List(Map("ga_date" ->
>> > map.get("ga_dateHour").get.toString().substring(0,
>> > map.get("ga_dateHour").get.toString().length()-2))) }}}
>> >
>> > val k=j.groupBy(_.get("ga_date"))
>> >
>> > But when I execute this, it throws an exception saying:
>> >
>> > org.apache.flink.api.common.InvalidProgramException: Return type
>> > Option[String] of KeySelector class
>> > org.apache.flink.api.scala.DataSet$$anon$12 is not a valid key type
>> >
>> > Where am I going wrong?
>> > --
>> > Thank You
>> >
>> > Regards
>> >
>> > Punit Naik
>> >
>>
>>
>>
>> --
>> BR,
>> Stefano Baghino
>>
>> Software Engineer @ Radicalbit
>>
>
>
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>



--
Thank You

Regards

Punit Naik
Reply | Threaded
Open this post in threaded view
|

Re: groupBy on a Dataset of Maps

Punit Naik
I forgot to mention that in this code my out.collect method is outputting a
tuple of Map[key,value] and the count
as Int.

On Fri, Apr 29, 2016 at 4:53 PM, Punit Naik <[hidden email]> wrote:

> Anyways, I fixed it. To you groupBy you should attach this:
>
> .reduceGroup {
>       (in, out: org.apache.flink.util.Collector[(Map[key,value], Int)]) =>
>         var v:Int = 0;
>         var k:Map[key,value]=Map()
>         for (t <- in) {
>           v+=1;
>           k=t
>         }
>         out.collect((k,v))
>     }
>
> On Fri, Apr 29, 2016 at 2:58 PM, Punit Naik <[hidden email]>
> wrote:
>
>> What if after grouping I wanted to count the occurrences of the key
>> "ga_date"?
>>
>> On Fri, Apr 29, 2016 at 2:15 PM, Stefano Baghino <
>> [hidden email]> wrote:
>>
>>> The `get` method on the Scala map returns an Option, which is not
>>> (currently) a valid key type for Flink (but there's ongoing work on this
>>> [1]). Flink must be aware of how to use a particular type as a key if you
>>> want to group by a value of said type. See the advanced DataSet concepts
>>> in
>>> the official Flink training for more info on this [2].
>>>
>>> If you're just playing around the easy way to make it work is to directly
>>> apply the key to the map (or use the apply method). Beware that you're
>>> prone to exceptions in this way. A cleaner solution would be to write
>>> your
>>> own KeySelector for the Option type.
>>>
>>> val k=j.groupBy(_("ga_date")) or
>>> val k=j.groupBy(ga => ga("ga_date")) or
>>> val k=j.groupBy(_.apply("ga_date"))
>>>
>>> As a side note, I believe the user mailing list may be more appropriate
>>> for
>>> this kind of issues.
>>>
>>> [1]: https://issues.apache.org/jira/browse/FLINK-2673
>>> [2]:
>>> http://dataartisans.github.io/flink-training/dataSetAdvanced/slides.html
>>>
>>> On Fri, Apr 29, 2016 at 10:13 AM, Punit Naik <[hidden email]>
>>> wrote:
>>>
>>> > Below is my code:
>>> >
>>> > val env = ExecutionEnvironment.getExecutionEnvironment
>>> > val data=env.readTextFile("file:///home/punit/test").flatMap( line =>
>>> > JSON.parseFull(line) )
>>> > val j=data.flatMap{ _ match {case map: Map[String, Any] =>
>>> >                             {List(Map("ga_date" ->
>>> > map.get("ga_dateHour").get.toString().substring(0,
>>> > map.get("ga_dateHour").get.toString().length()-2))) }}}
>>> >
>>> > val k=j.groupBy(_.get("ga_date"))
>>> >
>>> > But when I execute this, it throws an exception saying:
>>> >
>>> > org.apache.flink.api.common.InvalidProgramException: Return type
>>> > Option[String] of KeySelector class
>>> > org.apache.flink.api.scala.DataSet$$anon$12 is not a valid key type
>>> >
>>> > Where am I going wrong?
>>> > --
>>> > Thank You
>>> >
>>> > Regards
>>> >
>>> > Punit Naik
>>> >
>>>
>>>
>>>
>>> --
>>> BR,
>>> Stefano Baghino
>>>
>>> Software Engineer @ Radicalbit
>>>
>>
>>
>>
>> --
>> Thank You
>>
>> Regards
>>
>> Punit Naik
>>
>
>
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>



--
Thank You

Regards

Punit Naik