Skip to content

Commit 4527761

Browse files
Marcelo Vanzinsrowen
authored andcommitted
[SPARK-6046] [core] Reorganize deprecated config support in SparkConf.
This change tries to follow the chosen way for handling deprecated configs in SparkConf: all values (old and new) are kept in the conf object, and newer names take precedence over older ones when retrieving the value. Warnings are logged when config options are set, which generally happens on the driver node (where the logs are most visible). Author: Marcelo Vanzin <[email protected]> Closes #5514 from vanzin/SPARK-6046 and squashes the following commits: 9371529 [Marcelo Vanzin] Avoid math. 6cf3f11 [Marcelo Vanzin] Review feedback. 2445d48 [Marcelo Vanzin] Fix (and cleanup) update interval initialization. b6824be [Marcelo Vanzin] Clean up the other deprecated config use also. ab20351 [Marcelo Vanzin] Update FsHistoryProvider to only retrieve new config key. 2c93209 [Marcelo Vanzin] [SPARK-6046] [core] Reorganize deprecated config support in SparkConf.
1 parent f7a2564 commit 4527761

File tree

6 files changed

+124
-95
lines changed

6 files changed

+124
-95
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 95 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
6868
if (value == null) {
6969
throw new NullPointerException("null value for " + key)
7070
}
71+
logDeprecationWarning(key)
7172
settings.put(key, value)
7273
this
7374
}
@@ -134,13 +135,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
134135

135136
/** Set multiple parameters together */
136137
def setAll(settings: Traversable[(String, String)]): SparkConf = {
137-
this.settings.putAll(settings.toMap.asJava)
138+
settings.foreach { case (k, v) => set(k, v) }
138139
this
139140
}
140141

141142
/** Set a parameter if it isn't already configured */
142143
def setIfMissing(key: String, value: String): SparkConf = {
143-
settings.putIfAbsent(key, value)
144+
if (settings.putIfAbsent(key, value) == null) {
145+
logDeprecationWarning(key)
146+
}
144147
this
145148
}
146149

@@ -174,45 +177,44 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
174177
getOption(key).getOrElse(defaultValue)
175178
}
176179

177-
/**
178-
* Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
180+
/**
181+
* Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
179182
* suffix is provided then seconds are assumed.
180183
* @throws NoSuchElementException
181184
*/
182185
def getTimeAsSeconds(key: String): Long = {
183186
Utils.timeStringAsSeconds(get(key))
184187
}
185188

186-
/**
187-
* Get a time parameter as seconds, falling back to a default if not set. If no
189+
/**
190+
* Get a time parameter as seconds, falling back to a default if not set. If no
188191
* suffix is provided then seconds are assumed.
189-
*
190192
*/
191193
def getTimeAsSeconds(key: String, defaultValue: String): Long = {
192194
Utils.timeStringAsSeconds(get(key, defaultValue))
193195
}
194196

195-
/**
196-
* Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
197-
* suffix is provided then milliseconds are assumed.
197+
/**
198+
* Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
199+
* suffix is provided then milliseconds are assumed.
198200
* @throws NoSuchElementException
199201
*/
200202
def getTimeAsMs(key: String): Long = {
201203
Utils.timeStringAsMs(get(key))
202204
}
203205

204-
/**
205-
* Get a time parameter as milliseconds, falling back to a default if not set. If no
206-
* suffix is provided then milliseconds are assumed.
206+
/**
207+
* Get a time parameter as milliseconds, falling back to a default if not set. If no
208+
* suffix is provided then milliseconds are assumed.
207209
*/
208210
def getTimeAsMs(key: String, defaultValue: String): Long = {
209211
Utils.timeStringAsMs(get(key, defaultValue))
210212
}
211-
213+
212214

213215
/** Get a parameter as an Option */
214216
def getOption(key: String): Option[String] = {
215-
Option(settings.get(key))
217+
Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
216218
}
217219

218220
/** Get all parameters as a list of pairs */
@@ -379,13 +381,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
379381
}
380382
}
381383
}
382-
383-
// Warn against the use of deprecated configs
384-
deprecatedConfigs.values.foreach { dc =>
385-
if (contains(dc.oldName)) {
386-
dc.warn()
387-
}
388-
}
389384
}
390385

