Skip to content

Commit 184de91

Browse files
Lewuathejkbradley
authored andcommitted
[SPARK-6263] [MLLIB] Python MLlib API missing items: Utils
Implement missing API in pyspark. MLUtils * appendBias * loadVectors `kFold` is also missing however I am not sure `ClassTag` can be passed or restored through python. Author: lewuathe <[email protected]> Closes apache#5707 from Lewuathe/SPARK-6263 and squashes the following commits: 16863ea [lewuathe] Merge master 3fc27e7 [lewuathe] Merge branch 'master' into SPARK-6263 6084e9c [lewuathe] Resolv conflict d2aa2a0 [lewuathe] Resolv conflict 9c329d8 [lewuathe] Fix efficiency 3a12a2d [lewuathe] Merge branch 'master' into SPARK-6263 1d4714b [lewuathe] Fix style b29e2bc [lewuathe] Remove scipy dependencies e32eb40 [lewuathe] Merge branch 'master' into SPARK-6263 25d3c9d [lewuathe] Remove unnecessary imports 7ec04db [lewuathe] Resolv conflict 1502d13 [lewuathe] Resolv conflict d6bd416 [lewuathe] Check existence of scipy.sparse 5d555b1 [lewuathe] Construct scipy.sparse matrix c345a44 [lewuathe] Merge branch 'master' into SPARK-6263 b8b5ef7 [lewuathe] Fix unnecessary sort method d254be7 [lewuathe] Merge branch 'master' into SPARK-6263 62a9c7e [lewuathe] Fix appendBias return type 454c73d [lewuathe] Merge branch 'master' into SPARK-6263 a353354 [lewuathe] Remove unnecessary appendBias implementation 44295c2 [lewuathe] Merge branch 'master' into SPARK-6263 64f72ad [lewuathe] Merge branch 'master' into SPARK-6263 c728046 [lewuathe] Fix style 2980569 [lewuathe] [SPARK-6263] Python MLlib API missing items: Utils
1 parent 31b4a3d commit 184de91

File tree

3 files changed

+74
-0
lines changed

3 files changed

+74
-0
lines changed

mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,15 @@ private[python] class PythonMLLibAPI extends Serializable {
7575
minPartitions: Int): JavaRDD[LabeledPoint] =
7676
MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions)
7777

78+
/**
79+
* Loads and serializes vectors saved with `RDD#saveAsTextFile`.
80+
* @param jsc Java SparkContext
81+
* @param path file or directory path in any Hadoop-supported file system URI
82+
* @return serialized vectors in a RDD
83+
*/
84+
def loadVectors(jsc: JavaSparkContext, path: String): RDD[Vector] =
85+
MLUtils.loadVectors(jsc.sc, path)
86+
7887
private def trainRegressionModel(
7988
learner: GeneralizedLinearAlgorithm[_ <: GeneralizedLinearModel],
8089
data: JavaRDD[LabeledPoint],

python/pyspark/mllib/tests.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
from pyspark.mllib.feature import IDF
5555
from pyspark.mllib.feature import StandardScaler, ElementwiseProduct
5656
from pyspark.mllib.util import LinearDataGenerator
57+
from pyspark.mllib.util import MLUtils
5758
from pyspark.serializers import PickleSerializer
5859
from pyspark.streaming import StreamingContext
5960
from pyspark.sql import SQLContext
@@ -1290,6 +1291,48 @@ def func(rdd):
12901291
self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2)
12911292

12921293

1294+
class MLUtilsTests(MLlibTestCase):
1295+
def test_append_bias(self):
1296+
data = [2.0, 2.0, 2.0]
1297+
ret = MLUtils.appendBias(data)
1298+
self.assertEqual(ret[3], 1.0)
1299+
self.assertEqual(type(ret), DenseVector)
1300+
1301+
def test_append_bias_with_vector(self):
1302+
data = Vectors.dense([2.0, 2.0, 2.0])
1303+
ret = MLUtils.appendBias(data)
1304+
self.assertEqual(ret[3], 1.0)
1305+
self.assertEqual(type(ret), DenseVector)
1306+
1307+
def test_append_bias_with_sp_vector(self):
1308+
data = Vectors.sparse(3, {0: 2.0, 2: 2.0})
1309+
expected = Vectors.sparse(4, {0: 2.0, 2: 2.0, 3: 1.0})
1310+
# Returned value must be SparseVector
1311+
ret = MLUtils.appendBias(data)
1312+
self.assertEqual(ret, expected)
1313+
self.assertEqual(type(ret), SparseVector)
1314+
1315+
def test_load_vectors(self):
1316+
import shutil
1317+
data = [
1318+
[1.0, 2.0, 3.0],
1319+
[1.0, 2.0, 3.0]
1320+
]
1321+
temp_dir = tempfile.mkdtemp()
1322+
load_vectors_path = os.path.join(temp_dir, "test_load_vectors")
1323+
try:
1324+
self.sc.parallelize(data).saveAsTextFile(load_vectors_path)
1325+
ret_rdd = MLUtils.loadVectors(self.sc, load_vectors_path)
1326+
ret = ret_rdd.collect()
1327+
self.assertEqual(len(ret), 2)
1328+
self.assertEqual(ret[0], DenseVector([1.0, 2.0, 3.0]))
1329+
self.assertEqual(ret[1], DenseVector([1.0, 2.0, 3.0]))
1330+
except:
1331+
self.fail()
1332+
finally:
1333+
shutil.rmtree(load_vectors_path)
1334+
1335+
12931336
if __name__ == "__main__":
12941337
if not _have_scipy:
12951338
print("NOTE: Skipping SciPy tests as it does not seem to be installed")

python/pyspark/mllib/util.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,28 @@ def loadLabeledPoints(sc, path, minPartitions=None):
169169
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
170170
return callMLlibFunc("loadLabeledPoints", sc, path, minPartitions)
171171

172+
@staticmethod
173+
def appendBias(data):
174+
"""
175+
Returns a new vector with `1.0` (bias) appended to
176+
the end of the input vector.
177+
"""
178+
vec = _convert_to_vector(data)
179+
if isinstance(vec, SparseVector):
180+
newIndices = np.append(vec.indices, len(vec))
181+
newValues = np.append(vec.values, 1.0)
182+
return SparseVector(len(vec) + 1, newIndices, newValues)
183+
else:
184+
return _convert_to_vector(np.append(vec.toArray(), 1.0))
185+
186+
@staticmethod
187+
def loadVectors(sc, path):
188+
"""
189+
Loads vectors saved using `RDD[Vector].saveAsTextFile`
190+
with the default number of partitions.
191+
"""
192+
return callMLlibFunc("loadVectors", sc, path)
193+
172194

173195
class Saveable(object):
174196
"""

0 commit comments

Comments
 (0)