Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark

import java.io.File
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import org.apache.spark.util.Utils

/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
Expand All @@ -33,25 +35,33 @@ import scala.collection.mutable.HashMap
* All setter methods in this class support chaining. For example, you can write
* `new SparkConf().setMaster("local").setAppName("My app")`.
*
* The order of precedence for options is system properties > file.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this comment is necessary. The user shouldn't manually set the system properties anyway; the usual mechanism is through some --* option in SparkSubmit.

* Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
* by the user. Spark does not support modifying the configuration at runtime.
*
* @param loadDefaults whether to also load values from Java system properties
* @param loadDefaults whether to also load values from Java system properties, file.
* @param fileName load properties from file
*/
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
class SparkConf(loadDefaults: Boolean, fileName: Option[String])
extends Cloneable with Logging {

import SparkConf._

/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
def this() = this(true, None)

/**
* Create a SparkConf
* @param loadDefaults whether to also load values from Java system properties
*/
def this(loadDefaults: Boolean) = this(loadDefaults, None)

private val settings = new HashMap[String, String]()

if (loadDefaults) {
// Load any spark.* system properties
for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) {
settings(k) = v
}
fileName.foreach(f => loadPropertiesFromFile(f, isOverride = true))
loadSystemProperties()
}

/** Set a configuration variable. */
Expand Down Expand Up @@ -307,6 +317,27 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def toDebugString: String = {
settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
}

/** Load properties from file. */
private[spark] def loadPropertiesFromFile(fileName: String, isOverride: Boolean = false) {
val file = new File(fileName)
if (file.isFile()) {
loadProperties(Utils.getPropertiesFromFile(file.getAbsolutePath), isOverride)
}
}

/** Load any spark.* system properties */
private[spark] def loadSystemProperties() {
loadProperties(sys.props.toSeq, true)
}

