diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 4ad12ab7897e8..0013ae6b39918 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -303,10 +303,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // Otherwise, if we return an iterator, we release the memory reserved here // later when the task finishes. if (keepUnrolling) { + val taskAttemptId = currentTaskAttemptId() accountingLock.synchronized { - val amountToRelease = currentUnrollMemoryForThisTask - previousMemoryReserved - releaseUnrollMemoryForThisTask(amountToRelease) - reservePendingUnrollMemoryForThisTask(blockId, amountToRelease, droppedBlocks) + // Here, we transfer memory from unroll to pending unroll because we expect to cache this + // block in `tryToPut`. We do not release and re-acquire memory from the MemoryManager in + // order to avoid race conditions where another component steals the memory that we're + // trying to transfer. + val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved + unrollMemoryMap(taskAttemptId) -= amountToTransferToPending + pendingUnrollMemoryMap(taskAttemptId) = + pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending } } } @@ -362,7 +368,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // Note: if we have previously unrolled this block successfully, then pending unroll // memory should be non-zero. This is the amount that we already reserved during the // unrolling process. In this case, we can just reuse this space to cache our block. - // This must be synchronized so the release and re-acquire can happen atomically. + // + // Note: the StaticMemoryManager counts unroll memory as storage memory. Here, the + // synchronization on `accountingLock` guarantees that the release of unroll memory and + // acquisition of storage memory happens atomically. However, if storage memory is acquired + // outside of MemoryStore or if unroll memory is counted as execution memory, then we will + // have to revisit this assumption. See SPARK-10983 for more context. releasePendingUnrollMemoryForThisTask() val numBytesAcquired = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks) val enoughMemory = numBytesAcquired == size @@ -516,27 +527,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } } - /** - * Reserve the unroll memory of current unroll successful block used by this task - * until actually put the block into memory entry. - * @return whether the request is granted. - */ - private def reservePendingUnrollMemoryForThisTask( - blockId: BlockId, - memory: Long, - droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { - val taskAttemptId = currentTaskAttemptId() - accountingLock.synchronized { - val acquired = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks) - val success = acquired == memory - if (success) { - pendingUnrollMemoryMap(taskAttemptId) = - pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory - } - success - } - } - /** * Release pending unroll memory of current unroll successful block used by this task */