Skip to content

Commit e3a88d1

Browse files
suyanNoneAndrew Or
authored andcommitted
[SPARK-4777][CORE] Some block memory after unrollSafely not count into used memory(memoryStore.entrys or unrollMemory)
Some memory not count into memory used by memoryStore or unrollMemory. Thread A after unrollsafely memory, it will release 40MB unrollMemory(40MB will used by other threads). then ThreadA wait get accountingLock to tryToPut blockA(30MB). before Thread A get accountingLock, blockA memory size is not counting into unrollMemory or memoryStore.currentMemory. IIUC, freeMemory should minus that block memory So, put this release memory into pending, and release it in tryToPut before ensureSpace Author: hushan[胡珊] <[email protected]> Closes apache#3629 from suyanNone/unroll-memory and squashes the following commits: 809cc41 [hushan[胡珊]] Refine 407b2c9 [hushan[胡珊]] Refine according comments 39960d0 [hushan[胡珊]] Refine comments 0fd0213 [hushan[胡珊]] add comments 0fc2bec [hushan[胡珊]] Release pending unroll memory after put block in memoryStore 3a3f2c8 [hushan[胡珊]] Refine blockManagerSuite unroll test 3323c45 [hushan[胡珊]] Refine getOrElse f664317 [hushan[胡珊]] Make sure not add pending in every releaseUnrollMemory call 08b32ba [hushan[胡珊]] Pending unroll memory for this block untill tryToPut
1 parent 258d154 commit e3a88d1

File tree

2 files changed

+47
-7
lines changed

2 files changed

+47
-7
lines changed

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
4646
// A mapping from thread ID to amount of memory used for unrolling a block (in bytes)
4747
// All accesses of this map are assumed to have manually synchronized on `accountingLock`
4848
private val unrollMemoryMap = mutable.HashMap[Long, Long]()
49+
// Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
50+
// Pending unroll memory refers to the intermediate memory occupied by a thread
51+
// after the unroll but before the actual putting of the block in the cache.
52+
// This chunk of memory is expected to be released *as soon as* we finish
53+
// caching the corresponding block as opposed to until after the task finishes.
54+
// This is only used if a block is successfully unrolled in its entirety in
55+
// memory (SPARK-4777).
56+
private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]()
4957

5058
/**
5159
* The amount of space ensured for unrolling values in memory, shared across all cores.
@@ -283,12 +291,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
283291
}
284292

285293
} finally {
286-
// If we return an array, the values returned do not depend on the underlying vector and
287-
// we can immediately free up space for other threads. Otherwise, if we return an iterator,
288-
// we release the memory claimed by this thread later on when the task finishes.
294+
// If we return an array, the values returned will later be cached in `tryToPut`.
295+
// In this case, we should release the memory after we cache the block there.
296+
// Otherwise, if we return an iterator, we release the memory reserved here
297+
// later when the task finishes.
289298
if (keepUnrolling) {
290-
val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
291-
releaseUnrollMemoryForThisThread(amountToRelease)
299+
accountingLock.synchronized {
300+
val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
301+
releaseUnrollMemoryForThisThread(amountToRelease)
302+
reservePendingUnrollMemoryForThisThread(amountToRelease)
303+
}
292304
}
293305
}
294306
}
@@ -353,6 +365,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
353365
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
354366
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
355367
}
368+
// Release the unroll memory used because we no longer need the underlying Array
369+
releasePendingUnrollMemoryForThisThread()
356370
}
357371
ResultWithDroppedBlocks(putSuccess, droppedBlocks)
358372
}
@@ -381,7 +395,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
381395
}
382396

383397
// Take into account the amount of memory currently occupied by unrolling blocks
384-
val actualFreeMemory = freeMemory - currentUnrollMemory
398+
// and minus the pending unroll memory for that block on current thread.
399+
val threadId = Thread.currentThread().getId
400+
val actualFreeMemory = freeMemory - currentUnrollMemory +
401+
pendingUnrollMemoryMap.getOrElse(threadId, 0L)
385402

386403
if (actualFreeMemory < space) {
387404
val rddToAdd = getRddId(blockIdToAdd)
@@ -468,11 +485,32 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
468485
}
469486
}
470487

488+
/**
489+
* Reserve the unroll memory of current unroll successful block used by this thread
490+
* until actually put the block into memory entry.
491+
*/
492+
def reservePendingUnrollMemoryForThisThread(memory: Long): Unit = {
493+
val threadId = Thread.currentThread().getId
494+
accountingLock.synchronized {
495+
pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId, 0L) + memory
496+
}
497+
}
498+
499+
/**
500+
* Release pending unroll memory of current unroll successful block used by this thread
501+
*/
502+
def releasePendingUnrollMemoryForThisThread(): Unit = {
503+
val threadId = Thread.currentThread().getId
504+
accountingLock.synchronized {
505+
pendingUnrollMemoryMap.remove(threadId)
506+
}
507+
}
508+
471509
/**
472510
* Return the amount of memory currently occupied for unrolling blocks across all threads.
473511
*/
474512
def currentUnrollMemory: Long = accountingLock.synchronized {
475-
unrollMemoryMap.values.sum
513+
unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
476514
}
477515

478516
/**

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,6 +1064,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
10641064
var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
10651065
verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
10661066
assert(memoryStore.currentUnrollMemoryForThisThread === 0)
1067+
memoryStore.releasePendingUnrollMemoryForThisThread()
10671068

10681069
// Unroll with not enough space. This should succeed after kicking out someBlock1.
10691070
store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
@@ -1074,6 +1075,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
10741075
assert(droppedBlocks.size === 1)
10751076
assert(droppedBlocks.head._1 === TestBlockId("someBlock1"))
10761077
droppedBlocks.clear()
1078+
memoryStore.releasePendingUnrollMemoryForThisThread()
10771079

10781080
// Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
10791081
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.

0 commit comments

Comments
 (0)