Skip to content

Commit 7584418

Browse files
committed
Report application start/end times to HistoryServer
This involves adding application start and end events. This also allows us to record the actual app name instead of simply using the name of the directory.
1 parent 8aac163 commit 7584418

21 files changed

+250
-63
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ class SparkContext(
228228
dagScheduler.start()
229229

230230
postEnvironmentUpdate()
231+
postApplicationStart()
231232

232233
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
233234
val hadoopConfiguration = {
@@ -826,6 +827,7 @@ class SparkContext(
826827

827828
/** Shut down the SparkContext. */
828829
def stop() {
830+
postApplicationEnd()
829831
ui.stop()
830832
eventLogger.foreach(_.stop())
831833
// Do this only if not stopped already - best case effort.
@@ -1066,6 +1068,20 @@ class SparkContext(
10661068
/** Register a new RDD, returning its RDD ID */
10671069
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
10681070

1071+
/** Post the application start event */
1072+
private def postApplicationStart() {
1073+
listenerBus.post(SparkListenerApplicationStart(appName, startTime))
1074+
}
1075+
1076+
/**
1077+
* Post the application end event to all listeners immediately, rather than adding it
1078+
* to the event queue for it to be asynchronously processed eventually. Otherwise, a race
1079+
* condition exists in which the listeners may stop before this event has been propagated.
1080+
*/
1081+
private def postApplicationEnd() {
1082+
listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis), blocking = true)
1083+
}
1084+
10691085
/** Post the environment update event once the task scheduler is ready */
10701086
private def postEnvironmentUpdate() {
10711087
if (taskScheduler != null) {

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 50 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.deploy.SparkUIContainer
3030
import org.apache.spark.ui.SparkUI
3131
import org.apache.spark.ui.JettyUtils._
3232
import org.apache.spark.util.Utils
33-
import org.apache.spark.scheduler.ReplayListenerBus
33+
import org.apache.spark.scheduler.{ApplicationListener, ReplayListenerBus}
3434

3535
/**
3636
* A web server that re-renders SparkUIs of finished applications.
@@ -59,11 +59,7 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf)
5959
(request: HttpServletRequest) => indexPage.render(request), securityMgr = securityManager)
6060
)
6161

62-
// A mapping from an event log path to the associated, already rendered, SparkUI
63-
val logPathToUI = mutable.HashMap[String, SparkUI]()
64-
65-
// A mapping from an event log path to a timestamp of when it was last updated
66-
val logPathToLastUpdated = mutable.HashMap[String, Long]()
62+
val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()
6763

6864
/** Bind to the HTTP server behind this web interface */
6965
override def bind() {
@@ -78,6 +74,12 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf)
7874
checkForLogs()
7975
}
8076

77+
/** Parse app ID from the given log path. */
78+
def getAppId(logPath: String): String = logPath.split("/").last
79+
80+
/** Return the address of this server. */
81+
def getAddress = "http://" + host + ":" + boundPort
82+
8183
/**
8284
* Check for any updated event logs.
8385
*
@@ -92,46 +94,56 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf)
9294
// Render any missing or outdated SparkUI
9395
logDirs.foreach { dir =>
9496
val path = dir.getPath.toString
95-
val lastUpdated = dir.getModificationTime
96-
if (!logPathToLastUpdated.contains(path) ||
97-
logPathToLastUpdated.getOrElse(path, -1L) < lastUpdated) {
98-
maybeRenderUI(path, lastUpdated)
97+
val appId = getAppId(path)
98+
val lastUpdated = {
99+
val logFiles = fileSystem.listStatus(dir.getPath)
100+
if (logFiles != null) logFiles.map(_.getModificationTime).max else dir.getModificationTime
101+
}
102+
if (!appIdToInfo.contains(appId) || appIdToInfo(appId).lastUpdated < lastUpdated) {
103+
maybeRenderUI(appId, path, lastUpdated)
99104
}
100105
}
101106

102107
// Remove any outdated SparkUIs
103-
val logPaths = logDirs.map(_.getPath.toString)
104-
logPathToUI.foreach { case (path, ui) =>
105-
if (!logPaths.contains(path)) {
106-
detachUI(ui)
107-
logPathToUI.remove(path)
108-
logPathToLastUpdated.remove(path)
108+
val appIds = logDirs.map { dir => getAppId(dir.getPath.toString) }
109+
appIdToInfo.foreach { case (appId, info) =>
110+
if (!appIds.contains(appId)) {
111+
detachUI(info.ui)
112+
appIdToInfo.remove(appId)
109113
}
110114
}
111115
}
112116

113117
/** Attempt to render a new SparkUI from event logs residing in the given log directory. */
114-
def maybeRenderUI(logPath: String, lastUpdated: Long) {
115-
val appName = getAppName(logPath)
118+
private def maybeRenderUI(appId: String, logPath: String, lastUpdated: Long) {
116119
val replayBus = new ReplayListenerBus(conf)
117-
val ui = new SparkUI(conf, replayBus, appName, "/history/%s".format(appName))
120+
val appListener = new ApplicationListener
121+
replayBus.addListener(appListener)
122+
val ui = new SparkUI(conf, replayBus, appId, "/history/%s".format(appId))
118123

119124
// Do not call ui.bind() to avoid creating a new server for each application
120125
ui.start()
121126
val success = replayBus.replay(logPath)
122127
if (success) {
123128
attachUI(ui)
124-
logPathToUI(logPath) = ui
125-
logPathToLastUpdated(logPath) = lastUpdated
129+
if (!appListener.started) {
130+
logWarning("Application has event logs but has not started: %s".format(appId))
131+
}
132+
val appName = appListener.appName
133+
val startTime = appListener.startTime
134+
val endTime = appListener.endTime
135+
val info = ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, logPath, ui)
136+
137+
// If the UI already exists, terminate it and replace it
138+
appIdToInfo.remove(appId).foreach { info => detachUI(info.ui) }
139+
appIdToInfo(appId) = info
140+
141+
// Use mnemonic original app name rather than app ID
142+
val originalAppName = "%s (history)".format(appName)
143+
ui.setAppName(originalAppName)
126144
}
127145
}
128146

129-
/** Parse app name from the given log path. */
130-
def getAppName(logPath: String): String = logPath.split("/").last
131-
132-
/** Return the address of this server. */
133-
def getAddress = "http://" + host + ":" + boundPort
134-
135147
}
136148

137149
object HistoryServer {
@@ -147,3 +159,14 @@ object HistoryServer {
147159
while(true) { Thread.sleep(Int.MaxValue) }
148160
}
149161
}
162+
163+
private[spark] case class ApplicationHistoryInfo(
164+
name: String,
165+
startTime: Long,
166+
endTime: Long,
167+
lastUpdated: Long,
168+
logPath: String,
169+
ui: SparkUI) {
170+
def started = startTime != -1
171+
def finished = endTime != -1
172+
}

core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,18 @@ import javax.servlet.http.HttpServletRequest
2323

2424
import scala.xml.Node
2525

26-
import org.apache.spark.ui.{SparkUI, UIUtils}
26+
import org.apache.spark.deploy.DeployWebUI
27+
import org.apache.spark.ui.UIUtils
2728

2829
private[spark] class IndexPage(parent: HistoryServer) {
29-
val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
30+
private val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
3031

3132
def render(request: HttpServletRequest): Seq[Node] = {
3233
// Check if logs have been updated
3334
parent.checkForLogs()
3435

35-
// Populate app table, with most recently modified first
36-
val appRows = parent.logPathToLastUpdated.toSeq
37-
.sortBy { case (path, lastUpdated) => -lastUpdated }
38-
.map { case (path, lastUpdated) =>
39-
// (appName, lastUpdated, UI)
40-
(parent.getAppName(path), lastUpdated, parent.logPathToUI(path))
41-
}
36+
// Populate app table, with most recently modified app first
37+
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
4238
val appTable = UIUtils.listingTable(appHeader, appRow, appRows)
4339

4440
val content =
@@ -54,14 +50,31 @@ private[spark] class IndexPage(parent: HistoryServer) {
5450
UIUtils.basicSparkPage(content, "History Server")
5551
}
5652

57-
private val appHeader = Seq[String]("App Name", "Last Updated")
53+
private val appHeader = Seq(
54+
"App Name",
55+
"Started",
56+
"Finished",
57+
"Duration",
58+
"Log Directory",
59+
"Last Updated")
5860

59-
private def appRow(info: (String, Long, SparkUI)): Seq[Node] = {
60-
info match { case (appName, lastUpdated, ui) =>
61-
<tr>
62-
<td><a href={parent.getAddress + ui.basePath}>{appName}</a></td>
63-
<td>{dateFmt.format(new Date(lastUpdated))}</td>
64-
</tr>
65-
}
61+
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
62+
val appName = if (info.started) info.name else parent.getAppId(info.logPath)
63+
val uiAddress = parent.getAddress + info.ui.basePath
64+
val startTime = if (info.started) dateFmt.format(new Date(info.startTime)) else "Not started"
65+
val endTime = if (info.finished) dateFmt.format(new Date(info.endTime)) else "Not finished"
66+
val difference = if (info.started && info.finished) info.endTime - info.startTime else -1L
67+
val duration = if (difference > 0) DeployWebUI.formatDuration(difference) else "---"
68+
val logDirectory = parent.getAppId(info.logPath)
69+
val lastUpdated = dateFmt.format(new Date(info.lastUpdated))
70+
71+
<tr>
72+
<td><a href={uiAddress}>{appName}</a></td>
73+
<td>{startTime}</td>
74+
<td>{endTime}</td>
75+
<td>{duration}</td>
76+
<td>{logDirectory}</td>
77+
<td>{lastUpdated}</td>
78+
</tr>
6679
}
6780
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
/**
21+
* A simple listener for application events.
22+
*
23+
* This listener assumes at most one of each of SparkListenerApplicationStart and
24+
* SparkListenerApplicationEnd will be received. Otherwise, only the latest event
25+
* of each type will take effect.
26+
*/
27+
private[spark] class ApplicationListener extends SparkListener {
28+
var appName = "<Not Started>"
29+
var startTime = -1L
30+
var endTime = -1L
31+
32+
def started = startTime != -1
33+
34+
def finished = endTime != -1
35+
36+
def duration: Long = {
37+
val difference = endTime - startTime
38+
if (started && finished && difference > 0) difference else -1L
39+
}
40+
41+
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
42+
appName = applicationStart.appName
43+
startTime = applicationStart.time
44+
}
45+
46+
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
47+
endTime = applicationEnd.time
48+
}
49+
}

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
9090
logEvent(event, flushLogger = true)
9191
override def onUnpersistRDD(event: SparkListenerUnpersistRDD) =
9292
logEvent(event, flushLogger = true)
93+
override def onApplicationStart(event: SparkListenerApplicationStart) =
94+
logEvent(event, flushLogger = true)
95+
override def onApplicationEnd(event: SparkListenerApplicationEnd) =
96+
logEvent(event, flushLogger = true)
9397

9498
def stop() = logger.stop()
9599
}

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,18 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
6464
}.start()
6565
}
6666

