@@ -395,6 +395,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
395
395
}
396
396
397
397
// Take into account the amount of memory currently occupied by unrolling blocks
398
+ // and minus the pending unroll memory for that block on current thread.
399
+ val threadId = Thread .currentThread().getId
398
400
val actualFreeMemory = freeMemory - currentUnrollMemory
399
401
400
402
if (actualFreeMemory < space) {
@@ -482,13 +484,20 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
482
484
}
483
485
}
484
486
487
+ /**
488
+ * Reserve the unroll memory of current unroll successful block used by this thread
489
+ * until actually put the block into memory entry.
490
+ */
485
491
def reservePendingUnrollMemoryForThisThread (memory : Long ): Unit = {
486
492
val threadId = Thread .currentThread().getId
487
493
accountingLock.synchronized {
488
494
pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId, 0L ) + memory
489
495
}
490
496
}
491
497
498
+ /**
499
+ * Release pending unroll memory of current unroll successful block used by this thread
500
+ */
492
501
def releasePendingUnrollMemoryForThisThread (): Unit = {
493
502
val threadId = Thread .currentThread().getId
494
503
accountingLock.synchronized {
@@ -500,7 +509,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
500
509
* Return the amount of memory currently occupied for unrolling blocks across all threads.
501
510
*/
502
511
def currentUnrollMemory : Long = accountingLock.synchronized {
503
- unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
512
+ unrollMemoryMap.values.sum + pendingUnrollMemoryMap.
513
+ filter(_._1 != Thread .currentThread().getId).values.sum
504
514
}
505
515
506
516
/**
0 commit comments