"Cannot resolve map"

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

"Cannot resolve map"

Thomas FOURNIER
Hello,

In the following code,  map { case (id,(label, count)) => (label,id) } is
not resolved.
Is it related to zipWithIndex (org.apache.flink.api.scala) operation ?

My input is a DataSet[String] and I'd like to output a
DataSet[(String,Long)]


val mapping = input

  .map( (s => (s, 1)) )
  .groupBy( 0 )
  .reduce( (a, b) => (a._1, a._2 + b._2) )
  .partitionByRange( 1 )
  .zipWithIndex
  .map { case (id,(label, count)) => (label,id) }



Thanks
Regards

Thomas
Reply | Threaded
Open this post in threaded view
|

Re: "Cannot resolve map"

Till Rohrmann
Hi Thomas,

Flink does not support partial functions due to the map method being
overloaded. Instead you can write map{ x match { case ... => } } or you
import org.apache.flink.scala.extensions.acceptPartialFunctions and then
write .zipWithIndex.mapWith { case ... => }.

Cheers,
Till


On Fri, Nov 4, 2016 at 1:58 PM, Thomas FOURNIER <[hidden email]
> wrote:

> Hello,
>
> In the following code,  map { case (id,(label, count)) => (label,id) } is
> not resolved.
> Is it related to zipWithIndex (org.apache.flink.api.scala) operation ?
>
> My input is a DataSet[String] and I'd like to output a
> DataSet[(String,Long)]
>
>
> val mapping = input
>
>   .map( (s => (s, 1)) )
>   .groupBy( 0 )
>   .reduce( (a, b) => (a._1, a._2 + b._2) )
>   .partitionByRange( 1 )
>   .zipWithIndex
>   .map { case (id,(label, count)) => (label,id) }
>
>
>
> Thanks
> Regards
>
> Thomas
>