Skip to content

Commit 8dd5af2

Browse files
author
Andrew Or
committed
Fill in documentation + miscellaneous minor changes
For instance, this adds ability to throw away old stage graphs.
1 parent fe7816f commit 8dd5af2

File tree

9 files changed

+141
-76
lines changed

9 files changed

+141
-76
lines changed

core/src/main/scala/org/apache/spark/annotation/RDDScoped.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
import java.lang.annotation.*;
2121

2222
/**
23-
* Blah blah blah blah blah.
24-
* This should really be private and not displayed on the docs.
23+
* An annotation to mark a method as an RDD operation that encloses its body in a scope.
24+
* This is used to compute the scope of an RDD when it is instantiated.
2525
*/
26+
// TODO: This should really be private[spark]
2627
@Retention(RetentionPolicy.RUNTIME)
2728
@Target({ElementType.METHOD})
2829
public @interface RDDScoped {}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,7 +1450,13 @@ abstract class RDD[T: ClassTag](
14501450
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
14511451
@transient private[spark] val creationSite = sc.getCallSite()
14521452

1453-
/** Dem scopes. Tis null if de scope is not defined'eh. TODO: Make this private[spark]. */
1453+
/**
1454+
* The scope in which this RDD is defined.
1455+
*
1456+
* This is more flexible than the call site and can be defined hierarchically.
1457+
* For more detail, see the documentation of {{RDDScope}}. This scope is null if
1458+
* the user instantiates this RDD himself without using any Spark operations.
1459+
*/
14541460
@transient private[spark] val scope = RDDScope.getScope.orNull
14551461

14561462
private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")
@@ -1602,11 +1608,8 @@ abstract class RDD[T: ClassTag](
16021608
firstDebugString(this).mkString("\n")
16031609
}
16041610

1605-
override def toString: String = {
1606-
val _name = Option(name).map(_ + " ").getOrElse("")
1607-
val _scope = Option(scope).map(" (scope: " + _ + ")").getOrElse("")
1608-
"%s%s[%d] at %s%s".format(_name, getClass.getSimpleName, id, getCreationSite, _scope)
1609-
}
1611+
override def toString: String = "%s%s[%d] at %s".format(
1612+
Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite)
16101613

16111614
def toJavaRDD() : JavaRDD[T] = {
16121615
new JavaRDD(this)(elementClassTag)

core/src/main/scala/org/apache/spark/rdd/RDDScope.scala

Lines changed: 62 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.spark.rdd
219

320
import java.util.concurrent.atomic.AtomicInteger
21+
422
import org.apache.spark.annotation.RDDScoped
523

624
/**
7-
*
25+
* A collection of utility methods to construct a hierarchical representation of RDD scopes.
26+
* An RDD scope tracks the series of operations that created a given RDD.
827
*/
928
private[spark] object RDDScope {
1029

11-
/**
12-
*
13-
*/
30+
// Symbol for delimiting each level of the hierarchy
31+
// e.g. grandparent;parent;child
1432
val SCOPE_NESTING_DELIMITER = ";"
33+
34+
// Symbol for delimiting the scope name from the ID within each level
1535
val SCOPE_NAME_DELIMITER = "_"
1636

17-
/**
18-
*
19-
*/
37+
// Counter for generating scope IDs, for differentiating
38+
// between different scopes of the same name
39+
private val scopeCounter = new AtomicInteger(0)
40+
41+
// Consider only methods that belong to these classes as potential RDD operations
42+
// This is to limit the amount of reflection we do when we traverse the stack trace
2043
private val classesWithScopeMethods = Set(
2144
"org.apache.spark.SparkContext",
2245
"org.apache.spark.rdd.RDD",
@@ -25,45 +48,61 @@ private[spark] object RDDScope {
2548
)
2649

2750
/**
51+
* Make a globally unique scope ID from the scope name.
2852
*
29-
*/
30-
private val scopeIdCounter = new AtomicInteger(0)
31-
32-
33-
/**
34-
*
53+
* For instance:
54+
* textFile -> textFile_0
55+
* textFile -> textFile_1
56+
* map -> map_2
57+
* name;with_sensitive;characters -> name-with-sensitive-characters_3
3558
*/
3659
private def makeScopeId(name: String): String = {
3760
name.replace(SCOPE_NESTING_DELIMITER, "-").replace(SCOPE_NAME_DELIMITER, "-") +
38-
SCOPE_NAME_DELIMITER + scopeIdCounter.getAndIncrement
61+
SCOPE_NAME_DELIMITER + scopeCounter.getAndIncrement
3962
}
4063

4164
/**
65+
* Retrieve the hierarchical scope from the stack trace when an RDD is first created.
66+
*
67+
* This considers all methods marked with the @RDDScoped annotation and chains them together
68+
* in the order they are invoked. Each level in the scope hierarchy represents a unique
69+
* invocation of a particular RDD operation.
4270
*
71+
* For example: treeAggregate_0;reduceByKey_1;combineByKey_2;mapPartitions_3
72+
* This means this RDD is created by the user calling treeAggregate, which calls
73+
* `reduceByKey`, and then `combineByKey`, and then `mapPartitions` to create this RDD.
4374
*/
4475
private[spark] def getScope: Option[String] = {
76+
77+
// TODO: Note that this approach does not correctly associate the same invocation across RDDs
78+
// For instance, a call to `textFile` creates both a HadoopRDD and a MapPartitionsRDD, but
79+
// there is no way to associate the invocation across these two RDDs to draw the same scope
80+
// around them. This is because the stack trace simply does not provide information for us
81+
// to make any reasonable association across RDDs. We may need a higher level approach that
82+
// involves setting common variables before and after the RDD operation itself.
83+
4584
val rddScopeNames = Thread.currentThread.getStackTrace
4685
// Avoid reflecting on all classes in the stack trace
4786
.filter { ste => classesWithScopeMethods.contains(ste.getClassName) }
4887
// Return the corresponding method if it has the @RDDScoped annotation
4988
.flatMap { ste =>
50-
// Note that this is an approximation since we match the method only by name
51-
// Unfortunate we cannot be more precise because the stack trace does not
52-
// include parameter information
53-
Class.forName(ste.getClassName).getDeclaredMethods.find { m =>
54-
m.getName == ste.getMethodName &&
89+
// Note that this is an approximation since we match the method only by name
90+
// Unfortunate we cannot be more precise because the stack trace does not include
91+
// parameter information
92+
Class.forName(ste.getClassName).getDeclaredMethods.find { m =>
93+
m.getName == ste.getMethodName &&
5594
m.getDeclaredAnnotations.exists { a =>
5695
a.annotationType() == classOf[RDDScoped]
5796
}
97+
}
5898
}
59-
}
6099
// Use the method name as the scope name for now
61100
.map { m => m.getName }
62101

63102
// It is common for such methods to internally invoke other methods with the same name
64-
// (e.g. union, reduceByKey). Here we remove adjacent duplicates such that the scope
65-
// chain does not capture this (e.g. a, a, b, c, b, c, c => a, b, c, b, c). This is
66-
// surprisingly difficult to express even in Scala.
103+
// as aliases (e.g. union, reduceByKey). Here we remove adjacent duplicates such that
104+
// the scope chain does not capture this (e.g. a, a, b, c, b, c, c => a, b, c, b, c).
105+
// This is surprisingly difficult to express even in Scala.
67106
var prev: String = null
68107
val dedupedRddScopeNames = rddScopeNames.flatMap { n =>
69108
if (n != prev) {

core/src/main/scala/org/apache/spark/storage/RDDInfo.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class RDDInfo(
4242
import Utils.bytesToString
4343
val _scope = Option(scope).getOrElse("--")
4444
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
45-
"MemorySize: %s; TachyonSize: %s; DiskSize: %s (scope: %s)").format(
45+
"MemorySize: %s; TachyonSize: %s; DiskSize: %s [scope: %s]").format(
4646
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
4747
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize), _scope)
4848
}

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)
9393
private[spark] object SparkUI {
9494
val DEFAULT_PORT = 4040
9595
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
96+
val DEFAULT_POOL_NAME = "default"
97+
val DEFAULT_RETAINED_STAGES = 1000
98+
val DEFAULT_RETAINED_JOBS = 1000
9699

97100
def getUIPort(conf: SparkConf): Int = {
98101
conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.executor.TaskMetrics
2525
import org.apache.spark.scheduler._
2626
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
2727
import org.apache.spark.storage.BlockManagerId
28+
import org.apache.spark.ui.SparkUI
2829
import org.apache.spark.ui.jobs.UIData._
2930

3031
/**
@@ -38,8 +39,6 @@ import org.apache.spark.ui.jobs.UIData._
3839
@DeveloperApi
3940
class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
4041

41-
import JobProgressListener._
42-
4342
// Define a handful of type aliases so that data structures' types can serve as documentation.
4443
// These type aliases are public because they're used in the types of public fields:
4544

@@ -82,8 +81,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
8281
// To limit the total memory usage of JobProgressListener, we only track information for a fixed
8382
// number of non-active jobs and stages (there is no limit for active jobs and stages):
8483

85-
val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
86-
val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
84+
val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
85+
val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
8786

8887
// We can test for memory leaks by ensuring that collections that track non-active jobs and
8988
// stages do not grow without bound and that collections for active jobs/stages eventually become
@@ -284,8 +283,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
284283
activeStages(stage.stageId) = stage
285284
pendingStages.remove(stage.stageId)
286285
val poolName = Option(stageSubmitted.properties).map {
287-
p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
288-
}.getOrElse(DEFAULT_POOL_NAME)
286+
p => p.getProperty("spark.scheduler.pool", SparkUI.DEFAULT_POOL_NAME)
287+
}.getOrElse(SparkUI.DEFAULT_POOL_NAME)
289288

290289
stageIdToInfo(stage.stageId) = stage
291290
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData)
@@ -517,9 +516,3 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
517516
}
518517

519518
}
520-
521-
private object JobProgressListener {
522-
val DEFAULT_POOL_NAME = "default"
523-
val DEFAULT_RETAINED_STAGES = 1000
524-
val DEFAULT_RETAINED_JOBS = 1000
525-
}

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,15 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
3636
private val progressListener = parent.progressListener
3737
private val vizListener = parent.vizListener
3838

39+
/**
40+
* Return a DOM element that contains an RDD DAG visualization for this stage.
41+
* If there is no visualization information for this stage, return an empty element.
42+
*/
3943
private def renderViz(stageId: Int): Seq[Node] = {
4044
val graph = vizListener.getVizGraph(stageId)
4145
if (graph.isEmpty) {
42-
return Seq.empty
43-
}
44-
{
46+
Seq.empty
47+
} else {
4548
<div id="viz-dot-file" style="display:none">
4649
{VizGraph.makeDotFile(graph.get)}
4750
</div>

core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,40 @@ package org.apache.spark.ui.viz
2020
import scala.collection.mutable
2121

2222
import org.apache.spark.scheduler._
23+
import org.apache.spark.ui.SparkUI
2324

2425
/**
25-
* A SparkListener that...
26+
* A SparkListener that constructs a graph of the RDD DAG for each stage.
27+
* This graph will be used for rendering visualization in the UI later.
2628
*/
2729
private[ui] class VisualizationListener extends SparkListener {
28-
private val graphsByStageId = new mutable.HashMap[Int, VizGraph] // stage ID -> viz graph
2930

30-
/** */
31+
// A list of stage IDs to track the order in which stages are inserted
32+
private val stageIds = new mutable.ArrayBuffer[Int]
33+
34+
// Stage ID -> graph metadata for the stage
35+
private val stageIdToGraph = new mutable.HashMap[Int, VizGraph]
36+
37+
// How many stages to retain graph metadata for
38+
private val retainedStages =
39+
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
40+
41+
/** Return the graph metadata for the given stage, or None if no such information exists. */
3142
def getVizGraph(stageId: Int): Option[VizGraph] = {
32-
graphsByStageId.get(stageId)
43+
stageIdToGraph.get(stageId)
3344
}
3445

3546
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
3647
val stageId = stageCompleted.stageInfo.stageId
3748
val rddInfos = stageCompleted.stageInfo.rddInfos
3849
val vizGraph = VizGraph.makeVizGraph(rddInfos)
39-
graphsByStageId(stageId) = vizGraph
50+
stageIdToGraph(stageId) = vizGraph
51+
52+
// Remove metadata for old stages
53+
if (stageIds.size >= retainedStages) {
54+
val toRemove = math.max(retainedStages / 10, 1)
55+
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
56+
stageIds.trimStart(toRemove)
57+
}
4058
}
4159
}

0 commit comments

Comments
 (0)