Skip to content

Commit 43de96e

Browse files
author
Andrew Or
committed
Add parent IDs to StageInfo
1 parent 6e2cfea commit 43de96e

File tree

7 files changed

+43
-19
lines changed

7 files changed

+43
-19
lines changed

core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class StageInfo(
3333
val name: String,
3434
val numTasks: Int,
3535
val rddInfos: Seq[RDDInfo],
36+
val parentIds: Seq[Int],
3637
val details: String) {
3738
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
3839
var submissionTime: Option[Long] = None
@@ -66,6 +67,7 @@ private[spark] object StageInfo {
6667
stage.name,
6768
numTasks.getOrElse(stage.numTasks),
6869
rddInfos,
70+
stage.parents.map(_.id),
6971
stage.details)
7072
}
7173
}

core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
5151
// This could be empty if the JobProgressListener hasn't received information about the
5252
// stage or if the stage information has been garbage collected
5353
listener.stageIdToInfo.getOrElse(stageId,
54-
new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, "Unknown"))
54+
new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown"))
5555
}
5656

5757
val activeStages = mutable.Buffer[StageInfo]()

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ private[spark] object JsonProtocol {
227227

228228
def stageInfoToJson(stageInfo: StageInfo): JValue = {
229229
val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList)
230+
val parentIds = JArray(stageInfo.parentIds.map(JInt(_)).toList)
230231
val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing)
231232
val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
232233
val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
@@ -235,6 +236,7 @@ private[spark] object JsonProtocol {
235236
("Stage Name" -> stageInfo.name) ~
236237
("Number of Tasks" -> stageInfo.numTasks) ~
237238
("RDD Info" -> rddInfo) ~
239+
("Parent IDs" -> parentIds) ~
238240
("Details" -> stageInfo.details) ~
239241
("Submission Time" -> submissionTime) ~
240242
("Completion Time" -> completionTime) ~
@@ -521,7 +523,7 @@ private[spark] object JsonProtocol {
521523
// The "Stage Infos" field was added in Spark 1.2.0
522524
val stageInfos = Utils.jsonOption(json \ "Stage Infos")
523525
.map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
524-
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
526+
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown"))
525527
}
526528
SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
527529
}
@@ -601,6 +603,9 @@ private[spark] object JsonProtocol {
601603
val stageName = (json \ "Stage Name").extract[String]
602604
val numTasks = (json \ "Number of Tasks").extract[Int]
603605
val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
606+
val parentIds = Utils.jsonOption(json \ "Parent IDs")
607+
.map { l => l.extract[List[JValue]].map(_.extract[Int]) }
608+
.getOrElse(Seq.empty)
604609
val details = (json \ "Details").extractOpt[String].getOrElse("")
605610
val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
606611
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
@@ -610,7 +615,8 @@ private[spark] object JsonProtocol {
610615
case None => Seq[AccumulableInfo]()
611616
}
612617

613-
val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, rddInfos, details)
618+
val stageInfo = new StageInfo(
619+
stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details)
614620
stageInfo.submissionTime = submissionTime
615621
stageInfo.completionTime = completionTime
616622
stageInfo.failureReason = failureReason

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
701701
private val executorIdleTimeout = 3L
702702

703703
private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = {
704-
new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details")
704+
new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details")
705705
}
706706

707707
private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = {

core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
3434
val jobCompletionTime = 1421191296660L
3535

3636
private def createStageStartEvent(stageId: Int) = {
37-
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
37+
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, null, "")
3838
SparkListenerStageSubmitted(stageInfo)
3939
}
4040

4141
private def createStageEndEvent(stageId: Int, failed: Boolean = false) = {
42-
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
42+
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, null, "")
4343
if (failed) {
4444
stageInfo.failureReason = Some("Failed!")
4545
}
@@ -51,7 +51,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
5151
stageIds: Seq[Int],
5252
jobGroup: Option[String] = None): SparkListenerJobStart = {
5353
val stageInfos = stageIds.map { stageId =>
54-
new StageInfo(stageId, 0, stageId.toString, 0, null, "")
54+
new StageInfo(stageId, 0, stageId.toString, 0, null, null, "")
5555
}
5656
val properties: Option[Properties] = jobGroup.map { groupId =>
5757
val props = new Properties()

core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
5454
assert(storageListener.rddInfoList.isEmpty)
5555

5656
// 2 RDDs are known, but none are cached
57-
val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0, rddInfo1), "details")
57+
val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0, rddInfo1), Seq.empty, "details")
5858
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
5959
assert(storageListener._rddInfoMap.size === 2)
6060
assert(storageListener.rddInfoList.isEmpty)
@@ -64,15 +64,16 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
6464
val rddInfo3Cached = rddInfo3
6565
rddInfo2Cached.numCachedPartitions = 1
6666
rddInfo3Cached.numCachedPartitions = 1
67-
val stageInfo1 = new StageInfo(1, 0, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), "details")
67+
val stageInfo1 = new StageInfo(
68+
1, 0, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), Seq.empty, "details")
6869
bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
6970
assert(storageListener._rddInfoMap.size === 4)
7071
assert(storageListener.rddInfoList.size === 2)
7172

