Skip to content

Commit 2a7f68d

Browse files
author
Marcelo Vanzin
committed
Address review feedback.
Main changes: - Restore old command line handling. - Fix pagination. - Restore showing the log directory in the listing page.
1 parent 4e72c77 commit 2a7f68d

File tree

6 files changed

+196
-98
lines changed

6 files changed

+196
-98
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.history
19+
20+
import org.apache.spark.ui.SparkUI
21+
22+
private[spark] case class ApplicationHistoryInfo(
23+
id: String,
24+
name: String,
25+
startTime: Long,
26+
endTime: Long,
27+
lastUpdated: Long,
28+
sparkUser: String,
29+
viewAcls: String,
30+
ui: SparkUI) {
31+
}
32+
33+
private[spark] abstract class ApplicationHistoryProvider {
34+
35+
/**
36+
* This method should return a list of applications available for the history server to
37+
* show.
38+
*
39+
* @return List of all know applications.
40+
*/
41+
def getListing(): Seq[ApplicationHistoryInfo]
42+
43+
/**
44+
* This method should return the application information, including a rendered SparkUI.
45+
*
46+
* @param appId The application ID.
47+
* @return The app info, or null if not found.
48+
*/
49+
def getAppInfo(appId: String): ApplicationHistoryInfo
50+
51+
/**
52+
* Called when the server is shutting down.
53+
*/
54+
def stop(): Unit = { }
55+
56+
/**
57+
* Returns configuration data to be shown in the HS home page.
58+
*
59+
* @return A map with the configuration data. Data is show in the order returned by the map.
60+
*/
61+
def getConfig(): Map[String, String] = Map()
62+
63+
}

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
3232
with Logging {
3333

3434
// Interval between each check for event log updates
35-
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", 10) * 1000
35+
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
36+
conf.getInt("spark.history.updateInterval", 10)) * 1000
3637

3738
private val logDir = conf.get("spark.history.fs.logDirectory")
3839
private val fs = Utils.getHadoopFileSystem(logDir)
@@ -54,7 +55,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
5455
private val logCheckingThread = new Thread("LogCheckingThread") {
5556
override def run() = Utils.logUncaughtExceptions {
5657
while (true) {
57-
val now = getMonotonicTime()
58+
val now = getMonotonicTimeMs()
5859
if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
5960
Thread.sleep(UPDATE_INTERVAL_MS)
6061
} else {
@@ -97,13 +98,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
9798
}
9899
}
99100

101+
override def getConfig(): Map[String, String] =
102+
Map(("Event Log Location" -> logDir))
103+
100104
/**
101105
* Builds the application list based on the current contents of the log directory.
102106
* Tries to reuse as much of the data already in memory as possible, by not reading
103107
* applications that haven't been updated since last time the logs were checked.
104108
*/
105-
def checkForLogs() = {
106-
lastLogCheckTimeMs = getMonotonicTime()
109+
private def checkForLogs() = {
110+
lastLogCheckTimeMs = getMonotonicTimeMs()
107111
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
108112
try {
109113
val logStatus = fs.listStatus(new Path(logDir))
@@ -142,6 +146,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
142146
/**
143147
* Parse the application's logs to find out the information we need to build the
144148
* listing page.
149+
*
150+
* When creating the listing of available apps, there is no need to load the whole UI for the
151+
* application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user
152+
* clicks on a specific application.
153+
*
154+
* @param logDir Directory with application's log files.
155+
* @param renderUI Whether to create the SparkUI for the application. If false, the "ui"
156+
* attribute of the returned object will be null.
145157
*/
146158
private def loadAppInfo(logDir: FileStatus, renderUI: Boolean): ApplicationHistoryInfo = {
147159
val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs)
@@ -188,6 +200,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
188200
}
189201

190202
/** Returns the system's mononotically increasing time. */
191-
private def getMonotonicTime() = System.nanoTime() / (1000 * 1000)
203+
private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)
192204

193205
}

core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,28 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
2525

2626
private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
2727

28-
val pageSize = 20
28+
private val pageSize = 20
2929

3030
def render(request: HttpServletRequest): Seq[Node] = {
3131
val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
3232
val requestedFirst = (requestedPage - 1) * pageSize
3333

3434
val allApps = parent.getApplicationList()
3535
val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
36-
val apps = allApps.slice(actualFirst, Math.min(pageSize, allApps.size))
36+
val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))
3737

