|
Hi Till
This works only when there is only one variable to be broadcasted, doesn't it? What about the case when we need to broadcast two? Is it advisable to create a BroadcastDoubleElementMapper class or perhaps we could just send a tuple of all the variables? Perhaps that is a better idea. Regards Sachin Goel On Tue, Jun 2, 2015 at 8:15 PM, <[hidden email]> wrote: > [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML > > > Project: http://git-wip-us.apache.org/repos/asf/flink/repo > Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5 > Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5 > Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5 > > Branch: refs/heads/master > Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c > Parents: 44dae0c > Author: Till Rohrmann <[hidden email]> > Authored: Tue Jun 2 14:45:12 2015 +0200 > Committer: Till Rohrmann <[hidden email]> > Committed: Tue Jun 2 15:34:54 2015 +0200 > > ---------------------------------------------------------------------- > .../apache/flink/ml/classification/SVM.scala | 73 ++++++-------------- > .../flink/ml/preprocessing/StandardScaler.scala | 39 +++-------- > 2 files changed, 30 insertions(+), 82 deletions(-) > ---------------------------------------------------------------------- > > > > http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > ---------------------------------------------------------------------- > diff --git > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > index e01735f..c69b56a 100644 > --- > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > +++ > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > @@ -26,6 +26,7 @@ import scala.util.Random > import org.apache.flink.api.common.functions.RichMapFunction > import org.apache.flink.api.scala._ > import org.apache.flink.configuration.Configuration > +import org.apache.flink.ml._ > import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner > import org.apache.flink.ml.common._ > import org.apache.flink.ml.math.Vector > @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] { > * of the algorithm. > */ > object SVM{ > + > val WEIGHT_VECTOR ="weightVector" > > // ========================================== Parameters > ========================================= > @@ -242,7 +244,13 @@ object SVM{ > > instance.weightsOption match { > case Some(weights) => { > - input.map(new PredictionMapper[T]).withBroadcastSet(weights, > WEIGHT_VECTOR) > + input.mapWithBcVariable(weights){ > + (vector, weights) => { > + val dotProduct = weights dot vector.asBreeze > + > + LabeledVector(dotProduct, vector) > + } > + } > } > > case None => { > @@ -254,28 +262,6 @@ object SVM{ > } > } > > - /** Mapper to calculate the value of the prediction function. This is a > RichMapFunction, because > - * we broadcast the weight vector to all mappers. > - */ > - class PredictionMapper[T <: Vector] extends RichMapFunction[T, > LabeledVector] { > - > - var weights: BreezeDenseVector[Double] = _ > - > - @throws(classOf[Exception]) > - override def open(configuration: Configuration): Unit = { > - // get current weights > - weights = getRuntimeContext. > - > getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) > - } > - > - override def map(vector: T): LabeledVector = { > - // calculate the prediction value (scaled distance from the > separating hyperplane) > - val dotProduct = weights dot vector.asBreeze > - > - LabeledVector(dotProduct, vector) > - } > - } > - > /** [[org.apache.flink.ml.pipeline.PredictOperation]] for > [[LabeledVector ]]types. The result type > * is a [[(Double, Double)]] tuple, corresponding to (truth, > prediction) > * > @@ -291,7 +277,14 @@ object SVM{ > > instance.weightsOption match { > case Some(weights) => { > - input.map(new > LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR) > + input.mapWithBcVariable(weights){ > + (labeledVector, weights) => { > + val prediction = weights dot labeledVector.vector.asBreeze > + val truth = labeledVector.label > + > + (truth, prediction) > + } > + } > } > > case None => { > @@ -303,30 +296,6 @@ object SVM{ > } > } > > - /** Mapper to calculate the value of the prediction function. This is a > RichMapFunction, because > - * we broadcast the weight vector to all mappers. > - */ > - class LabeledPredictionMapper extends RichMapFunction[LabeledVector, > (Double, Double)] { > - > - var weights: BreezeDenseVector[Double] = _ > - > - @throws(classOf[Exception]) > - override def open(configuration: Configuration): Unit = { > - // get current weights > - weights = getRuntimeContext. > - > getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) > - } > - > - override def map(labeledVector: LabeledVector): (Double, Double) = { > - // calculate the prediction value (scaled distance from the > separating hyperplane) > - val prediction = weights dot labeledVector.vector.asBreeze > - val truth = labeledVector.label > - > - (truth, prediction) > - } > - } > - > - > /** [[FitOperation]] which trains a SVM with soft-margin based on the > given training data set. > * > */ > @@ -540,17 +509,17 @@ object SVM{ > > // compute projected gradient > var proj_grad = if(alpha <= 0.0){ > - math.min(grad, 0) > + scala.math.min(grad, 0) > } else if(alpha >= 1.0) { > - math.max(grad, 0) > + scala.math.max(grad, 0) > } else { > grad > } > > - if(math.abs(grad) != 0.0){ > + if(scala.math.abs(grad) != 0.0){ > val qii = x dot x > val newAlpha = if(qii != 0.0){ > - math.min(math.max((alpha - (grad / qii)), 0.0), 1.0) > + scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0), 1.0) > } else { > 1.0 > } > > > http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > ---------------------------------------------------------------------- > diff --git > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > index 2e3ed95..7992b02 100644 > --- > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > +++ > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._ > import org.apache.flink.api.common.typeinfo.TypeInformation > import org.apache.flink.api.scala._ > import org.apache.flink.configuration.Configuration > +import org.apache.flink.ml._ > import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap} > import org.apache.flink.ml.math.Breeze._ > import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} > @@ -209,20 +210,9 @@ object StandardScaler { > > instance.metricsOption match { > case Some(metrics) => { > - input.map(new RichMapFunction[T, T]() { > - > - var broadcastMean: linalg.Vector[Double] = null > - var broadcastStd: linalg.Vector[Double] = null > - > - override def open(parameters: Configuration): Unit = { > - val broadcastedMetrics = > getRuntimeContext().getBroadcastVariable[ > - (linalg.Vector[Double], linalg.Vector[Double]) > - ]("broadcastedMetrics").get(0) > - broadcastMean = broadcastedMetrics._1 > - broadcastStd = broadcastedMetrics._2 > - } > - > - override def map(vector: T): T = { > + input.mapWithBcVariable(metrics){ > + (vector, metrics) => { > + val (broadcastMean, broadcastStd) = metrics > var myVector = vector.asBreeze > > myVector -= broadcastMean > @@ -230,7 +220,7 @@ object StandardScaler { > myVector = (myVector :* std) + mean > myVector.fromBreeze > } > - }).withBroadcastSet(metrics, "broadcastedMetrics") > + } > } > > case None => > @@ -251,20 +241,9 @@ object StandardScaler { > > instance.metricsOption match { > case Some(metrics) => { > - input.map(new RichMapFunction[LabeledVector, LabeledVector]() > { > - > - var broadcastMean: linalg.Vector[Double] = null > - var broadcastStd: linalg.Vector[Double] = null > - > - override def open(parameters: Configuration): Unit = { > - val broadcastedMetrics = > getRuntimeContext().getBroadcastVariable[ > - (linalg.Vector[Double], linalg.Vector[Double]) > - ]("broadcastedMetrics").get(0) > - broadcastMean = broadcastedMetrics._1 > - broadcastStd = broadcastedMetrics._2 > - } > - > - override def map(labeledVector: LabeledVector): > LabeledVector = { > + input.mapWithBcVariable(metrics){ > + (labeledVector, metrics) => { > + val (broadcastMean, broadcastStd) = metrics > val LabeledVector(label, vector) = labeledVector > var breezeVector = vector.asBreeze > > @@ -273,7 +252,7 @@ object StandardScaler { > breezeVector = (breezeVector :* std) + mean > LabeledVector(label, breezeVector.fromBreeze[Vector]) > } > - }).withBroadcastSet(metrics, "broadcastedMetrics") > + } > } > > case None => > > |
|
Further, I think we should return just
broadcastVariable = getRuntimeContext. getBroadcastVariable[B]("broadcastVariable") in BroadcastSingleElementMapper User may wish to have a list broadcasted, and not just want to access the first element. For example, this would make sense in the kmeans algorithm. Regards Sachin Goel On Tue, Jun 2, 2015 at 9:03 PM, Sachin Goel <[hidden email]> wrote: > Hi Till > This works only when there is only one variable to be broadcasted, doesn't > it? What about the case when we need to broadcast two? Is it advisable to > create a BroadcastDoubleElementMapper class or perhaps we could just send a > tuple of all the variables? Perhaps that is a better idea. > > Regards > Sachin Goel > > On Tue, Jun 2, 2015 at 8:15 PM, <[hidden email]> wrote: > >> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML >> >> >> Project: http://git-wip-us.apache.org/repos/asf/flink/repo >> Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5 >> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5 >> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5 >> >> Branch: refs/heads/master >> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c >> Parents: 44dae0c >> Author: Till Rohrmann <[hidden email]> >> Authored: Tue Jun 2 14:45:12 2015 +0200 >> Committer: Till Rohrmann <[hidden email]> >> Committed: Tue Jun 2 15:34:54 2015 +0200 >> >> ---------------------------------------------------------------------- >> .../apache/flink/ml/classification/SVM.scala | 73 ++++++-------------- >> .../flink/ml/preprocessing/StandardScaler.scala | 39 +++-------- >> 2 files changed, 30 insertions(+), 82 deletions(-) >> ---------------------------------------------------------------------- >> >> >> >> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala >> ---------------------------------------------------------------------- >> diff --git >> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala >> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala >> index e01735f..c69b56a 100644 >> --- >> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala >> +++ >> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala >> @@ -26,6 +26,7 @@ import scala.util.Random >> import org.apache.flink.api.common.functions.RichMapFunction >> import org.apache.flink.api.scala._ >> import org.apache.flink.configuration.Configuration >> +import org.apache.flink.ml._ >> import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner >> import org.apache.flink.ml.common._ >> import org.apache.flink.ml.math.Vector >> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] { >> * of the algorithm. >> */ >> object SVM{ >> + >> val WEIGHT_VECTOR ="weightVector" >> >> // ========================================== Parameters >> ========================================= >> @@ -242,7 +244,13 @@ object SVM{ >> >> instance.weightsOption match { >> case Some(weights) => { >> - input.map(new PredictionMapper[T]).withBroadcastSet(weights, >> WEIGHT_VECTOR) >> + input.mapWithBcVariable(weights){ >> + (vector, weights) => { >> + val dotProduct = weights dot vector.asBreeze >> + >> + LabeledVector(dotProduct, vector) >> + } >> + } >> } >> >> case None => { >> @@ -254,28 +262,6 @@ object SVM{ >> } >> } >> >> - /** Mapper to calculate the value of the prediction function. This is >> a RichMapFunction, because >> - * we broadcast the weight vector to all mappers. >> - */ >> - class PredictionMapper[T <: Vector] extends RichMapFunction[T, >> LabeledVector] { >> - >> - var weights: BreezeDenseVector[Double] = _ >> - >> - @throws(classOf[Exception]) >> - override def open(configuration: Configuration): Unit = { >> - // get current weights >> - weights = getRuntimeContext. >> - >> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) >> - } >> - >> - override def map(vector: T): LabeledVector = { >> - // calculate the prediction value (scaled distance from the >> separating hyperplane) >> - val dotProduct = weights dot vector.asBreeze >> - >> - LabeledVector(dotProduct, vector) >> - } >> - } >> - >> /** [[org.apache.flink.ml.pipeline.PredictOperation]] for >> [[LabeledVector ]]types. The result type >> * is a [[(Double, Double)]] tuple, corresponding to (truth, >> prediction) >> * >> @@ -291,7 +277,14 @@ object SVM{ >> >> instance.weightsOption match { >> case Some(weights) => { >> - input.map(new >> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR) >> + input.mapWithBcVariable(weights){ >> + (labeledVector, weights) => { >> + val prediction = weights dot >> labeledVector.vector.asBreeze >> + val truth = labeledVector.label >> + >> + (truth, prediction) >> + } >> + } >> } >> >> case None => { >> @@ -303,30 +296,6 @@ object SVM{ >> } >> } >> >> - /** Mapper to calculate the value of the prediction function. This is >> a RichMapFunction, because >> - * we broadcast the weight vector to all mappers. >> - */ >> - class LabeledPredictionMapper extends RichMapFunction[LabeledVector, >> (Double, Double)] { >> - >> - var weights: BreezeDenseVector[Double] = _ >> - >> - @throws(classOf[Exception]) >> - override def open(configuration: Configuration): Unit = { >> - // get current weights >> - weights = getRuntimeContext. >> - >> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) >> - } >> - >> - override def map(labeledVector: LabeledVector): (Double, Double) = { >> - // calculate the prediction value (scaled distance from the >> separating hyperplane) >> - val prediction = weights dot labeledVector.vector.asBreeze >> - val truth = labeledVector.label >> - >> - (truth, prediction) >> - } >> - } >> - >> - >> /** [[FitOperation]] which trains a SVM with soft-margin based on the >> given training data set. >> * >> */ >> @@ -540,17 +509,17 @@ object SVM{ >> >> // compute projected gradient >> var proj_grad = if(alpha <= 0.0){ >> - math.min(grad, 0) >> + scala.math.min(grad, 0) >> } else if(alpha >= 1.0) { >> - math.max(grad, 0) >> + scala.math.max(grad, 0) >> } else { >> grad >> } >> >> - if(math.abs(grad) != 0.0){ >> + if(scala.math.abs(grad) != 0.0){ >> val qii = x dot x >> val newAlpha = if(qii != 0.0){ >> - math.min(math.max((alpha - (grad / qii)), 0.0), 1.0) >> + scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0), 1.0) >> } else { >> 1.0 >> } >> >> >> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala >> ---------------------------------------------------------------------- >> diff --git >> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala >> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala >> index 2e3ed95..7992b02 100644 >> --- >> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala >> +++ >> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala >> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._ >> import org.apache.flink.api.common.typeinfo.TypeInformation >> import org.apache.flink.api.scala._ >> import org.apache.flink.configuration.Configuration >> +import org.apache.flink.ml._ >> import org.apache.flink.ml.common.{LabeledVector, Parameter, >> ParameterMap} >> import org.apache.flink.ml.math.Breeze._ >> import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} >> @@ -209,20 +210,9 @@ object StandardScaler { >> >> instance.metricsOption match { >> case Some(metrics) => { >> - input.map(new RichMapFunction[T, T]() { >> - >> - var broadcastMean: linalg.Vector[Double] = null >> - var broadcastStd: linalg.Vector[Double] = null >> - >> - override def open(parameters: Configuration): Unit = { >> - val broadcastedMetrics = >> getRuntimeContext().getBroadcastVariable[ >> - (linalg.Vector[Double], linalg.Vector[Double]) >> - ]("broadcastedMetrics").get(0) >> - broadcastMean = broadcastedMetrics._1 >> - broadcastStd = broadcastedMetrics._2 >> - } >> - >> - override def map(vector: T): T = { >> + input.mapWithBcVariable(metrics){ >> + (vector, metrics) => { >> + val (broadcastMean, broadcastStd) = metrics >> var myVector = vector.asBreeze >> >> myVector -= broadcastMean >> @@ -230,7 +220,7 @@ object StandardScaler { >> myVector = (myVector :* std) + mean >> myVector.fromBreeze >> } >> - }).withBroadcastSet(metrics, "broadcastedMetrics") >> + } >> } >> >> case None => >> @@ -251,20 +241,9 @@ object StandardScaler { >> >> instance.metricsOption match { >> case Some(metrics) => { >> - input.map(new RichMapFunction[LabeledVector, >> LabeledVector]() { >> - >> - var broadcastMean: linalg.Vector[Double] = null >> - var broadcastStd: linalg.Vector[Double] = null >> - >> - override def open(parameters: Configuration): Unit = { >> - val broadcastedMetrics = >> getRuntimeContext().getBroadcastVariable[ >> - (linalg.Vector[Double], linalg.Vector[Double]) >> - ]("broadcastedMetrics").get(0) >> - broadcastMean = broadcastedMetrics._1 >> - broadcastStd = broadcastedMetrics._2 >> - } >> - >> - override def map(labeledVector: LabeledVector): >> LabeledVector = { >> + input.mapWithBcVariable(metrics){ >> + (labeledVector, metrics) => { >> + val (broadcastMean, broadcastStd) = metrics >> val LabeledVector(label, vector) = labeledVector >> var breezeVector = vector.asBreeze >> >> @@ -273,7 +252,7 @@ object StandardScaler { >> breezeVector = (breezeVector :* std) + mean >> LabeledVector(label, breezeVector.fromBreeze[Vector]) >> } >> - }).withBroadcastSet(metrics, "broadcastedMetrics") >> + } >> } >> >> case None => >> >> > |
|
Yes you’re right Sachin. The mapWithBcVariable is only syntactic sugar if
you have a broadcast DataSet which contains only one element. If you have multiple elements in your DataSet then you can’t use this method. But we can define another method mapWithBcSet which takes a function f: (element: T, broadcastValues: List[B]) => O, for example. If you have multiple DataSet which fulfil this condition, then you can wrap them in a tuple as you’ve said. Cheers, Till On Tue, Jun 2, 2015 at 7:10 PM, Sachin Goel <[hidden email]> wrote: > Further, I think we should return just > broadcastVariable = getRuntimeContext. > getBroadcastVariable[B]("broadcastVariable") > in BroadcastSingleElementMapper > User may wish to have a list broadcasted, and not just want to access the > first element. For example, this would make sense in the kmeans algorithm. > > Regards > Sachin Goel > > On Tue, Jun 2, 2015 at 9:03 PM, Sachin Goel <[hidden email]> > wrote: > > > Hi Till > > This works only when there is only one variable to be broadcasted, > doesn't > > it? What about the case when we need to broadcast two? Is it advisable to > > create a BroadcastDoubleElementMapper class or perhaps we could just > send a > > tuple of all the variables? Perhaps that is a better idea. > > > > Regards > > Sachin Goel > > > > On Tue, Jun 2, 2015 at 8:15 PM, <[hidden email]> wrote: > > > >> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML > >> > >> > >> Project: http://git-wip-us.apache.org/repos/asf/flink/repo > >> Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5 > >> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5 > >> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5 > >> > >> Branch: refs/heads/master > >> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c > >> Parents: 44dae0c > >> Author: Till Rohrmann <[hidden email]> > >> Authored: Tue Jun 2 14:45:12 2015 +0200 > >> Committer: Till Rohrmann <[hidden email]> > >> Committed: Tue Jun 2 15:34:54 2015 +0200 > >> > >> ---------------------------------------------------------------------- > >> .../apache/flink/ml/classification/SVM.scala | 73 > ++++++-------------- > >> .../flink/ml/preprocessing/StandardScaler.scala | 39 +++-------- > >> 2 files changed, 30 insertions(+), 82 deletions(-) > >> ---------------------------------------------------------------------- > >> > >> > >> > >> > http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > >> ---------------------------------------------------------------------- > >> diff --git > >> > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > >> > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > >> index e01735f..c69b56a 100644 > >> --- > >> > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > >> +++ > >> > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > >> @@ -26,6 +26,7 @@ import scala.util.Random > >> import org.apache.flink.api.common.functions.RichMapFunction > >> import org.apache.flink.api.scala._ > >> import org.apache.flink.configuration.Configuration > >> +import org.apache.flink.ml._ > >> import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner > >> import org.apache.flink.ml.common._ > >> import org.apache.flink.ml.math.Vector > >> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] { > >> * of the algorithm. > >> */ > >> object SVM{ > >> + > >> val WEIGHT_VECTOR ="weightVector" > >> > >> // ========================================== Parameters > >> ========================================= > >> @@ -242,7 +244,13 @@ object SVM{ > >> > >> instance.weightsOption match { > >> case Some(weights) => { > >> - input.map(new > PredictionMapper[T]).withBroadcastSet(weights, > >> WEIGHT_VECTOR) > >> + input.mapWithBcVariable(weights){ > >> + (vector, weights) => { > >> + val dotProduct = weights dot vector.asBreeze > >> + > >> + LabeledVector(dotProduct, vector) > >> + } > >> + } > >> } > >> > >> case None => { > >> @@ -254,28 +262,6 @@ object SVM{ > >> } > >> } > >> > >> - /** Mapper to calculate the value of the prediction function. This is > >> a RichMapFunction, because > >> - * we broadcast the weight vector to all mappers. > >> - */ > >> - class PredictionMapper[T <: Vector] extends RichMapFunction[T, > >> LabeledVector] { > >> - > >> - var weights: BreezeDenseVector[Double] = _ > >> - > >> - @throws(classOf[Exception]) > >> - override def open(configuration: Configuration): Unit = { > >> - // get current weights > >> - weights = getRuntimeContext. > >> - > >> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) > >> - } > >> - > >> - override def map(vector: T): LabeledVector = { > >> - // calculate the prediction value (scaled distance from the > >> separating hyperplane) > >> - val dotProduct = weights dot vector.asBreeze > >> - > >> - LabeledVector(dotProduct, vector) > >> - } > >> - } > >> - > >> /** [[org.apache.flink.ml.pipeline.PredictOperation]] for > >> [[LabeledVector ]]types. The result type > >> * is a [[(Double, Double)]] tuple, corresponding to (truth, > >> prediction) > >> * > >> @@ -291,7 +277,14 @@ object SVM{ > >> > >> instance.weightsOption match { > >> case Some(weights) => { > >> - input.map(new > >> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR) > >> + input.mapWithBcVariable(weights){ > >> + (labeledVector, weights) => { > >> + val prediction = weights dot > >> labeledVector.vector.asBreeze > >> + val truth = labeledVector.label > >> + > >> + (truth, prediction) > >> + } > >> + } > >> } > >> > >> case None => { > >> @@ -303,30 +296,6 @@ object SVM{ > >> } > >> } > >> > >> - /** Mapper to calculate the value of the prediction function. This is > >> a RichMapFunction, because > >> - * we broadcast the weight vector to all mappers. > >> - */ > >> - class LabeledPredictionMapper extends RichMapFunction[LabeledVector, > >> (Double, Double)] { > >> - > >> - var weights: BreezeDenseVector[Double] = _ > >> - > >> - @throws(classOf[Exception]) > >> - override def open(configuration: Configuration): Unit = { > >> - // get current weights > >> - weights = getRuntimeContext. > >> - > >> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) > >> - } > >> - > >> - override def map(labeledVector: LabeledVector): (Double, Double) = > { > >> - // calculate the prediction value (scaled distance from the > >> separating hyperplane) > >> - val prediction = weights dot labeledVector.vector.asBreeze > >> - val truth = labeledVector.label > >> - > >> - (truth, prediction) > >> - } > >> - } > >> - > >> - > >> /** [[FitOperation]] which trains a SVM with soft-margin based on the > >> given training data set. > >> * > >> */ > >> @@ -540,17 +509,17 @@ object SVM{ > >> > >> // compute projected gradient > >> var proj_grad = if(alpha <= 0.0){ > >> - math.min(grad, 0) > >> + scala.math.min(grad, 0) > >> } else if(alpha >= 1.0) { > >> - math.max(grad, 0) > >> + scala.math.max(grad, 0) > >> } else { > >> grad > >> } > >> > >> - if(math.abs(grad) != 0.0){ > >> + if(scala.math.abs(grad) != 0.0){ > >> val qii = x dot x > >> val newAlpha = if(qii != 0.0){ > >> - math.min(math.max((alpha - (grad / qii)), 0.0), 1.0) > >> + scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0), > 1.0) > >> } else { > >> 1.0 > >> } > >> > >> > >> > http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > >> ---------------------------------------------------------------------- > >> diff --git > >> > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > >> > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > >> index 2e3ed95..7992b02 100644 > >> --- > >> > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > >> +++ > >> > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > >> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._ > >> import org.apache.flink.api.common.typeinfo.TypeInformation > >> import org.apache.flink.api.scala._ > >> import org.apache.flink.configuration.Configuration > >> +import org.apache.flink.ml._ > >> import org.apache.flink.ml.common.{LabeledVector, Parameter, > >> ParameterMap} > >> import org.apache.flink.ml.math.Breeze._ > >> import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} > >> @@ -209,20 +210,9 @@ object StandardScaler { > >> > >> instance.metricsOption match { > >> case Some(metrics) => { > >> - input.map(new RichMapFunction[T, T]() { > >> - > >> - var broadcastMean: linalg.Vector[Double] = null > >> - var broadcastStd: linalg.Vector[Double] = null > >> - > >> - override def open(parameters: Configuration): Unit = { > >> - val broadcastedMetrics = > >> getRuntimeContext().getBroadcastVariable[ > >> - (linalg.Vector[Double], linalg.Vector[Double]) > >> - ]("broadcastedMetrics").get(0) > >> - broadcastMean = broadcastedMetrics._1 > >> - broadcastStd = broadcastedMetrics._2 > >> - } > >> - > >> - override def map(vector: T): T = { > >> + input.mapWithBcVariable(metrics){ > >> + (vector, metrics) => { > >> + val (broadcastMean, broadcastStd) = metrics > >> var myVector = vector.asBreeze > >> > >> myVector -= broadcastMean > >> @@ -230,7 +220,7 @@ object StandardScaler { > >> myVector = (myVector :* std) + mean > >> myVector.fromBreeze > >> } > >> - }).withBroadcastSet(metrics, "broadcastedMetrics") > >> + } > >> } > >> > >> case None => > >> @@ -251,20 +241,9 @@ object StandardScaler { > >> > >> instance.metricsOption match { > >> case Some(metrics) => { > >> - input.map(new RichMapFunction[LabeledVector, > >> LabeledVector]() { > >> - > >> - var broadcastMean: linalg.Vector[Double] = null > >> - var broadcastStd: linalg.Vector[Double] = null > >> - > >> - override def open(parameters: Configuration): Unit = { > >> - val broadcastedMetrics = > >> getRuntimeContext().getBroadcastVariable[ > >> - (linalg.Vector[Double], linalg.Vector[Double]) > >> - ]("broadcastedMetrics").get(0) > >> - broadcastMean = broadcastedMetrics._1 > >> - broadcastStd = broadcastedMetrics._2 > >> - } > >> - > >> - override def map(labeledVector: LabeledVector): > >> LabeledVector = { > >> + input.mapWithBcVariable(metrics){ > >> + (labeledVector, metrics) => { > >> + val (broadcastMean, broadcastStd) = metrics > >> val LabeledVector(label, vector) = labeledVector > >> var breezeVector = vector.asBreeze > >> > >> @@ -273,7 +252,7 @@ object StandardScaler { > >> breezeVector = (breezeVector :* std) + mean > >> LabeledVector(label, breezeVector.fromBreeze[Vector]) > >> } > >> - }).withBroadcastSet(metrics, "broadcastedMetrics") > >> + } > >> } > >> > >> case None => > >> > >> > > > |
|
Should I go ahead and add this method then? The mapWithBcSet I mean.
Regards Sachin Goel On Tue, Jun 2, 2015 at 10:43 PM, Till Rohrmann <[hidden email]> wrote: > Yes you’re right Sachin. The mapWithBcVariable is only syntactic sugar if > you have a broadcast DataSet which contains only one element. If you have > multiple elements in your DataSet then you can’t use this method. But we > can define another method mapWithBcSet which takes a function f: (element: > T, broadcastValues: List[B]) => O, for example. > > If you have multiple DataSet which fulfil this condition, then you can wrap > them in a tuple as you’ve said. > > Cheers, > Till > > > On Tue, Jun 2, 2015 at 7:10 PM, Sachin Goel <[hidden email]> > wrote: > > > Further, I think we should return just > > broadcastVariable = getRuntimeContext. > > getBroadcastVariable[B]("broadcastVariable") > > in BroadcastSingleElementMapper > > User may wish to have a list broadcasted, and not just want to access the > > first element. For example, this would make sense in the kmeans > algorithm. > > > > Regards > > Sachin Goel > > > > On Tue, Jun 2, 2015 at 9:03 PM, Sachin Goel <[hidden email]> > > wrote: > > > > > Hi Till > > > This works only when there is only one variable to be broadcasted, > > doesn't > > > it? What about the case when we need to broadcast two? Is it advisable > to > > > create a BroadcastDoubleElementMapper class or perhaps we could just > > send a > > > tuple of all the variables? Perhaps that is a better idea. > > > > > > Regards > > > Sachin Goel > > > > > > On Tue, Jun 2, 2015 at 8:15 PM, <[hidden email]> wrote: > > > > > >> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML > > >> > > >> > > >> Project: http://git-wip-us.apache.org/repos/asf/flink/repo > > >> Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5 > > >> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5 > > >> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5 > > >> > > >> Branch: refs/heads/master > > >> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c > > >> Parents: 44dae0c > > >> Author: Till Rohrmann <[hidden email]> > > >> Authored: Tue Jun 2 14:45:12 2015 +0200 > > >> Committer: Till Rohrmann <[hidden email]> > > >> Committed: Tue Jun 2 15:34:54 2015 +0200 > > >> > > >> ---------------------------------------------------------------------- > > >> .../apache/flink/ml/classification/SVM.scala | 73 > > ++++++-------------- > > >> .../flink/ml/preprocessing/StandardScaler.scala | 39 +++-------- > > >> 2 files changed, 30 insertions(+), 82 deletions(-) > > >> ---------------------------------------------------------------------- > > >> > > >> > > >> > > >> > > > http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > >> ---------------------------------------------------------------------- > > >> diff --git > > >> > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > >> > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > >> index e01735f..c69b56a 100644 > > >> --- > > >> > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > >> +++ > > >> > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > >> @@ -26,6 +26,7 @@ import scala.util.Random > > >> import org.apache.flink.api.common.functions.RichMapFunction > > >> import org.apache.flink.api.scala._ > > >> import org.apache.flink.configuration.Configuration > > >> +import org.apache.flink.ml._ > > >> import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner > > >> import org.apache.flink.ml.common._ > > >> import org.apache.flink.ml.math.Vector > > >> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] { > > >> * of the algorithm. > > >> */ > > >> object SVM{ > > >> + > > >> val WEIGHT_VECTOR ="weightVector" > > >> > > >> // ========================================== Parameters > > >> ========================================= > > >> @@ -242,7 +244,13 @@ object SVM{ > > >> > > >> instance.weightsOption match { > > >> case Some(weights) => { > > >> - input.map(new > > PredictionMapper[T]).withBroadcastSet(weights, > > >> WEIGHT_VECTOR) > > >> + input.mapWithBcVariable(weights){ > > >> + (vector, weights) => { > > >> + val dotProduct = weights dot vector.asBreeze > > >> + > > >> + LabeledVector(dotProduct, vector) > > >> + } > > >> + } > > >> } > > >> > > >> case None => { > > >> @@ -254,28 +262,6 @@ object SVM{ > > >> } > > >> } > > >> > > >> - /** Mapper to calculate the value of the prediction function. This > is > > >> a RichMapFunction, because > > >> - * we broadcast the weight vector to all mappers. > > >> - */ > > >> - class PredictionMapper[T <: Vector] extends RichMapFunction[T, > > >> LabeledVector] { > > >> - > > >> - var weights: BreezeDenseVector[Double] = _ > > >> - > > >> - @throws(classOf[Exception]) > > >> - override def open(configuration: Configuration): Unit = { > > >> - // get current weights > > >> - weights = getRuntimeContext. > > >> - > > >> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) > > >> - } > > >> - > > >> - override def map(vector: T): LabeledVector = { > > >> - // calculate the prediction value (scaled distance from the > > >> separating hyperplane) > > >> - val dotProduct = weights dot vector.asBreeze > > >> - > > >> - LabeledVector(dotProduct, vector) > > >> - } > > >> - } > > >> - > > >> /** [[org.apache.flink.ml.pipeline.PredictOperation]] for > > >> [[LabeledVector ]]types. The result type > > >> * is a [[(Double, Double)]] tuple, corresponding to (truth, > > >> prediction) > > >> * > > >> @@ -291,7 +277,14 @@ object SVM{ > > >> > > >> instance.weightsOption match { > > >> case Some(weights) => { > > >> - input.map(new > > >> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR) > > >> + input.mapWithBcVariable(weights){ > > >> + (labeledVector, weights) => { > > >> + val prediction = weights dot > > >> labeledVector.vector.asBreeze > > >> + val truth = labeledVector.label > > >> + > > >> + (truth, prediction) > > >> + } > > >> + } > > >> } > > >> > > >> case None => { > > >> @@ -303,30 +296,6 @@ object SVM{ > > >> } > > >> } > > >> > > >> - /** Mapper to calculate the value of the prediction function. This > is > > >> a RichMapFunction, because > > >> - * we broadcast the weight vector to all mappers. > > >> - */ > > >> - class LabeledPredictionMapper extends > RichMapFunction[LabeledVector, > > >> (Double, Double)] { > > >> - > > >> - var weights: BreezeDenseVector[Double] = _ > > >> - > > >> - @throws(classOf[Exception]) > > >> - override def open(configuration: Configuration): Unit = { > > >> - // get current weights > > >> - weights = getRuntimeContext. > > >> - > > >> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) > > >> - } > > >> - > > >> - override def map(labeledVector: LabeledVector): (Double, Double) > = > > { > > >> - // calculate the prediction value (scaled distance from the > > >> separating hyperplane) > > >> - val prediction = weights dot labeledVector.vector.asBreeze > > >> - val truth = labeledVector.label > > >> - > > >> - (truth, prediction) > > >> - } > > >> - } > > >> - > > >> - > > >> /** [[FitOperation]] which trains a SVM with soft-margin based on > the > > >> given training data set. > > >> * > > >> */ > > >> @@ -540,17 +509,17 @@ object SVM{ > > >> > > >> // compute projected gradient > > >> var proj_grad = if(alpha <= 0.0){ > > >> - math.min(grad, 0) > > >> + scala.math.min(grad, 0) > > >> } else if(alpha >= 1.0) { > > >> - math.max(grad, 0) > > >> + scala.math.max(grad, 0) > > >> } else { > > >> grad > > >> } > > >> > > >> - if(math.abs(grad) != 0.0){ > > >> + if(scala.math.abs(grad) != 0.0){ > > >> val qii = x dot x > > >> val newAlpha = if(qii != 0.0){ > > >> - math.min(math.max((alpha - (grad / qii)), 0.0), 1.0) > > >> + scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0), > > 1.0) > > >> } else { > > >> 1.0 > > >> } > > >> > > >> > > >> > > > http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > > >> ---------------------------------------------------------------------- > > >> diff --git > > >> > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > > >> > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > > >> index 2e3ed95..7992b02 100644 > > >> --- > > >> > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > > >> +++ > > >> > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > > >> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._ > > >> import org.apache.flink.api.common.typeinfo.TypeInformation > > >> import org.apache.flink.api.scala._ > > >> import org.apache.flink.configuration.Configuration > > >> +import org.apache.flink.ml._ > > >> import org.apache.flink.ml.common.{LabeledVector, Parameter, > > >> ParameterMap} > > >> import org.apache.flink.ml.math.Breeze._ > > >> import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} > > >> @@ -209,20 +210,9 @@ object StandardScaler { > > >> > > >> instance.metricsOption match { > > >> case Some(metrics) => { > > >> - input.map(new RichMapFunction[T, T]() { > > >> - > > >> - var broadcastMean: linalg.Vector[Double] = null > > >> - var broadcastStd: linalg.Vector[Double] = null > > >> - > > >> - override def open(parameters: Configuration): Unit = { > > >> - val broadcastedMetrics = > > >> getRuntimeContext().getBroadcastVariable[ > > >> - (linalg.Vector[Double], linalg.Vector[Double]) > > >> - ]("broadcastedMetrics").get(0) > > >> - broadcastMean = broadcastedMetrics._1 > > >> - broadcastStd = broadcastedMetrics._2 > > >> - } > > >> - > > >> - override def map(vector: T): T = { > > >> + input.mapWithBcVariable(metrics){ > > >> + (vector, metrics) => { > > >> + val (broadcastMean, broadcastStd) = metrics > > >> var myVector = vector.asBreeze > > >> > > >> myVector -= broadcastMean > > >> @@ -230,7 +220,7 @@ object StandardScaler { > > >> myVector = (myVector :* std) + mean > > >> myVector.fromBreeze > > >> } > > >> - }).withBroadcastSet(metrics, "broadcastedMetrics") > > >> + } > > >> } > > >> > > >> case None => > > >> @@ -251,20 +241,9 @@ object StandardScaler { > > >> > > >> instance.metricsOption match { > > >> case Some(metrics) => { > > >> - input.map(new RichMapFunction[LabeledVector, > > >> LabeledVector]() { > > >> - > > >> - var broadcastMean: linalg.Vector[Double] = null > > >> - var broadcastStd: linalg.Vector[Double] = null > > >> - > > >> - override def open(parameters: Configuration): Unit = { > > >> - val broadcastedMetrics = > > >> getRuntimeContext().getBroadcastVariable[ > > >> - (linalg.Vector[Double], linalg.Vector[Double]) > > >> - ]("broadcastedMetrics").get(0) > > >> - broadcastMean = broadcastedMetrics._1 > > >> - broadcastStd = broadcastedMetrics._2 > > >> - } > > >> - > > >> - override def map(labeledVector: LabeledVector): > > >> LabeledVector = { > > >> + input.mapWithBcVariable(metrics){ > > >> + (labeledVector, metrics) => { > > >> + val (broadcastMean, broadcastStd) = metrics > > >> val LabeledVector(label, vector) = labeledVector > > >> var breezeVector = vector.asBreeze > > >> > > >> @@ -273,7 +252,7 @@ object StandardScaler { > > >> breezeVector = (breezeVector :* std) + mean > > >> LabeledVector(label, breezeVector.fromBreeze[Vector]) > > >> } > > >> - }).withBroadcastSet(metrics, "broadcastedMetrics") > > >> + } > > >> } > > >> > > >> case None => > > >> > > >> > > > > > > |
|
If it helps you with your task, then you can add it. The best thing is
probably to implement it similarly to the mapWithBcVariable. Cheers, Till On Tue, Jun 2, 2015 at 7:25 PM, Sachin Goel <[hidden email]> wrote: > Should I go ahead and add this method then? The mapWithBcSet I mean. > > Regards > Sachin Goel > > On Tue, Jun 2, 2015 at 10:43 PM, Till Rohrmann <[hidden email]> > wrote: > > > Yes you’re right Sachin. The mapWithBcVariable is only syntactic sugar if > > you have a broadcast DataSet which contains only one element. If you have > > multiple elements in your DataSet then you can’t use this method. But we > > can define another method mapWithBcSet which takes a function f: > (element: > > T, broadcastValues: List[B]) => O, for example. > > > > If you have multiple DataSet which fulfil this condition, then you can > wrap > > them in a tuple as you’ve said. > > > > Cheers, > > Till > > > > > > On Tue, Jun 2, 2015 at 7:10 PM, Sachin Goel <[hidden email]> > > wrote: > > > > > Further, I think we should return just > > > broadcastVariable = getRuntimeContext. > > > getBroadcastVariable[B]("broadcastVariable") > > > in BroadcastSingleElementMapper > > > User may wish to have a list broadcasted, and not just want to access > the > > > first element. For example, this would make sense in the kmeans > > algorithm. > > > > > > Regards > > > Sachin Goel > > > > > > On Tue, Jun 2, 2015 at 9:03 PM, Sachin Goel <[hidden email]> > > > wrote: > > > > > > > Hi Till > > > > This works only when there is only one variable to be broadcasted, > > > doesn't > > > > it? What about the case when we need to broadcast two? Is it > advisable > > to > > > > create a BroadcastDoubleElementMapper class or perhaps we could just > > > send a > > > > tuple of all the variables? Perhaps that is a better idea. > > > > > > > > Regards > > > > Sachin Goel > > > > > > > > On Tue, Jun 2, 2015 at 8:15 PM, <[hidden email]> wrote: > > > > > > > >> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML > > > >> > > > >> > > > >> Project: http://git-wip-us.apache.org/repos/asf/flink/repo > > > >> Commit: > http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5 > > > >> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5 > > > >> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5 > > > >> > > > >> Branch: refs/heads/master > > > >> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c > > > >> Parents: 44dae0c > > > >> Author: Till Rohrmann <[hidden email]> > > > >> Authored: Tue Jun 2 14:45:12 2015 +0200 > > > >> Committer: Till Rohrmann <[hidden email]> > > > >> Committed: Tue Jun 2 15:34:54 2015 +0200 > > > >> > > > >> > ---------------------------------------------------------------------- > > > >> .../apache/flink/ml/classification/SVM.scala | 73 > > > ++++++-------------- > > > >> .../flink/ml/preprocessing/StandardScaler.scala | 39 +++-------- > > > >> 2 files changed, 30 insertions(+), 82 deletions(-) > > > >> > ---------------------------------------------------------------------- > > > >> > > > >> > > > >> > > > >> > > > > > > http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > > >> > ---------------------------------------------------------------------- > > > >> diff --git > > > >> > > > > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > > >> > > > > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > > >> index e01735f..c69b56a 100644 > > > >> --- > > > >> > > > > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > > >> +++ > > > >> > > > > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala > > > >> @@ -26,6 +26,7 @@ import scala.util.Random > > > >> import org.apache.flink.api.common.functions.RichMapFunction > > > >> import org.apache.flink.api.scala._ > > > >> import org.apache.flink.configuration.Configuration > > > >> +import org.apache.flink.ml._ > > > >> import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner > > > >> import org.apache.flink.ml.common._ > > > >> import org.apache.flink.ml.math.Vector > > > >> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] { > > > >> * of the algorithm. > > > >> */ > > > >> object SVM{ > > > >> + > > > >> val WEIGHT_VECTOR ="weightVector" > > > >> > > > >> // ========================================== Parameters > > > >> ========================================= > > > >> @@ -242,7 +244,13 @@ object SVM{ > > > >> > > > >> instance.weightsOption match { > > > >> case Some(weights) => { > > > >> - input.map(new > > > PredictionMapper[T]).withBroadcastSet(weights, > > > >> WEIGHT_VECTOR) > > > >> + input.mapWithBcVariable(weights){ > > > >> + (vector, weights) => { > > > >> + val dotProduct = weights dot vector.asBreeze > > > >> + > > > >> + LabeledVector(dotProduct, vector) > > > >> + } > > > >> + } > > > >> } > > > >> > > > >> case None => { > > > >> @@ -254,28 +262,6 @@ object SVM{ > > > >> } > > > >> } > > > >> > > > >> - /** Mapper to calculate the value of the prediction function. > This > > is > > > >> a RichMapFunction, because > > > >> - * we broadcast the weight vector to all mappers. > > > >> - */ > > > >> - class PredictionMapper[T <: Vector] extends RichMapFunction[T, > > > >> LabeledVector] { > > > >> - > > > >> - var weights: BreezeDenseVector[Double] = _ > > > >> - > > > >> - @throws(classOf[Exception]) > > > >> - override def open(configuration: Configuration): Unit = { > > > >> - // get current weights > > > >> - weights = getRuntimeContext. > > > >> - > > > >> > getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) > > > >> - } > > > >> - > > > >> - override def map(vector: T): LabeledVector = { > > > >> - // calculate the prediction value (scaled distance from the > > > >> separating hyperplane) > > > >> - val dotProduct = weights dot vector.asBreeze > > > >> - > > > >> - LabeledVector(dotProduct, vector) > > > >> - } > > > >> - } > > > >> - > > > >> /** [[org.apache.flink.ml.pipeline.PredictOperation]] for > > > >> [[LabeledVector ]]types. The result type > > > >> * is a [[(Double, Double)]] tuple, corresponding to (truth, > > > >> prediction) > > > >> * > > > >> @@ -291,7 +277,14 @@ object SVM{ > > > >> > > > >> instance.weightsOption match { > > > >> case Some(weights) => { > > > >> - input.map(new > > > >> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR) > > > >> + input.mapWithBcVariable(weights){ > > > >> + (labeledVector, weights) => { > > > >> + val prediction = weights dot > > > >> labeledVector.vector.asBreeze > > > >> + val truth = labeledVector.label > > > >> + > > > >> + (truth, prediction) > > > >> + } > > > >> + } > > > >> } > > > >> > > > >> case None => { > > > >> @@ -303,30 +296,6 @@ object SVM{ > > > >> } > > > >> } > > > >> > > > >> - /** Mapper to calculate the value of the prediction function. > This > > is > > > >> a RichMapFunction, because > > > >> - * we broadcast the weight vector to all mappers. > > > >> - */ > > > >> - class LabeledPredictionMapper extends > > RichMapFunction[LabeledVector, > > > >> (Double, Double)] { > > > >> - > > > >> - var weights: BreezeDenseVector[Double] = _ > > > >> - > > > >> - @throws(classOf[Exception]) > > > >> - override def open(configuration: Configuration): Unit = { > > > >> - // get current weights > > > >> - weights = getRuntimeContext. > > > >> - > > > >> > getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) > > > >> - } > > > >> - > > > >> - override def map(labeledVector: LabeledVector): (Double, > Double) > > = > > > { > > > >> - // calculate the prediction value (scaled distance from the > > > >> separating hyperplane) > > > >> - val prediction = weights dot labeledVector.vector.asBreeze > > > >> - val truth = labeledVector.label > > > >> - > > > >> - (truth, prediction) > > > >> - } > > > >> - } > > > >> - > > > >> - > > > >> /** [[FitOperation]] which trains a SVM with soft-margin based on > > the > > > >> given training data set. > > > >> * > > > >> */ > > > >> @@ -540,17 +509,17 @@ object SVM{ > > > >> > > > >> // compute projected gradient > > > >> var proj_grad = if(alpha <= 0.0){ > > > >> - math.min(grad, 0) > > > >> + scala.math.min(grad, 0) > > > >> } else if(alpha >= 1.0) { > > > >> - math.max(grad, 0) > > > >> + scala.math.max(grad, 0) > > > >> } else { > > > >> grad > > > >> } > > > >> > > > >> - if(math.abs(grad) != 0.0){ > > > >> + if(scala.math.abs(grad) != 0.0){ > > > >> val qii = x dot x > > > >> val newAlpha = if(qii != 0.0){ > > > >> - math.min(math.max((alpha - (grad / qii)), 0.0), 1.0) > > > >> + scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0), > > > 1.0) > > > >> } else { > > > >> 1.0 > > > >> } > > > >> > > > >> > > > >> > > > > > > http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > > > >> > ---------------------------------------------------------------------- > > > >> diff --git > > > >> > > > > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > > > >> > > > > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > > > >> index 2e3ed95..7992b02 100644 > > > >> --- > > > >> > > > > > > a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > > > >> +++ > > > >> > > > > > > b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala > > > >> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._ > > > >> import org.apache.flink.api.common.typeinfo.TypeInformation > > > >> import org.apache.flink.api.scala._ > > > >> import org.apache.flink.configuration.Configuration > > > >> +import org.apache.flink.ml._ > > > >> import org.apache.flink.ml.common.{LabeledVector, Parameter, > > > >> ParameterMap} > > > >> import org.apache.flink.ml.math.Breeze._ > > > >> import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} > > > >> @@ -209,20 +210,9 @@ object StandardScaler { > > > >> > > > >> instance.metricsOption match { > > > >> case Some(metrics) => { > > > >> - input.map(new RichMapFunction[T, T]() { > > > >> - > > > >> - var broadcastMean: linalg.Vector[Double] = null > > > >> - var broadcastStd: linalg.Vector[Double] = null > > > >> - > > > >> - override def open(parameters: Configuration): Unit = > { > > > >> - val broadcastedMetrics = > > > >> getRuntimeContext().getBroadcastVariable[ > > > >> - (linalg.Vector[Double], linalg.Vector[Double]) > > > >> - ]("broadcastedMetrics").get(0) > > > >> - broadcastMean = broadcastedMetrics._1 > > > >> - broadcastStd = broadcastedMetrics._2 > > > >> - } > > > >> - > > > >> - override def map(vector: T): T = { > > > >> + input.mapWithBcVariable(metrics){ > > > >> + (vector, metrics) => { > > > >> + val (broadcastMean, broadcastStd) = metrics > > > >> var myVector = vector.asBreeze > > > >> > > > >> myVector -= broadcastMean > > > >> @@ -230,7 +220,7 @@ object StandardScaler { > > > >> myVector = (myVector :* std) + mean > > > >> myVector.fromBreeze > > > >> } > > > >> - }).withBroadcastSet(metrics, "broadcastedMetrics") > > > >> + } > > > >> } > > > >> > > > >> case None => > > > >> @@ -251,20 +241,9 @@ object StandardScaler { > > > >> > > > >> instance.metricsOption match { > > > >> case Some(metrics) => { > > > >> - input.map(new RichMapFunction[LabeledVector, > > > >> LabeledVector]() { > > > >> - > > > >> - var broadcastMean: linalg.Vector[Double] = null > > > >> - var broadcastStd: linalg.Vector[Double] = null > > > >> - > > > >> - override def open(parameters: Configuration): Unit = > { > > > >> - val broadcastedMetrics = > > > >> getRuntimeContext().getBroadcastVariable[ > > > >> - (linalg.Vector[Double], linalg.Vector[Double]) > > > >> - ]("broadcastedMetrics").get(0) > > > >> - broadcastMean = broadcastedMetrics._1 > > > >> - broadcastStd = broadcastedMetrics._2 > > > >> - } > > > >> - > > > >> - override def map(labeledVector: LabeledVector): > > > >> LabeledVector = { > > > >> + input.mapWithBcVariable(metrics){ > > > >> + (labeledVector, metrics) => { > > > >> + val (broadcastMean, broadcastStd) = metrics > > > >> val LabeledVector(label, vector) = labeledVector > > > >> var breezeVector = vector.asBreeze > > > >> > > > >> @@ -273,7 +252,7 @@ object StandardScaler { > > > >> breezeVector = (breezeVector :* std) + mean > > > >> LabeledVector(label, > breezeVector.fromBreeze[Vector]) > > > >> } > > > >> - }).withBroadcastSet(metrics, "broadcastedMetrics") > > > >> + } > > > >> } > > > >> > > > >> case None => > > > >> > > > >> > > > > > > > > > > |
| Free forum by Nabble | Edit this page |