67-
def post(event: SparkListenerEvent) {
68-
val eventAdded = eventQueue.offer(event)
69-
if (!eventAdded && !queueFullErrorMessageLogged) {
70-
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
71-
"This likely means one of the SparkListeners is too slow and cannot keep up with the " +
72-
"rate at which tasks are being started by the scheduler.")
73-
queueFullErrorMessageLogged = true
67+
def post(event: SparkListenerEvent, blocking: Boolean = false) {
68+
if (!blocking) {
69+
val eventAdded = eventQueue.offer(event)
70+
if (!eventAdded && !queueFullErrorMessageLogged) {
71+
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
72+
"This likely means one of the SparkListeners is too slow and cannot keep up with the " +
73+
"rate at which tasks are being started by the scheduler.")
74+
queueFullErrorMessageLogged = true
75+
}
76+
} else {
77+
// Bypass the event queue and post to all attached listeners immediately
78+
postToAll(event)
7479
}
7580
}
7681

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
6262

6363
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
6464

65+
case class SparkListenerApplicationStart(appName: String, time: Long) extends SparkListenerEvent
66+
67+
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
68+
6569
/** An event used in the listener to shutdown the listener daemon thread. */
6670
private[spark] case object SparkListenerShutdown extends SparkListenerEvent
6771

