Skip to content

Commit e8422c5

Browse files
sarutakAndrew Or
authored andcommitted
[SPARK-5231][WebUI] History Server shows wrong job submission time.
History Server doesn't show collect job submission time. It's because `JobProgressListener` updates job submission time every time `onJobStart` method is invoked from `ReplayListenerBus`. Author: Kousuke Saruta <[email protected]> Closes #4029 from sarutak/SPARK-5231 and squashes the following commits: 0af9e22 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5231 da8bd14 [Kousuke Saruta] Made submissionTime in SparkListenerJobStartas and completionTime in SparkListenerJobEnd as regular Long 0412a6a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5231 26b9b99 [Kousuke Saruta] Fixed the test cases 2d47bd3 [Kousuke Saruta] Fixed to record job submission time and completion time collectly
1 parent f6b852a commit e8422c5

File tree

9 files changed

+77
-35
lines changed

9 files changed

+77
-35
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ class DAGScheduler(
661661
// completion events or stage abort
662662
stageIdToStage -= s.id
663663
jobIdToStageIds -= job.jobId
664-
listenerBus.post(SparkListenerJobEnd(job.jobId, jobResult))
664+
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), jobResult))
665665
}
666666
}
667667

@@ -710,7 +710,7 @@ class DAGScheduler(
710710
stage.latestInfo.stageFailed(stageFailedMessage)
711711
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
712712
}
713-
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
713+
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
714714
}
715715
}
716716

