Skip to content

Commit c45d20c

Browse files
committed
All Spark processes should support spark-defaults.conf, config file
1 parent 7b4f39f commit c45d20c

File tree

8 files changed

+133
-50
lines changed

8 files changed

+133
-50
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import java.io.{File, FileInputStream, IOException}
21-
import java.util.Properties
2220
import java.util.jar.JarFile
2321

2422
import scala.collection.JavaConversions._
2523
import scala.collection.mutable.{ArrayBuffer, HashMap}
2624

27-
import org.apache.spark.SparkException
2825
import org.apache.spark.util.Utils
2926

3027
/**
@@ -63,9 +60,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
6360
val defaultProperties = new HashMap[String, String]()
6461
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
6562
Option(propertiesFile).foreach { filename =>
66-
val file = new File(filename)
67-
SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) =>
68-
if (k.startsWith("spark")) {
63+
Utils.getPropertiesFromFile(filename).foreach { case (k, v) =>
64+
if (k.startsWith("spark.")) {
6965
defaultProperties(k) = v
7066
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
7167
} else {
@@ -90,19 +86,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
9086
*/
9187
private def mergeSparkProperties(): Unit = {
9288
// Use common defaults file, if not specified by user
93-
if (propertiesFile == null) {
94-
val sep = File.separator
95-
val sparkHomeConfig = env.get("SPARK_HOME").map(sparkHome => s"${sparkHome}${sep}conf")
96-
val confDir = env.get("SPARK_CONF_DIR").orElse(sparkHomeConfig)
97-
98-
confDir.foreach { sparkConfDir =>
99-
val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
100-
val file = new File(defaultPath)
101-
if (file.exists()) {
102-
propertiesFile = file.getAbsolutePath
103-
}
104-
}
105-
}
89+
propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile)
10690

10791
val properties = HashMap[String, String]()
10892
properties.putAll(defaultSparkProperties)
@@ -397,23 +381,3 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
397381
SparkSubmit.exitFn()
398382
}
399383
}
400-
401-
object SparkSubmitArguments {
402-
/** Load properties present in the given file. */
403-
def getPropertiesFromFile(file: File): Seq[(String, String)] = {
404-
require(file.exists(), s"Properties file $file does not exist")
405-
require(file.isFile(), s"Properties file $file is not a normal file")
406-
val inputStream = new FileInputStream(file)
407-
try {
408-
val properties = new Properties()
409-
properties.load(inputStream)
410-
properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
411-
} catch {
412-
case e: IOException =>
413-
val message = s"Failed when loading Spark properties file $file"
414-
throw new SparkException(message, e)
415-
} finally {
416-
inputStream.close()
417-
}
418-
}
419-
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ private[spark] object SparkSubmitDriverBootstrapper {
6868
assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set")
6969

7070
// Parse the properties file for the equivalent spark.driver.* configs
71-
val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap
71+
val properties = Utils.getPropertiesFromFile(propertiesFile)
7272
val confDriverMemory = properties.get("spark.driver.memory")
7373
val confLibraryPath = properties.get("spark.driver.extraLibraryPath")
7474
val confClasspath = properties.get("spark.driver.extraClassPath")

core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
package org.apache.spark.deploy.history
1919

2020
import org.apache.spark.SparkConf
21+
import org.apache.spark.util.Utils
2122

2223
/**
2324
* Command-line parser for the master.
2425
*/
2526
private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
2627
private var logDir: String = null
28+
private var propertiesFile: String = null
2729

2830
parse(args.toList)
2931

@@ -32,22 +34,34 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
3234
case ("--dir" | "-d") :: value :: tail =>
3335
logDir = value
3436
conf.set("spark.history.fs.logDirectory", value)
37+
System.setProperty("spark.history.fs.logDirectory", value)
3538
parse(tail)
3639

3740
case ("--help" | "-h") :: tail =>
3841
printUsageAndExit(0)
3942

43+
case ("--properties-file") :: value :: tail =>
44+
propertiesFile = value
45+
parse(tail)
46+
4047
case Nil =>
4148

4249
case _ =>
4350
printUsageAndExit(1)
4451
}
4552
}
4653

