Skip to content

Commit 57517b8

Browse files
author
Marcelo Vanzin
committed
Review feedback. Mostly, more consistent use of Scala's Option.
1 parent 311e49d commit 57517b8

File tree

8 files changed

+40
-38
lines changed

8 files changed

+40
-38
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1255,6 +1255,8 @@ class SparkContext(config: SparkConf) extends Logging {
12551255

12561256
/** Post the application start event */
12571257
private def postApplicationStart() {
1258+
// Note: this code assumes that the task scheduler has been initialized and has contacted
1259+
// the cluster manager to get an application ID (in case the cluster manager provides one).
12581260
listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
12591261
startTime, sparkUser))
12601262
}

core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ private[spark] abstract class ApplicationHistoryProvider {
4242
* @param appId The application ID.
4343
* @return The application's UI, or null if application is not found.
4444
*/
45-
def getAppUI(appId: String): SparkUI
45+
def getAppUI(appId: String): Option[SparkUI]
4646

4747
/**
4848
* Called when the server is shutting down.

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import org.apache.spark.util.Utils
3131
private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
3232
with Logging {
3333

34+
private val NOT_STARTED = "<Not Started>"
35+
3436
// Interval between each check for event log updates
3537
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
3638
conf.getInt("spark.history.updateInterval", 10)) * 1000
@@ -98,7 +100,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
98100

99101
override def getListing() = appList.values
100102

101-
override def getAppUI(appId: String): SparkUI = {
103+
override def getAppUI(appId: String): Option[SparkUI] = {
102104
try {
103105
appList.get(appId).map(info => {
104106
val (replayBus, appListener) = createReplayBus(fs.getFileStatus(
@@ -114,15 +116,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
114116
replayBus.replay()
115117

116118
// Note that this does not have any effect due to SPARK-2169.
117-
ui.setAppName(s"${appListener.appName} ($appId)")
119+
ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)")
118120

119121
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
120122
ui.getSecurityManager.setUIAcls(uiAclsEnabled)
121-
ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
123+
ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED),
124+
appListener.viewAcls.getOrElse(""))
122125
ui
123-
}).getOrElse(null)
126+
})
124127
} catch {
125-
case e: FileNotFoundException => null
128+
case e: FileNotFoundException => None
126129
}
127130
}
128131

@@ -161,18 +164,18 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
161164
new FsApplicationHistoryInfo(
162165
dir.getPath().getName(),
163166
appListener.appId.getOrElse(dir.getPath().getName()),
164-
appListener.appName,
165-
appListener.startTime,
166-
appListener.endTime,
167+
appListener.appName.getOrElse(NOT_STARTED),
168+
appListener.startTime.getOrElse(-1L),
169+
appListener.endTime.getOrElse(-1L),
167170
getModificationTime(dir),
168-
appListener.sparkUser)
171+
appListener.sparkUser.getOrElse(NOT_STARTED))
169172
} catch {
170173
case e: Exception =>
171174
logInfo(s"Failed to load application log data from $dir.", e)
172175
null
173176
}
174177
}
175-
.sortBy { info => -info.endTime }
178+
.sortBy { info => if (info != null) -info.endTime else -1 }
176179

177180
mostRecentLogModTime = newMostRecentModTime
178181

@@ -228,7 +231,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
228231

229232
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
230233
ui.getSecurityManager.setUIAcls(uiAclsEnabled)
231-
ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
234+
ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED),
235+
appListener.viewAcls.getOrElse(""))
232236
ui
233237
}
234238

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,7 @@ class HistoryServer(
5252

5353
private val appLoader = new CacheLoader[String, SparkUI] {
5454
override def load(key: String): SparkUI = {
55-
val ui = provider.getAppUI(key)
56-
if (ui == null) {
57-
throw new NoSuchElementException()
58-
}
55+
val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException())
5956
attachSparkUI(ui)
6057
ui
6158
}

core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,39 +24,38 @@ package org.apache.spark.scheduler
2424
* from multiple applications are seen, the behavior is unspecified.
2525
*/
2626
private[spark] class ApplicationEventListener extends SparkListener {
27-
var appName = "<Not Started>"
27+
var appName: Option[String] = None
2828
var appId: Option[String] = None
29-
var sparkUser = "<Not Started>"
30-
var startTime = -1L
31-
var endTime = -1L
32-
var viewAcls = ""
29+
var sparkUser: Option[String] = None
30+
var startTime: Option[Long] = None
31+
var endTime: Option[Long] = None
32+
var viewAcls: Option[String] = None
3333
var enableViewAcls = false
3434

35-
def applicationStarted = startTime != -1
35+
def applicationStarted = startTime.isDefined
3636

37-
def applicationCompleted = endTime != -1
37+
def applicationCompleted = endTime.isDefined
3838

3939
def applicationDuration: Long = {
40-
val difference = endTime - startTime
41-
if (applicationStarted && applicationCompleted && difference > 0) difference else -1L
40+
if (applicationStarted && applicationCompleted) endTime.get - startTime.get else -1
4241
}
4342

4443
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
45-
appName = applicationStart.appName
44+
appName = Some(applicationStart.appName)
4645
appId = applicationStart.appId
47-
startTime = applicationStart.time
48-
sparkUser = applicationStart.sparkUser
46+
startTime = Some(applicationStart.time)
47+
sparkUser = Some(applicationStart.sparkUser)
4948
}
5049

5150
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
52-
endTime = applicationEnd.time
51+
endTime = Some(applicationEnd.time)
5352
}
5453

