Skip to content

Commit 6c686ac

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into clean-moar
2 parents 79a435b + b631bf7 commit 6c686ac

File tree

21 files changed

+796
-98
lines changed

21 files changed

+796
-98
lines changed

docs/running-on-yarn.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,22 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
7171
</tr>
7272
<tr>
7373
<td><code>spark.yarn.scheduler.heartbeat.interval-ms</code></td>
74-
<td>5000</td>
74+
<td>3000</td>
7575
<td>
7676
The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager.
77+
The value is capped at half the value of YARN's configuration for the expiry interval
78+
(<code>yarn.am.liveness-monitor.expiry-interval-ms</code>).
79+
</td>
80+
</tr>
81+
<tr>
82+
<td><code>spark.yarn.scheduler.initial-allocation.interval</code></td>
83+
<td>200ms</td>
84+
<td>
85+
The initial interval in which the Spark application master eagerly heartbeats to the YARN ResourceManager
86+
when there are pending container allocation requests. It should be no larger than
87+
<code>spark.yarn.scheduler.heartbeat.interval-ms</code>. The allocation interval will doubled on
88+
successive eager heartbeats if pending containers still exist, until
89+
<code>spark.yarn.scheduler.heartbeat.interval-ms</code> is reached.
7790
</td>
7891
</tr>
7992
<tr>

mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
153153
def load(sc: SparkContext, path: String): NaiveBayesModel = {
154154
val sqlContext = new SQLContext(sc)
155155
// Load Parquet data.
156-
val dataRDD = sqlContext.parquetFile(dataPath(path))
156+
val dataRDD = sqlContext.read.parquet(dataPath(path))
157157
// Check schema explicitly since erasure makes it hard to use match-case for checking.
158158
checkSchema[Data](dataRDD.schema)
159159
val dataArray = dataRDD.select("labels", "pi", "theta", "modelType").take(1)
@@ -199,7 +199,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
199199
def load(sc: SparkContext, path: String): NaiveBayesModel = {
200200
val sqlContext = new SQLContext(sc)
201201
// Load Parquet data.
202-
val dataRDD = sqlContext.parquetFile(dataPath(path))
202+
val dataRDD = sqlContext.read.parquet(dataPath(path))
203203
// Check schema explicitly since erasure makes it hard to use match-case for checking.
204204
checkSchema[Data](dataRDD.schema)
205205
val dataArray = dataRDD.select("labels", "pi", "theta").take(1)

mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private[classification] object GLMClassificationModel {
7575
def loadData(sc: SparkContext, path: String, modelClass: String): Data = {
7676
val datapath = Loader.dataPath(path)
7777
val sqlContext = new SQLContext(sc)
78-
val dataRDD = sqlContext.parquetFile(datapath)
78+
val dataRDD = sqlContext.read.parquet(datapath)
7979
val dataArray = dataRDD.select("weights", "intercept", "threshold").take(1)
8080
assert(dataArray.size == 1, s"Unable to load $modelClass data from: $datapath")
8181
val data = dataArray(0)

mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
132132
def load(sc: SparkContext, path: String): GaussianMixtureModel = {
133133
val dataPath = Loader.dataPath(path)
134134
val sqlContext = new SQLContext(sc)
135-
val dataFrame = sqlContext.parquetFile(dataPath)
135+
val dataFrame = sqlContext.read.parquet(dataPath)
136136
val dataArray = dataFrame.select("weight", "mu", "sigma").collect()
137137

138138
// Check schema explicitly since erasure makes it hard to use match-case for checking.

mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ object KMeansModel extends Loader[KMeansModel] {
120120
assert(className == thisClassName)
121121
assert(formatVersion == thisFormatVersion)
122122
val k = (metadata \ "k").extract[Int]
123-
val centriods = sqlContext.parquetFile(Loader.dataPath(path))
123+
val centriods = sqlContext.read.parquet(Loader.dataPath(path))
124124
Loader.checkSchema[Cluster](centriods.schema)
125125
val localCentriods = centriods.map(Cluster.apply).collect()
126126
assert(k == localCentriods.size)

mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.evaluation
1919

2020
import org.apache.spark.rdd.RDD
2121
import org.apache.spark.SparkContext._
22+
import org.apache.spark.sql.DataFrame
2223

2324
/**
2425
* Evaluator for multilabel classification.
@@ -27,6 +28,13 @@ import org.apache.spark.SparkContext._
2728
*/
2829
class MultilabelMetrics(predictionAndLabels: RDD[(Array[Double], Array[Double])]) {
2930

31+
/**
32+
* An auxiliary constructor taking a DataFrame.
33+
* @param predictionAndLabels a DataFrame with two double array columns: prediction and label
34+
*/
35+
private[mllib] def this(predictionAndLabels: DataFrame) =
36+
this(predictionAndLabels.map(r => (r.getSeq[Double](0).toArray, r.getSeq[Double](1).toArray)))
37+
3038
private lazy val numDocs: Long = predictionAndLabels.count()
3139

3240
private lazy val numLabels: Long = predictionAndLabels.flatMap { case (_, labels) =>

mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ class Word2Vec extends Serializable with Logging {
158158
.sortWith((a, b) => a.cn > b.cn)
159159

160160
vocabSize = vocab.length
161+
require(vocabSize > 0, "The vocabulary size should be > 0. You may need to check " +
162+
"the setting of minCount, which could be large enough to remove all your words in sentences.")
163+
161164
var a = 0
162165
while (a < vocabSize) {
163166
vocabHash += vocab(a).word -> a
@@ -556,7 +559,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
556559
def load(sc: SparkContext, path: String): Word2VecModel = {
557560
val dataPath = Loader.dataPath(path)
558561
val sqlContext = new SQLContext(sc)
559-
val dataFrame = sqlContext.parquetFile(dataPath)
562+
val dataFrame = sqlContext.read.parquet(dataPath)
560563

561564
val dataArray = dataFrame.select("word", "vector").collect()
562565

mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,11 +292,11 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
292292
assert(className == thisClassName)
293293
assert(formatVersion == thisFormatVersion)
294294
val rank = (metadata \ "rank").extract[Int]
295-
val userFeatures = sqlContext.parquetFile(userPath(path))
295+
val userFeatures = sqlContext.read.parquet(userPath(path))
296296
.map { case Row(id: Int, features: Seq[_]) =>
297297
(id, features.asInstanceOf[Seq[Double]].toArray)
298298
}
299-
val productFeatures = sqlContext.parquetFile(productPath(path))
299+
val productFeatures = sqlContext.read.parquet(productPath(path))
300300
.map { case Row(id: Int, features: Seq[_]) =>
301301
(id, features.asInstanceOf[Seq[Double]].toArray)
302302
}

mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
189189

190190
def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = {
191191
val sqlContext = new SQLContext(sc)
192-
val dataRDD = sqlContext.parquetFile(dataPath(path))
192+
val dataRDD = sqlContext.read.parquet(dataPath(path))
193193

194194
checkSchema[Data](dataRDD.schema)
195195
val dataArray = dataRDD.select("boundary", "prediction").collect()

mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ private[regression] object GLMRegressionModel {
7272
def loadData(sc: SparkContext, path: String, modelClass: String, numFeatures: Int): Data = {
7373
val datapath = Loader.dataPath(path)
7474
val sqlContext = new SQLContext(sc)
75-
val dataRDD = sqlContext.parquetFile(datapath)
75+
val dataRDD = sqlContext.read.parquet(datapath)
7676
val dataArray = dataRDD.select("weights", "intercept").take(1)
7777
assert(dataArray.size == 1, s"Unable to load $modelClass data from: $datapath")
7878
val data = dataArray(0)

0 commit comments

Comments
 (0)