54+
// This mutates the SparkConf, so all accesses to it must be made after this line
55+
Utils.loadDefaultSparkProperties(conf, propertiesFile)
56+
4757
private def printUsageAndExit(exitCode: Int) {
4858
System.err.println(
4959
"""
50-
|Usage: HistoryServer
60+
|Usage: HistoryServer [options]
61+
|
62+
|Options:
63+
| --properties-file FILE Path to a custom Spark properties file.
64+
| Default is conf/spark-defaults.conf.
5165
|
5266
|Configuration options can be set by setting the corresponding JVM system property.
5367
|History Server options are always available; additional options depend on the provider.

core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
2727
var host = Utils.localHostName()
2828
var port = 7077
2929
var webUiPort = 8080
30+
var propertiesFile: String = null
3031

3132
// Check for settings in environment variables
3233
if (System.getenv("SPARK_MASTER_HOST") != null) {
@@ -38,12 +39,16 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
3839
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
3940
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
4041
}
42+
43+
parse(args.toList)
44+
45+
// This mutates the SparkConf, so all accesses to it must be made after this line
46+
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
47+
4148
if (conf.contains("spark.master.ui.port")) {
4249
webUiPort = conf.get("spark.master.ui.port").toInt
4350
}
4451

45-
parse(args.toList)
46-
4752
def parse(args: List[String]): Unit = args match {
4853
case ("--ip" | "-i") :: value :: tail =>
4954
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
@@ -63,7 +68,11 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
6368
webUiPort = value
6469
parse(tail)
6570

66-
case ("--help" | "-h") :: tail =>
71+
case ("--properties-file") :: value :: tail =>
72+
propertiesFile = value
73+
parse(tail)
74+
75+
case ("--help") :: tail =>
6776
printUsageAndExit(0)
6877

6978
case Nil => {}
@@ -83,7 +92,9 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
8392
" -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" +
8493
" -h HOST, --host HOST Hostname to listen on\n" +
8594
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
86-
" --webui-port PORT Port for web UI (default: 8080)")
95+
" --webui-port PORT Port for web UI (default: 8080)\n" +
96+
" --properties-file FILE Path to a custom Spark properties file.\n" +
97+
" Default is conf/spark-defaults.conf.")
8798
System.exit(exitCode)
8899
}
89100
}

core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
3333
var memory = inferDefaultMemory()
3434
var masters: Array[String] = null
3535
var workDir: String = null
36+
var propertiesFile: String = null
3637

3738
// Check for settings in environment variables
3839
if (System.getenv("SPARK_WORKER_PORT") != null) {
@@ -47,16 +48,19 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
4748
if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
4849
webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
4950
}
50-
if (conf.contains("spark.worker.ui.port")) {
51-
webUiPort = conf.get("spark.worker.ui.port").toInt
52-
}
5351
if (System.getenv("SPARK_WORKER_DIR") != null) {
5452
workDir = System.getenv("SPARK_WORKER_DIR")
5553
}
5654

5755
parse(args.toList)
5856

5957
checkWorkerMemory()
58+
// This mutates the SparkConf, so all accesses to it must be made after this line
59+
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
60+
61+
if (conf.contains("spark.worker.ui.port")) {
62+
webUiPort = conf.get("spark.worker.ui.port").toInt
63+
}
6064

6165
def parse(args: List[String]): Unit = args match {
6266
case ("--ip" | "-i") :: value :: tail =>
@@ -89,7 +93,11 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
8993
webUiPort = value
9094
parse(tail)
9195

92-
case ("--help" | "-h") :: tail =>
96+
case ("--properties-file") :: value :: tail =>
97+
propertiesFile = value
98+
parse(tail)
99+
100+
case ("--help") :: tail =>
93101
printUsageAndExit(0)
94102

95103
case value :: tail =>
@@ -124,7 +132,9 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
124132
" -i HOST, --ip IP Hostname to listen on (deprecated, please use --host or -h)\n" +
125133
" -h HOST, --host HOST Hostname to listen on\n" +
126134
" -p PORT, --port PORT Port to listen on (default: random)\n" +
127-
" --webui-port PORT Port for web UI (default: 8081)")
135+
" --webui-port PORT Port for web UI (default: 8081)\n" +
136+
" --properties-file FILE Path to a custom Spark properties file.\n" +
137+
" Default is conf/spark-defaults.conf.")
128138
System.exit(exitCode)
129139
}
130140

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1410,6 +1410,64 @@ private[spark] object Utils extends Logging {
14101410
}
14111411
}
14121412

1413+
/**
1414+
* Load default Spark properties from the given file. If no file is provided,
1415+
* use the common defaults file. This mutates state in the given SparkConf and
1416+
* in this JVM's system properties if the config specified in the file is not
1417+
* already set. Return the path of the properties file used.
1418+
*/
1419+
def loadDefaultSparkProperties(conf: SparkConf, filePath: String = null): String = {
1420+
val path = Option(filePath).getOrElse(getDefaultPropertiesFile)
1421+
Option(path).foreach { confFile =>
1422+
getPropertiesFromFile(confFile).filter { case (k, v) =>
1423+
k.startsWith("spark.")
1424+
}.foreach { case (k, v) =>
1425+
conf.setIfMissing(k, v)
1426+
sys.props.getOrElseUpdate(k, v)
1427+
}
1428+
}
1429+
path
1430+
}
1431+
1432+
/** Load properties present in the given file. */
1433+
def getPropertiesFromFile(filename: String): Map[String, String] = {
1434+
val file = new File(filename)
1435+
require(file.exists(), s"Properties file $file does not exist")
1436+
require(file.isFile(), s"Properties file $file is not a normal file")
1437+
1438+
val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8")
1439+
try {
1440+
val properties = new Properties()
1441+
properties.load(inReader)
1442+
properties.stringPropertyNames().map(k => (k, properties(k).trim)).toMap
1443+
} catch {
1444+
case e: IOException =>
1445+
throw new SparkException(s"Failed when loading Spark properties from $filename", e)
1446+
} finally {
1447+
inReader.close()
1448+
}
1449+
}
1450+
1451+
/** Return the path of the default Spark properties file. */
1452+
def getDefaultPropertiesFile(): String = {
1453+
val s = File.separator
1454+
def getAbsolutePath(filePath: String): String = {
1455+
Option(filePath)
1456+
.map(t => new File(t))
1457+
.filter(_.isFile)
1458+
.map(_.getAbsolutePath).orNull
1459+
}
1460+
1461+
val configFile = sys.env.get("SPARK_CONF_DIR")
1462+
.map(t => s"$t${s}spark-defaults.conf")
1463+
.map(getAbsolutePath).orNull
1464+
1465+
Option(configFile).getOrElse(sys.env.get("SPARK_HOME")
1466+
.map(t => s"${t}${s}conf${s}spark-defaults.conf")
1467+
.map(getAbsolutePath)
1468+
.orNull)
1469+
}
1470+
14131471
/** Return a nice string representation of the exception, including the stack trace. */
14141472
def exceptionString(e: Exception): String = {
14151473
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import com.google.common.base.Charsets
2727
import com.google.common.io.Files
2828
import org.scalatest.FunSuite
2929

30+
import org.apache.spark.SparkConf
31+
3032
class UtilsSuite extends FunSuite {
3133

3234
test("bytesToString") {
@@ -332,4 +334,21 @@ class UtilsSuite extends FunSuite {
332334
assert(!tempFile2.exists())
333335
}
334336

337+
test("loading properties from file") {
338+
val outFile = File.createTempFile("test-load-spark-properties", "test")
339+
try {
340+
System.setProperty("spark.test.fileNameLoadB", "2")
341+
Files.write("spark.test.fileNameLoadA true\n" +
342+
"spark.test.fileNameLoadB 1\n", outFile, Charsets.UTF_8)
343+
val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath)
344+
properties
345+
.filter { case (k, v) => k.startsWith("spark.")}
346+
.foreach { case (k, v) => sys.props.getOrElseUpdate(k, v)}
347+
val sparkConf = new SparkConf
348+
assert(sparkConf.getBoolean("spark.test.fileNameLoadA", false) === true)
349+
assert(sparkConf.getInt("spark.test.fileNameLoadB", 1) === 2)
350+
} finally {
351+
outFile.delete()
352+
}
353+
}
335354
}

docs/monitoring.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,13 @@ follows:
7777
one implementation, provided by Spark, which looks for application logs stored in the
7878
file system.</td>
7979
</tr>
80+
<tr>
81+
<td>spark.history.fs.logDirectory</td>
82+
<td>(none)</td>
83+
<td>
84+
Directory that contains application event logs to be loaded by the history server
85+
</td>
86+
</tr>
8087
<tr>
8188
<td>spark.history.fs.updateInterval</td>
8289
<td>10</td>

0 commit comments

Comments
 (0)