From 576e60bb204b1caedb7696a3365b4f4f2b2c6a81 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 8 Mar 2014 13:43:16 -0800 Subject: [PATCH 1/4] SPARK-1205: Clean up callSite/origin/generator. This patch removes the `generator` field and simplifies + documents the tracking of callsites. There are two places where we care about call sites, when a job is run and when an RDD is created. This patch retains both of those features but does a slight refactoring and renaming to make things less confusing. There was another feature of an rdd called the `generator` which was by default the user class that in which the RDD was created. This is used exclusively in the JobLogger. It been subsumed by the ability to name a job group. The job logger can later be refectored to read the job group directly (will require some work) but for now this just preserves the default logged value of the user class. I'm not sure any users ever used the ability to override this. --- .../scala/org/apache/spark/SparkContext.scala | 21 ++++++++----------- .../org/apache/spark/api/java/JavaRDD.scala | 6 +++++- .../apache/spark/api/java/JavaRDDLike.scala | 10 ++++----- .../main/scala/org/apache/spark/rdd/RDD.scala | 20 +++++++----------- .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../apache/spark/scheduler/JobLogger.scala | 10 +++------ .../org/apache/spark/scheduler/Stage.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 4 ++-- 8 files changed, 33 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ce25573834829..616496515017f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -820,23 +820,20 @@ class SparkContext( * Support function for API backtraces. */ def setCallSite(site: String) { - setLocalProperty("externalCallSite", site) + setLocalProperty("externalCallSiteString", site) } /** * Support function for API backtraces. */ def clearCallSite() { - setLocalProperty("externalCallSite", null) + setLocalProperty("externalCallSiteString", null) } - private[spark] def getCallSite(): String = { - val callSite = getLocalProperty("externalCallSite") - if (callSite == null) { - Utils.formatSparkCallSite - } else { - callSite - } + /** Capture the current user callsite and return a formatted version for printing. If the user + * has overridden the call site, this will return the user's version. */ + private[spark] def getCallSiteString(): String = { + Option(getLocalProperty("externalCallSiteString")).getOrElse(Utils.formatCallSiteInfo()) } /** @@ -851,7 +848,7 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { - val callSite = getCallSite + val callSite = getCallSiteString val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) val start = System.nanoTime @@ -935,7 +932,7 @@ class SparkContext( func: (TaskContext, Iterator[T]) => U, evaluator: ApproximateEvaluator[U, R], timeout: Long): PartialResult[R] = { - val callSite = getCallSite + val callSite = getCallSiteString logInfo("Starting job: " + callSite) val start = System.nanoTime val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, @@ -955,7 +952,7 @@ class SparkContext( resultFunc: => R): SimpleFutureAction[R] = { val cleanF = clean(processPartition) - val callSite = getCallSite + val callSite = getCallSiteString val waiter = dagScheduler.submitJob( rdd, (context: TaskContext, iter: Iterator[T]) => cleanF(iter), diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 91bf404631f49..566341748d767 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -135,7 +135,11 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] = wrapRDD(rdd.subtract(other, p)) - def generator: String = rdd.generator + /** + * @deprecated The 'generator' field was removed in Spark 1.0.0. Use sc.setJobGroup. + */ + @Deprecated + def generator: String = "" override def toString = rdd.toString diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index af0114bee3f49..bc926d22a7b6d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -19,7 +19,6 @@ package org.apache.spark.api.java import java.util.{Comparator, List => JList} -import scala.Tuple2 import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -500,8 +499,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def name(): String = rdd.name - /** Reset generator */ - def setGenerator(_generator: String) = { - rdd.setGenerator(_generator) - } + /** + * @deprecated The 'generator' field was removed in Spark 1.0.0. Use sc.setJobGroup. + */ + @Deprecated + def setGenerator(_generator: String) = { } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3fe56963e0008..3f085836de2c3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -126,13 +126,8 @@ abstract class RDD[T: ClassTag]( this } - /** User-defined generator of this RDD*/ - @transient var generator = Utils.getCallSiteInfo.firstUserClass - - /** Reset generator*/ - def setGenerator(_generator: String) = { - generator = _generator - } + @deprecated("The 'generator' field has been removed, use sc.setJobGroup.", "1.0.0") + def setGenerator(_generator: String) = { } /** * Set this RDD's storage level to persist its values across operations after the first time @@ -1031,8 +1026,10 @@ abstract class RDD[T: ClassTag]( private var storageLevel: StorageLevel = StorageLevel.NONE - /** Record user function generating this RDD. */ - @transient private[spark] val origin = sc.getCallSite() + /** Info about the function call site where this was created (e.g. `textFile`, `parallelize`). */ + @transient private[spark] val callSite = Utils.getCallSiteInfo + + private[spark] def getCallSiteString = Utils.formatCallSiteInfo(callSite) private[spark] def elementClassTag: ClassTag[T] = classTag[T] @@ -1095,10 +1092,7 @@ abstract class RDD[T: ClassTag]( } override def toString: String = "%s%s[%d] at %s".format( - Option(name).map(_ + " ").getOrElse(""), - getClass.getSimpleName, - id, - origin) + Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCallSiteString) def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dc5b25d845dc2..3d8c7f9b6eae3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -279,7 +279,7 @@ class DAGScheduler( } else { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown - logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")") + logInfo("Registering RDD " + rdd.id + " (" + rdd.getCallSiteString + ")") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size) } stage diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 80f9ec7d03007..612ea32c83326 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -213,14 +213,10 @@ class JobLogger(val user: String, val logDirName: String) * @param indent Indent number before info */ protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) { + val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE" val rddInfo = - if (rdd.getStorageLevel != StorageLevel.NONE) { - "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " + - rdd.origin + " " + rdd.generator - } else { - "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " + - rdd.origin + " " + rdd.generator - } + s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " + + s"${rdd.getCallSiteString} ${rdd.callSite.firstUserClass}" jobLogInfo(jobID, indentString(indent) + rddInfo, false) rdd.dependencies.foreach { case shufDep: ShuffleDependency[_, _] => diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index a78b0186b9eab..92e762771b839 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -100,7 +100,7 @@ private[spark] class Stage( id } - val name = callSite.getOrElse(rdd.origin) + val name = callSite.getOrElse(rdd.getCallSiteString) override def toString = "Stage " + id diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0eb2f78b730f6..7e504746ca2e4 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -717,8 +717,8 @@ private[spark] object Utils extends Logging { new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass) } - def formatSparkCallSite = { - val callSiteInfo = getCallSiteInfo + /** Returns a printable version of the call site info suitable for logs. */ + def formatCallSiteInfo(callSiteInfo: CallSiteInfo = Utils.getCallSiteInfo) = { "%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile, callSiteInfo.firstUserLine) } From 62e77ef649a4ed7e2bfffadb111377748f6ce096 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 9 Mar 2014 22:34:48 -0700 Subject: [PATCH 2/4] Review feedback --- .../main/scala/org/apache/spark/SparkContext.scala | 14 +++++++------- .../scala/org/apache/spark/api/java/JavaRDD.scala | 6 ------ .../org/apache/spark/api/java/JavaRDDLike.scala | 5 ----- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 9 +++------ .../org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../org/apache/spark/scheduler/JobLogger.scala | 2 +- .../scala/org/apache/spark/scheduler/Stage.scala | 2 +- 7 files changed, 13 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 616496515017f..06cf721f71eb0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -820,20 +820,20 @@ class SparkContext( * Support function for API backtraces. */ def setCallSite(site: String) { - setLocalProperty("externalCallSiteString", site) + setLocalProperty("externalCallSite", site) } /** * Support function for API backtraces. */ def clearCallSite() { - setLocalProperty("externalCallSiteString", null) + setLocalProperty("externalCallSite", null) } /** Capture the current user callsite and return a formatted version for printing. If the user * has overridden the call site, this will return the user's version. */ - private[spark] def getCallSiteString(): String = { - Option(getLocalProperty("externalCallSiteString")).getOrElse(Utils.formatCallSiteInfo()) + private[spark] def getCallSite(): String = { + Option(getLocalProperty("externalCallSite")).getOrElse(Utils.formatCallSiteInfo()) } /** @@ -848,7 +848,7 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { - val callSite = getCallSiteString + val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) val start = System.nanoTime @@ -932,7 +932,7 @@ class SparkContext( func: (TaskContext, Iterator[T]) => U, evaluator: ApproximateEvaluator[U, R], timeout: Long): PartialResult[R] = { - val callSite = getCallSiteString + val callSite = getCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, @@ -952,7 +952,7 @@ class SparkContext( resultFunc: => R): SimpleFutureAction[R] = { val cleanF = clean(processPartition) - val callSite = getCallSiteString + val callSite = getCallSite val waiter = dagScheduler.submitJob( rdd, (context: TaskContext, iter: Iterator[T]) => cleanF(iter), diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 566341748d767..01d9357a2556d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -135,12 +135,6 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] = wrapRDD(rdd.subtract(other, p)) - /** - * @deprecated The 'generator' field was removed in Spark 1.0.0. Use sc.setJobGroup. - */ - @Deprecated - def generator: String = "" - override def toString = rdd.toString /** Assign a name to this RDD */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index bc926d22a7b6d..a89419bbd10e7 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -499,9 +499,4 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def name(): String = rdd.name - /** - * @deprecated The 'generator' field was removed in Spark 1.0.0. Use sc.setJobGroup. - */ - @Deprecated - def setGenerator(_generator: String) = { } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3f085836de2c3..f09777ca9fd84 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -126,9 +126,6 @@ abstract class RDD[T: ClassTag]( this } - @deprecated("The 'generator' field has been removed, use sc.setJobGroup.", "1.0.0") - def setGenerator(_generator: String) = { } - /** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not @@ -1027,9 +1024,9 @@ abstract class RDD[T: ClassTag]( private var storageLevel: StorageLevel = StorageLevel.NONE /** Info about the function call site where this was created (e.g. `textFile`, `parallelize`). */ - @transient private[spark] val callSite = Utils.getCallSiteInfo + @transient private[spark] val callSiteInfo = Utils.getCallSiteInfo - private[spark] def getCallSiteString = Utils.formatCallSiteInfo(callSite) + private[spark] def getCallSite = Utils.formatCallSiteInfo(callSiteInfo) private[spark] def elementClassTag: ClassTag[T] = classTag[T] @@ -1092,7 +1089,7 @@ abstract class RDD[T: ClassTag]( } override def toString: String = "%s%s[%d] at %s".format( - Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCallSiteString) + Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCallSite) def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3d8c7f9b6eae3..943c102d15422 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -279,7 +279,7 @@ class DAGScheduler( } else { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown - logInfo("Registering RDD " + rdd.id + " (" + rdd.getCallSiteString + ")") + logInfo("Registering RDD " + rdd.id + " (" + rdd.getCallSite + ")") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size) } stage diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 612ea32c83326..5d452202d1f69 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -216,7 +216,7 @@ class JobLogger(val user: String, val logDirName: String) val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE" val rddInfo = s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " + - s"${rdd.getCallSiteString} ${rdd.callSite.firstUserClass}" + s"${rdd.getCallSite} ${rdd.callSiteInfo.firstUserClass}" jobLogInfo(jobID, indentString(indent) + rddInfo, false) rdd.dependencies.foreach { case shufDep: ShuffleDependency[_, _] => diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 92e762771b839..4d5af261d1a65 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -100,7 +100,7 @@ private[spark] class Stage( id } - val name = callSite.getOrElse(rdd.getCallSiteString) + val name = callSite.getOrElse(rdd.getCallSite) override def toString = "Stage " + id From e17fb76105364bca97c653cebc4a77b4133f0e22 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 10 Mar 2014 10:33:09 -0700 Subject: [PATCH 3/4] Review feedback: callSite -> creationSite --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 ++++-- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 8 +++----- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../main/scala/org/apache/spark/scheduler/JobLogger.scala | 2 +- .../src/main/scala/org/apache/spark/scheduler/Stage.scala | 2 +- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 06cf721f71eb0..41b709b58d663 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -830,8 +830,10 @@ class SparkContext( setLocalProperty("externalCallSite", null) } - /** Capture the current user callsite and return a formatted version for printing. If the user - * has overridden the call site, this will return the user's version. */ + /** + * Capture the current user callsite and return a formatted version for printing. If the user + * has overridden the call site, this will return the user's version. + */ private[spark] def getCallSite(): String = { Option(getLocalProperty("externalCallSite")).getOrElse(Utils.formatCallSiteInfo()) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index f09777ca9fd84..f560ce3349514 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1023,10 +1023,8 @@ abstract class RDD[T: ClassTag]( private var storageLevel: StorageLevel = StorageLevel.NONE - /** Info about the function call site where this was created (e.g. `textFile`, `parallelize`). */ - @transient private[spark] val callSiteInfo = Utils.getCallSiteInfo - - private[spark] def getCallSite = Utils.formatCallSiteInfo(callSiteInfo) + /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ + private[spark] def getCreationSite = sc.getCallSite() private[spark] def elementClassTag: ClassTag[T] = classTag[T] @@ -1089,7 +1087,7 @@ abstract class RDD[T: ClassTag]( } override def toString: String = "%s%s[%d] at %s".format( - Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCallSite) + Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite) def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 943c102d15422..d83d0341c61ab 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -279,7 +279,7 @@ class DAGScheduler( } else { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown - logInfo("Registering RDD " + rdd.id + " (" + rdd.getCallSite + ")") + logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size) } stage diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 5d452202d1f69..01cbcc390c6cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -216,7 +216,7 @@ class JobLogger(val user: String, val logDirName: String) val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE" val rddInfo = s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " + - s"${rdd.getCallSite} ${rdd.callSiteInfo.firstUserClass}" + s"${rdd.getCreationSite} ${rdd.creationSiteInfo.firstUserClass}" jobLogInfo(jobID, indentString(indent) + rddInfo, false) rdd.dependencies.foreach { case shufDep: ShuffleDependency[_, _] => diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 4d5af261d1a65..5c1fc30e4a557 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -100,7 +100,7 @@ private[spark] class Stage( id } - val name = callSite.getOrElse(rdd.getCallSite) + val name = callSite.getOrElse(rdd.getCreationSite) override def toString = "Stage " + id From fc1d009c4a99a2298e063eeddb3f7d089089c509 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 10 Mar 2014 13:58:29 -0700 Subject: [PATCH 4/4] Compile fix --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index f560ce3349514..4afa7523dd802 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1024,7 +1024,8 @@ abstract class RDD[T: ClassTag]( private var storageLevel: StorageLevel = StorageLevel.NONE /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ - private[spark] def getCreationSite = sc.getCallSite() + @transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo + private[spark] def getCreationSite = Utils.formatCallSiteInfo(creationSiteInfo) private[spark] def elementClassTag: ClassTag[T] = classTag[T]