@@ -424,11 +424,11 @@ class CheckpointSuite extends TestSuiteBase {
424
424
}
425
425
}
426
426
}
427
- clock.advance(batchDuration.milliseconds)
428
427
eventually(eventuallyTimeout) {
429
428
// Wait until all files have been recorded and all batches have started
430
429
assert(recordedFiles(ssc) === Seq (1 , 2 , 3 ) && batchCounter.getNumStartedBatches === 3 )
431
430
}
431
+ clock.advance(batchDuration.milliseconds)
432
432
// Wait for a checkpoint to be written
433
433
eventually(eventuallyTimeout) {
434
434
assert(Checkpoint .getCheckpointFiles(checkpointDir).size === 6 )
@@ -454,9 +454,12 @@ class CheckpointSuite extends TestSuiteBase {
454
454
// recorded before failure were saved and successfully recovered
455
455
logInfo(" *********** RESTARTING ************" )
456
456
withStreamingContext(new StreamingContext (checkpointDir)) { ssc =>
457
- // So that the restarted StreamingContext's clock has gone forward in time since failure
458
- ssc.conf.set(" spark.streaming.manualClock.jump" , (batchDuration * 3 ).milliseconds.toString)
459
- val oldClockTime = clock.getTimeMillis()
457
+ // "batchDuration.milliseconds * 3" has gone before restarting StreamingContext. And because
458
+ // the recovery time is read from the checkpoint time but the original clock doesn't align
459
+ // with the batch time, we need to add the offset "batchDuration.milliseconds / 2".
460
+ ssc.conf.set(" spark.streaming.manualClock.jump" ,
461
+ (batchDuration.milliseconds / 2 + batchDuration.milliseconds * 3 ).toString)
462
+ val oldClockTime = clock.getTimeMillis() // 15000ms
460
463
clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
461
464
val batchCounter = new BatchCounter (ssc)
462
465
val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf [TestOutputStream [Int ]]
@@ -467,10 +470,10 @@ class CheckpointSuite extends TestSuiteBase {
467
470
ssc.start()
468
471
// Verify that the clock has traveled forward to the expected time
469
472
eventually(eventuallyTimeout) {
470
- clock.getTimeMillis() === oldClockTime
473
+ assert( clock.getTimeMillis() === oldClockTime)
471
474
}
472
- // Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch)
473
- val numBatchesAfterRestart = 4
475
+ // There are 5 batches between 6000ms and 15000ms (inclusive).
476
+ val numBatchesAfterRestart = 5
474
477
eventually(eventuallyTimeout) {
475
478
assert(batchCounter.getNumCompletedBatches === numBatchesAfterRestart)
476
479
}
@@ -483,7 +486,6 @@ class CheckpointSuite extends TestSuiteBase {
483
486
assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1 )
484
487
}
485
488
}
486
- clock.advance(batchDuration.milliseconds)
487
489
logInfo(" Output after restart = " + outputStream.output.mkString(" [" , " , " , " ]" ))
488
490
assert(outputStream.output.size > 0 , " No files processed after restart" )
489
491
ssc.stop()
0 commit comments