Skip to content

Commit c086bd5

Browse files
committed
Add HistoryServer and scripts ++ Refactor WebUI interface
HistoryServer can be launched with ./sbin/start-history-server.sh <log-dir> and stopped with ./sbin/stop-history-server.sh. This commit also involves refactoring all the UIs to avoid duplicate code.
1 parent ffe272d commit c086bd5

File tree

12 files changed

+490
-68
lines changed

12 files changed

+490
-68
lines changed

core/src/main/scala/org/apache/spark/deploy/WebUI.scala renamed to core/src/main/scala/org/apache/spark/deploy/DeployWebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,6 @@ private[spark] object DeployWebUI {
4242
return "%.0f min".format(minutes)
4343
}
4444
val hours = minutes / 60
45-
return "%.1f h".format(hours)
45+
"%.1f h".format(hours)
4646
}
4747
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy
19+
20+
import org.apache.spark.ui.{SparkUI, WebUI}
21+
22+
private[spark] abstract class SparkUIContainer(name: String) extends WebUI(name) {
23+
24+
/** Attach a SparkUI to this container. Only valid after bind(). */
25+
def attachUI(ui: SparkUI) {
26+
assert(serverInfo.isDefined,
27+
"%s must be bound to a server before attaching SparkUIs".format(name))
28+
val rootHandler = serverInfo.get.rootHandler
29+
for (handler <- ui.handlers) {
30+
rootHandler.addHandler(handler)
31+
if (!handler.isStarted) {
32+
handler.start()
33+
}
34+
}
35+
}
36+
37+
/** Detach a SparkUI from this container. Only valid after bind(). */
38+
def detachUI(ui: SparkUI) {
39+
assert(serverInfo.isDefined,
40+
"%s must be bound to a server before detaching SparkUIs".format(name))
41+
val rootHandler = serverInfo.get.rootHandler
42+
for (handler <- ui.handlers) {
43+
if (handler.isStarted) {
44+
handler.stop()
45+
}
46+
rootHandler.removeHandler(handler)
47+
}
48+
}
49+
50+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.history
19+
20+
import java.net.URI
21+
import javax.servlet.http.HttpServletRequest
22+
23+
import scala.collection.mutable
24+
25+
import org.apache.hadoop.fs.{FileStatus, Path}
26+
import org.eclipse.jetty.servlet.ServletContextHandler
27+
28+
import org.apache.spark.{Logging, SecurityManager, SparkConf}
29+
import org.apache.spark.deploy.SparkUIContainer
30+
import org.apache.spark.ui.SparkUI
31+
import org.apache.spark.ui.JettyUtils._
32+
import org.apache.spark.util.Utils
33+
import org.apache.spark.scheduler.ReplayListenerBus
34+
35+
/**
36+
* A web server that re-renders SparkUIs of finished applications.
37+
*
38+
* For the standalone mode, MasterWebUI already achieves this functionality. Thus, the
39+
* main use case of the HistoryServer is in other deploy modes (e.g. Yarn or Mesos).
40+
*
41+
* The logging directory structure is as follows: Within the given base directory, each
42+
* application's event logs are maintained in the application's own sub-directory.
43+
*
44+
* @param baseLogDir The base directory in which event logs are found
45+
* @param requestedPort The requested port to which this server is to be bound
46+
*/
47+
class HistoryServer(baseLogDir: String, requestedPort: Int, conf: SparkConf)
48+
extends SparkUIContainer("History Server") with Logging {
49+
50+
private val host = Utils.localHostName()
51+
private val port = requestedPort
52+
private val indexPage = new IndexPage(this)
53+
private val fileSystem = Utils.getHadoopFileSystem(new URI(baseLogDir))
54+
private val securityManager = new SecurityManager(conf)
55+
56+
private val handlers = Seq[ServletContextHandler](
57+
createStaticHandler(HistoryServer.STATIC_RESOURCE_DIR, "/static"),
58+
createServletHandler("/",
59+
(request: HttpServletRequest) => indexPage.render(request), securityMgr = securityManager)
60+
)
61+
62+
// A mapping from an event log path to the associated, already rendered, SparkUI
63+
val logPathToUI = mutable.HashMap[String, SparkUI]()
64+
65+
// A mapping from an event log path to a timestamp of when it was last updated
66+
val logPathToLastUpdated = mutable.HashMap[String, Long]()
67+
68+
/** Bind to the HTTP server behind this web interface */
69+
override def bind() {
70+
try {
71+
serverInfo = Some(startJettyServer(host, port, handlers, conf))
72+
logInfo("Started HistoryServer at http://%s:%d".format(host, boundPort))
73+
} catch {
74+
case e: Exception =>
75+
logError("Failed to create HistoryServer", e)
76+
System.exit(1)
77+
}
78+
checkForLogs()
79+
}
80+
81+
/**
82+
* Check for any updated event logs.
83+
*
84+
* If a new application is found, render the associated SparkUI and remember it.
85+
* If an existing application is updated, re-render the associated SparkUI.
86+
* If an existing application is removed, remove the associated SparkUI.
87+
*/
88+
def checkForLogs() {
89+
val logStatus = fileSystem.listStatus(new Path(baseLogDir))
90+
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
91+
92+
// Render any missing or outdated SparkUI
93+
logDirs.foreach { dir =>
94+
val path = dir.getPath.toString
95+
val lastUpdated = dir.getModificationTime
96+
if (!logPathToLastUpdated.contains(path) ||
97+
logPathToLastUpdated.getOrElse(path, -1L) < lastUpdated) {
98+
maybeRenderUI(path, lastUpdated)
99+
}
100+
}
101+
102+
// Remove any outdated SparkUIs
103+
val logPaths = logDirs.map(_.getPath.toString)
104+
logPathToUI.keys.foreach { path =>
105+
if (!logPaths.contains(path)) {
106+
logPathToUI.remove(path)
107+
logPathToLastUpdated.remove(path)
108+
}
109+
}
110+
111+
logWarning("By the end of check for logs, the map looks like")
112+
logPathToUI.foreach { case (k, v) => logWarning("* %s".format(k)) }
113+
}
114+
115+
/** Attempt to render a new SparkUI from event logs residing in the given log directory. */
116+
def maybeRenderUI(logPath: String, lastUpdated: Long) {
117+
logWarning("Maybe rendering UI %s".format(logPath))
118+
119+
val appName = logPath.split("/").last
120+
val replayBus = new ReplayListenerBus(conf)
121+
val ui = new SparkUI(conf, replayBus, appName, "/history/%s".format(appName))
122+
123+
// Do not call ui.bind() to avoid creating a new server for each application
124+
ui.start()
125+
val success = replayBus.replay(logPath)
126+
logWarning("Just replayed the events. Successful? %s".format(success))
127+
if (success) {
128+
attachUI(ui)
129+
logPathToUI(logPath) = ui
130+
logPathToLastUpdated(logPath) = lastUpdated
131+
}
132+
}
133+
134+
}
135+
136+
object HistoryServer {
137+
val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
138+
139+
def main(argStrings: Array[String]) {
140+
val conf = new SparkConf
141+
val args = new HistoryServerArguments(argStrings, conf)
142+
val server = new HistoryServer(args.logDir, args.port, conf)
143+
server.bind()
144+
145+
// Wait until the end of the world... or if the HistoryServer process is manually stopped
146+
while(true) { Thread.sleep(Int.MaxValue) }
147+
}
148+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.history
19+
20+
import java.net.URI
21+
22+
import org.apache.spark.SparkConf
23+
import org.apache.spark.util.{Utils, IntParam}
24+
import org.apache.hadoop.fs.Path
25+
26+
/**
27+
* Command-line parser for the master.
28+
*/
29+
private[spark] class HistoryServerArguments(args: Array[String], conf: SparkConf) {
30+
var port = 18080
31+
var logDir = ""
32+
33+
parse(args.toList)
34+
35+
def parse(args: List[String]): Unit = {
36+
args match {
37+
case ("--port" | "-p") :: IntParam(value) :: tail =>
38+
port = value
39+
parse(tail)
40+
41+
case ("--dir" | "-d") :: value :: tail =>
42+
logDir = value
43+
parse(tail)
44+
45+
case ("--help" | "-h") :: tail =>
46+
printUsageAndExit(0)
47+
48+
case Nil => {}
49+
50+
case _ =>
51+
printUsageAndExit(1)
52+
}
53+
validateLogDir()
54+
}
55+
56+
def validateLogDir() {
57+
if (logDir == "") {
58+
System.err.println("Logging directory must be specified.")
59+
printUsageAndExit(1)
60+
}
61+
val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
62+
val path = new Path(logDir)
63+
if (!fileSystem.exists(path) || !fileSystem.getFileStatus(path).isDir) {
64+
System.err.println("Logging directory specified is invalid: %s".format(logDir))
65+
printUsageAndExit(1)
66+
}
67+
}
68+
69+
/**
70+
* Print usage and exit JVM with the given exit code.
71+
*/
72+
def printUsageAndExit(exitCode: Int) {
73+
System.err.println(
74+
"Usage: HistoryServer [options]\n" +
75+
"\n" +
76+
"Options:\n" +
77+
" -p PORT, --port PORT Port for web server (default: 18080)\n" +
78+
" -d DIR, --dir DIR Location of event log files")
79+
System.exit(exitCode)
80+
}
81+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.history
19+
20+
import javax.servlet.http.HttpServletRequest
21+
22+
import scala.xml.Node
23+
24+
import org.apache.spark.deploy.DeployWebUI
25+
import org.apache.spark.deploy.master.ApplicationInfo
26+
import org.apache.spark.ui.UIUtils
27+
import org.apache.spark.util.Utils
28+
29+
private[spark] class IndexPage(parent: HistoryServer) {
30+
31+
def render(request: HttpServletRequest): Seq[Node] = {
32+
val content =
33+
<div class="row-fluid">
34+
<div class="span12">
35+
<ul class="unstyled">
36+
<li>
37+
<strong>Welcome to the Fastest and Furious-est HistoryServer in the World!</strong>
38+
</li>
39+
{
40+
parent.logPathToUI.map { case (path, ui) =>
41+
<li>{path} at {ui.basePath}</li>
42+
}
43+
}
44+
</ul>
45+
</div>
46+
</div>
47+
48+
UIUtils.basicSparkPage(content, "History Server")
49+
}
50+
51+
def appRow(app: ApplicationInfo): Seq[Node] = {
52+
<tr>
53+
<td>
54+
<a href={"app?appId=" + app.id}>{app.id}</a>
55+
</td>
56+
<td>
57+
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
58+
</td>
59+
<td>
60+
{app.coresGranted}
61+
</td>
62+
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
63+
{Utils.megabytesToString(app.desc.memoryPerSlave)}
64+
</td>
65+
<td>{DeployWebUI.formatDate(app.submitDate)}</td>
66+
<td>{app.desc.user}</td>
67+
<td>{app.state.toString}</td>
68+
<td>{DeployWebUI.formatDuration(app.duration)}</td>
69+
</tr>
70+
}
71+
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -671,16 +671,16 @@ private[spark] class Master(
671671
appConf.set("spark.eventLog.compress", "true")
672672
appConf.set("spark.io.compression.codec", codec)
673673
}
674-
val replayerBus = new ReplayListenerBus(appConf)
674+
val replayBus = new ReplayListenerBus(appConf)
675675
val ui = new SparkUI(
676676
appConf,
677-
replayerBus,
677+
replayBus,
678678
"%s (finished)".format(appName),
679679
"/history/%s".format(app.id))
680680

681681
// Do not call ui.bind() to avoid creating a new server for each application
682682
ui.start()
683-
val success = replayerBus.replay(eventLogDir)
683+
val success = replayBus.replay(eventLogDir)
684684
if (!success) {
685685
ui.stop()
686686
None

0 commit comments

Comments
 (0)