3838
val actualPage = (actualFirst / pageSize) + 1
3939
val last = Math.min(actualFirst + pageSize, allApps.size) - 1
4040
val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0)
4141

4242
val appTable = UIUtils.listingTable(appHeader, appRow, apps)
43+
val providerConfig = parent.getProviderConfig()
4344
val content =
4445
<div class="row-fluid">
4546
<div class="span12">
47+
<ul class="unstyled">
48+
{ providerConfig.map(e => <li><strong>{e._1}:</strong> {e._2}</li>) }
49+
</ul>
4650
{
4751
if (allApps.size > 0) {
4852
<h4>

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 12 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ class HistoryServer(
5454
private val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
5555

5656
private val localHost = Utils.localHostName()
57-
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
5857

5958
private val appLoader = new CacheLoader[String, SparkUI] {
6059
override def load(key: String): SparkUI = {
@@ -80,13 +79,14 @@ class HistoryServer(
8079

8180
private val loaderServlet = new HttpServlet {
8281
protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
83-
val parts = req.getPathInfo().split("/")
82+
val parts = Option(req.getPathInfo()).getOrElse("").split("/")
8483
if (parts.length < 2) {
85-
res.setStatus(HttpServletResponse.SC_BAD_REQUEST)
84+
res.sendError(HttpServletResponse.SC_BAD_REQUEST,
85+
s"Unexpected path info in request (URI = ${req.getRequestURI()}")
8686
return
8787
}
8888

89-
var appId = parts(1)
89+
val appId = parts(1)
9090

9191
// Note we don't use the UI retrieved from the cache; the cache loader above will register
9292
// the app's UI, and all we need to do is redirect the user to the same URI that was
@@ -157,6 +157,13 @@ class HistoryServer(
157157
*/
158158
def getApplicationList() = provider.getListing()
159159

160+
/**
161+
* Returns the provider configuration to show in the listing page.
162+
*
163+
* @return A map with the provider's configuration.
164+
*/
165+
def getProviderConfig() = provider.getConfig()
166+
160167
}
161168

162169
/**
@@ -174,7 +181,7 @@ object HistoryServer {
174181

175182
def main(argStrings: Array[String]) {
176183
initSecurity()
177-
parse(argStrings.toList)
184+
val args = new HistoryServerArguments(conf, argStrings)
178185
val securityManager = new SecurityManager(conf)
179186

180187
val providerName = conf.getOption("spark.history.provider")
@@ -212,89 +219,4 @@ object HistoryServer {
212219
}
213220
}
214221

215-
private def parse(args: List[String]): Unit = {
216-
args match {
217-
case ("--dir" | "-d") :: value :: tail =>
218-
set("fs.logDirectory", value)
219-
parse(tail)
220-
221-
case ("-D") :: opt :: value :: tail =>
222-
set(opt, value)
223-
parse(tail)
224-
225-
case ("--help" | "-h") :: tail =>
226-
printUsageAndExit(0)
227-
228-
case Nil =>
229-
230-
case _ =>
231-
printUsageAndExit(1)
232-
}
233-
}
234-
235-
private def set(name: String, value: String) = {
236-
conf.set("spark.history." + name, value)
237-
}
238-
239-
private def printUsageAndExit(exitCode: Int) {
240-
System.err.println(
241-
"""
242-
|Usage: HistoryServer [options]
243-
|
244-
|Options are set by passing "-D option value" command line arguments to the class.
245-
|Command line options will override JVM system properties (which should be prepended
246-
|with "spark.history.").
247-
|
248-
|History Server options are always available; additional options depend on the provider.
249-
|
250-
|History Server options:
251-
|
252-
| ui.port Port where server will listen for connections (default 18080)
253-
| ui.acls.enable Whether to enable view acls for all applications (default false)
254-
| provider Name of history provider class (defaults to file system-based provider)
255-
|
256-
|FsHistoryProvider options:
257-
|
258-
| fs.logDirectory Directory where app logs are stored (required)
259-
| fs.updateInterval How often to reload log data from storage (seconds, default 10)
260-
|""".stripMargin)
261-
System.exit(exitCode)
262-
}
263-
264-
}
265-
266-
private[spark] abstract class ApplicationHistoryProvider {
267-
268-
/**
269-
* This method should return a list of applications available for the history server to
270-
* show.
271-
*
272-
* @return List of all know applications.
273-
*/
274-
def getListing(): Seq[ApplicationHistoryInfo]
275-
276-
/**
277-
* This method should return the application information, including a rendered SparkUI.
278-
*
279-
* @param appId The application ID.
280-
* @return The app info, or null if not found.
281-
*/
282-
def getAppInfo(appId: String): ApplicationHistoryInfo
283-
284-
/**
285-
* Called when the server is shutting down.
286-
*/
287-
def stop(): Unit = { }
288-
289-
}
290-
291-
private[spark] case class ApplicationHistoryInfo(
292-
id: String,
293-
name: String,
294-
startTime: Long,
295-
endTime: Long,
296-
lastUpdated: Long,
297-
sparkUser: String,
298-
viewAcls: String,
299-
ui: SparkUI) {
300222
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.history
19+
20+
import java.net.URI
21+
22+
import org.apache.hadoop.fs.Path
23+
24+
import org.apache.spark.SparkConf
25+
import org.apache.spark.util.Utils
26+
27+
/**
28+
* Command-line parser for the master.
29+
*/
30+
private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
31+
var logDir = conf.get("spark.history.fs.logDirectory", null)
32+
33+
parse(args.toList)
34+
35+
private def parse(args: List[String]): Unit = {
36+
args match {
37+
case ("--dir" | "-d") :: value :: tail =>
38+
logDir = value
39+
parse(tail)
40+
41+
case ("--help" | "-h") :: tail =>
42+
printUsageAndExit(0)
43+
44+
case Nil =>
45+
46+
case _ =>
47+
printUsageAndExit(1)
48+
}
49+
validateLogDir()
50+
conf.set("spark.history.fs.logDirectory", logDir)
51+
}
52+
53+
private def validateLogDir() {
54+
if (logDir == null) {
55+
System.err.println("Logging directory must be specified.")
56+
printUsageAndExit(1)
57+
}
58+
val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
59+
val path = new Path(logDir)
60+
if (!fileSystem.exists(path)) {
61+
System.err.println("Logging directory specified does not exist: %s".format(logDir))
62+
printUsageAndExit(1)
63+
}
64+
if (!fileSystem.getFileStatus(path).isDir) {
65+
System.err.println("Logging directory specified is not a directory: %s".format(logDir))
66+
printUsageAndExit(1)
67+
}
68+
}
69+
70+
private def printUsageAndExit(exitCode: Int) {
71+
System.err.println(
72+
"""
73+
|Usage: HistoryServer [-d logDir]
74+
|
75+
|The preferred way to pass options is to set the configuration below using
76+
|SPARK_HISTORY_OPTS. The "-d" command line argument is avalable for backwards
77+
|compatibility, and overrides "spark.history.fs.logDirectory".
78+
|
79+
|History Server options are always available; additional options depend on the provider.
80+
|
81+
|History Server options:
82+
|
83+
| spark.history.ui.port Port where server will listen for connections (default 18080)
84+
| spark.history.acls.enable Whether to enable view acls for all applications (default false)
85+
| spark.history.provider Name of history provider class (defaults to file system-based
86+
| provider)
87+
|
88+
|FsHistoryProvider options:
89+
|
90+
| spark.history.fs.logDirectory Directory where app logs are stored (required)
91+
| spark.history.fs.updateInterval How often to reload log data from storage (seconds,
92+
| default 10)
93+
|""".stripMargin)
94+
System.exit(exitCode)
95+
}
96+
97+
}

0 commit comments

Comments
 (0)