@@ -544,11 +544,12 @@ private[spark] class MemoryStore(
544
544
}
545
545
546
546
if (freedMemory >= space) {
547
- val successfulBlocks = ArrayBuffer [ BlockId ]()
547
+ var lastSuccessfulBlock = - 1
548
548
try {
549
549
logInfo(s " ${selectedBlocks.size} blocks selected for dropping " +
550
550
s " ( ${Utils .bytesToString(freedMemory)} bytes) " )
551
- for (blockId <- selectedBlocks) {
551
+ (0 until selectedBlocks.size).foreach { idx =>
552
+ val blockId = selectedBlocks(idx)
552
553
val entry = entries.synchronized {
553
554
entries.get(blockId)
554
555
}
@@ -559,23 +560,19 @@ private[spark] class MemoryStore(
559
560
dropBlock(blockId, entry)
560
561
afterDropAction(blockId)
561
562
}
562
- successfulBlocks += blockId
563
+ lastSuccessfulBlock = idx
563
564
}
564
565
logInfo(s " After dropping ${selectedBlocks.size} blocks, " +
565
566
s " free memory is ${Utils .bytesToString(maxMemory - blocksMemoryUsed)}" )
566
567
freedMemory
567
568
} finally {
568
569
// like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal
569
570
// with InterruptedException
570
- if (successfulBlocks.size != selectedBlocks.size) {
571
- val blocksToClean = selectedBlocks -- successfulBlocks
572
- blocksToClean.foreach { id =>
573
- // some of the blocks may have already been unlocked, or completely removed
574
- blockInfoManager.get(id).foreach { info =>
575
- if (info.readerCount > 0 || info.writerTask != BlockInfo .NO_WRITER ) {
576
- blockInfoManager.unlock(id)
577
- }
578
- }
571
+ if (lastSuccessfulBlock != selectedBlocks.size - 1 ) {
572
+ // the blocks we didn't process successfully are still locked, so we have to unlock them
573
+ (lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx =>
574
+ val blockId = selectedBlocks(idx)
575
+ blockInfoManager.unlock(blockId)
579
576
}
580
577
}
581
578
}
0 commit comments