Skip to content

Commit ae58aea

Browse files
DB Tsaimengxr
authored andcommitted
SPARK-2272 [MLlib] Feature scaling which standardizes the range of independent variables or features of data
Feature scaling is a method used to standardize the range of independent variables or features of data. In data processing, it is generally performed during the data preprocessing step. In this work, a trait called `VectorTransformer` is defined for generic transformation on a vector. It contains one method to be implemented, `transform` which applies transformation on a vector. There are two implementations of `VectorTransformer` now, and they all can be easily extended with PMML transformation support. 1) `StandardScaler` - Standardizes features by removing the mean and scaling to unit variance using column summary statistics on the samples in the training set. 2) `Normalizer` - Normalizes samples individually to unit L^n norm Author: DB Tsai <[email protected]> Closes apache#1207 from dbtsai/dbtsai-feature-scaling and squashes the following commits: 78c15d3 [DB Tsai] Alpine Data Labs
1 parent 5507dd8 commit ae58aea

File tree

6 files changed

+567
-1
lines changed

6 files changed

+567
-1
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.feature
19+
20+
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}
21+
22+
import org.apache.spark.annotation.DeveloperApi
23+
import org.apache.spark.mllib.linalg.{Vector, Vectors}
24+
25+
/**
26+
* :: DeveloperApi ::
27+
* Normalizes samples individually to unit L^p^ norm
28+
*
29+
* For any 1 <= p < Double.PositiveInfinity, normalizes samples using
30+
* sum(abs(vector).^p^)^(1/p)^ as norm.
31+
*
32+
* For p = Double.PositiveInfinity, max(abs(vector)) will be used as norm for normalization.
33+
*
34+
* @param p Normalization in L^p^ space, p = 2 by default.
35+
*/
36+
@DeveloperApi
37+
class Normalizer(p: Double) extends VectorTransformer {
38+
39+
def this() = this(2)
40+
41+
require(p >= 1.0)
42+
43+
/**
44+
* Applies unit length normalization on a vector.
45+
*
46+
* @param vector vector to be normalized.
47+
* @return normalized vector. If the norm of the input is zero, it will return the input vector.
48+
*/
49+
override def transform(vector: Vector): Vector = {
50+
var norm = vector.toBreeze.norm(p)
51+
52+
if (norm != 0.0) {
53+
// For dense vector, we've to allocate new memory for new output vector.
54+
// However, for sparse vector, the `index` array will not be changed,
55+
// so we can re-use it to save memory.
56+
vector.toBreeze match {
57+
case dv: BDV[Double] => Vectors.fromBreeze(dv :/ norm)
58+
case sv: BSV[Double] =>
59+
val output = new BSV[Double](sv.index, sv.data.clone(), sv.length)
60+
var i = 0
61+
while (i < output.data.length) {
62+
output.data(i) /= norm
63+
i += 1
64+
}
65+
Vectors.fromBreeze(output)
66+
case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
67+
}
68+
} else {
69+
// Since the norm is zero, return the input vector object itself.
70+
// Note that it's safe since we always assume that the data in RDD
71+
// should be immutable.
72+
vector
73+
}
74+
}
75+
76+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.feature
19+
20+
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
21+
22+
import org.apache.spark.annotation.DeveloperApi
23+
import org.apache.spark.mllib.linalg.{Vector, Vectors}
24+
import org.apache.spark.mllib.rdd.RDDFunctions._
25+
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
26+
import org.apache.spark.rdd.RDD
27+
28+
/**
29+
* :: DeveloperApi ::
30+
* Standardizes features by removing the mean and scaling to unit variance using column summary
31+
* statistics on the samples in the training set.
32+
*
33+
* @param withMean False by default. Centers the data with mean before scaling. It will build a
34+
* dense output, so this does not work on sparse input and will raise an exception.
35+
* @param withStd True by default. Scales the data to unit standard deviation.
36+
*/
37+
@DeveloperApi
38+
class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransformer {
39+
40+
def this() = this(false, true)
41+
42+
require(withMean || withStd, s"withMean and withStd both equal to false. Doing nothing.")
43+
44+
private var mean: BV[Double] = _
45+
private var factor: BV[Double] = _
46+
47+
/**
48+
* Computes the mean and variance and stores as a model to be used for later scaling.
49+
*
50+
* @param data The data used to compute the mean and variance to build the transformation model.
51+
* @return This StandardScalar object.
52+
*/
53+
def fit(data: RDD[Vector]): this.type = {
54+
val summary = data.treeAggregate(new MultivariateOnlineSummarizer)(
55+
(aggregator, data) => aggregator.add(data),
56+
(aggregator1, aggregator2) => aggregator1.merge(aggregator2))
57+
58+
mean = summary.mean.toBreeze
59+
factor = summary.variance.toBreeze
60+
require(mean.length == factor.length)
61+
62+
var i = 0
63+
while (i < factor.length) {
64+
factor(i) = if (factor(i) != 0.0) 1.0 / math.sqrt(factor(i)) else 0.0
65+
i += 1
66+
}
67+
68+
this
69+
}
70+
71+
/**
72+
* Applies standardization transformation on a vector.
73+
*
74+
* @param vector Vector to be standardized.
75+
* @return Standardized vector. If the variance of a column is zero, it will return default `0.0`
76+
* for the column with zero variance.
77+
*/
78+
override def transform(vector: Vector): Vector = {
79+
if (mean == null || factor == null) {
80+
throw new IllegalStateException(
81+
"Haven't learned column summary statistics yet. Call fit first.")
82+
}
83+
84+
require(vector.size == mean.length)
85+
86+
if (withMean) {
87+
vector.toBreeze match {
88+
case dv: BDV[Double] =>
89+
val output = vector.toBreeze.copy
90+
var i = 0
91+
while (i < output.length) {
92+
output(i) = (output(i) - mean(i)) * (if (withStd) factor(i) else 1.0)
93+
i += 1
94+
}
95+
Vectors.fromBreeze(output)
96+
case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
97+
}
98+
} else if (withStd) {
99+
vector.toBreeze match {
100+
case dv: BDV[Double] => Vectors.fromBreeze(dv :* factor)
101+
case sv: BSV[Double] =>
102+
// For sparse vector, the `index` array inside sparse vector object will not be changed,
103+
// so we can re-use it to save memory.
104+
val output = new BSV[Double](sv.index, sv.data.clone(), sv.length)
105+
var i = 0
106+
while (i < output.data.length) {
107+
output.data(i) *= factor(output.index(i))
108+
i += 1
109+
}
110+
Vectors.fromBreeze(output)
111+
case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
112+
}
113+
} else {
114+
// Note that it's safe since we always assume that the data in RDD should be immutable.
115+
vector
116+
}
117+
}
118+
119+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.feature
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
import org.apache.spark.mllib.linalg.Vector
22+
import org.apache.spark.rdd.RDD
23+
24+
/**
25+
* :: DeveloperApi ::
26+
* Trait for transformation of a vector
27+
*/
28+
@DeveloperApi
29+
trait VectorTransformer extends Serializable {
30+
31+
/**
32+
* Applies transformation on a vector.
33+
*
34+
* @param vector vector to be transformed.
35+
* @return transformed vector.
36+
*/
37+
def transform(vector: Vector): Vector
38+
39+
/**
40+
* Applies transformation on an RDD[Vector].
41+
*
42+
* @param data RDD[Vector] to be transformed.
43+
* @return transformed RDD[Vector].
44+
*/
45+
def transform(data: RDD[Vector]): RDD[Vector] = {
46+
// Later in #1498 , all RDD objects are sent via broadcasting instead of akka.
47+
// So it should be no longer necessary to explicitly broadcast `this` object.
48+
data.map(x => this.transform(x))
49+
}
50+
51+
}

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg.distributed
1919

2020
import java.util.Arrays
2121

22-
import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
22+
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
2323
import breeze.linalg.{svd => brzSvd, axpy => brzAxpy}
2424
import breeze.numerics.{sqrt => brzSqrt}
2525
import com.github.fommil.netlib.BLAS.{getInstance => blas}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.feature
19+
20+
import org.scalatest.FunSuite
21+
22+
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors}
23+
import org.apache.spark.mllib.util.LocalSparkContext
24+
import org.apache.spark.mllib.util.TestingUtils._
25+
26+
class NormalizerSuite extends FunSuite with LocalSparkContext {
27+
28+
val data = Array(
29+
Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))),
30+
Vectors.dense(0.0, 0.0, 0.0),
31+
Vectors.dense(0.6, -1.1, -3.0),
32+
Vectors.sparse(3, Seq((1, 0.91), (2, 3.2))),
33+
Vectors.sparse(3, Seq((0, 5.7), (1, 0.72), (2, 2.7))),
34+
Vectors.sparse(3, Seq())
35+
)
36+
37+
lazy val dataRDD = sc.parallelize(data, 3)
38+
39+
test("Normalization using L1 distance") {
40+
val l1Normalizer = new Normalizer(1)
41+
42+
val data1 = data.map(l1Normalizer.transform)
43+
val data1RDD = l1Normalizer.transform(dataRDD)
44+
45+
assert((data, data1, data1RDD.collect()).zipped.forall {
46+
case (v1: DenseVector, v2: DenseVector, v3: DenseVector) => true
47+
case (v1: SparseVector, v2: SparseVector, v3: SparseVector) => true
48+
case _ => false
49+
}, "The vector type should be preserved after normalization.")
50+
51+
assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
52+
53+
assert(data1(0).toBreeze.norm(1) ~== 1.0 absTol 1E-5)
54+
assert(data1(2).toBreeze.norm(1) ~== 1.0 absTol 1E-5)
55+
assert(data1(3).toBreeze.norm(1) ~== 1.0 absTol 1E-5)
56+
assert(data1(4).toBreeze.norm(1) ~== 1.0 absTol 1E-5)
57+
58+
assert(data1(0) ~== Vectors.sparse(3, Seq((0, -0.465116279), (1, 0.53488372))) absTol 1E-5)
59+
assert(data1(1) ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
60+
assert(data1(2) ~== Vectors.dense(0.12765957, -0.23404255, -0.63829787) absTol 1E-5)
61+
assert(data1(3) ~== Vectors.sparse(3, Seq((1, 0.22141119), (2, 0.7785888))) absTol 1E-5)
62+
assert(data1(4) ~== Vectors.dense(0.625, 0.07894737, 0.29605263) absTol 1E-5)
63+
assert(data1(5) ~== Vectors.sparse(3, Seq()) absTol 1E-5)
64+
}
65+
66+
test("Normalization using L2 distance") {
67+
val l2Normalizer = new Normalizer()
68+
69+
val data2 = data.map(l2Normalizer.transform)
70+
val data2RDD = l2Normalizer.transform(dataRDD)
71+
72+
assert((data, data2, data2RDD.collect()).zipped.forall {
73+
case (v1: DenseVector, v2: DenseVector, v3: DenseVector) => true
74+
case (v1: SparseVector, v2: SparseVector, v3: SparseVector) => true
75+
case _ => false
76+
}, "The vector type should be preserved after normalization.")
77+
78+
assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
79+
80+
assert(data2(0).toBreeze.norm(2) ~== 1.0 absTol 1E-5)
81+
assert(data2(2).toBreeze.norm(2) ~== 1.0 absTol 1E-5)
82+
assert(data2(3).toBreeze.norm(2) ~== 1.0 absTol 1E-5)
83+
assert(data2(4).toBreeze.norm(2) ~== 1.0 absTol 1E-5)
84+
85+
assert(data2(0) ~== Vectors.sparse(3, Seq((0, -0.65617871), (1, 0.75460552))) absTol 1E-5)
86+
assert(data2(1) ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
87+
assert(data2(2) ~== Vectors.dense(0.184549876, -0.3383414, -0.922749378) absTol 1E-5)
88+
assert(data2(3) ~== Vectors.sparse(3, Seq((1, 0.27352993), (2, 0.96186349))) absTol 1E-5)
89+
assert(data2(4) ~== Vectors.dense(0.897906166, 0.113419726, 0.42532397) absTol 1E-5)
90+
assert(data2(5) ~== Vectors.sparse(3, Seq()) absTol 1E-5)
91+
}
92+
93+
test("Normalization using L^Inf distance.") {
94+
val lInfNormalizer = new Normalizer(Double.PositiveInfinity)
95+
96+
val dataInf = data.map(lInfNormalizer.transform)
97+
val dataInfRDD = lInfNormalizer.transform(dataRDD)
98+
99+
assert((data, dataInf, dataInfRDD.collect()).zipped.forall {
100+
case (v1: DenseVector, v2: DenseVector, v3: DenseVector) => true
101+
case (v1: SparseVector, v2: SparseVector, v3: SparseVector) => true
102+
case _ => false
103+
}, "The vector type should be preserved after normalization.")
104+
105+
assert((dataInf, dataInfRDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
106+
107+
assert(dataInf(0).toArray.map(Math.abs).max ~== 1.0 absTol 1E-5)
108+
assert(dataInf(2).toArray.map(Math.abs).max ~== 1.0 absTol 1E-5)
109+
assert(dataInf(3).toArray.map(Math.abs).max ~== 1.0 absTol 1E-5)
110+
assert(dataInf(4).toArray.map(Math.abs).max ~== 1.0 absTol 1E-5)
111+
112+
assert(dataInf(0) ~== Vectors.sparse(3, Seq((0, -0.86956522), (1, 1.0))) absTol 1E-5)
113+
assert(dataInf(1) ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
114+
assert(dataInf(2) ~== Vectors.dense(0.2, -0.36666667, -1.0) absTol 1E-5)
115+
assert(dataInf(3) ~== Vectors.sparse(3, Seq((1, 0.284375), (2, 1.0))) absTol 1E-5)
116+
assert(dataInf(4) ~== Vectors.dense(1.0, 0.12631579, 0.473684211) absTol 1E-5)
117+
assert(dataInf(5) ~== Vectors.sparse(3, Seq()) absTol 1E-5)
118+
}
119+
120+
}

0 commit comments

Comments
 (0)