Skip to content

[SPARK-3377] [Metrics] Metrics can be accidentally aggregated #2250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4180993
Modified SparkContext to retain spark.unique.app.name property in Spa…
sarutak Sep 3, 2014
55debab
Modified SparkContext and Executor to set spark.executor.id to identi…
sarutak Sep 3, 2014
71609f5
Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockMa…
sarutak Sep 3, 2014
868e326
Modified MetricsSystem to set registry name with unique application-i…
sarutak Sep 3, 2014
85ffc02
Revert "Modified sourceName of ExecutorSource, DAGSchedulerSource and…
sarutak Sep 3, 2014
4e057c9
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 3, 2014
6fc5560
Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockMa…
sarutak Sep 3, 2014
6f7dcd4
Modified constructor of DAGSchedulerSource and BlockManagerSource bec…
sarutak Sep 3, 2014
15f88a3
Modified MetricsSystem#buildRegistryName because conf.get does not re…
sarutak Sep 3, 2014
fa7175b
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 4, 2014
4603a39
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 4, 2014
3e098d8
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 4, 2014
e4a4593
tmp
sarutak Sep 4, 2014
912a637
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 7, 2014
848819c
Merge branch 'metrics-structure-improvement' of github.com:sarutak/sp…
sarutak Sep 12, 2014
93e263a
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 12, 2014
45bd33d
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 12, 2014
7b67f5a
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 13, 2014
08e627e
Revert "tmp"
sarutak Sep 15, 2014
ead8966
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 15, 2014
3ea7896
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 16, 2014
4a871c3
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
sarutak Sep 17, 2014
cfe8027
Use applicaton id for metrics name instead of System.currentTimeMilli…
sarutak Sep 17, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ class SparkContext(config: SparkConf) extends Logging {
val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")

// TODO Get Application ID by common way for all master type as well as YARN cluster mode
val appId = conf.getOption("spark.yarn.app.id").getOrElse(System.currentTimeMillis().toString)

val uniqueAppName = appId + "." + appName
conf.set("spark.unique.app.name", uniqueAppName)

// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
val tachyonFolderName = "spark-" + randomUUID.toString()
Expand All @@ -200,6 +206,7 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] val listenerBus = new LiveListenerBus

// Create the Spark execution environment (cache, map output tracker, etc)
conf.set("spark.executor.id", "driver")
private[spark] val env = SparkEnv.create(
conf,
"<driver>",
Expand Down Expand Up @@ -411,8 +418,8 @@ class SparkContext(config: SparkConf) extends Logging {
// Post init
taskScheduler.postStartHook()

private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)

private def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ private[spark] class Executor(
val executorSource = new ExecutorSource(this, executorId)

// Initialize Spark environment (using system properties read above)
conf.set("spark.executor.id", "executor." + executorId)
private val env = {
if (!isLocal) {
val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String)

override val metricRegistry = new MetricRegistry()

// TODO: It would be nice to pass the application name here
override val sourceName = "executor.%s".format(executorId)
override val sourceName = "executor"

// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
Expand Down
19 changes: 17 additions & 2 deletions core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,34 @@ private[spark] class MetricsSystem private (val instance: String,
sinks.foreach(_.report())
}

def buildRegistryName(source: Source) = {
val appNameOpt = conf.getOption("spark.unique.app.name")
val executorIdOpt = conf.getOption("spark.executor.id")
val registryName = {
if (appNameOpt.isDefined && executorIdOpt.isDefined) {
MetricRegistry.name(appNameOpt.get, executorIdOpt.get, source.sourceName)
} else {
MetricRegistry.name(source.sourceName)
}
}
registryName
}

def registerSource(source: Source) {
sources += source
try {
registry.register(source.sourceName, source.metricRegistry)
val regName = buildRegistryName(source)
registry.register(regName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
}
}

def removeSource(source: Source) {
sources -= source
val regName = buildRegistryName(source)
registry.removeMatching(new MetricFilter {
def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName)
def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.SparkContext
import org.apache.spark.metrics.source.Source

private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
extends Source {
override val metricRegistry = new MetricRegistry()
override val sourceName = "%s.DAGScheduler".format(sc.appName)
override val sourceName = "DAGScheduler"

metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failedStages.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.SparkContext
import org.apache.spark.metrics.source.Source

private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext)
private[spark] class BlockManagerSource(val blockManager: BlockManager)
extends Source {
override val metricRegistry = new MetricRegistry()
override val sourceName = "%s.BlockManager".format(sc.appName)
override val sourceName = "BlockManager"

metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] {
override def getValue: Long = {
Expand Down