Re: [2/2] flink git commit: [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: [2/2] flink git commit: [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML

Sachin Goel
Hi Till
This works only when there is only one variable to be broadcasted, doesn't
it? What about the case when we need to broadcast two? Is it advisable to
create a BroadcastDoubleElementMapper class or perhaps we could just send a
tuple of all the variables? Perhaps that is a better idea.

Regards
Sachin Goel

On Tue, Jun 2, 2015 at 8:15 PM, <[hidden email]> wrote:

> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML
>
>
> Project: http://git-wip-us.apache.org/repos/asf/flink/repo
> Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5
> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5
> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5
>
> Branch: refs/heads/master
> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c
> Parents: 44dae0c
> Author: Till Rohrmann <[hidden email]>
> Authored: Tue Jun 2 14:45:12 2015 +0200
> Committer: Till Rohrmann <[hidden email]>
> Committed: Tue Jun 2 15:34:54 2015 +0200
>
> ----------------------------------------------------------------------
>  .../apache/flink/ml/classification/SVM.scala    | 73 ++++++--------------
>  .../flink/ml/preprocessing/StandardScaler.scala | 39 +++--------
>  2 files changed, 30 insertions(+), 82 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> ----------------------------------------------------------------------
> diff --git
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> index e01735f..c69b56a 100644
> ---
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> +++
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> @@ -26,6 +26,7 @@ import scala.util.Random
>  import org.apache.flink.api.common.functions.RichMapFunction
>  import org.apache.flink.api.scala._
>  import org.apache.flink.configuration.Configuration
> +import org.apache.flink.ml._
>  import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
>  import org.apache.flink.ml.common._
>  import org.apache.flink.ml.math.Vector
> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] {
>    * of the algorithm.
>    */
>  object SVM{
> +
>    val WEIGHT_VECTOR ="weightVector"
>
>    // ========================================== Parameters
> =========================================
> @@ -242,7 +244,13 @@ object SVM{
>
>          instance.weightsOption match {
>            case Some(weights) => {
> -            input.map(new PredictionMapper[T]).withBroadcastSet(weights,
> WEIGHT_VECTOR)
> +            input.mapWithBcVariable(weights){
> +              (vector, weights) => {
> +                val dotProduct = weights dot vector.asBreeze
> +
> +                LabeledVector(dotProduct, vector)
> +              }
> +            }
>            }
>
>            case None => {
> @@ -254,28 +262,6 @@ object SVM{
>      }
>    }
>
> -  /** Mapper to calculate the value of the prediction function. This is a
> RichMapFunction, because
> -    * we broadcast the weight vector to all mappers.
> -    */
> -  class PredictionMapper[T <: Vector] extends RichMapFunction[T,
> LabeledVector] {
> -
> -    var weights: BreezeDenseVector[Double] = _
> -
> -    @throws(classOf[Exception])
> -    override def open(configuration: Configuration): Unit = {
> -      // get current weights
> -      weights = getRuntimeContext.
> -
> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
> -    }
> -
> -    override def map(vector: T): LabeledVector = {
> -      // calculate the prediction value (scaled distance from the
> separating hyperplane)
> -      val dotProduct = weights dot vector.asBreeze
> -
> -      LabeledVector(dotProduct, vector)
> -    }
> -  }
> -
>    /** [[org.apache.flink.ml.pipeline.PredictOperation]] for
> [[LabeledVector ]]types. The result type
>      * is a [[(Double, Double)]] tuple, corresponding to (truth,
> prediction)
>      *
> @@ -291,7 +277,14 @@ object SVM{
>
>          instance.weightsOption match {
>            case Some(weights) => {
> -            input.map(new
> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR)
> +            input.mapWithBcVariable(weights){
> +              (labeledVector, weights) => {
> +                val prediction = weights dot labeledVector.vector.asBreeze
> +                val truth = labeledVector.label
> +
> +                (truth, prediction)
> +              }
> +            }
>            }
>
>            case None => {
> @@ -303,30 +296,6 @@ object SVM{
>      }
>    }
>
> -  /** Mapper to calculate the value of the prediction function. This is a
> RichMapFunction, because
> -    * we broadcast the weight vector to all mappers.
> -    */
> -  class LabeledPredictionMapper extends RichMapFunction[LabeledVector,
> (Double, Double)] {
> -
> -    var weights: BreezeDenseVector[Double] = _
> -
> -    @throws(classOf[Exception])
> -    override def open(configuration: Configuration): Unit = {
> -      // get current weights
> -      weights = getRuntimeContext.
> -
> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
> -    }
> -
> -    override def map(labeledVector: LabeledVector): (Double, Double) = {
> -      // calculate the prediction value (scaled distance from the
> separating hyperplane)
> -      val prediction = weights dot labeledVector.vector.asBreeze
> -      val truth = labeledVector.label
> -
> -      (truth, prediction)
> -    }
> -  }
> -
> -
>    /** [[FitOperation]] which trains a SVM with soft-margin based on the
> given training data set.
>      *
>      */
> @@ -540,17 +509,17 @@ object SVM{
>
>      // compute projected gradient
>      var proj_grad = if(alpha  <= 0.0){
> -      math.min(grad, 0)
> +      scala.math.min(grad, 0)
>      } else if(alpha >= 1.0) {
> -      math.max(grad, 0)
> +      scala.math.max(grad, 0)
>      } else {
>        grad
>      }
>
> -    if(math.abs(grad) != 0.0){
> +    if(scala.math.abs(grad) != 0.0){
>        val qii = x dot x
>        val newAlpha = if(qii != 0.0){
> -        math.min(math.max((alpha - (grad / qii)), 0.0), 1.0)
> +        scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0), 1.0)
>        } else {
>          1.0
>        }
>
>
> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> ----------------------------------------------------------------------
> diff --git
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> index 2e3ed95..7992b02 100644
> ---
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> +++
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._
>  import org.apache.flink.api.common.typeinfo.TypeInformation
>  import org.apache.flink.api.scala._
>  import org.apache.flink.configuration.Configuration
> +import org.apache.flink.ml._
>  import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
>  import org.apache.flink.ml.math.Breeze._
>  import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
> @@ -209,20 +210,9 @@ object StandardScaler {
>
>          instance.metricsOption match {
>            case Some(metrics) => {
> -            input.map(new RichMapFunction[T, T]() {
> -
> -              var broadcastMean: linalg.Vector[Double] = null
> -              var broadcastStd: linalg.Vector[Double] = null
> -
> -              override def open(parameters: Configuration): Unit = {
> -                val broadcastedMetrics =
> getRuntimeContext().getBroadcastVariable[
> -                    (linalg.Vector[Double], linalg.Vector[Double])
> -                  ]("broadcastedMetrics").get(0)
> -                broadcastMean = broadcastedMetrics._1
> -                broadcastStd = broadcastedMetrics._2
> -              }
> -
> -              override def map(vector: T): T = {
> +            input.mapWithBcVariable(metrics){
> +              (vector, metrics) => {
> +                val (broadcastMean, broadcastStd) = metrics
>                  var myVector = vector.asBreeze
>
>                  myVector -= broadcastMean
> @@ -230,7 +220,7 @@ object StandardScaler {
>                  myVector = (myVector :* std) + mean
>                  myVector.fromBreeze
>                }
> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
> +            }
>            }
>
>            case None =>
> @@ -251,20 +241,9 @@ object StandardScaler {
>
>          instance.metricsOption match {
>            case Some(metrics) => {
> -            input.map(new RichMapFunction[LabeledVector, LabeledVector]()
> {
> -
> -              var broadcastMean: linalg.Vector[Double] = null
> -              var broadcastStd: linalg.Vector[Double] = null
> -
> -              override def open(parameters: Configuration): Unit = {
> -                val broadcastedMetrics =
> getRuntimeContext().getBroadcastVariable[
> -                  (linalg.Vector[Double], linalg.Vector[Double])
> -                  ]("broadcastedMetrics").get(0)
> -                broadcastMean = broadcastedMetrics._1
> -                broadcastStd = broadcastedMetrics._2
> -              }
> -
> -              override def map(labeledVector: LabeledVector):
> LabeledVector = {
> +            input.mapWithBcVariable(metrics){
> +              (labeledVector, metrics) => {
> +                val (broadcastMean, broadcastStd) = metrics
>                  val LabeledVector(label, vector) = labeledVector
>                  var breezeVector = vector.asBreeze
>
> @@ -273,7 +252,7 @@ object StandardScaler {
>                  breezeVector = (breezeVector :* std) + mean
>                  LabeledVector(label, breezeVector.fromBreeze[Vector])
>                }
> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
> +            }
>            }
>
>            case None =>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [2/2] flink git commit: [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML

Sachin Goel
Further, I think we should return just
broadcastVariable = getRuntimeContext.
getBroadcastVariable[B]("broadcastVariable")
in BroadcastSingleElementMapper
User may wish to have a list broadcasted, and not just want to access the
first element. For example, this would make sense in the kmeans algorithm.

Regards
Sachin Goel

On Tue, Jun 2, 2015 at 9:03 PM, Sachin Goel <[hidden email]>
wrote:

> Hi Till
> This works only when there is only one variable to be broadcasted, doesn't
> it? What about the case when we need to broadcast two? Is it advisable to
> create a BroadcastDoubleElementMapper class or perhaps we could just send a
> tuple of all the variables? Perhaps that is a better idea.
>
> Regards
> Sachin Goel
>
> On Tue, Jun 2, 2015 at 8:15 PM, <[hidden email]> wrote:
>
>> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/flink/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5
>> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5
>> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5
>>
>> Branch: refs/heads/master
>> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c
>> Parents: 44dae0c
>> Author: Till Rohrmann <[hidden email]>
>> Authored: Tue Jun 2 14:45:12 2015 +0200
>> Committer: Till Rohrmann <[hidden email]>
>> Committed: Tue Jun 2 15:34:54 2015 +0200
>>
>> ----------------------------------------------------------------------
>>  .../apache/flink/ml/classification/SVM.scala    | 73 ++++++--------------
>>  .../flink/ml/preprocessing/StandardScaler.scala | 39 +++--------
>>  2 files changed, 30 insertions(+), 82 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
>> ----------------------------------------------------------------------
>> diff --git
>> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
>> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
>> index e01735f..c69b56a 100644
>> ---
>> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
>> +++
>> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
>> @@ -26,6 +26,7 @@ import scala.util.Random
>>  import org.apache.flink.api.common.functions.RichMapFunction
>>  import org.apache.flink.api.scala._
>>  import org.apache.flink.configuration.Configuration
>> +import org.apache.flink.ml._
>>  import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
>>  import org.apache.flink.ml.common._
>>  import org.apache.flink.ml.math.Vector
>> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] {
>>    * of the algorithm.
>>    */
>>  object SVM{
>> +
>>    val WEIGHT_VECTOR ="weightVector"
>>
>>    // ========================================== Parameters
>> =========================================
>> @@ -242,7 +244,13 @@ object SVM{
>>
>>          instance.weightsOption match {
>>            case Some(weights) => {
>> -            input.map(new PredictionMapper[T]).withBroadcastSet(weights,
>> WEIGHT_VECTOR)
>> +            input.mapWithBcVariable(weights){
>> +              (vector, weights) => {
>> +                val dotProduct = weights dot vector.asBreeze
>> +
>> +                LabeledVector(dotProduct, vector)
>> +              }
>> +            }
>>            }
>>
>>            case None => {
>> @@ -254,28 +262,6 @@ object SVM{
>>      }
>>    }
>>
>> -  /** Mapper to calculate the value of the prediction function. This is
>> a RichMapFunction, because
>> -    * we broadcast the weight vector to all mappers.
>> -    */
>> -  class PredictionMapper[T <: Vector] extends RichMapFunction[T,
>> LabeledVector] {
>> -
>> -    var weights: BreezeDenseVector[Double] = _
>> -
>> -    @throws(classOf[Exception])
>> -    override def open(configuration: Configuration): Unit = {
>> -      // get current weights
>> -      weights = getRuntimeContext.
>> -
>> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
>> -    }
>> -
>> -    override def map(vector: T): LabeledVector = {
>> -      // calculate the prediction value (scaled distance from the
>> separating hyperplane)
>> -      val dotProduct = weights dot vector.asBreeze
>> -
>> -      LabeledVector(dotProduct, vector)
>> -    }
>> -  }
>> -
>>    /** [[org.apache.flink.ml.pipeline.PredictOperation]] for
>> [[LabeledVector ]]types. The result type
>>      * is a [[(Double, Double)]] tuple, corresponding to (truth,
>> prediction)
>>      *
>> @@ -291,7 +277,14 @@ object SVM{
>>
>>          instance.weightsOption match {
>>            case Some(weights) => {
>> -            input.map(new
>> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR)
>> +            input.mapWithBcVariable(weights){
>> +              (labeledVector, weights) => {
>> +                val prediction = weights dot
>> labeledVector.vector.asBreeze
>> +                val truth = labeledVector.label
>> +
>> +                (truth, prediction)
>> +              }
>> +            }
>>            }
>>
>>            case None => {
>> @@ -303,30 +296,6 @@ object SVM{
>>      }
>>    }
>>
>> -  /** Mapper to calculate the value of the prediction function. This is
>> a RichMapFunction, because
>> -    * we broadcast the weight vector to all mappers.
>> -    */
>> -  class LabeledPredictionMapper extends RichMapFunction[LabeledVector,
>> (Double, Double)] {
>> -
>> -    var weights: BreezeDenseVector[Double] = _
>> -
>> -    @throws(classOf[Exception])
>> -    override def open(configuration: Configuration): Unit = {
>> -      // get current weights
>> -      weights = getRuntimeContext.
>> -
>> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
>> -    }
>> -
>> -    override def map(labeledVector: LabeledVector): (Double, Double) = {
>> -      // calculate the prediction value (scaled distance from the
>> separating hyperplane)
>> -      val prediction = weights dot labeledVector.vector.asBreeze
>> -      val truth = labeledVector.label
>> -
>> -      (truth, prediction)
>> -    }
>> -  }
>> -
>> -
>>    /** [[FitOperation]] which trains a SVM with soft-margin based on the
>> given training data set.
>>      *
>>      */
>> @@ -540,17 +509,17 @@ object SVM{
>>
>>      // compute projected gradient
>>      var proj_grad = if(alpha  <= 0.0){
>> -      math.min(grad, 0)
>> +      scala.math.min(grad, 0)
>>      } else if(alpha >= 1.0) {
>> -      math.max(grad, 0)
>> +      scala.math.max(grad, 0)
>>      } else {
>>        grad
>>      }
>>
>> -    if(math.abs(grad) != 0.0){
>> +    if(scala.math.abs(grad) != 0.0){
>>        val qii = x dot x
>>        val newAlpha = if(qii != 0.0){
>> -        math.min(math.max((alpha - (grad / qii)), 0.0), 1.0)
>> +        scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0), 1.0)
>>        } else {
>>          1.0
>>        }
>>
>>
>> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
>> ----------------------------------------------------------------------
>> diff --git
>> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
>> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
>> index 2e3ed95..7992b02 100644
>> ---
>> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
>> +++
>> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
>> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._
>>  import org.apache.flink.api.common.typeinfo.TypeInformation
>>  import org.apache.flink.api.scala._
>>  import org.apache.flink.configuration.Configuration
>> +import org.apache.flink.ml._
>>  import org.apache.flink.ml.common.{LabeledVector, Parameter,
>> ParameterMap}
>>  import org.apache.flink.ml.math.Breeze._
>>  import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
>> @@ -209,20 +210,9 @@ object StandardScaler {
>>
>>          instance.metricsOption match {
>>            case Some(metrics) => {
>> -            input.map(new RichMapFunction[T, T]() {
>> -
>> -              var broadcastMean: linalg.Vector[Double] = null
>> -              var broadcastStd: linalg.Vector[Double] = null
>> -
>> -              override def open(parameters: Configuration): Unit = {
>> -                val broadcastedMetrics =
>> getRuntimeContext().getBroadcastVariable[
>> -                    (linalg.Vector[Double], linalg.Vector[Double])
>> -                  ]("broadcastedMetrics").get(0)
>> -                broadcastMean = broadcastedMetrics._1
>> -                broadcastStd = broadcastedMetrics._2
>> -              }
>> -
>> -              override def map(vector: T): T = {
>> +            input.mapWithBcVariable(metrics){
>> +              (vector, metrics) => {
>> +                val (broadcastMean, broadcastStd) = metrics
>>                  var myVector = vector.asBreeze
>>
>>                  myVector -= broadcastMean
>> @@ -230,7 +220,7 @@ object StandardScaler {
>>                  myVector = (myVector :* std) + mean
>>                  myVector.fromBreeze
>>                }
>> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
>> +            }
>>            }
>>
>>            case None =>
>> @@ -251,20 +241,9 @@ object StandardScaler {
>>
>>          instance.metricsOption match {
>>            case Some(metrics) => {
>> -            input.map(new RichMapFunction[LabeledVector,
>> LabeledVector]() {
>> -
>> -              var broadcastMean: linalg.Vector[Double] = null
>> -              var broadcastStd: linalg.Vector[Double] = null
>> -
>> -              override def open(parameters: Configuration): Unit = {
>> -                val broadcastedMetrics =
>> getRuntimeContext().getBroadcastVariable[
>> -                  (linalg.Vector[Double], linalg.Vector[Double])
>> -                  ]("broadcastedMetrics").get(0)
>> -                broadcastMean = broadcastedMetrics._1
>> -                broadcastStd = broadcastedMetrics._2
>> -              }
>> -
>> -              override def map(labeledVector: LabeledVector):
>> LabeledVector = {
>> +            input.mapWithBcVariable(metrics){
>> +              (labeledVector, metrics) => {
>> +                val (broadcastMean, broadcastStd) = metrics
>>                  val LabeledVector(label, vector) = labeledVector
>>                  var breezeVector = vector.asBreeze
>>
>> @@ -273,7 +252,7 @@ object StandardScaler {
>>                  breezeVector = (breezeVector :* std) + mean
>>                  LabeledVector(label, breezeVector.fromBreeze[Vector])
>>                }
>> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
>> +            }
>>            }
>>
>>            case None =>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [2/2] flink git commit: [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML

till.rohrmann
Yes you’re right Sachin. The mapWithBcVariable is only syntactic sugar if
you have a broadcast DataSet which contains only one element. If you have
multiple elements in your DataSet then you can’t use this method. But we
can define another method mapWithBcSet which takes a function f: (element:
T, broadcastValues: List[B]) => O, for example.

If you have multiple DataSet which fulfil this condition, then you can wrap
them in a tuple as you’ve said.

Cheers,
Till


On Tue, Jun 2, 2015 at 7:10 PM, Sachin Goel <[hidden email]>
wrote:

> Further, I think we should return just
> broadcastVariable = getRuntimeContext.
> getBroadcastVariable[B]("broadcastVariable")
> in BroadcastSingleElementMapper
> User may wish to have a list broadcasted, and not just want to access the
> first element. For example, this would make sense in the kmeans algorithm.
>
> Regards
> Sachin Goel
>
> On Tue, Jun 2, 2015 at 9:03 PM, Sachin Goel <[hidden email]>
> wrote:
>
> > Hi Till
> > This works only when there is only one variable to be broadcasted,
> doesn't
> > it? What about the case when we need to broadcast two? Is it advisable to
> > create a BroadcastDoubleElementMapper class or perhaps we could just
> send a
> > tuple of all the variables? Perhaps that is a better idea.
> >
> > Regards
> > Sachin Goel
> >
> > On Tue, Jun 2, 2015 at 8:15 PM, <[hidden email]> wrote:
> >
> >> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML
> >>
> >>
> >> Project: http://git-wip-us.apache.org/repos/asf/flink/repo
> >> Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5
> >> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5
> >> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5
> >>
> >> Branch: refs/heads/master
> >> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c
> >> Parents: 44dae0c
> >> Author: Till Rohrmann <[hidden email]>
> >> Authored: Tue Jun 2 14:45:12 2015 +0200
> >> Committer: Till Rohrmann <[hidden email]>
> >> Committed: Tue Jun 2 15:34:54 2015 +0200
> >>
> >> ----------------------------------------------------------------------
> >>  .../apache/flink/ml/classification/SVM.scala    | 73
> ++++++--------------
> >>  .../flink/ml/preprocessing/StandardScaler.scala | 39 +++--------
> >>  2 files changed, 30 insertions(+), 82 deletions(-)
> >> ----------------------------------------------------------------------
> >>
> >>
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> >> ----------------------------------------------------------------------
> >> diff --git
> >>
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> >>
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> >> index e01735f..c69b56a 100644
> >> ---
> >>
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> >> +++
> >>
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> >> @@ -26,6 +26,7 @@ import scala.util.Random
> >>  import org.apache.flink.api.common.functions.RichMapFunction
> >>  import org.apache.flink.api.scala._
> >>  import org.apache.flink.configuration.Configuration
> >> +import org.apache.flink.ml._
> >>  import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
> >>  import org.apache.flink.ml.common._
> >>  import org.apache.flink.ml.math.Vector
> >> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] {
> >>    * of the algorithm.
> >>    */
> >>  object SVM{
> >> +
> >>    val WEIGHT_VECTOR ="weightVector"
> >>
> >>    // ========================================== Parameters
> >> =========================================
> >> @@ -242,7 +244,13 @@ object SVM{
> >>
> >>          instance.weightsOption match {
> >>            case Some(weights) => {
> >> -            input.map(new
> PredictionMapper[T]).withBroadcastSet(weights,
> >> WEIGHT_VECTOR)
> >> +            input.mapWithBcVariable(weights){
> >> +              (vector, weights) => {
> >> +                val dotProduct = weights dot vector.asBreeze
> >> +
> >> +                LabeledVector(dotProduct, vector)
> >> +              }
> >> +            }
> >>            }
> >>
> >>            case None => {
> >> @@ -254,28 +262,6 @@ object SVM{
> >>      }
> >>    }
> >>
> >> -  /** Mapper to calculate the value of the prediction function. This is
> >> a RichMapFunction, because
> >> -    * we broadcast the weight vector to all mappers.
> >> -    */
> >> -  class PredictionMapper[T <: Vector] extends RichMapFunction[T,
> >> LabeledVector] {
> >> -
> >> -    var weights: BreezeDenseVector[Double] = _
> >> -
> >> -    @throws(classOf[Exception])
> >> -    override def open(configuration: Configuration): Unit = {
> >> -      // get current weights
> >> -      weights = getRuntimeContext.
> >> -
> >> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
> >> -    }
> >> -
> >> -    override def map(vector: T): LabeledVector = {
> >> -      // calculate the prediction value (scaled distance from the
> >> separating hyperplane)
> >> -      val dotProduct = weights dot vector.asBreeze
> >> -
> >> -      LabeledVector(dotProduct, vector)
> >> -    }
> >> -  }
> >> -
> >>    /** [[org.apache.flink.ml.pipeline.PredictOperation]] for
> >> [[LabeledVector ]]types. The result type
> >>      * is a [[(Double, Double)]] tuple, corresponding to (truth,
> >> prediction)
> >>      *
> >> @@ -291,7 +277,14 @@ object SVM{
> >>
> >>          instance.weightsOption match {
> >>            case Some(weights) => {
> >> -            input.map(new
> >> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR)
> >> +            input.mapWithBcVariable(weights){
> >> +              (labeledVector, weights) => {
> >> +                val prediction = weights dot
> >> labeledVector.vector.asBreeze
> >> +                val truth = labeledVector.label
> >> +
> >> +                (truth, prediction)
> >> +              }
> >> +            }
> >>            }
> >>
> >>            case None => {
> >> @@ -303,30 +296,6 @@ object SVM{
> >>      }
> >>    }
> >>
> >> -  /** Mapper to calculate the value of the prediction function. This is
> >> a RichMapFunction, because
> >> -    * we broadcast the weight vector to all mappers.
> >> -    */
> >> -  class LabeledPredictionMapper extends RichMapFunction[LabeledVector,
> >> (Double, Double)] {
> >> -
> >> -    var weights: BreezeDenseVector[Double] = _
> >> -
> >> -    @throws(classOf[Exception])
> >> -    override def open(configuration: Configuration): Unit = {
> >> -      // get current weights
> >> -      weights = getRuntimeContext.
> >> -
> >> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
> >> -    }
> >> -
> >> -    override def map(labeledVector: LabeledVector): (Double, Double) =
> {
> >> -      // calculate the prediction value (scaled distance from the
> >> separating hyperplane)
> >> -      val prediction = weights dot labeledVector.vector.asBreeze
> >> -      val truth = labeledVector.label
> >> -
> >> -      (truth, prediction)
> >> -    }
> >> -  }
> >> -
> >> -
> >>    /** [[FitOperation]] which trains a SVM with soft-margin based on the
> >> given training data set.
> >>      *
> >>      */
> >> @@ -540,17 +509,17 @@ object SVM{
> >>
> >>      // compute projected gradient
> >>      var proj_grad = if(alpha  <= 0.0){
> >> -      math.min(grad, 0)
> >> +      scala.math.min(grad, 0)
> >>      } else if(alpha >= 1.0) {
> >> -      math.max(grad, 0)
> >> +      scala.math.max(grad, 0)
> >>      } else {
> >>        grad
> >>      }
> >>
> >> -    if(math.abs(grad) != 0.0){
> >> +    if(scala.math.abs(grad) != 0.0){
> >>        val qii = x dot x
> >>        val newAlpha = if(qii != 0.0){
> >> -        math.min(math.max((alpha - (grad / qii)), 0.0), 1.0)
> >> +        scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0),
> 1.0)
> >>        } else {
> >>          1.0
> >>        }
> >>
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> >> ----------------------------------------------------------------------
> >> diff --git
> >>
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> >>
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> >> index 2e3ed95..7992b02 100644
> >> ---
> >>
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> >> +++
> >>
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> >> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._
> >>  import org.apache.flink.api.common.typeinfo.TypeInformation
> >>  import org.apache.flink.api.scala._
> >>  import org.apache.flink.configuration.Configuration
> >> +import org.apache.flink.ml._
> >>  import org.apache.flink.ml.common.{LabeledVector, Parameter,
> >> ParameterMap}
> >>  import org.apache.flink.ml.math.Breeze._
> >>  import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
> >> @@ -209,20 +210,9 @@ object StandardScaler {
> >>
> >>          instance.metricsOption match {
> >>            case Some(metrics) => {
> >> -            input.map(new RichMapFunction[T, T]() {
> >> -
> >> -              var broadcastMean: linalg.Vector[Double] = null
> >> -              var broadcastStd: linalg.Vector[Double] = null
> >> -
> >> -              override def open(parameters: Configuration): Unit = {
> >> -                val broadcastedMetrics =
> >> getRuntimeContext().getBroadcastVariable[
> >> -                    (linalg.Vector[Double], linalg.Vector[Double])
> >> -                  ]("broadcastedMetrics").get(0)
> >> -                broadcastMean = broadcastedMetrics._1
> >> -                broadcastStd = broadcastedMetrics._2
> >> -              }
> >> -
> >> -              override def map(vector: T): T = {
> >> +            input.mapWithBcVariable(metrics){
> >> +              (vector, metrics) => {
> >> +                val (broadcastMean, broadcastStd) = metrics
> >>                  var myVector = vector.asBreeze
> >>
> >>                  myVector -= broadcastMean
> >> @@ -230,7 +220,7 @@ object StandardScaler {
> >>                  myVector = (myVector :* std) + mean
> >>                  myVector.fromBreeze
> >>                }
> >> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
> >> +            }
> >>            }
> >>
> >>            case None =>
> >> @@ -251,20 +241,9 @@ object StandardScaler {
> >>
> >>          instance.metricsOption match {
> >>            case Some(metrics) => {
> >> -            input.map(new RichMapFunction[LabeledVector,
> >> LabeledVector]() {
> >> -
> >> -              var broadcastMean: linalg.Vector[Double] = null
> >> -              var broadcastStd: linalg.Vector[Double] = null
> >> -
> >> -              override def open(parameters: Configuration): Unit = {
> >> -                val broadcastedMetrics =
> >> getRuntimeContext().getBroadcastVariable[
> >> -                  (linalg.Vector[Double], linalg.Vector[Double])
> >> -                  ]("broadcastedMetrics").get(0)
> >> -                broadcastMean = broadcastedMetrics._1
> >> -                broadcastStd = broadcastedMetrics._2
> >> -              }
> >> -
> >> -              override def map(labeledVector: LabeledVector):
> >> LabeledVector = {
> >> +            input.mapWithBcVariable(metrics){
> >> +              (labeledVector, metrics) => {
> >> +                val (broadcastMean, broadcastStd) = metrics
> >>                  val LabeledVector(label, vector) = labeledVector
> >>                  var breezeVector = vector.asBreeze
> >>
> >> @@ -273,7 +252,7 @@ object StandardScaler {
> >>                  breezeVector = (breezeVector :* std) + mean
> >>                  LabeledVector(label, breezeVector.fromBreeze[Vector])
> >>                }
> >> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
> >> +            }
> >>            }
> >>
> >>            case None =>
> >>
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [2/2] flink git commit: [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML

Sachin Goel
Should I go ahead and add this method then? The mapWithBcSet I mean.

Regards
Sachin Goel

On Tue, Jun 2, 2015 at 10:43 PM, Till Rohrmann <[hidden email]>
wrote:

> Yes you’re right Sachin. The mapWithBcVariable is only syntactic sugar if
> you have a broadcast DataSet which contains only one element. If you have
> multiple elements in your DataSet then you can’t use this method. But we
> can define another method mapWithBcSet which takes a function f: (element:
> T, broadcastValues: List[B]) => O, for example.
>
> If you have multiple DataSet which fulfil this condition, then you can wrap
> them in a tuple as you’ve said.
>
> Cheers,
> Till
> ​
>
> On Tue, Jun 2, 2015 at 7:10 PM, Sachin Goel <[hidden email]>
> wrote:
>
> > Further, I think we should return just
> > broadcastVariable = getRuntimeContext.
> > getBroadcastVariable[B]("broadcastVariable")
> > in BroadcastSingleElementMapper
> > User may wish to have a list broadcasted, and not just want to access the
> > first element. For example, this would make sense in the kmeans
> algorithm.
> >
> > Regards
> > Sachin Goel
> >
> > On Tue, Jun 2, 2015 at 9:03 PM, Sachin Goel <[hidden email]>
> > wrote:
> >
> > > Hi Till
> > > This works only when there is only one variable to be broadcasted,
> > doesn't
> > > it? What about the case when we need to broadcast two? Is it advisable
> to
> > > create a BroadcastDoubleElementMapper class or perhaps we could just
> > send a
> > > tuple of all the variables? Perhaps that is a better idea.
> > >
> > > Regards
> > > Sachin Goel
> > >
> > > On Tue, Jun 2, 2015 at 8:15 PM, <[hidden email]> wrote:
> > >
> > >> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML
> > >>
> > >>
> > >> Project: http://git-wip-us.apache.org/repos/asf/flink/repo
> > >> Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5
> > >> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5
> > >> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5
> > >>
> > >> Branch: refs/heads/master
> > >> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c
> > >> Parents: 44dae0c
> > >> Author: Till Rohrmann <[hidden email]>
> > >> Authored: Tue Jun 2 14:45:12 2015 +0200
> > >> Committer: Till Rohrmann <[hidden email]>
> > >> Committed: Tue Jun 2 15:34:54 2015 +0200
> > >>
> > >> ----------------------------------------------------------------------
> > >>  .../apache/flink/ml/classification/SVM.scala    | 73
> > ++++++--------------
> > >>  .../flink/ml/preprocessing/StandardScaler.scala | 39 +++--------
> > >>  2 files changed, 30 insertions(+), 82 deletions(-)
> > >> ----------------------------------------------------------------------
> > >>
> > >>
> > >>
> > >>
> >
> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> > >> ----------------------------------------------------------------------
> > >> diff --git
> > >>
> >
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> > >>
> >
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> > >> index e01735f..c69b56a 100644
> > >> ---
> > >>
> >
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> > >> +++
> > >>
> >
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> > >> @@ -26,6 +26,7 @@ import scala.util.Random
> > >>  import org.apache.flink.api.common.functions.RichMapFunction
> > >>  import org.apache.flink.api.scala._
> > >>  import org.apache.flink.configuration.Configuration
> > >> +import org.apache.flink.ml._
> > >>  import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
> > >>  import org.apache.flink.ml.common._
> > >>  import org.apache.flink.ml.math.Vector
> > >> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] {
> > >>    * of the algorithm.
> > >>    */
> > >>  object SVM{
> > >> +
> > >>    val WEIGHT_VECTOR ="weightVector"
> > >>
> > >>    // ========================================== Parameters
> > >> =========================================
> > >> @@ -242,7 +244,13 @@ object SVM{
> > >>
> > >>          instance.weightsOption match {
> > >>            case Some(weights) => {
> > >> -            input.map(new
> > PredictionMapper[T]).withBroadcastSet(weights,
> > >> WEIGHT_VECTOR)
> > >> +            input.mapWithBcVariable(weights){
> > >> +              (vector, weights) => {
> > >> +                val dotProduct = weights dot vector.asBreeze
> > >> +
> > >> +                LabeledVector(dotProduct, vector)
> > >> +              }
> > >> +            }
> > >>            }
> > >>
> > >>            case None => {
> > >> @@ -254,28 +262,6 @@ object SVM{
> > >>      }
> > >>    }
> > >>
> > >> -  /** Mapper to calculate the value of the prediction function. This
> is
> > >> a RichMapFunction, because
> > >> -    * we broadcast the weight vector to all mappers.
> > >> -    */
> > >> -  class PredictionMapper[T <: Vector] extends RichMapFunction[T,
> > >> LabeledVector] {
> > >> -
> > >> -    var weights: BreezeDenseVector[Double] = _
> > >> -
> > >> -    @throws(classOf[Exception])
> > >> -    override def open(configuration: Configuration): Unit = {
> > >> -      // get current weights
> > >> -      weights = getRuntimeContext.
> > >> -
> > >> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
> > >> -    }
> > >> -
> > >> -    override def map(vector: T): LabeledVector = {
> > >> -      // calculate the prediction value (scaled distance from the
> > >> separating hyperplane)
> > >> -      val dotProduct = weights dot vector.asBreeze
> > >> -
> > >> -      LabeledVector(dotProduct, vector)
> > >> -    }
> > >> -  }
> > >> -
> > >>    /** [[org.apache.flink.ml.pipeline.PredictOperation]] for
> > >> [[LabeledVector ]]types. The result type
> > >>      * is a [[(Double, Double)]] tuple, corresponding to (truth,
> > >> prediction)
> > >>      *
> > >> @@ -291,7 +277,14 @@ object SVM{
> > >>
> > >>          instance.weightsOption match {
> > >>            case Some(weights) => {
> > >> -            input.map(new
> > >> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR)
> > >> +            input.mapWithBcVariable(weights){
> > >> +              (labeledVector, weights) => {
> > >> +                val prediction = weights dot
> > >> labeledVector.vector.asBreeze
> > >> +                val truth = labeledVector.label
> > >> +
> > >> +                (truth, prediction)
> > >> +              }
> > >> +            }
> > >>            }
> > >>
> > >>            case None => {
> > >> @@ -303,30 +296,6 @@ object SVM{
> > >>      }
> > >>    }
> > >>
> > >> -  /** Mapper to calculate the value of the prediction function. This
> is
> > >> a RichMapFunction, because
> > >> -    * we broadcast the weight vector to all mappers.
> > >> -    */
> > >> -  class LabeledPredictionMapper extends
> RichMapFunction[LabeledVector,
> > >> (Double, Double)] {
> > >> -
> > >> -    var weights: BreezeDenseVector[Double] = _
> > >> -
> > >> -    @throws(classOf[Exception])
> > >> -    override def open(configuration: Configuration): Unit = {
> > >> -      // get current weights
> > >> -      weights = getRuntimeContext.
> > >> -
> > >> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
> > >> -    }
> > >> -
> > >> -    override def map(labeledVector: LabeledVector): (Double, Double)
> =
> > {
> > >> -      // calculate the prediction value (scaled distance from the
> > >> separating hyperplane)
> > >> -      val prediction = weights dot labeledVector.vector.asBreeze
> > >> -      val truth = labeledVector.label
> > >> -
> > >> -      (truth, prediction)
> > >> -    }
> > >> -  }
> > >> -
> > >> -
> > >>    /** [[FitOperation]] which trains a SVM with soft-margin based on
> the
> > >> given training data set.
> > >>      *
> > >>      */
> > >> @@ -540,17 +509,17 @@ object SVM{
> > >>
> > >>      // compute projected gradient
> > >>      var proj_grad = if(alpha  <= 0.0){
> > >> -      math.min(grad, 0)
> > >> +      scala.math.min(grad, 0)
> > >>      } else if(alpha >= 1.0) {
> > >> -      math.max(grad, 0)
> > >> +      scala.math.max(grad, 0)
> > >>      } else {
> > >>        grad
> > >>      }
> > >>
> > >> -    if(math.abs(grad) != 0.0){
> > >> +    if(scala.math.abs(grad) != 0.0){
> > >>        val qii = x dot x
> > >>        val newAlpha = if(qii != 0.0){
> > >> -        math.min(math.max((alpha - (grad / qii)), 0.0), 1.0)
> > >> +        scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0),
> > 1.0)
> > >>        } else {
> > >>          1.0
> > >>        }
> > >>
> > >>
> > >>
> >
> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> > >> ----------------------------------------------------------------------
> > >> diff --git
> > >>
> >
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> > >>
> >
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> > >> index 2e3ed95..7992b02 100644
> > >> ---
> > >>
> >
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> > >> +++
> > >>
> >
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> > >> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._
> > >>  import org.apache.flink.api.common.typeinfo.TypeInformation
> > >>  import org.apache.flink.api.scala._
> > >>  import org.apache.flink.configuration.Configuration
> > >> +import org.apache.flink.ml._
> > >>  import org.apache.flink.ml.common.{LabeledVector, Parameter,
> > >> ParameterMap}
> > >>  import org.apache.flink.ml.math.Breeze._
> > >>  import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
> > >> @@ -209,20 +210,9 @@ object StandardScaler {
> > >>
> > >>          instance.metricsOption match {
> > >>            case Some(metrics) => {
> > >> -            input.map(new RichMapFunction[T, T]() {
> > >> -
> > >> -              var broadcastMean: linalg.Vector[Double] = null
> > >> -              var broadcastStd: linalg.Vector[Double] = null
> > >> -
> > >> -              override def open(parameters: Configuration): Unit = {
> > >> -                val broadcastedMetrics =
> > >> getRuntimeContext().getBroadcastVariable[
> > >> -                    (linalg.Vector[Double], linalg.Vector[Double])
> > >> -                  ]("broadcastedMetrics").get(0)
> > >> -                broadcastMean = broadcastedMetrics._1
> > >> -                broadcastStd = broadcastedMetrics._2
> > >> -              }
> > >> -
> > >> -              override def map(vector: T): T = {
> > >> +            input.mapWithBcVariable(metrics){
> > >> +              (vector, metrics) => {
> > >> +                val (broadcastMean, broadcastStd) = metrics
> > >>                  var myVector = vector.asBreeze
> > >>
> > >>                  myVector -= broadcastMean
> > >> @@ -230,7 +220,7 @@ object StandardScaler {
> > >>                  myVector = (myVector :* std) + mean
> > >>                  myVector.fromBreeze
> > >>                }
> > >> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
> > >> +            }
> > >>            }
> > >>
> > >>            case None =>
> > >> @@ -251,20 +241,9 @@ object StandardScaler {
> > >>
> > >>          instance.metricsOption match {
> > >>            case Some(metrics) => {
> > >> -            input.map(new RichMapFunction[LabeledVector,
> > >> LabeledVector]() {
> > >> -
> > >> -              var broadcastMean: linalg.Vector[Double] = null
> > >> -              var broadcastStd: linalg.Vector[Double] = null
> > >> -
> > >> -              override def open(parameters: Configuration): Unit = {
> > >> -                val broadcastedMetrics =
> > >> getRuntimeContext().getBroadcastVariable[
> > >> -                  (linalg.Vector[Double], linalg.Vector[Double])
> > >> -                  ]("broadcastedMetrics").get(0)
> > >> -                broadcastMean = broadcastedMetrics._1
> > >> -                broadcastStd = broadcastedMetrics._2
> > >> -              }
> > >> -
> > >> -              override def map(labeledVector: LabeledVector):
> > >> LabeledVector = {
> > >> +            input.mapWithBcVariable(metrics){
> > >> +              (labeledVector, metrics) => {
> > >> +                val (broadcastMean, broadcastStd) = metrics
> > >>                  val LabeledVector(label, vector) = labeledVector
> > >>                  var breezeVector = vector.asBreeze
> > >>
> > >> @@ -273,7 +252,7 @@ object StandardScaler {
> > >>                  breezeVector = (breezeVector :* std) + mean
> > >>                  LabeledVector(label, breezeVector.fromBreeze[Vector])
> > >>                }
> > >> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
> > >> +            }
> > >>            }
> > >>
> > >>            case None =>
> > >>
> > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [2/2] flink git commit: [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML

till.rohrmann
If it helps you with your task, then you can add it. The best thing is
probably to implement it similarly to the mapWithBcVariable.

Cheers,
Till


On Tue, Jun 2, 2015 at 7:25 PM, Sachin Goel <[hidden email]>
wrote:

> Should I go ahead and add this method then? The mapWithBcSet I mean.
>
> Regards
> Sachin Goel
>
> On Tue, Jun 2, 2015 at 10:43 PM, Till Rohrmann <[hidden email]>
> wrote:
>
> > Yes you’re right Sachin. The mapWithBcVariable is only syntactic sugar if
> > you have a broadcast DataSet which contains only one element. If you have
> > multiple elements in your DataSet then you can’t use this method. But we
> > can define another method mapWithBcSet which takes a function f:
> (element:
> > T, broadcastValues: List[B]) => O, for example.
> >
> > If you have multiple DataSet which fulfil this condition, then you can
> wrap
> > them in a tuple as you’ve said.
> >
> > Cheers,
> > Till
> > ​
> >
> > On Tue, Jun 2, 2015 at 7:10 PM, Sachin Goel <[hidden email]>
> > wrote:
> >
> > > Further, I think we should return just
> > > broadcastVariable = getRuntimeContext.
> > > getBroadcastVariable[B]("broadcastVariable")
> > > in BroadcastSingleElementMapper
> > > User may wish to have a list broadcasted, and not just want to access
> the
> > > first element. For example, this would make sense in the kmeans
> > algorithm.
> > >
> > > Regards
> > > Sachin Goel
> > >
> > > On Tue, Jun 2, 2015 at 9:03 PM, Sachin Goel <[hidden email]>
> > > wrote:
> > >
> > > > Hi Till
> > > > This works only when there is only one variable to be broadcasted,
> > > doesn't
> > > > it? What about the case when we need to broadcast two? Is it
> advisable
> > to
> > > > create a BroadcastDoubleElementMapper class or perhaps we could just
> > > send a
> > > > tuple of all the variables? Perhaps that is a better idea.
> > > >
> > > > Regards
> > > > Sachin Goel
> > > >
> > > > On Tue, Jun 2, 2015 at 8:15 PM, <[hidden email]> wrote:
> > > >
> > > >> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML
> > > >>
> > > >>
> > > >> Project: http://git-wip-us.apache.org/repos/asf/flink/repo
> > > >> Commit:
> http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5
> > > >> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5
> > > >> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5
> > > >>
> > > >> Branch: refs/heads/master
> > > >> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c
> > > >> Parents: 44dae0c
> > > >> Author: Till Rohrmann <[hidden email]>
> > > >> Authored: Tue Jun 2 14:45:12 2015 +0200
> > > >> Committer: Till Rohrmann <[hidden email]>
> > > >> Committed: Tue Jun 2 15:34:54 2015 +0200
> > > >>
> > > >>
> ----------------------------------------------------------------------
> > > >>  .../apache/flink/ml/classification/SVM.scala    | 73
> > > ++++++--------------
> > > >>  .../flink/ml/preprocessing/StandardScaler.scala | 39 +++--------
> > > >>  2 files changed, 30 insertions(+), 82 deletions(-)
> > > >>
> ----------------------------------------------------------------------
> > > >>
> > > >>
> > > >>
> > > >>
> > >
> >
> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> > > >>
> ----------------------------------------------------------------------
> > > >> diff --git
> > > >>
> > >
> >
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> > > >>
> > >
> >
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> > > >> index e01735f..c69b56a 100644
> > > >> ---
> > > >>
> > >
> >
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> > > >> +++
> > > >>
> > >
> >
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> > > >> @@ -26,6 +26,7 @@ import scala.util.Random
> > > >>  import org.apache.flink.api.common.functions.RichMapFunction
> > > >>  import org.apache.flink.api.scala._
> > > >>  import org.apache.flink.configuration.Configuration
> > > >> +import org.apache.flink.ml._
> > > >>  import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
> > > >>  import org.apache.flink.ml.common._
> > > >>  import org.apache.flink.ml.math.Vector
> > > >> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] {
> > > >>    * of the algorithm.
> > > >>    */
> > > >>  object SVM{
> > > >> +
> > > >>    val WEIGHT_VECTOR ="weightVector"
> > > >>
> > > >>    // ========================================== Parameters
> > > >> =========================================
> > > >> @@ -242,7 +244,13 @@ object SVM{
> > > >>
> > > >>          instance.weightsOption match {
> > > >>            case Some(weights) => {
> > > >> -            input.map(new
> > > PredictionMapper[T]).withBroadcastSet(weights,
> > > >> WEIGHT_VECTOR)
> > > >> +            input.mapWithBcVariable(weights){
> > > >> +              (vector, weights) => {
> > > >> +                val dotProduct = weights dot vector.asBreeze
> > > >> +
> > > >> +                LabeledVector(dotProduct, vector)
> > > >> +              }
> > > >> +            }
> > > >>            }
> > > >>
> > > >>            case None => {
> > > >> @@ -254,28 +262,6 @@ object SVM{
> > > >>      }
> > > >>    }
> > > >>
> > > >> -  /** Mapper to calculate the value of the prediction function.
> This
> > is
> > > >> a RichMapFunction, because
> > > >> -    * we broadcast the weight vector to all mappers.
> > > >> -    */
> > > >> -  class PredictionMapper[T <: Vector] extends RichMapFunction[T,
> > > >> LabeledVector] {
> > > >> -
> > > >> -    var weights: BreezeDenseVector[Double] = _
> > > >> -
> > > >> -    @throws(classOf[Exception])
> > > >> -    override def open(configuration: Configuration): Unit = {
> > > >> -      // get current weights
> > > >> -      weights = getRuntimeContext.
> > > >> -
> > > >>
> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
> > > >> -    }
> > > >> -
> > > >> -    override def map(vector: T): LabeledVector = {
> > > >> -      // calculate the prediction value (scaled distance from the
> > > >> separating hyperplane)
> > > >> -      val dotProduct = weights dot vector.asBreeze
> > > >> -
> > > >> -      LabeledVector(dotProduct, vector)
> > > >> -    }
> > > >> -  }
> > > >> -
> > > >>    /** [[org.apache.flink.ml.pipeline.PredictOperation]] for
> > > >> [[LabeledVector ]]types. The result type
> > > >>      * is a [[(Double, Double)]] tuple, corresponding to (truth,
> > > >> prediction)
> > > >>      *
> > > >> @@ -291,7 +277,14 @@ object SVM{
> > > >>
> > > >>          instance.weightsOption match {
> > > >>            case Some(weights) => {
> > > >> -            input.map(new
> > > >> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR)
> > > >> +            input.mapWithBcVariable(weights){
> > > >> +              (labeledVector, weights) => {
> > > >> +                val prediction = weights dot
> > > >> labeledVector.vector.asBreeze
> > > >> +                val truth = labeledVector.label
> > > >> +
> > > >> +                (truth, prediction)
> > > >> +              }
> > > >> +            }
> > > >>            }
> > > >>
> > > >>            case None => {
> > > >> @@ -303,30 +296,6 @@ object SVM{
> > > >>      }
> > > >>    }
> > > >>
> > > >> -  /** Mapper to calculate the value of the prediction function.
> This
> > is
> > > >> a RichMapFunction, because
> > > >> -    * we broadcast the weight vector to all mappers.
> > > >> -    */
> > > >> -  class LabeledPredictionMapper extends
> > RichMapFunction[LabeledVector,
> > > >> (Double, Double)] {
> > > >> -
> > > >> -    var weights: BreezeDenseVector[Double] = _
> > > >> -
> > > >> -    @throws(classOf[Exception])
> > > >> -    override def open(configuration: Configuration): Unit = {
> > > >> -      // get current weights
> > > >> -      weights = getRuntimeContext.
> > > >> -
> > > >>
> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
> > > >> -    }
> > > >> -
> > > >> -    override def map(labeledVector: LabeledVector): (Double,
> Double)
> > =
> > > {
> > > >> -      // calculate the prediction value (scaled distance from the
> > > >> separating hyperplane)
> > > >> -      val prediction = weights dot labeledVector.vector.asBreeze
> > > >> -      val truth = labeledVector.label
> > > >> -
> > > >> -      (truth, prediction)
> > > >> -    }
> > > >> -  }
> > > >> -
> > > >> -
> > > >>    /** [[FitOperation]] which trains a SVM with soft-margin based on
> > the
> > > >> given training data set.
> > > >>      *
> > > >>      */
> > > >> @@ -540,17 +509,17 @@ object SVM{
> > > >>
> > > >>      // compute projected gradient
> > > >>      var proj_grad = if(alpha  <= 0.0){
> > > >> -      math.min(grad, 0)
> > > >> +      scala.math.min(grad, 0)
> > > >>      } else if(alpha >= 1.0) {
> > > >> -      math.max(grad, 0)
> > > >> +      scala.math.max(grad, 0)
> > > >>      } else {
> > > >>        grad
> > > >>      }
> > > >>
> > > >> -    if(math.abs(grad) != 0.0){
> > > >> +    if(scala.math.abs(grad) != 0.0){
> > > >>        val qii = x dot x
> > > >>        val newAlpha = if(qii != 0.0){
> > > >> -        math.min(math.max((alpha - (grad / qii)), 0.0), 1.0)
> > > >> +        scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0),
> > > 1.0)
> > > >>        } else {
> > > >>          1.0
> > > >>        }
> > > >>
> > > >>
> > > >>
> > >
> >
> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> > > >>
> ----------------------------------------------------------------------
> > > >> diff --git
> > > >>
> > >
> >
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> > > >>
> > >
> >
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> > > >> index 2e3ed95..7992b02 100644
> > > >> ---
> > > >>
> > >
> >
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> > > >> +++
> > > >>
> > >
> >
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> > > >> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._
> > > >>  import org.apache.flink.api.common.typeinfo.TypeInformation
> > > >>  import org.apache.flink.api.scala._
> > > >>  import org.apache.flink.configuration.Configuration
> > > >> +import org.apache.flink.ml._
> > > >>  import org.apache.flink.ml.common.{LabeledVector, Parameter,
> > > >> ParameterMap}
> > > >>  import org.apache.flink.ml.math.Breeze._
> > > >>  import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
> > > >> @@ -209,20 +210,9 @@ object StandardScaler {
> > > >>
> > > >>          instance.metricsOption match {
> > > >>            case Some(metrics) => {
> > > >> -            input.map(new RichMapFunction[T, T]() {
> > > >> -
> > > >> -              var broadcastMean: linalg.Vector[Double] = null
> > > >> -              var broadcastStd: linalg.Vector[Double] = null
> > > >> -
> > > >> -              override def open(parameters: Configuration): Unit =
> {
> > > >> -                val broadcastedMetrics =
> > > >> getRuntimeContext().getBroadcastVariable[
> > > >> -                    (linalg.Vector[Double], linalg.Vector[Double])
> > > >> -                  ]("broadcastedMetrics").get(0)
> > > >> -                broadcastMean = broadcastedMetrics._1
> > > >> -                broadcastStd = broadcastedMetrics._2
> > > >> -              }
> > > >> -
> > > >> -              override def map(vector: T): T = {
> > > >> +            input.mapWithBcVariable(metrics){
> > > >> +              (vector, metrics) => {
> > > >> +                val (broadcastMean, broadcastStd) = metrics
> > > >>                  var myVector = vector.asBreeze
> > > >>
> > > >>                  myVector -= broadcastMean
> > > >> @@ -230,7 +220,7 @@ object StandardScaler {
> > > >>                  myVector = (myVector :* std) + mean
> > > >>                  myVector.fromBreeze
> > > >>                }
> > > >> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
> > > >> +            }
> > > >>            }
> > > >>
> > > >>            case None =>
> > > >> @@ -251,20 +241,9 @@ object StandardScaler {
> > > >>
> > > >>          instance.metricsOption match {
> > > >>            case Some(metrics) => {
> > > >> -            input.map(new RichMapFunction[LabeledVector,
> > > >> LabeledVector]() {
> > > >> -
> > > >> -              var broadcastMean: linalg.Vector[Double] = null
> > > >> -              var broadcastStd: linalg.Vector[Double] = null
> > > >> -
> > > >> -              override def open(parameters: Configuration): Unit =
> {
> > > >> -                val broadcastedMetrics =
> > > >> getRuntimeContext().getBroadcastVariable[
> > > >> -                  (linalg.Vector[Double], linalg.Vector[Double])
> > > >> -                  ]("broadcastedMetrics").get(0)
> > > >> -                broadcastMean = broadcastedMetrics._1
> > > >> -                broadcastStd = broadcastedMetrics._2
> > > >> -              }
> > > >> -
> > > >> -              override def map(labeledVector: LabeledVector):
> > > >> LabeledVector = {
> > > >> +            input.mapWithBcVariable(metrics){
> > > >> +              (labeledVector, metrics) => {
> > > >> +                val (broadcastMean, broadcastStd) = metrics
> > > >>                  val LabeledVector(label, vector) = labeledVector
> > > >>                  var breezeVector = vector.asBreeze
> > > >>
> > > >> @@ -273,7 +252,7 @@ object StandardScaler {
> > > >>                  breezeVector = (breezeVector :* std) + mean
> > > >>                  LabeledVector(label,
> breezeVector.fromBreeze[Vector])
> > > >>                }
> > > >> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
> > > >> +            }
> > > >>            }
> > > >>
> > > >>            case None =>
> > > >>
> > > >>
> > > >
> > >
> >
>