391386
/**
@@ -400,19 +395,44 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
400395

401396
private[spark] object SparkConf extends Logging {
402397

398+
/**
399+
* Maps deprecated config keys to information about the deprecation.
400+
*
401+
* The extra information is logged as a warning when the config is present in the user's
402+
* configuration.
403+
*/
403404
private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
404405
val configs = Seq(
405-
DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
406-
"1.3"),
407-
DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
408-
"Use spark.{driver,executor}.userClassPathFirst instead."),
409-
DeprecatedConfig("spark.history.fs.updateInterval",
410-
"spark.history.fs.update.interval.seconds",
411-
"1.3", "Use spark.history.fs.update.interval.seconds instead"),
412-
DeprecatedConfig("spark.history.updateInterval",
413-
"spark.history.fs.update.interval.seconds",
414-
"1.3", "Use spark.history.fs.update.interval.seconds instead"))
415-
configs.map { x => (x.oldName, x) }.toMap
406+
DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
407+
"Please use spark.{driver,executor}.userClassPathFirst instead."))
408+
Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
409+
}
410+
411+
/**
412+
* Maps a current config key to alternate keys that were used in previous version of Spark.
413+
*
414+
* The alternates are used in the order defined in this map. If deprecated configs are
415+
* present in the user's configuration, a warning is logged.
416+
*/
417+
private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
418+
"spark.executor.userClassPathFirst" -> Seq(
419+
AlternateConfig("spark.files.userClassPathFirst", "1.3")),
420+
"spark.history.fs.update.interval" -> Seq(
421+
AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
422+
AlternateConfig("spark.history.fs.updateInterval", "1.3"),
423+
AlternateConfig("spark.history.updateInterval", "1.3"))
424+
)
425+
426+
/**
427+
* A view of `configsWithAlternatives` that makes it more efficient to look up deprecated
428+
* config keys.
429+
*
430+
* Maps the deprecated config name to a 2-tuple (new config name, alternate config info).
431+
*/
432+
private val allAlternatives: Map[String, (String, AlternateConfig)] = {
433+
configsWithAlternatives.keys.flatMap { key =>
434+
configsWithAlternatives(key).map { cfg => (cfg.key -> (key -> cfg)) }
435+
}.toMap
416436
}
417437

418438
/**
@@ -443,61 +463,57 @@ private[spark] object SparkConf extends Logging {
443463
}
444464

445465
/**
446-
* Translate the configuration key if it is deprecated and has a replacement, otherwise just
447-
* returns the provided key.
448-
*
449-
* @param userKey Configuration key from the user / caller.
450-
* @param warn Whether to print a warning if the key is deprecated. Warnings will be printed
451-
* only once for each key.
466+
* Looks for available deprecated keys for the given config option, and return the first
467+
* value available.
452468
*/
453-
private def translateConfKey(userKey: String, warn: Boolean = false): String = {
454-
deprecatedConfigs.get(userKey)
455-
.map { deprecatedKey =>
456-
if (warn) {
457-
deprecatedKey.warn()
458-
}
459-
deprecatedKey.newName.getOrElse(userKey)
460-
}.getOrElse(userKey)
469+
def getDeprecatedConfig(key: String, conf: SparkConf): Option[String] = {
470+
configsWithAlternatives.get(key).flatMap { alts =>
471+
alts.collectFirst { case alt if conf.contains(alt.key) =>
472+
val value = conf.get(alt.key)
473+
alt.translation.map(_(value)).getOrElse(value)
474+
}
475+
}
461476
}
462477

463478
/**
464-
* Holds information about keys that have been deprecated or renamed.
479+
* Logs a warning message if the given config key is deprecated.
480+
*/
481+
def logDeprecationWarning(key: String): Unit = {
482+
deprecatedConfigs.get(key).foreach { cfg =>
483+
logWarning(
484+
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
485+
s"may be removed in the future. ${cfg.deprecationMessage}")
486+
}
487+
488+
allAlternatives.get(key).foreach { case (newKey, cfg) =>
489+
logWarning(
490+
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
491+
s"and may be removed in the future. Please use the new key '$newKey' instead.")
492+
}
493+
}
494+
495+
/**
496+
* Holds information about keys that have been deprecated and do not have a replacement.
465497
*
466-
* @param oldName Old configuration key.
467-
* @param newName New configuration key, or `null` if key has no replacement, in which case the
468-
* deprecated key will be used (but the warning message will still be printed).
498+
* @param key The deprecated key.
469499
* @param version Version of Spark where key was deprecated.
470-
* @param deprecationMessage Message to include in the deprecation warning; mandatory when
471-
* `newName` is not provided.
500+
* @param deprecationMessage Message to include in the deprecation warning.
472501
*/
473502
private case class DeprecatedConfig(
474-
oldName: String,
475-
_newName: String,
503+
key: String,
476504
version: String,
477-
deprecationMessage: String = null) {
478-
479-
private val warned = new AtomicBoolean(false)
480-
val newName = Option(_newName)
505+
deprecationMessage: String)
481506

482-
if (newName == null && (deprecationMessage == null || deprecationMessage.isEmpty())) {
483-
throw new IllegalArgumentException("Need new config name or deprecation message.")
484-
}
485-
486-
def warn(): Unit = {
487-
if (warned.compareAndSet(false, true)) {
488-
if (newName != null) {
489-
val message = Option(deprecationMessage).getOrElse(
490-
s"Please use the alternative '$newName' instead.")
491-
logWarning(
492-
s"The configuration option '$oldName' has been replaced as of Spark $version and " +
493-
s"may be removed in the future. $message")
494-
} else {
495-
logWarning(
496-
s"The configuration option '$oldName' has been deprecated as of Spark $version and " +
497-
s"may be removed in the future. $deprecationMessage")
498-
}
499-
}
500-
}
507+
/**
508+
* Information about an alternate configuration key that has been deprecated.
509+
*
510+
* @param key The deprecated config key.
511+
* @param version The Spark version in which the key was deprecated.
512+
* @param translation A translation function for converting old config values into new ones.
513+
*/
514+
private case class AlternateConfig(
515+
key: String,
516+
version: String,
517+
translation: Option[String => String] = None)
501518

502-
}
503519
}

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
4949
private val NOT_STARTED = "<Not Started>"
5050