private def loadProperties(seq: Seq[(String, String)], isOverride: Boolean) {
for ((k, v) <- seq if k.startsWith("spark.")) {
if (isOverride || settings.get(k).isEmpty) {
settings(k) = v
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I actually don't think any of this logic should exist in SparkConf. The current model is that by the time we get here all the properties from the file are already loaded into sys.props so we can just load those. This is a nice abstraction because SparkConf doesn't have to be concerned with where the properties file is.

}

private[spark] object SparkConf {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I

/* Start the Workers */
for (workerNum <- 1 to numWorkers) {
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
val (workerSystem, _) = Worker.startSystemAndActor(conf, localHostname, 0, 0, coresPerWorker,
memoryPerWorker, masters, null, Some(workerNum))
workerActorSystems += workerSystem
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@

package org.apache.spark.deploy

import java.io.{File, FileInputStream, IOException}
import java.util.Properties
import java.util.jar.JarFile

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.SparkException
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -66,9 +63,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
val defaultProperties = new HashMap[String, String]()
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
Option(propertiesFile).foreach { filename =>
val file = new File(filename)
SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) =>
if (k.startsWith("spark")) {
Utils.getPropertiesFromFile(filename).foreach { case (k, v) =>
if (k.startsWith("spark.")) {
defaultProperties(k) = v
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
} else {
Expand All @@ -85,27 +81,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
*/
private def mergeSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
if (propertiesFile == null) {
sys.env.get("SPARK_CONF_DIR").foreach { sparkConfDir =>
val sep = File.separator
val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
val file = new File(defaultPath)
if (file.exists()) {
propertiesFile = file.getAbsolutePath
}
}
}

if (propertiesFile == null) {
sys.env.get("SPARK_HOME").foreach { sparkHome =>
val sep = File.separator
val defaultPath = s"${sparkHome}${sep}conf${sep}spark-defaults.conf"
val file = new File(defaultPath)
if (file.exists()) {
propertiesFile = file.getAbsolutePath
}
}
}
propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultConfigFile)

val properties = getDefaultSparkProperties
properties.putAll(sparkProperties)
Expand Down Expand Up @@ -400,23 +376,3 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
SparkSubmit.exitFn()
}
}

object SparkSubmitArguments {
/** Load properties present in the given file. */
def getPropertiesFromFile(file: File): Seq[(String, String)] = {
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inputStream = new FileInputStream(file)
try {
val properties = new Properties()
properties.load(inputStream)
properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
} catch {
case e: IOException =>
val message = s"Failed when loading Spark properties file $file"
throw new SparkException(message, e)
} finally {
inputStream.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.spark.deploy.history

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

/**
* Command-line parser for the master.
*/
private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
private var logDir: String = null
private var propertiesFile: String = null

parse(args.toList)

Expand All @@ -37,36 +39,30 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
case ("--help" | "-h") :: tail =>
printUsageAndExit(0)

case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)

case Nil =>

case _ =>
printUsageAndExit(1)
}
}

// Use common defaults file, if not specified by user
propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultConfigFile)
Option(propertiesFile).foreach(f => conf.loadPropertiesFromFile(f))

private def printUsageAndExit(exitCode: Int) {
System.err.println(
"""
|Usage: HistoryServer
|
|Configuration options can be set by setting the corresponding JVM system property.
|History Server options are always available; additional options depend on the provider.
|
|History Server options:
|
| spark.history.ui.port Port where server will listen for connections
| (default 18080)
| spark.history.acls.enable Whether to enable view acls for all applications
| (default false)
| spark.history.provider Name of history provider class (defaults to
| file system-based provider)
| spark.history.retainedApplications Max number of application UIs to keep loaded in memory
| (default 50)
|FsHistoryProvider options:
|Usage: HistoryServer [options]
|
| spark.history.fs.logDirectory Directory where app logs are stored (required)
| spark.history.fs.updateInterval How often to reload log data from storage (in seconds,
| default 10)
|Options:
| -d DIR, --dir DIR Directory where app logs are stored.
| --properties-file FILE Path to a file from which to load extra properties. If not
| specified, this will look for conf/spark-defaults.conf.
|""".stripMargin)
System.exit(exitCode)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
var host = Utils.localHostName()
var port = 7077
var webUiPort = 8080
var propertiesFile: String = null

// Check for settings in environment variables
if (System.getenv("SPARK_MASTER_HOST") != null) {
Expand All @@ -44,6 +45,10 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {

parse(args.toList)

// Use common defaults file, if not specified by user
propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultConfigFile)
Option(propertiesFile).foreach(f => conf.loadPropertiesFromFile(f))

def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
Expand All @@ -63,7 +68,11 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
webUiPort = value
parse(tail)

case ("--help" | "-h") :: tail =>
case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)

case ("--help") :: tail =>
printUsageAndExit(0)

case Nil => {}
Expand All @@ -83,7 +92,9 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
" -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" +
" -h HOST, --host HOST Hostname to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
" --webui-port PORT Port for web UI (default: 8080)")
" --webui-port PORT Port for web UI (default: 8080)\n" +
" --properties-file FILE Path to a file from which to load extra properties. If not \n" +
" specified, this will look for conf/spark-defaults.conf.")
System.exit(exitCode)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,14 @@ private[spark] object Worker extends Logging {
SignalLogger.register(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir)
Option(args.propertiesFile).foreach(t => conf.loadPropertiesFromFile(t))
val (actorSystem, _) = startSystemAndActor(conf, args.host, args.port,
args.webUiPort, args.cores, args.memory, args.masters, args.workDir)
actorSystem.awaitTermination()
}

def startSystemAndActor(
conf: SparkConf,
host: String,
port: Int,
webUiPort: Int,
Expand All @@ -390,7 +392,6 @@ private[spark] object Worker extends Logging {
workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = {

// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val conf = new SparkConf
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val actorName = "Worker"
val securityMgr = new SecurityManager(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
var memory = inferDefaultMemory()
var masters: Array[String] = null
var workDir: String = null
var propertiesFile: String = null

// Check for settings in environment variables
if (System.getenv("SPARK_WORKER_PORT") != null) {
Expand All @@ -55,6 +56,7 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
}

parse(args.toList)
propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultConfigFile)

def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Expand Down Expand Up @@ -87,7 +89,11 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
webUiPort = value
parse(tail)

case ("--help" | "-h") :: tail =>
case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)

case ("--help") :: tail =>
printUsageAndExit(0)

case value :: tail =>
Expand Down Expand Up @@ -122,7 +128,9 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
" -i HOST, --ip IP Hostname to listen on (deprecated, please use --host or -h)\n" +
" -h HOST, --host HOST Hostname to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: random)\n" +
" --webui-port PORT Port for web UI (default: 8081)")
" --webui-port PORT Port for web UI (default: 8081)\n" +
" --properties-file FILE Path to a file from which to load extra properties. If not \n" +
" specified, this will look for conf/spark-defaults.conf.")
System.exit(exitCode)
}

Expand Down
39 changes: 38 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.util
import java.io._
import java.net._
import java.nio.ByteBuffer
import java.util.{Locale, Random, UUID}
import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -1307,6 +1307,43 @@ private[spark] object Utils extends Logging {
}
}

/** Load properties present in the given file. */
def getPropertiesFromFile(filename: String): Seq[(String, String)] = {
val file = new File(filename)
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inputStream = new FileInputStream(file)
getPropertiesFromInputStream(inputStream)
}

/**
* Load properties present in the given inputStream.
* @param inputStream InputStream from where to load properties.
* Expected to contain UTF-8 data. Will be closed by this method.
*/
private[spark] def getPropertiesFromInputStream(inputStream: InputStream):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to put this in a separate function. We won't ever read the Spark configs from another input stream that's not a file, so this can just go into getPropertiesFromFile

Seq[(String, String)] = {
val inReader = new InputStreamReader(inputStream, "UTF-8")
try {
val properties = new Properties()
properties.load(inReader)
properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably a little more efficient:

properties.entrySet().map(e => (e.getKey().asInstanceOf[String], e.getValue().asInstanceOf[String].trim()))

} catch {
case e: IOException =>
val message = s"Failed when loading Spark properties"
throw new SparkException(message, e)
} finally {
inReader.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're keeping this, you should document that this method will close the given input stream.

}
}

private[spark] def getDefaultConfigFile: String = {
val s = File.separator
Seq(
sys.env.get("SPARK_CONF_DIR").map(t => new File(s"$t${s}spark-defaults.conf")),
sys.env.get("SPARK_HOME").map(t => new File(s"${t}${s}conf${s}spark-defaults.conf"))).
filter(_.isDefined).map(_.get).find(_.exists).map(_.getAbsolutePath).orNull
}
/** Return a nice string representation of the exception, including the stack trace. */
def exceptionString(e: Exception): String = {
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)
Expand Down
Loading