Skip to content

Commit 87a2292

Browse files
committed
Guard against misuse of MetricsSystem methods.
1 parent f779fe0 commit 87a2292

File tree

1 file changed

+20
-6
lines changed

1 file changed

+20
-6
lines changed

core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,22 +76,36 @@ private[spark] class MetricsSystem private (
7676
private val sources = new mutable.ArrayBuffer[Source]
7777
private val registry = new MetricRegistry()
7878

79+
private var running: Boolean = false
80+
7981
// Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
8082
private var metricsServlet: Option[MetricsServlet] = None
8183

82-
/** Get any UI handlers used by this metrics system. */
83-
def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())
84+
/**
85+
* Get any UI handlers used by this metrics system; can only be called after start().
86+
*/
87+
def getServletHandlers = {
88+
require(running, "Can only call getServletHandlers on a running MetricsSystem")
89+
metricsServlet.map(_.getHandlers).getOrElse(Array())
90+
}
8491

8592
metricsConfig.initialize()
8693

8794
def start() {
95+
require(!running, "Attempting to start a MetricsSystem that is already running")
96+
running = true
8897
registerSources()
8998
registerSinks()
9099
sinks.foreach(_.start)
91100
}
92101

93102
def stop() {
94-
sinks.foreach(_.stop)
103+
if (running) {
104+
sinks.foreach(_.stop)
105+
} else {
106+
logWarning("Stopping a MetricsSystem that is not running")
107+
}
108+
running = false
95109
}
96110

97111
def report() {
@@ -107,7 +121,7 @@ private[spark] class MetricsSystem private (
107121
* @return An unique metric name for each combination of
108122
* application, executor/driver and metric source.
109123
*/
110-
def buildRegistryName(source: Source): String = {
124+
private[spark] def buildRegistryName(source: Source): String = {
111125
val appId = conf.getOption("spark.app.id")
112126
val executorId = conf.getOption("spark.executor.id")
113127
val defaultName = MetricRegistry.name(source.sourceName)
@@ -144,7 +158,7 @@ private[spark] class MetricsSystem private (
144158
})
145159
}
146160

147-
def registerSources() {
161+
private def registerSources() {
148162
val instConfig = metricsConfig.getInstance(instance)
149163
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
150164

@@ -160,7 +174,7 @@ private[spark] class MetricsSystem private (
160174
}
161175
}
162176

163-
def registerSinks() {
177+
private def registerSinks() {
164178
val instConfig = metricsConfig.getInstance(instance)
165179
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
166180

0 commit comments

Comments
 (0)