Skip to content

Commit 42d8a01

Browse files
rekhajoshmsrowen
authored andcommitted
[SPARK-8593] [CORE] Sort app attempts by start time.
This makes sure attempts are listed in the order they were executed, and that the app's state matches the state of the most current attempt. Author: Joshi <[email protected]> Author: Rekha Joshi <[email protected]> Closes apache#7253 from rekhajoshm/SPARK-8593 and squashes the following commits: 874dd80 [Joshi] History Server: updated order for multiple attempts(logcleaner) 716e0b1 [Joshi] History Server: updated order for multiple attempts(descending start time works everytime) 548c753 [Joshi] History Server: updated order for multiple attempts(descending start time works everytime) 83306a8 [Joshi] History Server: updated order for multiple attempts(descending start time) b0fc922 [Joshi] History Server: updated order for multiple attempts(updated comment) cc0fda7 [Joshi] History Server: updated order for multiple attempts(updated test) 304cb0b [Joshi] History Server: updated order for multiple attempts(reverted HistoryPage) 85024e8 [Joshi] History Server: updated order for multiple attempts a41ac4b [Joshi] History Server: updated order for multiple attempts ab65fa1 [Joshi] History Server: some attempt completed to work with showIncomplete 0be142d [Rekha Joshi] Merge pull request #3 from apache/master 106fd8e [Rekha Joshi] Merge pull request #2 from apache/master e3677c9 [Rekha Joshi] Merge pull request #1 from apache/master
1 parent 8b8be1f commit 42d8a01

File tree

2 files changed

+14
-20
lines changed

2 files changed

+14
-20
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -407,8 +407,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
407407

408408
/**
409409
* Comparison function that defines the sort order for application attempts within the same
410-
* application. Order is: running attempts before complete attempts, running attempts sorted
411-
* by start time, completed attempts sorted by end time.
410+
* application. Order is: attempts are sorted by descending start time.
411+
* Most recent attempt state matches with current state of the app.
412412
*
413413
* Normally applications should have a single running attempt; but failure to call sc.stop()
414414
* may cause multiple running attempts to show up.
@@ -418,11 +418,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
418418
private def compareAttemptInfo(
419419
a1: FsApplicationAttemptInfo,
420420
a2: FsApplicationAttemptInfo): Boolean = {
421-
if (a1.completed == a2.completed) {
422-
if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
423-
} else {
424-
!a1.completed
425-
}
421+
a1.startTime >= a2.startTime
426422
}
427423

428424
/**

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
243243
appListAfterRename.size should be (1)
244244
}
245245

246-
test("apps with multiple attempts") {
246+
test("apps with multiple attempts with order") {
247247
val provider = new FsHistoryProvider(createTestConf())
248248

249-
val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
249+
val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)
250250
writeFile(attempt1, true, None,
251-
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
252-
SparkListenerApplicationEnd(2L)
251+
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1"))
253252
)
254253

255254
updateAndCheck(provider) { list =>
@@ -259,7 +258,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
259258

260259
val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
261260
writeFile(attempt2, true, None,
262-
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
261+
SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2"))
263262
)
264263

265264
updateAndCheck(provider) { list =>
@@ -268,30 +267,29 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
268267
list.head.attempts.head.attemptId should be (Some("attempt2"))
269268
}
270269

271-
val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
272-
attempt2.delete()
273-
writeFile(attempt2, true, None,
274-
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
270+
val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false)
271+
writeFile(attempt3, true, None,
272+
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt3")),
275273
SparkListenerApplicationEnd(4L)
276274
)
277275

278276
updateAndCheck(provider) { list =>
279277
list should not be (null)
280278
list.size should be (1)
281-
list.head.attempts.size should be (2)
282-
list.head.attempts.head.attemptId should be (Some("attempt2"))
279+
list.head.attempts.size should be (3)
280+
list.head.attempts.head.attemptId should be (Some("attempt3"))
283281
}
284282

285283
val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
286-
writeFile(attempt2, true, None,
284+
writeFile(attempt1, true, None,
287285
SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
288286
SparkListenerApplicationEnd(6L)
289287
)
290288

291289
updateAndCheck(provider) { list =>
292290
list.size should be (2)
293291
list.head.attempts.size should be (1)
294-
list.last.attempts.size should be (2)
292+
list.last.attempts.size should be (3)
295293
list.head.attempts.head.attemptId should be (Some("attempt1"))
296294

297295
list.foreach { case app =>

0 commit comments

Comments
 (0)