@@ -125,6 +129,16 @@ trait SparkListener {
125129
* Called when an RDD is manually unpersisted by the application
126130
*/
127131
def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) { }
132+
133+
/**
134+
* Called when the application starts
135+
*/
136+
def onApplicationStart(applicationStart: SparkListenerApplicationStart) { }
137+
138+
/**
139+
* Called when the application ends
140+
*/
141+
def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { }
128142
}
129143

130144
/**

core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ private[spark] trait SparkListenerBus {
6161
sparkListeners.foreach(_.onBlockManagerRemoved(blockManagerRemoved))
6262
case unpersistRDD: SparkListenerUnpersistRDD =>
6363
sparkListeners.foreach(_.onUnpersistRDD(unpersistRDD))
64+
case applicationStart: SparkListenerApplicationStart =>
65+
sparkListeners.foreach(_.onApplicationStart(applicationStart))
66+
case applicationEnd: SparkListenerApplicationEnd =>
67+
sparkListeners.foreach(_.onApplicationEnd(applicationEnd))
6468
case SparkListenerShutdown =>
6569
}
6670
}

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private[spark] class SparkUI(
3434
val sc: SparkContext,
3535
conf: SparkConf,
3636
val listenerBus: SparkListenerBus,
37-
val appName: String,
37+
var appName: String,
3838
val basePath: String = "")
3939
extends WebUI("SparkUI") with Logging {
4040

@@ -75,6 +75,8 @@ private[spark] class SparkUI(
7575
// Maintain executor storage status through Spark events
7676
val storageStatusListener = new StorageStatusListener
7777

78+
def setAppName(name: String) = appName = name
79+
7880
/** Initialize all components of the server */
7981
def start() {
8082
storage.start()

core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ import org.apache.spark.ui.JettyUtils._
2929
import org.apache.spark.ui.Page.Environment
3030

3131
private[ui] class EnvironmentUI(parent: SparkUI) {
32-
private val appName = parent.appName
3332
private val basePath = parent.basePath
3433
private var _listener: Option[EnvironmentListener] = None
3534

35+
private def appName = parent.appName
36+
3637
lazy val listener = _listener.get
3738

3839
def start() {

0 commit comments

Comments
 (0)