7273
// Submitting RDDInfos with duplicate IDs does nothing
7374
val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY, "scoop", Seq(10))
7475
rddInfo0Cached.numCachedPartitions = 1
75-
val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), "details")
76+
val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), Seq.empty, "details")
7677
bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached))
7778
assert(storageListener._rddInfoMap.size === 4)
7879
assert(storageListener.rddInfoList.size === 2)
@@ -88,7 +89,8 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
8889
val rddInfo1Cached = rddInfo1
8990
rddInfo0Cached.numCachedPartitions = 1
9091
rddInfo1Cached.numCachedPartitions = 1
91-
val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), "details")
92+
val stageInfo0 = new StageInfo(
93+
0, 0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), Seq.empty, "details")
9294
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
9395
assert(storageListener._rddInfoMap.size === 2)
9496
assert(storageListener.rddInfoList.size === 2)
@@ -108,7 +110,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
108110
val myRddInfo1 = rddInfo1
109111
val myRddInfo2 = rddInfo2
110112
val stageInfo0 = new StageInfo(
111-
0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
113+
0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), Seq.empty, "details")
112114
bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
113115
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
114116
assert(storageListener._rddInfoMap.size === 3)
@@ -168,8 +170,8 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
168170

169171
val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly, "scoop", Seq(4))
170172
val rddInfo1 = new RDDInfo(1, "rdd1", 1 ,memOnly, "scoop", Seq(4))
171-
val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), "details")
172-
val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), "details")
173+
val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), Seq.empty, "details")
174+
val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), Seq.empty, "details")
173175
val taskMetrics0 = new TaskMetrics
174176
val taskMetrics1 = new TaskMetrics
175177
val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L, 0L))

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ class JsonProtocolSuite extends FunSuite {
161161
assertEquals(exceptionFailure, JsonProtocol.taskEndReasonFromJson(oldEvent))
162162
}
163163

164-
test("StageInfo backward compatibility") {
164+
test("StageInfo backward compatibility (details, accumulables)") {
165165
val info = makeStageInfo(1, 2, 3, 4L, 5L)
166166
val newJson = JsonProtocol.stageInfoToJson(info)
167167

@@ -294,7 +294,7 @@ class JsonProtocolSuite extends FunSuite {
294294
val stageIds = Seq[Int](1, 2, 3, 4)
295295
val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400, x * 500))
296296
val dummyStageInfos =
297-
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
297+
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown"))
298298
val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
299299
val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == "Stage Infos"})
300300
val expectedJobStart =
@@ -320,8 +320,8 @@ class JsonProtocolSuite extends FunSuite {
320320
assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent))
321321
}
322322

323-
test("RDDInfo backward compatibility") {
324-
// Prior to Spark 1.4.0, RDDInfo did not have a "Scope" and "Parent IDs" properties
323+
test("RDDInfo backward compatibility (scope, parent IDs)") {
324+
// Prior to Spark 1.4.0, RDDInfo did not have the "Scope" and "Parent IDs" properties
325325
val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, "fable", Seq(1, 6, 8))
326326
val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo)
327327
.removeField({ _._1 == "Scope"})
@@ -330,6 +330,14 @@ class JsonProtocolSuite extends FunSuite {
330330
assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson))
331331
}
332332

333+
test("StageInfo backward compatibility (parent IDs)") {
334+
// Prior to Spark 1.4.0, StageInfo did not have the "Parent IDs" property
335+
val stageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq(1, 2, 3), "details")
336+
val oldStageInfo = JsonProtocol.stageInfoToJson(stageInfo).removeField({ _._1 == "Parent IDs"})
337+
val expectedStageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq.empty, "details")
338+
assertEquals(expectedStageInfo, JsonProtocol.stageInfoFromJson(oldStageInfo))
339+
}
340+
333341
/** -------------------------- *
334342
| Helper test running methods |
335343
* --------------------------- */
@@ -661,7 +669,7 @@ class JsonProtocolSuite extends FunSuite {
661669

662670
private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
663671
val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) }
664-
val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, "details")
672+
val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, Seq(100, 200, 300), "details")
665673
val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2))
666674
stageInfo.accumulables(acc1.id) = acc1
667675
stageInfo.accumulables(acc2.id) = acc2
@@ -754,6 +762,7 @@ class JsonProtocolSuite extends FunSuite {
754762
| "Stage Name": "greetings",
755763
| "Number of Tasks": 200,
756764
| "RDD Info": [],
765+
| "ParentIDs" : [100, 200, 300],
757766
| "Details": "details",
758767
| "Accumulables": [
759768
| {
@@ -808,6 +817,7 @@ class JsonProtocolSuite extends FunSuite {
808817
| "Disk Size": 501
809818
| }
810819
| ],
820+
| "ParentIDs" : [100, 200, 300],
811821
| "Details": "details",
812822
| "Accumulables": [
813823
| {
@@ -1193,6 +1203,7 @@ class JsonProtocolSuite extends FunSuite {
11931203
| "Disk Size": 500
11941204
| }
11951205
| ],
1206+
| "Parent IDs" : [100, 200, 300],
11961207
| "Details": "details",
11971208
| "Accumulables": [
11981209
| {
@@ -1252,6 +1263,7 @@ class JsonProtocolSuite extends FunSuite {
12521263
| "Disk Size": 1001
12531264
| }
12541265
| ],
1266+
| "ParentIDs" : [100, 200, 300],
12551267
| "Details": "details",
12561268
| "Accumulables": [
12571269
| {
@@ -1329,6 +1341,7 @@ class JsonProtocolSuite extends FunSuite {
13291341
| "Disk Size": 1502
13301342
| }
13311343
| ],
1344+
| "ParentIDs" : [100, 200, 300],
13321345
| "Details": "details",
13331346
| "Accumulables": [
13341347
| {
@@ -1424,6 +1437,7 @@ class JsonProtocolSuite extends FunSuite {
14241437
| "Disk Size": 2003
14251438
| }
14261439
| ],
1440+
| "ParentIDs" : [100, 200, 300],
14271441
| "Details": "details",
14281442
| "Accumulables": [
14291443
| {

0 commit comments

Comments
 (0)