5554
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
5655
synchronized {
5756
val environmentDetails = environmentUpdate.environmentDetails
5857
val allProperties = environmentDetails("Spark Properties").toMap
59-
viewAcls = allProperties.getOrElse("spark.ui.view.acls", "")
58+
viewAcls = allProperties.get("spark.ui.view.acls")
6059
enableViewAcls = allProperties.getOrElse("spark.ui.acls.enable", "false").toBoolean
6160
}
6261
}

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,18 +83,18 @@ private[spark] class SparkDeploySchedulerBackend(
8383
override def connected(appId: String) {
8484
logInfo("Connected to Spark cluster with app ID " + appId)
8585
this.appId = appId
86-
notifyRegistered()
86+
wakeUpContext()
8787
}
8888

8989
override def disconnected() {
90-
notifyRegistered()
90+
wakeUpContext()
9191
if (!stopping) {
9292
logWarning("Disconnected from Spark cluster! Waiting for reconnection...")
9393
}
9494
}
9595

9696
override def dead(reason: String) {
97-
notifyRegistered()
97+
wakeUpContext()
9898
if (!stopping) {
9999
logError("Application has been killed. Reason: " + reason)
100100
scheduler.error(reason)
@@ -129,7 +129,7 @@ private[spark] class SparkDeploySchedulerBackend(
129129
}
130130
}
131131

132-
private def notifyRegistered() = {
132+
private def wakeUpContext() = {
133133
registrationLock.synchronized {
134134
registrationDone = true
135135
registrationLock.notifyAll()

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,6 @@ private[spark] class CoarseMesosSchedulerBackend(
304304
slaveLost(d, s)
305305
}
306306

307-
override def applicationId(): Option[String] =
308-
Some(frameworkId).map(id => Some(id.getValue())).getOrElse(null)
307+
override def applicationId(): Option[String] = None
308+
309309
}

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,6 @@ private[spark] class MesosSchedulerBackend(
337337
// TODO: query Mesos for number of cores
338338
override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
339339

340-
override def applicationId(): Option[String] =
341-
Some(frameworkId).map(id => Some(id.getValue())).getOrElse(null)
340+
override def applicationId(): Option[String] = None
341+
342342
}

0 commit comments

Comments
 (0)