5151
// Interval between each check for event log updates
52-
private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds")
53-
.orElse(conf.getOption("spark.history.fs.updateInterval"))
54-
.orElse(conf.getOption("spark.history.updateInterval"))
55-
.map(_.toInt)
56-
.getOrElse(10) * 1000
52+
private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
5753

5854
// Interval between each cleaner checks for event logs to delete
5955
private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleaner.interval.seconds",
@@ -130,8 +126,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
130126
// Disable the background thread during tests.
131127
if (!conf.contains("spark.testing")) {
132128
// A task that periodically checks for event log updates on disk.
133-
pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS,
134-
TimeUnit.MILLISECONDS)
129+
pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
135130

136131
if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
137132
// A task that periodically cleans event logs on disk.

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,7 @@ private[spark] class Executor(
8989
ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME, new ExecutorEndpoint(env.rpcEnv, executorId))
9090

9191
// Whether to load classes in user jars before those in Spark jars
92-
private val userClassPathFirst: Boolean = {
93-
conf.getBoolean("spark.executor.userClassPathFirst",
94-
conf.getBoolean("spark.files.userClassPathFirst", false))
95-
}
92+
private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false)
9693

9794
// Create our ClassLoader
9895
// do this after SparkEnv creation so can access the SecurityManager

core/src/test/scala/org/apache/spark/SparkConfSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,28 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
197197
serializer.newInstance().serialize(new StringBuffer())
198198
}
199199

200+
test("deprecated configs") {
201+
val conf = new SparkConf()
202+
val newName = "spark.history.fs.update.interval"
203+
204+
assert(!conf.contains(newName))
205+
206+
conf.set("spark.history.updateInterval", "1")
207+
assert(conf.get(newName) === "1")
208+
209+
conf.set("spark.history.fs.updateInterval", "2")
210+
assert(conf.get(newName) === "2")
211+
212+
conf.set("spark.history.fs.update.interval.seconds", "3")
213+
assert(conf.get(newName) === "3")
214+
215+
conf.set(newName, "4")
216+
assert(conf.get(newName) === "4")
217+
218+
val count = conf.getAll.filter { case (k, v) => k.startsWith("spark.history.") }.size
219+
assert(count === 4)
220+
}
221+
200222
}
201223

202224
class Class1 {}

docs/monitoring.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,10 @@ follows:
8686
</td>
8787
</tr>
8888
<tr>
89-
<td>spark.history.fs.update.interval.seconds</td>
90-
<td>10</td>
89+
<td>spark.history.fs.update.interval</td>
90+
<td>10s</td>
9191
<td>
92-
The period, in seconds, at which information displayed by this history server is updated.
92+
The period at which information displayed by this history server is updated.
9393
Each update checks for any changes made to the event logs in persisted storage.
9494
</td>
9595
</tr>

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,8 +1052,7 @@ object Client extends Logging {
10521052
if (isDriver) {
10531053
conf.getBoolean("spark.driver.userClassPathFirst", false)
10541054
} else {
1055-
conf.getBoolean("spark.executor.userClassPathFirst",
1056-
conf.getBoolean("spark.files.userClassPathFirst", false))
1055+
conf.getBoolean("spark.executor.userClassPathFirst", false)
10571056
}
10581057
}
10591058

0 commit comments

Comments
 (0)