From 62f567799050f9839835376ab72d2158adb29cf4 Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Wed, 30 Jul 2014 15:50:21 +0800 Subject: [PATCH] All Spark processes should support spark-defaults.conf, config file. --- .../scala/org/apache/spark/SparkConf.scala | 45 ++++++++++++++--- .../spark/deploy/LocalSparkCluster.scala | 2 +- .../spark/deploy/SparkSubmitArguments.scala | 50 ++----------------- .../history/HistoryServerArguments.scala | 34 ++++++------- .../spark/deploy/master/MasterArguments.scala | 15 +++++- .../apache/spark/deploy/worker/Worker.scala | 7 +-- .../spark/deploy/worker/WorkerArguments.scala | 12 ++++- .../scala/org/apache/spark/util/Utils.scala | 39 ++++++++++++++- core/src/test/resources/test-spark.conf | 18 +++++++ .../org/apache/spark/SparkConfSuite.scala | 22 ++++++++ .../org/apache/spark/util/UtilsSuite.scala | 5 ++ docs/monitoring.md | 7 +++ 12 files changed, 174 insertions(+), 82 deletions(-) create mode 100644 core/src/test/resources/test-spark.conf diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 13f0bff7ee507..bea5dfe59fcd6 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -17,8 +17,10 @@ package org.apache.spark +import java.io.File import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap +import org.apache.spark.util.Utils /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. @@ -33,25 +35,33 @@ import scala.collection.mutable.HashMap * All setter methods in this class support chaining. For example, you can write * `new SparkConf().setMaster("local").setAppName("My app")`. * + * The order of precedence for options is system properties > file. + * * Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified * by the user. Spark does not support modifying the configuration at runtime. * - * @param loadDefaults whether to also load values from Java system properties + * @param loadDefaults whether to also load values from Java system properties, file. + * @param fileName load properties from file */ -class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { +class SparkConf(loadDefaults: Boolean, fileName: Option[String]) + extends Cloneable with Logging { import SparkConf._ /** Create a SparkConf that loads defaults from system properties and the classpath */ - def this() = this(true) + def this() = this(true, None) + + /** + * Create a SparkConf + * @param loadDefaults whether to also load values from Java system properties + */ + def this(loadDefaults: Boolean) = this(loadDefaults, None) private val settings = new HashMap[String, String]() if (loadDefaults) { - // Load any spark.* system properties - for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) { - settings(k) = v - } + fileName.foreach(f => loadPropertiesFromFile(f, isOverride = true)) + loadSystemProperties() } /** Set a configuration variable. */ @@ -307,6 +317,27 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { def toDebugString: String = { settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n") } + + /** Load properties from file. */ + private[spark] def loadPropertiesFromFile(fileName: String, isOverride: Boolean = false) { + val file = new File(fileName) + if (file.isFile()) { + loadProperties(Utils.getPropertiesFromFile(file.getAbsolutePath), isOverride) + } + } + + /** Load any spark.* system properties */ + private[spark] def loadSystemProperties() { + loadProperties(sys.props.toSeq, true) + } + + private def loadProperties(seq: Seq[(String, String)], isOverride: Boolean) { + for ((k, v) <- seq if k.startsWith("spark.")) { + if (isOverride || settings.get(k).isEmpty) { + settings(k) = v + } + } + } } private[spark] object SparkConf { diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 9a7a113c95715..33dd519a5fbe2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -52,7 +52,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I /* Start the Workers */ for (workerNum <- 1 to numWorkers) { - val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker, + val (workerSystem, _) = Worker.startSystemAndActor(conf, localHostname, 0, 0, coresPerWorker, memoryPerWorker, masters, null, Some(workerNum)) workerActorSystems += workerSystem } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index d545f58c5da7e..0c5fb3aadbbe3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -17,14 +17,11 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties import java.util.jar.JarFile import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} -import org.apache.spark.SparkException import org.apache.spark.util.Utils /** @@ -66,9 +63,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { val defaultProperties = new HashMap[String, String]() if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile") Option(propertiesFile).foreach { filename => - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) => - if (k.startsWith("spark")) { + Utils.getPropertiesFromFile(filename).foreach { case (k, v) => + if (k.startsWith("spark.")) { defaultProperties(k) = v if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v") } else { @@ -85,27 +81,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { */ private def mergeSparkProperties(): Unit = { // Use common defaults file, if not specified by user - if (propertiesFile == null) { - sys.env.get("SPARK_CONF_DIR").foreach { sparkConfDir => - val sep = File.separator - val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf" - val file = new File(defaultPath) - if (file.exists()) { - propertiesFile = file.getAbsolutePath - } - } - } - - if (propertiesFile == null) { - sys.env.get("SPARK_HOME").foreach { sparkHome => - val sep = File.separator - val defaultPath = s"${sparkHome}${sep}conf${sep}spark-defaults.conf" - val file = new File(defaultPath) - if (file.exists()) { - propertiesFile = file.getAbsolutePath - } - } - } + propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultConfigFile) val properties = getDefaultSparkProperties properties.putAll(sparkProperties) @@ -400,23 +376,3 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { SparkSubmit.exitFn() } } - -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { - require(file.exists(), s"Properties file $file does not exist") - require(file.isFile(), s"Properties file $file is not a normal file") - val inputStream = new FileInputStream(file) - try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim)) - } catch { - case e: IOException => - val message = s"Failed when loading Spark properties file $file" - throw new SparkException(message, e) - } finally { - inputStream.close() - } - } -} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala index 25fc76c23e0fb..094606d2a28d9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -18,12 +18,14 @@ package org.apache.spark.deploy.history import org.apache.spark.SparkConf +import org.apache.spark.util.Utils /** * Command-line parser for the master. */ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) { private var logDir: String = null + private var propertiesFile: String = null parse(args.toList) @@ -37,6 +39,10 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String] case ("--help" | "-h") :: tail => printUsageAndExit(0) + case ("--properties-file") :: value :: tail => + propertiesFile = value + parse(tail) + case Nil => case _ => @@ -44,29 +50,19 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String] } } + // Use common defaults file, if not specified by user + propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultConfigFile) + Option(propertiesFile).foreach(f => conf.loadPropertiesFromFile(f)) + private def printUsageAndExit(exitCode: Int) { System.err.println( """ - |Usage: HistoryServer - | - |Configuration options can be set by setting the corresponding JVM system property. - |History Server options are always available; additional options depend on the provider. - | - |History Server options: - | - | spark.history.ui.port Port where server will listen for connections - | (default 18080) - | spark.history.acls.enable Whether to enable view acls for all applications - | (default false) - | spark.history.provider Name of history provider class (defaults to - | file system-based provider) - | spark.history.retainedApplications Max number of application UIs to keep loaded in memory - | (default 50) - |FsHistoryProvider options: + |Usage: HistoryServer [options] | - | spark.history.fs.logDirectory Directory where app logs are stored (required) - | spark.history.fs.updateInterval How often to reload log data from storage (in seconds, - | default 10) + |Options: + | -d DIR, --dir DIR Directory where app logs are stored. + | --properties-file FILE Path to a file from which to load extra properties. If not + | specified, this will look for conf/spark-defaults.conf. |""".stripMargin) System.exit(exitCode) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala index 4b0dbbe543d3f..176f2865c7b0d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala @@ -27,6 +27,7 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) { var host = Utils.localHostName() var port = 7077 var webUiPort = 8080 + var propertiesFile: String = null // Check for settings in environment variables if (System.getenv("SPARK_MASTER_HOST") != null) { @@ -44,6 +45,10 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) { parse(args.toList) + // Use common defaults file, if not specified by user + propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultConfigFile) + Option(propertiesFile).foreach(f => conf.loadPropertiesFromFile(f)) + def parse(args: List[String]): Unit = args match { case ("--ip" | "-i") :: value :: tail => Utils.checkHost(value, "ip no longer supported, please use hostname " + value) @@ -63,7 +68,11 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) { webUiPort = value parse(tail) - case ("--help" | "-h") :: tail => + case ("--properties-file") :: value :: tail => + propertiesFile = value + parse(tail) + + case ("--help") :: tail => printUsageAndExit(0) case Nil => {} @@ -83,7 +92,9 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) { " -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" + " -h HOST, --host HOST Hostname to listen on\n" + " -p PORT, --port PORT Port to listen on (default: 7077)\n" + - " --webui-port PORT Port for web UI (default: 8080)") + " --webui-port PORT Port for web UI (default: 8080)\n" + + " --properties-file FILE Path to a file from which to load extra properties. If not \n" + + " specified, this will look for conf/spark-defaults.conf.") System.exit(exitCode) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index bacb514ed6335..aa378255483f1 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -375,12 +375,14 @@ private[spark] object Worker extends Logging { SignalLogger.register(log) val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) - val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, - args.memory, args.masters, args.workDir) + Option(args.propertiesFile).foreach(t => conf.loadPropertiesFromFile(t)) + val (actorSystem, _) = startSystemAndActor(conf, args.host, args.port, + args.webUiPort, args.cores, args.memory, args.masters, args.workDir) actorSystem.awaitTermination() } def startSystemAndActor( + conf: SparkConf, host: String, port: Int, webUiPort: Int, @@ -390,7 +392,6 @@ private[spark] object Worker extends Logging { workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems - val conf = new SparkConf val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") val actorName = "Worker" val securityMgr = new SecurityManager(conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index 1e295aaa48c30..b69d9b2bc8290 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -33,6 +33,7 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) { var memory = inferDefaultMemory() var masters: Array[String] = null var workDir: String = null + var propertiesFile: String = null // Check for settings in environment variables if (System.getenv("SPARK_WORKER_PORT") != null) { @@ -55,6 +56,7 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) { } parse(args.toList) + propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultConfigFile) def parse(args: List[String]): Unit = args match { case ("--ip" | "-i") :: value :: tail => @@ -87,7 +89,11 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) { webUiPort = value parse(tail) - case ("--help" | "-h") :: tail => + case ("--properties-file") :: value :: tail => + propertiesFile = value + parse(tail) + + case ("--help") :: tail => printUsageAndExit(0) case value :: tail => @@ -122,7 +128,9 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) { " -i HOST, --ip IP Hostname to listen on (deprecated, please use --host or -h)\n" + " -h HOST, --host HOST Hostname to listen on\n" + " -p PORT, --port PORT Port to listen on (default: random)\n" + - " --webui-port PORT Port for web UI (default: 8081)") + " --webui-port PORT Port for web UI (default: 8081)\n" + + " --properties-file FILE Path to a file from which to load extra properties. If not \n" + + " specified, this will look for conf/spark-defaults.conf.") System.exit(exitCode) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c60be4f8a11d2..bf1513a4e47d7 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -20,7 +20,7 @@ package org.apache.spark.util import java.io._ import java.net._ import java.nio.ByteBuffer -import java.util.{Locale, Random, UUID} +import java.util.{Locale, Properties, Random, UUID} import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor} import scala.collection.JavaConversions._ @@ -1307,6 +1307,43 @@ private[spark] object Utils extends Logging { } } + /** Load properties present in the given file. */ + def getPropertiesFromFile(filename: String): Seq[(String, String)] = { + val file = new File(filename) + require(file.exists(), s"Properties file $file does not exist") + require(file.isFile(), s"Properties file $file is not a normal file") + val inputStream = new FileInputStream(file) + getPropertiesFromInputStream(inputStream) + } + + /** + * Load properties present in the given inputStream. + * @param inputStream InputStream from where to load properties. + * Expected to contain UTF-8 data. Will be closed by this method. + */ + private[spark] def getPropertiesFromInputStream(inputStream: InputStream): + Seq[(String, String)] = { + val inReader = new InputStreamReader(inputStream, "UTF-8") + try { + val properties = new Properties() + properties.load(inReader) + properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim)) + } catch { + case e: IOException => + val message = s"Failed when loading Spark properties" + throw new SparkException(message, e) + } finally { + inReader.close() + } + } + + private[spark] def getDefaultConfigFile: String = { + val s = File.separator + Seq( + sys.env.get("SPARK_CONF_DIR").map(t => new File(s"$t${s}spark-defaults.conf")), + sys.env.get("SPARK_HOME").map(t => new File(s"${t}${s}conf${s}spark-defaults.conf"))). + filter(_.isDefined).map(_.get).find(_.exists).map(_.getAbsolutePath).orNull + } /** Return a nice string representation of the exception, including the stack trace. */ def exceptionString(e: Exception): String = { if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace) diff --git a/core/src/test/resources/test-spark.conf b/core/src/test/resources/test-spark.conf new file mode 100644 index 0000000000000..ef9e16d47ff2a --- /dev/null +++ b/core/src/test/resources/test-spark.conf @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +spark.test.streamLoad true diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 87e9012622456..1024c49fbb58e 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -17,6 +17,10 @@ package org.apache.spark +import com.google.common.io.Files +import java.io.File +import java.nio.charset.Charset + import org.scalatest.FunSuite class SparkConfSuite extends FunSuite with LocalSparkContext { @@ -30,6 +34,24 @@ class SparkConfSuite extends FunSuite with LocalSparkContext { } } + test("loading from file") { + val outFile = File.createTempFile("sparkConf-loading-from-file", "") + try { + Files.write("spark.test.fileNameLoad true\n", outFile, Charset.forName("UTF-8")) + assert(new SparkConf(true, Some(outFile.getAbsolutePath)). + get("spark.test.fileNameLoad") === "true") + + System.setProperty("spark.test.fileNameLoad", "false") + val conf = new SparkConf(true, Some(outFile.getAbsolutePath)) + assert(conf.get("spark.test.fileNameLoad") === "false") + } finally { + System.clearProperty("spark.test.fileNameLoad") + if (outFile != null) { + outFile.delete() + } + } + } + test("initializing without loading defaults") { try { System.setProperty("spark.test.testProperty", "2") diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 70d423ba8a04d..0421ca776d0c9 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -297,4 +297,9 @@ class UtilsSuite extends FunSuite { } } + test("getPropertiesFromInputStream") { + val in = Utils.getSparkClassLoader.getResourceAsStream("test-spark.conf") + val properties = Utils.getPropertiesFromInputStream(in).toMap + assert(properties("spark.test.streamLoad") === "true") + } } diff --git a/docs/monitoring.md b/docs/monitoring.md index d07ec4a57a2cc..73e3fb403247e 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -77,6 +77,13 @@ follows: one implementation, provided by Spark, which looks for application logs stored in the file system. + + spark.history.fs.logDirectory + (none) + + Directory where app logs are stored. + + spark.history.fs.updateInterval 10