Skip to content

[SPARK-12165][SPARK-12189] Fix bugs in eviction of storage memory by execution #10170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ private[spark] abstract class MemoryManager(
def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
}
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean

/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
Expand Down Expand Up @@ -109,12 +107,7 @@ private[spark] abstract class MemoryManager(
def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
memoryMode match {
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
}
}
memoryMode: MemoryMode): Long

/**
* Release numBytes of execution memory belonging to the given task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,50 @@ private[spark] class StaticMemoryManager(
}

// Max number of bytes worth of blocks to evict when unrolling
private val maxMemoryToEvictForUnroll: Long = {
private val maxUnrollMemory: Long = {
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
if (numBytes > maxStorageMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxStorageMemory bytes)")
false
} else {
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
}
}

override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory)
val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
val freeMemory = storageMemoryPool.memoryFree
// When unrolling, we will use all of the existing free memory, and, if necessary,
// some extra space freed from evicting cached blocks. We must place a cap on the
// amount of memory to be evicted by unrolling, however, otherwise unrolling one
// big block can blow away the entire cache.
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
// Keep it within the range 0 <= X <= maxNumBytesToFree
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
}

private[memory]
override def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
memoryMode match {
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
}
}
}


Expand Down
37 changes: 21 additions & 16 deletions core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,16 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
acquireMemory(blockId, numBytes, numBytes, evictedBlocks)
val numBytesToFree = math.max(0, numBytes - memoryFree)
acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
}

/**
* Acquire N bytes of storage memory for the given block, evicting existing ones if necessary.
*
* @param blockId the ID of the block we are acquiring storage memory for
* @param numBytesToAcquire the size of this block
* @param numBytesToFree the size of space to be freed through evicting blocks
* @param numBytesToFree the amount of space to be freed through evicting blocks
* @return whether all N bytes were successfully granted.
*/
def acquireMemory(
Expand All @@ -84,16 +85,18 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
// Register evicted blocks, if any, with the active task metrics
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
if (numBytesToFree > 0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks)
// Register evicted blocks, if any, with the active task metrics
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
}
}
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
// back into this StorageMemoryPool in order to free. Therefore, these variables should have
// been updated.
// back into this StorageMemoryPool in order to free memory. Therefore, these variables
// should have been updated.
val enoughMemory = numBytesToAcquire <= memoryFree
if (enoughMemory) {
_memoryUsed += numBytesToAcquire
Expand Down Expand Up @@ -121,18 +124,20 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
*/
def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
// First, shrink the pool by reclaiming free memory:
val spaceFreedByReleasingUnusedMemory = Math.min(spaceToFree, memoryFree)
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
decrementPoolSize(spaceFreedByReleasingUnusedMemory)
if (spaceFreedByReleasingUnusedMemory == spaceToFree) {
spaceFreedByReleasingUnusedMemory
} else {
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
memoryStore.ensureFreeSpace(spaceToFree - spaceFreedByReleasingUnusedMemory, evictedBlocks)
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, evictedBlocks)
val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
_memoryUsed -= spaceFreedByEviction
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else {
spaceFreedByReleasingUnusedMemory
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
case MemoryMode.OFF_HEAP =>
// For now, we only support on-heap caching of data, so we do not need to interact with
// the storage pool when allocating off-heap memory. This will change in the future, though.
super.acquireExecutionMemory(numBytes, taskAttemptId, memoryMode)
offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this change. It's much less brittle!

}
}

Expand All @@ -110,6 +110,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assert(numBytes >= 0)
if (numBytes > maxStorageMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxStorageMemory bytes)")
return false
}
if (numBytes > storageMemoryPool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from
// the execution pool.
Expand Down
76 changes: 16 additions & 60 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -406,85 +406,41 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}

/**
* Try to free up a given amount of space by evicting existing blocks.
*
* @param space the amount of memory to free, in bytes
* @param droppedBlocks a holder for blocks evicted in the process
* @return whether the requested free space is freed.
*/
private[spark] def ensureFreeSpace(
space: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
ensureFreeSpace(None, space, droppedBlocks)
}

/**
* Try to free up a given amount of space to store a block by evicting existing ones.
*
* @param space the amount of memory to free, in bytes
* @param droppedBlocks a holder for blocks evicted in the process
* @return whether the requested free space is freed.
*/
private[spark] def ensureFreeSpace(
blockId: BlockId,
space: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
ensureFreeSpace(Some(blockId), space, droppedBlocks)
}

/**
* Try to free up a given amount of space to store a particular block, but can fail if
* either the block is bigger than our memory or it would require replacing another block
* from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
* don't fit into memory that we want to avoid).
*
* @param blockId the ID of the block we are freeing space for, if any
* @param space the size of this block
* @param droppedBlocks a holder for blocks evicted in the process
* @return whether the requested free space is freed.
*/
private def ensureFreeSpace(
* Try to evict blocks to free up a given amount of space to store a particular block.
* Can fail if either the block is bigger than our memory or it would require replacing
* another block from the same RDD (which leads to a wasteful cyclic replacement pattern for
* RDDs that don't fit into memory that we want to avoid).
*
* @param blockId the ID of the block we are freeing space for, if any
* @param space the size of this block
* @param droppedBlocks a holder for blocks evicted in the process
* @return whether the requested free space is freed.
*/
private[spark] def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
assert(space > 0)
memoryManager.synchronized {
val freeMemory = maxMemory - memoryUsed
var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
var selectedMemory = 0L

logInfo(s"Ensuring $space bytes of free space " +
blockId.map { id => s"for block $id" }.getOrElse("") +
s"(free: $freeMemory, max: $maxMemory)")

// Fail fast if the block simply won't fit
if (space > maxMemory) {
logInfo("Will not " + blockId.map { id => s"store $id" }.getOrElse("free memory") +
s" as the required space ($space bytes) exceeds our memory limit ($maxMemory bytes)")
return false
}

// No need to evict anything if there is already enough free space
if (freeMemory >= space) {
return true
}

// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
while (freeMemory + selectedMemory < space && iterator.hasNext) {
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
selectedBlocks += blockId
selectedMemory += pair.getValue.size
freedMemory += pair.getValue.size
}
}
}

if (freeMemory + selectedMemory >= space) {
if (freedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
Expand Down
Loading