Below is my code:
val env = ExecutionEnvironment.getExecutionEnvironment val data=env.readTextFile("file:///home/punit/test").flatMap( line => JSON.parseFull(line) ) val j=data.flatMap{ _ match {case map: Map[String, Any] => {List(Map("ga_date" -> map.get("ga_dateHour").get.toString().substring(0, map.get("ga_dateHour").get.toString().length()-2))) }}} val k=j.groupBy(_.get("ga_date")) But when I execute this, it throws an exception saying: org.apache.flink.api.common.InvalidProgramException: Return type Option[String] of KeySelector class org.apache.flink.api.scala.DataSet$$anon$12 is not a valid key type Where am I going wrong? -- Thank You Regards Punit Naik |
The `get` method on the Scala map returns an Option, which is not
(currently) a valid key type for Flink (but there's ongoing work on this [1]). Flink must be aware of how to use a particular type as a key if you want to group by a value of said type. See the advanced DataSet concepts in the official Flink training for more info on this [2]. If you're just playing around the easy way to make it work is to directly apply the key to the map (or use the apply method). Beware that you're prone to exceptions in this way. A cleaner solution would be to write your own KeySelector for the Option type. val k=j.groupBy(_("ga_date")) or val k=j.groupBy(ga => ga("ga_date")) or val k=j.groupBy(_.apply("ga_date")) As a side note, I believe the user mailing list may be more appropriate for this kind of issues. [1]: https://issues.apache.org/jira/browse/FLINK-2673 [2]: http://dataartisans.github.io/flink-training/dataSetAdvanced/slides.html On Fri, Apr 29, 2016 at 10:13 AM, Punit Naik <[hidden email]> wrote: > Below is my code: > > val env = ExecutionEnvironment.getExecutionEnvironment > val data=env.readTextFile("file:///home/punit/test").flatMap( line => > JSON.parseFull(line) ) > val j=data.flatMap{ _ match {case map: Map[String, Any] => > {List(Map("ga_date" -> > map.get("ga_dateHour").get.toString().substring(0, > map.get("ga_dateHour").get.toString().length()-2))) }}} > > val k=j.groupBy(_.get("ga_date")) > > But when I execute this, it throws an exception saying: > > org.apache.flink.api.common.InvalidProgramException: Return type > Option[String] of KeySelector class > org.apache.flink.api.scala.DataSet$$anon$12 is not a valid key type > > Where am I going wrong? > -- > Thank You > > Regards > > Punit Naik > -- BR, Stefano Baghino Software Engineer @ Radicalbit |
okay I'll take all your points into consideration.
On Fri, Apr 29, 2016 at 2:15 PM, Stefano Baghino < [hidden email]> wrote: > The `get` method on the Scala map returns an Option, which is not > (currently) a valid key type for Flink (but there's ongoing work on this > [1]). Flink must be aware of how to use a particular type as a key if you > want to group by a value of said type. See the advanced DataSet concepts in > the official Flink training for more info on this [2]. > > If you're just playing around the easy way to make it work is to directly > apply the key to the map (or use the apply method). Beware that you're > prone to exceptions in this way. A cleaner solution would be to write your > own KeySelector for the Option type. > > val k=j.groupBy(_("ga_date")) or > val k=j.groupBy(ga => ga("ga_date")) or > val k=j.groupBy(_.apply("ga_date")) > > As a side note, I believe the user mailing list may be more appropriate for > this kind of issues. > > [1]: https://issues.apache.org/jira/browse/FLINK-2673 > [2]: > http://dataartisans.github.io/flink-training/dataSetAdvanced/slides.html > > On Fri, Apr 29, 2016 at 10:13 AM, Punit Naik <[hidden email]> > wrote: > > > Below is my code: > > > > val env = ExecutionEnvironment.getExecutionEnvironment > > val data=env.readTextFile("file:///home/punit/test").flatMap( line => > > JSON.parseFull(line) ) > > val j=data.flatMap{ _ match {case map: Map[String, Any] => > > {List(Map("ga_date" -> > > map.get("ga_dateHour").get.toString().substring(0, > > map.get("ga_dateHour").get.toString().length()-2))) }}} > > > > val k=j.groupBy(_.get("ga_date")) > > > > But when I execute this, it throws an exception saying: > > > > org.apache.flink.api.common.InvalidProgramException: Return type > > Option[String] of KeySelector class > > org.apache.flink.api.scala.DataSet$$anon$12 is not a valid key type > > > > Where am I going wrong? > > -- > > Thank You > > > > Regards > > > > Punit Naik > > > > > > -- > BR, > Stefano Baghino > > Software Engineer @ Radicalbit > -- Thank You Regards Punit Naik |
In reply to this post by stefanobaghino
What if after grouping I wanted to count the occurrences of the key
"ga_date"? On Fri, Apr 29, 2016 at 2:15 PM, Stefano Baghino < [hidden email]> wrote: > The `get` method on the Scala map returns an Option, which is not > (currently) a valid key type for Flink (but there's ongoing work on this > [1]). Flink must be aware of how to use a particular type as a key if you > want to group by a value of said type. See the advanced DataSet concepts in > the official Flink training for more info on this [2]. > > If you're just playing around the easy way to make it work is to directly > apply the key to the map (or use the apply method). Beware that you're > prone to exceptions in this way. A cleaner solution would be to write your > own KeySelector for the Option type. > > val k=j.groupBy(_("ga_date")) or > val k=j.groupBy(ga => ga("ga_date")) or > val k=j.groupBy(_.apply("ga_date")) > > As a side note, I believe the user mailing list may be more appropriate for > this kind of issues. > > [1]: https://issues.apache.org/jira/browse/FLINK-2673 > [2]: > http://dataartisans.github.io/flink-training/dataSetAdvanced/slides.html > > On Fri, Apr 29, 2016 at 10:13 AM, Punit Naik <[hidden email]> > wrote: > > > Below is my code: > > > > val env = ExecutionEnvironment.getExecutionEnvironment > > val data=env.readTextFile("file:///home/punit/test").flatMap( line => > > JSON.parseFull(line) ) > > val j=data.flatMap{ _ match {case map: Map[String, Any] => > > {List(Map("ga_date" -> > > map.get("ga_dateHour").get.toString().substring(0, > > map.get("ga_dateHour").get.toString().length()-2))) }}} > > > > val k=j.groupBy(_.get("ga_date")) > > > > But when I execute this, it throws an exception saying: > > > > org.apache.flink.api.common.InvalidProgramException: Return type > > Option[String] of KeySelector class > > org.apache.flink.api.scala.DataSet$$anon$12 is not a valid key type > > > > Where am I going wrong? > > -- > > Thank You > > > > Regards > > > > Punit Naik > > > > > > -- > BR, > Stefano Baghino > > Software Engineer @ Radicalbit > -- Thank You Regards Punit Naik |
Anyways, I fixed it. To you groupBy you should attach this:
.reduceGroup { (in, out: org.apache.flink.util.Collector[(Map[key,value], Int)]) => var v:Int = 0; var k:Map[key,value]=Map() for (t <- in) { v+=1; k=t } out.collect((k,v)) } On Fri, Apr 29, 2016 at 2:58 PM, Punit Naik <[hidden email]> wrote: > What if after grouping I wanted to count the occurrences of the key > "ga_date"? > > On Fri, Apr 29, 2016 at 2:15 PM, Stefano Baghino < > [hidden email]> wrote: > >> The `get` method on the Scala map returns an Option, which is not >> (currently) a valid key type for Flink (but there's ongoing work on this >> [1]). Flink must be aware of how to use a particular type as a key if you >> want to group by a value of said type. See the advanced DataSet concepts >> in >> the official Flink training for more info on this [2]. >> >> If you're just playing around the easy way to make it work is to directly >> apply the key to the map (or use the apply method). Beware that you're >> prone to exceptions in this way. A cleaner solution would be to write your >> own KeySelector for the Option type. >> >> val k=j.groupBy(_("ga_date")) or >> val k=j.groupBy(ga => ga("ga_date")) or >> val k=j.groupBy(_.apply("ga_date")) >> >> As a side note, I believe the user mailing list may be more appropriate >> for >> this kind of issues. >> >> [1]: https://issues.apache.org/jira/browse/FLINK-2673 >> [2]: >> http://dataartisans.github.io/flink-training/dataSetAdvanced/slides.html >> >> On Fri, Apr 29, 2016 at 10:13 AM, Punit Naik <[hidden email]> >> wrote: >> >> > Below is my code: >> > >> > val env = ExecutionEnvironment.getExecutionEnvironment >> > val data=env.readTextFile("file:///home/punit/test").flatMap( line => >> > JSON.parseFull(line) ) >> > val j=data.flatMap{ _ match {case map: Map[String, Any] => >> > {List(Map("ga_date" -> >> > map.get("ga_dateHour").get.toString().substring(0, >> > map.get("ga_dateHour").get.toString().length()-2))) }}} >> > >> > val k=j.groupBy(_.get("ga_date")) >> > >> > But when I execute this, it throws an exception saying: >> > >> > org.apache.flink.api.common.InvalidProgramException: Return type >> > Option[String] of KeySelector class >> > org.apache.flink.api.scala.DataSet$$anon$12 is not a valid key type >> > >> > Where am I going wrong? >> > -- >> > Thank You >> > >> > Regards >> > >> > Punit Naik >> > >> >> >> >> -- >> BR, >> Stefano Baghino >> >> Software Engineer @ Radicalbit >> > > > > -- > Thank You > > Regards > > Punit Naik > -- Thank You Regards Punit Naik |
I forgot to mention that in this code my out.collect method is outputting a
tuple of Map[key,value] and the count as Int. On Fri, Apr 29, 2016 at 4:53 PM, Punit Naik <[hidden email]> wrote: > Anyways, I fixed it. To you groupBy you should attach this: > > .reduceGroup { > (in, out: org.apache.flink.util.Collector[(Map[key,value], Int)]) => > var v:Int = 0; > var k:Map[key,value]=Map() > for (t <- in) { > v+=1; > k=t > } > out.collect((k,v)) > } > > On Fri, Apr 29, 2016 at 2:58 PM, Punit Naik <[hidden email]> > wrote: > >> What if after grouping I wanted to count the occurrences of the key >> "ga_date"? >> >> On Fri, Apr 29, 2016 at 2:15 PM, Stefano Baghino < >> [hidden email]> wrote: >> >>> The `get` method on the Scala map returns an Option, which is not >>> (currently) a valid key type for Flink (but there's ongoing work on this >>> [1]). Flink must be aware of how to use a particular type as a key if you >>> want to group by a value of said type. See the advanced DataSet concepts >>> in >>> the official Flink training for more info on this [2]. >>> >>> If you're just playing around the easy way to make it work is to directly >>> apply the key to the map (or use the apply method). Beware that you're >>> prone to exceptions in this way. A cleaner solution would be to write >>> your >>> own KeySelector for the Option type. >>> >>> val k=j.groupBy(_("ga_date")) or >>> val k=j.groupBy(ga => ga("ga_date")) or >>> val k=j.groupBy(_.apply("ga_date")) >>> >>> As a side note, I believe the user mailing list may be more appropriate >>> for >>> this kind of issues. >>> >>> [1]: https://issues.apache.org/jira/browse/FLINK-2673 >>> [2]: >>> http://dataartisans.github.io/flink-training/dataSetAdvanced/slides.html >>> >>> On Fri, Apr 29, 2016 at 10:13 AM, Punit Naik <[hidden email]> >>> wrote: >>> >>> > Below is my code: >>> > >>> > val env = ExecutionEnvironment.getExecutionEnvironment >>> > val data=env.readTextFile("file:///home/punit/test").flatMap( line => >>> > JSON.parseFull(line) ) >>> > val j=data.flatMap{ _ match {case map: Map[String, Any] => >>> > {List(Map("ga_date" -> >>> > map.get("ga_dateHour").get.toString().substring(0, >>> > map.get("ga_dateHour").get.toString().length()-2))) }}} >>> > >>> > val k=j.groupBy(_.get("ga_date")) >>> > >>> > But when I execute this, it throws an exception saying: >>> > >>> > org.apache.flink.api.common.InvalidProgramException: Return type >>> > Option[String] of KeySelector class >>> > org.apache.flink.api.scala.DataSet$$anon$12 is not a valid key type >>> > >>> > Where am I going wrong? >>> > -- >>> > Thank You >>> > >>> > Regards >>> > >>> > Punit Naik >>> > >>> >>> >>> >>> -- >>> BR, >>> Stefano Baghino >>> >>> Software Engineer @ Radicalbit >>> >> >> >> >> -- >> Thank You >> >> Regards >> >> Punit Naik >> > > > > -- > Thank You > > Regards > > Punit Naik > -- Thank You Regards Punit Naik |
Free forum by Nabble | Edit this page |