Skip to content

Commit 996e2d4

Browse files
committed
[SPARK-7654] [MLLIB] Migrate MLlib to the DataFrame reader/writer API
parquetFile -> read.parquet rxin Author: Xiangrui Meng <[email protected]> Closes #6281 from mengxr/SPARK-7654 and squashes the following commits: a79b612 [Xiangrui Meng] parquetFile -> read.parquet (cherry picked from commit 589b12f) Signed-off-by: Xiangrui Meng <[email protected]>
1 parent 10698e1 commit 996e2d4

File tree

10 files changed

+12
-12
lines changed

10 files changed

+12
-12
lines changed

mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
153153
def load(sc: SparkContext, path: String): NaiveBayesModel = {
154154
val sqlContext = new SQLContext(sc)
155155
// Load Parquet data.
156-
val dataRDD = sqlContext.parquetFile(dataPath(path))
156+
val dataRDD = sqlContext.read.parquet(dataPath(path))
157157
// Check schema explicitly since erasure makes it hard to use match-case for checking.
158158
checkSchema[Data](dataRDD.schema)
159159
val dataArray = dataRDD.select("labels", "pi", "theta", "modelType").take(1)
@@ -199,7 +199,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
199199
def load(sc: SparkContext, path: String): NaiveBayesModel = {
200200
val sqlContext = new SQLContext(sc)
201201
// Load Parquet data.
202-
val dataRDD = sqlContext.parquetFile(dataPath(path))
202+
val dataRDD = sqlContext.read.parquet(dataPath(path))
203203
// Check schema explicitly since erasure makes it hard to use match-case for checking.
204204
checkSchema[Data](dataRDD.schema)
205205
val dataArray = dataRDD.select("labels", "pi", "theta").take(1)

mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private[classification] object GLMClassificationModel {
7575
def loadData(sc: SparkContext, path: String, modelClass: String): Data = {
7676
val datapath = Loader.dataPath(path)
7777
val sqlContext = new SQLContext(sc)
78-
val dataRDD = sqlContext.parquetFile(datapath)
78+
val dataRDD = sqlContext.read.parquet(datapath)
7979
val dataArray = dataRDD.select("weights", "intercept", "threshold").take(1)
8080
assert(dataArray.size == 1, s"Unable to load $modelClass data from: $datapath")
8181
val data = dataArray(0)

mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
132132
def load(sc: SparkContext, path: String): GaussianMixtureModel = {
133133
val dataPath = Loader.dataPath(path)
134134
val sqlContext = new SQLContext(sc)
135-
val dataFrame = sqlContext.parquetFile(dataPath)
135+
val dataFrame = sqlContext.read.parquet(dataPath)
136136
val dataArray = dataFrame.select("weight", "mu", "sigma").collect()
137137

138138
// Check schema explicitly since erasure makes it hard to use match-case for checking.

mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ object KMeansModel extends Loader[KMeansModel] {
120120
assert(className == thisClassName)
121121
assert(formatVersion == thisFormatVersion)
122122
val k = (metadata \ "k").extract[Int]
123-
val centriods = sqlContext.parquetFile(Loader.dataPath(path))
123+
val centriods = sqlContext.read.parquet(Loader.dataPath(path))
124124
Loader.checkSchema[Cluster](centriods.schema)
125125
val localCentriods = centriods.map(Cluster.apply).collect()
126126
assert(k == localCentriods.size)

mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
556556
def load(sc: SparkContext, path: String): Word2VecModel = {
557557
val dataPath = Loader.dataPath(path)
558558
val sqlContext = new SQLContext(sc)
559-
val dataFrame = sqlContext.parquetFile(dataPath)
559+
val dataFrame = sqlContext.read.parquet(dataPath)
560560

561561
val dataArray = dataFrame.select("word", "vector").collect()
562562

mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,11 +292,11 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
292292
assert(className == thisClassName)
293293
assert(formatVersion == thisFormatVersion)
294294
val rank = (metadata \ "rank").extract[Int]
295-
val userFeatures = sqlContext.parquetFile(userPath(path))
295+
val userFeatures = sqlContext.read.parquet(userPath(path))
296296
.map { case Row(id: Int, features: Seq[_]) =>
297297
(id, features.asInstanceOf[Seq[Double]].toArray)
298298
}
299-
val productFeatures = sqlContext.parquetFile(productPath(path))
299+
val productFeatures = sqlContext.read.parquet(productPath(path))
300300
.map { case Row(id: Int, features: Seq[_]) =>
301301
(id, features.asInstanceOf[Seq[Double]].toArray)
302302
}

mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
189189

190190
def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = {
191191
val sqlContext = new SQLContext(sc)
192-
val dataRDD = sqlContext.parquetFile(dataPath(path))
192+
val dataRDD = sqlContext.read.parquet(dataPath(path))
193193

194194
checkSchema[Data](dataRDD.schema)
195195
val dataArray = dataRDD.select("boundary", "prediction").collect()

mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ private[regression] object GLMRegressionModel {
7272
def loadData(sc: SparkContext, path: String, modelClass: String, numFeatures: Int): Data = {
7373
val datapath = Loader.dataPath(path)
7474
val sqlContext = new SQLContext(sc)
75-
val dataRDD = sqlContext.parquetFile(datapath)
75+
val dataRDD = sqlContext.read.parquet(datapath)
7676
val dataArray = dataRDD.select("weights", "intercept").take(1)
7777
assert(dataArray.size == 1, s"Unable to load $modelClass data from: $datapath")
7878
val data = dataArray(0)

mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging {
230230
val datapath = Loader.dataPath(path)
231231
val sqlContext = new SQLContext(sc)
232232
// Load Parquet data.
233-
val dataRDD = sqlContext.parquetFile(datapath)
233+
val dataRDD = sqlContext.read.parquet(datapath)
234234
// Check schema explicitly since erasure makes it hard to use match-case for checking.
235235
Loader.checkSchema[NodeData](dataRDD.schema)
236236
val nodes = dataRDD.map(NodeData.apply)

mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ private[tree] object TreeEnsembleModel extends Logging {
437437
treeAlgo: String): Array[DecisionTreeModel] = {
438438
val datapath = Loader.dataPath(path)
439439
val sqlContext = new SQLContext(sc)
440-
val nodes = sqlContext.parquetFile(datapath).map(NodeData.apply)
440+
val nodes = sqlContext.read.parquet(datapath).map(NodeData.apply)
441441
val trees = constructTrees(nodes)
442442
trees.map(new DecisionTreeModel(_, Algo.fromString(treeAlgo)))
443443
}

0 commit comments

Comments
 (0)