@@ -303,10 +303,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
303
303
// Otherwise, if we return an iterator, we release the memory reserved here
304
304
// later when the task finishes.
305
305
if (keepUnrolling) {
306
+ val taskAttemptId = currentTaskAttemptId()
306
307
accountingLock.synchronized {
307
- val amountToRelease = currentUnrollMemoryForThisTask - previousMemoryReserved
308
- releaseUnrollMemoryForThisTask(amountToRelease)
309
- reservePendingUnrollMemoryForThisTask(blockId, amountToRelease, droppedBlocks)
308
+ // Here, we transfer memory from unroll to pending unroll because we expect to cache this
309
+ // block in `tryToPut`. We do not release and re-acquire memory from the MemoryManager in
310
+ // order to avoid race conditions where another component steals the memory that we're
311
+ // trying to transfer.
312
+ val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved
313
+ unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
314
+ pendingUnrollMemoryMap(taskAttemptId) =
315
+ pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L ) + amountToTransferToPending
310
316
}
311
317
}
312
318
}
@@ -362,7 +368,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
362
368
// Note: if we have previously unrolled this block successfully, then pending unroll
363
369
// memory should be non-zero. This is the amount that we already reserved during the
364
370
// unrolling process. In this case, we can just reuse this space to cache our block.
365
- // This must be synchronized so the release and re-acquire can happen atomically.
371
+ //
372
+ // Note: the StaticMemoryManager counts unroll memory as storage memory. Here, the
373
+ // synchronization on `accountingLock` guarantees that the release of unroll memory and
374
+ // acquisition of storage memory happens atomically. However, if storage memory is acquired
375
+ // outside of MemoryStore or if unroll memory is counted as execution memory, then we will
376
+ // have to revisit this assumption. See SPARK-10983 for more context.
366
377
releasePendingUnrollMemoryForThisTask()
367
378
val numBytesAcquired = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
368
379
val enoughMemory = numBytesAcquired == size
@@ -516,27 +527,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
516
527
}
517
528
}
518
529
519
- /**
520
- * Reserve the unroll memory of current unroll successful block used by this task
521
- * until actually put the block into memory entry.
522
- * @return whether the request is granted.
523
- */
524
- private def reservePendingUnrollMemoryForThisTask (
525
- blockId : BlockId ,
526
- memory : Long ,
527
- droppedBlocks : mutable.Buffer [(BlockId , BlockStatus )]): Boolean = {
528
- val taskAttemptId = currentTaskAttemptId()
529
- accountingLock.synchronized {
530
- val acquired = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks)
531
- val success = acquired == memory
532
- if (success) {
533
- pendingUnrollMemoryMap(taskAttemptId) =
534
- pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L ) + memory
535
- }
536
- success
537
- }
538
- }
539
-
540
530
/**
541
531
* Release pending unroll memory of current unroll successful block used by this task
542
532
*/
0 commit comments