@@ -224,7 +224,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
224
224
assert(fs.exists(path))
225
225
226
226
// the checkpoint is not cleaned by default (without the configuration set)
227
- var postGCTester = new CleanerTester (sc, Seq (rddId), Nil , Nil )
227
+ var postGCTester = new CleanerTester (sc, Seq (rddId), Nil , Nil , Nil )
228
228
rdd = null // Make RDD out of scope
229
229
runGC()
230
230
postGCTester.assertCleanup()
@@ -245,7 +245,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
245
245
assert(fs.exists(RDDCheckpointData .rddCheckpointDataPath(sc, rddId).get))
246
246
247
247
// Test that GC causes checkpoint data cleanup after dereferencing the RDD
248
- postGCTester = new CleanerTester (sc, Seq (rddId), Nil , Nil )
248
+ postGCTester = new CleanerTester (sc, Seq (rddId), Nil , Nil , Seq (rddId) )
249
249
rdd = null // Make RDD out of scope
250
250
runGC()
251
251
postGCTester.assertCleanup()
@@ -406,12 +406,14 @@ class CleanerTester(
406
406
sc : SparkContext ,
407
407
rddIds : Seq [Int ] = Seq .empty,
408
408
shuffleIds : Seq [Int ] = Seq .empty,
409
- broadcastIds : Seq [Long ] = Seq .empty)
409
+ broadcastIds : Seq [Long ] = Seq .empty,
410
+ checkpointIds : Seq [Long ] = Seq .empty)
410
411
extends Logging {
411
412
412
413
val toBeCleanedRDDIds = new HashSet [Int ] with SynchronizedSet [Int ] ++= rddIds
413
414
val toBeCleanedShuffleIds = new HashSet [Int ] with SynchronizedSet [Int ] ++= shuffleIds
414
415
val toBeCleanedBroadcstIds = new HashSet [Long ] with SynchronizedSet [Long ] ++= broadcastIds
416
+ val toBeCheckpointIds = new HashSet [Long ] with SynchronizedSet [Long ] ++= checkpointIds
415
417
val isDistributed = ! sc.isLocal
416
418
417
419
val cleanerListener = new CleanerListener {
@@ -427,12 +429,17 @@ class CleanerTester(
427
429
428
430
def broadcastCleaned (broadcastId : Long ): Unit = {
429
431
toBeCleanedBroadcstIds -= broadcastId
430
- logInfo(" Broadcast" + broadcastId + " cleaned" )
432
+ logInfo(" Broadcast " + broadcastId + " cleaned" )
431
433
}
432
434
433
435
def accumCleaned (accId : Long ): Unit = {
434
436
logInfo(" Cleaned accId " + accId + " cleaned" )
435
437
}
438
+
439
+ def checkpointCleaned (rddId : Long ): Unit = {
440
+ toBeCheckpointIds -= rddId
441
+ logInfo(" checkpoint " + rddId + " cleaned" )
442
+ }
436
443
}
437
444
438
445
val MAX_VALIDATION_ATTEMPTS = 10
@@ -456,7 +463,8 @@ class CleanerTester(
456
463
457
464
/** Verify that RDDs, shuffles, etc. occupy resources */
458
465
private def preCleanupValidate () {
459
- assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty, " Nothing to cleanup" )
466
+ assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty ||
467
+ checkpointIds.nonEmpty, " Nothing to cleanup" )
460
468
461
469
// Verify the RDDs have been persisted and blocks are present
462
470
rddIds.foreach { rddId =>
@@ -547,7 +555,8 @@ class CleanerTester(
547
555
private def isAllCleanedUp =
548
556
toBeCleanedRDDIds.isEmpty &&
549
557
toBeCleanedShuffleIds.isEmpty &&
550
- toBeCleanedBroadcstIds.isEmpty
558
+ toBeCleanedBroadcstIds.isEmpty &&
559
+ toBeCheckpointIds.isEmpty
551
560
552
561
private def getRDDBlocks (rddId : Int ): Seq [BlockId ] = {
553
562
blockManager.master.getMatchingBlockIds( _ match {
0 commit comments