Skip to content

Commit 2f00a29

Browse files
jerryshaoJoshRosen
authored andcommitted
[SPARK-4595][Core] Fix MetricsServlet not work issue
`MetricsServlet` handler should be added to the web UI after initialized by `MetricsSystem`, otherwise servlet handler cannot be attached. Author: Saisai Shao <[email protected]> Author: Josh Rosen <[email protected]> Author: jerryshao <[email protected]> Closes apache#3444 from jerryshao/SPARK-4595 and squashes the following commits: 434d17e [Saisai Shao] Merge pull request #10 from JoshRosen/metrics-system-cleanup 87a2292 [Josh Rosen] Guard against misuse of MetricsSystem methods. f779fe0 [jerryshao] Fix MetricsServlet not work issue (cherry picked from commit cf50631) Signed-off-by: Josh Rosen <[email protected]>
1 parent b5919d1 commit 2f00a29

File tree

7 files changed

+28
-11
lines changed

7 files changed

+28
-11
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,8 @@ class SparkContext(config: SparkConf) extends Logging {
344344
// The metrics system for Driver need to be set spark.app.id to app ID.
345345
// So it should start after we get app ID from the task scheduler and set spark.app.id.
346346
metricsSystem.start()
347+
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
348+
metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
347349

348350
// Optionally log Spark events
349351
private[spark] val eventLogger: Option[EventLoggingListener] = {

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ private[spark] class Master(
129129
masterMetricsSystem.registerSource(masterSource)
130130
masterMetricsSystem.start()
131131
applicationMetricsSystem.start()
132+
// Attach the master and app metrics servlet handler to the web ui after the metrics systems are
133+
// started.
134+
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
135+
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
132136

133137
persistenceEngine = RECOVERY_MODE match {
134138
case "ZOOKEEPER" =>

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ class MasterWebUI(val master: Master, requestedPort: Int)
4141
attachPage(new HistoryNotFoundPage(this))
4242
attachPage(new MasterPage(this))
4343
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
44-
master.masterMetricsSystem.getServletHandlers.foreach(attachHandler)
45-
master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler)
4644
}
4745

4846
/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ private[spark] class Worker(
163163

164164
metricsSystem.registerSource(workerSource)
165165
metricsSystem.start()
166+
// Attach the worker metrics servlet handler to the web ui after the metrics system is started.
167+
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
166168
}
167169

168170
def changeMaster(url: String, uiUrl: String) {

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ class WorkerWebUI(
5050
attachHandler(createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"))
5151
attachHandler(createServletHandler("/log",
5252
(request: HttpServletRequest) => logPage.renderLog(request), worker.securityMgr))
53-
worker.metricsSystem.getServletHandlers.foreach(attachHandler)
5453
}
5554
}
5655

core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,22 +76,36 @@ private[spark] class MetricsSystem private (
7676
private val sources = new mutable.ArrayBuffer[Source]
7777
private val registry = new MetricRegistry()
7878

79+
private var running: Boolean = false
80+
7981
// Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
8082
private var metricsServlet: Option[MetricsServlet] = None
8183

82-
/** Get any UI handlers used by this metrics system. */
83-
def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())
84+
/**
85+
* Get any UI handlers used by this metrics system; can only be called after start().
86+
*/
87+
def getServletHandlers = {
88+
require(running, "Can only call getServletHandlers on a running MetricsSystem")
89+
metricsServlet.map(_.getHandlers).getOrElse(Array())
90+
}
8491

8592
metricsConfig.initialize()
8693

8794
def start() {
95+
require(!running, "Attempting to start a MetricsSystem that is already running")
96+
running = true
8897
registerSources()
8998
registerSinks()
9099
sinks.foreach(_.start)
91100
}
92101

93102
def stop() {
94-
sinks.foreach(_.stop)
103+
if (running) {
104+
sinks.foreach(_.stop)
105+
} else {
106+
logWarning("Stopping a MetricsSystem that is not running")
107+
}
108+
running = false
95109
}
96110

97111
def report() {
@@ -107,7 +121,7 @@ private[spark] class MetricsSystem private (
107121
* @return An unique metric name for each combination of
108122
* application, executor/driver and metric source.
109123
*/
110-
def buildRegistryName(source: Source): String = {
124+
private[spark] def buildRegistryName(source: Source): String = {
111125
val appId = conf.getOption("spark.app.id")
112126
val executorId = conf.getOption("spark.executor.id")
113127
val defaultName = MetricRegistry.name(source.sourceName)
@@ -144,7 +158,7 @@ private[spark] class MetricsSystem private (
144158
})
145159
}
146160

147-
def registerSources() {
161+
private def registerSources() {
148162
val instConfig = metricsConfig.getInstance(instance)
149163
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
150164

@@ -160,7 +174,7 @@ private[spark] class MetricsSystem private (
160174
}
161175
}
162176

163-
def registerSinks() {
177+
private def registerSinks() {
164178
val instConfig = metricsConfig.getInstance(instance)
165179
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
166180

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ private[spark] class SparkUI private (
5757
attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
5858
attachHandler(
5959
createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest))
60-
// If the UI is live, then serve
61-
sc.foreach { _.env.metricsSystem.getServletHandlers.foreach(attachHandler) }
6260
}
6361
initialize()
6462

0 commit comments

Comments
 (0)