@@ -749,17 +749,20 @@ class DAGScheduler(
749749
logInfo("Missing parents: " + getMissingParentStages(finalStage))
750750
val shouldRunLocally =
751751
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
752+
val jobSubmissionTime = clock.getTime()
752753
if (shouldRunLocally) {
753754
// Compute very short actions like first() or take() with no parent stages locally.
754-
listenerBus.post(SparkListenerJobStart(job.jobId, Seq.empty, properties))
755+
listenerBus.post(
756+
SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
755757
runLocally(job)
756758
} else {
757759
jobIdToActiveJob(jobId) = job
758760
activeJobs += job
759761
finalStage.resultOfJob = Some(job)
760762
val stageIds = jobIdToStageIds(jobId).toArray
761763
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
762-
listenerBus.post(SparkListenerJobStart(job.jobId, stageInfos, properties))
764+
listenerBus.post(
765+
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
763766
submitStage(finalStage)
764767
}
765768
}
@@ -965,7 +968,8 @@ class DAGScheduler(
965968
if (job.numFinished == job.numPartitions) {
966969
markStageAsFinished(stage)
967970
cleanupStateForJobAndIndependentStages(job)
968-
listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
971+
listenerBus.post(
972+
SparkListenerJobEnd(job.jobId, clock.getTime(), JobSucceeded))
969973
}
970974

971975
// taskSucceeded runs some user code that might throw an exception. Make sure
@@ -1234,7 +1238,7 @@ class DAGScheduler(
12341238
if (ableToCancelStages) {
12351239
job.listener.jobFailed(error)
12361240
cleanupStateForJobAndIndependentStages(job)
1237-
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
1241+
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
12381242
}
12391243
}
12401244

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ case class SparkListenerTaskEnd(
5959
@DeveloperApi
6060
case class SparkListenerJobStart(
6161
jobId: Int,
62+
time: Long,
6263
stageInfos: Seq[StageInfo],
6364
properties: Properties = null)
6465
extends SparkListenerEvent {
@@ -68,7 +69,11 @@ case class SparkListenerJobStart(
6869
}
6970

7071
@DeveloperApi
71-
case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkListenerEvent
72+
case class SparkListenerJobEnd(
73+
jobId: Int,
74+
time: Long,
75+
jobResult: JobResult)
76+
extends SparkListenerEvent
7277

7378
@DeveloperApi
7479
case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]])

core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import scala.xml.{Node, NodeSeq}
2121

2222
import javax.servlet.http.HttpServletRequest
2323

24-
import org.apache.spark.JobExecutionStatus
2524
import org.apache.spark.ui.{WebUIPage, UIUtils}
2625
import org.apache.spark.ui.jobs.UIData.JobUIData
2726

@@ -51,13 +50,13 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
5150
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
5251
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
5352
val duration: Option[Long] = {
54-
job.startTime.map { start =>
55-
val end = job.endTime.getOrElse(System.currentTimeMillis())
53+
job.submissionTime.map { start =>
54+
val end = job.completionTime.getOrElse(System.currentTimeMillis())
5655
end - start
5756
}
5857
}
5958
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
60-
val formattedSubmissionTime = job.startTime.map(UIUtils.formatDate).getOrElse("Unknown")
59+
val formattedSubmissionTime = job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
6160
val detailUrl =
6261
"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)
6362
<tr>
@@ -68,7 +67,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
6867
<div><em>{lastStageDescription}</em></div>
6968
<a href={detailUrl}>{lastStageName}</a>
7069
</td>
71-
<td sorttable_customkey={job.startTime.getOrElse(-1).toString}>
70+
<td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>
7271
{formattedSubmissionTime}
7372
</td>
7473
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
@@ -101,11 +100,11 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
101100
val now = System.currentTimeMillis
102101

103102
val activeJobsTable =
104-
jobsTable(activeJobs.sortBy(_.startTime.getOrElse(-1L)).reverse)
103+
jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse)
105104
val completedJobsTable =
106-
jobsTable(completedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
105+
jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
107106
val failedJobsTable =
108-
jobsTable(failedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
107+
jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
109108

110109
val shouldShowActiveJobs = activeJobs.nonEmpty
111110
val shouldShowCompletedJobs = completedJobs.nonEmpty

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
153153
val jobData: JobUIData =
154154
new JobUIData(
155155
jobId = jobStart.jobId,
156-
startTime = Some(System.currentTimeMillis),
157-
endTime = None,
156+
submissionTime = Option(jobStart.time).filter(_ >= 0),
158157
stageIds = jobStart.stageIds,
159158
jobGroup = jobGroup,
160159
status = JobExecutionStatus.RUNNING)
161160
// Compute (a potential underestimate of) the number of tasks that will be run by this job.
162161
// This may be an underestimate because the job start event references all of the result
163-
// stages's transitive stage dependencies, but some of these stages might be skipped if their
162+
// stages' transitive stage dependencies, but some of these stages might be skipped if their
164163
// output is available from earlier runs.
165164
// See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
166165
jobData.numTasks = {
@@ -186,7 +185,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
186185
logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
187186
new JobUIData(jobId = jobEnd.jobId)
188187
}
189-
jobData.endTime = Some(System.currentTimeMillis())
188+
jobData.completionTime = Option(jobEnd.time).filter(_ >= 0)
189+
190190
jobEnd.jobResult match {
191191
case JobSucceeded =>
192192
completedJobs += jobData
@@ -309,7 +309,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
309309
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
310310
val info = taskEnd.taskInfo
311311
// If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task
312-
// compeletion event is for. Let's just drop it here. This means we might have some speculation
312+
// completion event is for. Let's just drop it here. This means we might have some speculation
313313
// tasks on the web ui that's never marked as complete.
314314
if (info != null && taskEnd.stageAttemptId != -1) {
315315
val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), {

core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,15 @@ private[jobs] object UIData {
4040

4141
class JobUIData(
4242
var jobId: Int = -1,
43-
var startTime: Option[Long] = None,
44-
var endTime: Option[Long] = None,
43+
var submissionTime: Option[Long] = None,
44+
var completionTime: Option[Long] = None,
4545
var stageIds: Seq[Int] = Seq.empty,
4646
var jobGroup: Option[String] = None,
4747
var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN,
4848
/* Tasks */
4949
// `numTasks` is a potential underestimate of the true number of tasks that this job will run.
5050
// This may be an underestimate because the job start event references all of the result
51-
// stages's transitive stage dependencies, but some of these stages might be skipped if their
51+
// stages' transitive stage dependencies, but some of these stages might be skipped if their
5252
// output is available from earlier runs.
5353
// See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
5454
var numTasks: Int = 0,

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark.executor._
3232
import org.apache.spark.scheduler._
3333
import org.apache.spark.storage._
3434
import org.apache.spark._
35+
import org.apache.hadoop.hdfs.web.JsonUtil
3536

3637
/**
3738
* Serializes SparkListener events to/from JSON. This protocol provides strong backwards-
@@ -141,6 +142,7 @@ private[spark] object JsonProtocol {
141142
val properties = propertiesToJson(jobStart.properties)
142143
("Event" -> Utils.getFormattedClassName(jobStart)) ~
143144
("Job ID" -> jobStart.jobId) ~
145+
("Submission Time" -> jobStart.time) ~
144146
("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in Spark 1.2.0
145147
("Stage IDs" -> jobStart.stageIds) ~
146148
("Properties" -> properties)
@@ -150,6 +152,7 @@ private[spark] object JsonProtocol {
150152
val jobResult = jobResultToJson(jobEnd.jobResult)
151153
("Event" -> Utils.getFormattedClassName(jobEnd)) ~
152154
("Job ID" -> jobEnd.jobId) ~
155+
("Completion Time" -> jobEnd.time) ~
153156
("Job Result" -> jobResult)
154157
}
155158

@@ -492,20 +495,24 @@ private[spark] object JsonProtocol {
492495

493496
def jobStartFromJson(json: JValue): SparkListenerJobStart = {
494497
val jobId = (json \ "Job ID").extract[Int]
498+
val submissionTime =
499+
Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]).getOrElse(-1L)
495500
val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int])
496501
val properties = propertiesFromJson(json \ "Properties")
497502
// The "Stage Infos" field was added in Spark 1.2.0
498503
val stageInfos = Utils.jsonOption(json \ "Stage Infos")
499504
.map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
500505
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
501506
}
502-
SparkListenerJobStart(jobId, stageInfos, properties)
507+
SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
503508
}
504509

505510
def jobEndFromJson(json: JValue): SparkListenerJobEnd = {
506511
val jobId = (json \ "Job ID").extract[Int]
512+
val completionTime =
513+
Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]).getOrElse(-1L)
507514
val jobResult = jobResultFromJson(json \ "Job Result")
508-
SparkListenerJobEnd(jobId, jobResult)
515+
SparkListenerJobEnd(jobId, completionTime, jobResult)
509516
}
510517

511518
def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = {

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
3434
/** Length of time to wait while draining listener events. */
3535
val WAIT_TIMEOUT_MILLIS = 10000
3636

37+
val jobCompletionTime = 1421191296660L
38+
3739
before {
3840
sc = new SparkContext("local", "SparkListenerSuite")
3941
}
@@ -44,7 +46,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
4446
bus.addListener(counter)
4547

4648
// Listener bus hasn't started yet, so posting events should not increment counter
47-
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
49+
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
4850
assert(counter.count === 0)
4951

5052
// Starting listener bus should flush all buffered events
@@ -54,7 +56,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
5456

5557
// After listener bus has stopped, posting events should not increment counter
5658
bus.stop()
57-
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
59+
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
5860
assert(counter.count === 5)
5961

6062
// Listener bus must not be started twice
@@ -99,7 +101,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
99101

100102
bus.addListener(blockingListener)
101103
bus.start()
102-
bus.post(SparkListenerJobEnd(0, JobSucceeded))
104+
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
103105

104106
listenerStarted.acquire()
105107
// Listener should be blocked after start
@@ -345,7 +347,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
345347
bus.start()
346348

347349
// Post events to all listeners, and wait until the queue is drained
348-
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
350+
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
349351
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
350352

351353
// The exception should be caught, and the event should be propagated to other listeners

core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import org.apache.spark.util.Utils
2828

2929
class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matchers {
3030

31+
val jobSubmissionTime = 1421191042750L
32+
val jobCompletionTime = 1421191296660L
3133

3234
private def createStageStartEvent(stageId: Int) = {
3335
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
@@ -46,12 +48,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
4648
val stageInfos = stageIds.map { stageId =>
4749
new StageInfo(stageId, 0, stageId.toString, 0, null, "")
4850
}
49-
SparkListenerJobStart(jobId, stageInfos)
51+
SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos)
5052
}
5153

5254
private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
5355
val result = if (failed) JobFailed(new Exception("dummy failure")) else JobSucceeded
54-
SparkListenerJobEnd(jobId, result)
56+
SparkListenerJobEnd(jobId, jobCompletionTime, result)
5557
}
5658

5759
private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean = false) {

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ import org.apache.spark.storage._
3434

3535
class JsonProtocolSuite extends FunSuite {
3636

37+
val jobSubmissionTime = 1421191042750L
38+
val jobCompletionTime = 1421191296660L
39+
3740
test("SparkListenerEvent") {
3841
val stageSubmitted =
3942
SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
@@ -54,9 +57,9 @@ class JsonProtocolSuite extends FunSuite {
5457
val stageIds = Seq[Int](1, 2, 3, 4)
5558
val stageInfos = stageIds.map(x =>
5659
makeStageInfo(x, x * 200, x * 300, x * 400L, x * 500L))
57-
SparkListenerJobStart(10, stageInfos, properties)
60+
SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
5861
}
59-
val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
62+
val jobEnd = SparkListenerJobEnd(20, jobCompletionTime, JobSucceeded)
6063
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
6164
"JVM Information" -> Seq(("GC speed", "9999 objects/s"), ("Java home", "Land of coffee")),
6265
"Spark Properties" -> Seq(("Job throughput", "80000 jobs/s, regardless of job type")),
@@ -247,13 +250,31 @@ class JsonProtocolSuite extends FunSuite {
247250
val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400, x * 500))
248251
val dummyStageInfos =
249252
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
250-
val jobStart = SparkListenerJobStart(10, stageInfos, properties)
253+
val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
251254
val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == "Stage Infos"})
252255
val expectedJobStart =
253-
SparkListenerJobStart(10, dummyStageInfos, properties)
256+
SparkListenerJobStart(10, jobSubmissionTime, dummyStageInfos, properties)
254257
assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldEvent))
255258
}
256259

260+
test("SparkListenerJobStart and SparkListenerJobEnd backward compatibility") {
261+
// Prior to Spark 1.3.0, SparkListenerJobStart did not have a "Submission Time" property.
262+
// Also, SparkListenerJobEnd did not have a "Completion Time" property.
263+
val stageIds = Seq[Int](1, 2, 3, 4)
264+
val stageInfos = stageIds.map(x => makeStageInfo(x * 10, x * 20, x * 30, x * 40, x * 50))
265+
val jobStart = SparkListenerJobStart(11, jobSubmissionTime, stageInfos, properties)
266+
val oldStartEvent = JsonProtocol.jobStartToJson(jobStart)
267+
.removeField({ _._1 == "Submission Time"})
268+
val expectedJobStart = SparkListenerJobStart(11, -1, stageInfos, properties)
269+
assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldStartEvent))
270+
271+
val jobEnd = SparkListenerJobEnd(11, jobCompletionTime, JobSucceeded)
272+
val oldEndEvent = JsonProtocol.jobEndToJson(jobEnd)
273+
.removeField({ _._1 == "Completion Time"})
274+
val expectedJobEnd = SparkListenerJobEnd(11, -1, JobSucceeded)
275+
assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent))
276+
}
277+
257278
/** -------------------------- *
258279
| Helper test running methods |
259280
* --------------------------- */
@@ -1075,6 +1096,7 @@ class JsonProtocolSuite extends FunSuite {
10751096
|{
10761097
| "Event": "SparkListenerJobStart",
10771098
| "Job ID": 10,
1099+
| "Submission Time": 1421191042750,
10781100
| "Stage Infos": [
10791101
| {
10801102
| "Stage ID": 1,
@@ -1349,6 +1371,7 @@ class JsonProtocolSuite extends FunSuite {
13491371
|{
13501372
| "Event": "SparkListenerJobEnd",
13511373
| "Job ID": 20,
1374+
| "Completion Time": 1421191296660,
13521375
| "Job Result": {
13531376
| "Result": "JobSucceeded"
13541377
| }

0 commit comments

Comments
 (0)