Skip to content

[SPARK-12165][SPARK-12189] Fix bugs in eviction of storage memory by execution #10170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 19 commits into from

Conversation

JoshRosen
Copy link
Contributor

This patch fixes a bug in the eviction of storage memory by execution.

The bug:

In general, execution should be able to evict storage memory when the total storage memory usage is greater than maxMemory * spark.memory.storageFraction. Due to a bug, however, Spark might wind up evicting no storage memory in certain cases where the storage memory usage was between maxMemory * spark.memory.storageFraction and maxMemory. For example, here is a regression test which illustrates the bug:

    val maxMemory = 1000L
    val taskAttemptId = 0L
    val (mm, ms) = makeThings(maxMemory)
    // Since we used the default storage fraction (0.5), we should be able to allocate 500 bytes
    // of storage memory which are immune to eviction by execution memory pressure.

    // Acquire enough storage memory to exceed the storage region size
    assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
    assertEvictBlocksToFreeSpaceNotCalled(ms)
    assert(mm.executionMemoryUsed === 0L)
    assert(mm.storageMemoryUsed === 750L)

    // At this point, storage is using 250 more bytes of memory than it is guaranteed, so execution
    // should be able to reclaim up to 250 bytes of storage memory.
    // Therefore, execution should now be able to require up to 500 bytes of memory:
    assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L) // <--- fails by only returning 250L
    assert(mm.storageMemoryUsed === 500L)
    assert(mm.executionMemoryUsed === 500L)
    assertEvictBlocksToFreeSpaceCalled(ms, 250L)

The problem relates to the control flow / interaction between StorageMemoryPool.shrinkPoolToReclaimSpace() and MemoryStore.ensureFreeSpace(). While trying to allocate the 500 bytes of execution memory, the UnifiedMemoryManager discovers that it will need to reclaim 250 bytes of memory from storage, so it calls StorageMemoryPool.shrinkPoolToReclaimSpace(250L). This method, in turn, calls MemoryStore.ensureFreeSpace(250L). However, ensureFreeSpace() first checks whether the requested space is less than maxStorageMemory - storageMemoryUsed, which will be true if there is any free execution memory because it turns out that MemoryStore.maxStorageMemory = (maxMemory - onHeapExecutionMemoryPool.memoryUsed) when the UnifiedMemoryManager is used.

The control flow here is somewhat confusing (it grew to be messy / confusing over time / as a result of the merging / refactoring of several components). In the pre-Spark 1.6 code, ensureFreeSpace was called directly by the MemoryStore itself, whereas in 1.6 it's involved in a confusing control flow where MemoryStore calls MemoryManager.acquireStorageMemory, which then calls back into MemoryStore.ensureFreeSpace, which, in turn, calls MemoryManager.freeStorageMemory.

The solution:

The solution implemented in this patch is to remove the confusing circular control flow between MemoryManager and MemoryStore, making the storage memory acquisition process much more linear / straightforward. The key changes:

  • Remove a layer of inheritance which made the memory manager code harder to understand (5384117).
  • Move some bounds checks earlier in the call chain (13ba7ad).
  • Refactor ensureFreeSpace() so that the part which evicts blocks can be called independently from the part which checks whether there is enough free space to avoid eviction (7c68ca0).
  • Realize that this lets us remove a layer of overloads from ensureFreeSpace (eec4f6c).
  • Realize that ensureFreeSpace() can simply be replaced with an evictBlocksToFreeSpace() method which is called after we've already figured out how much memory needs to be reclaimed via eviction; (2dc842a).

Along the way, I fixed some problems with the mocks in MemoryManagerSuite: the old mocks would unconditionally report that a block had been evicted even if there was enough space in the storage pool such that eviction would be avoided.

I also fixed a problem where StorageMemoryPool._memoryUsed might become negative due to freed memory being double-counted when excution evicts storage. The problem was that StorageMemoryPoolshrinkPoolToFreeSpace would decrement _memoryUsed even though StorageMemoryPool.freeMemory had already decremented it as each evicted block was freed. See SPARK-12189 for details.

@SparkQA
Copy link

SparkQA commented Dec 7, 2015

Test build #47251 has finished for PR 10170 at commit 0eac7da.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public abstract static class PrefixComputer\n

@JoshRosen
Copy link
Contributor Author

I think that the test failures in BlockManagerSuite are related to the semantics of maxNumBytesToFree in StaticMemoryManager.acquireUnrollMemory.

Previously, it looks like maxNumBytesToFree was the amount of memory that we'd try to free in the memoryStore.ensureFreeSpace() call but due to the bug fixed in this patch I believe that we'd end up in the case where maxNumBytesToFree is less than the amount of free storage memory, so ensureFreeSpace() wouldn't evict anything even though we actually need numBytesToAcquire > maxNumBytesToFree bytes of memory.

After the fixes implemented here, we'll first claim as much free storage memory as possible, then subtract that from our memory goal and request the remaining memory via spilling. As a result, we are more prone to evict, which might be throwing off the original test case (it's a little tricky to say due to size estimation; I'll try to see if I can decouple that via mocking in order to make the test easier to reason about).

One minor question of semantics: up until now (and still) it looks like maxNumBytesToFree has been "the maximum amount of memory to attempt to free up via spilling" and not "an upper bound on the amount of memory that will be spilled in response to a request". I believe that this might be the right interpretation, since the latter interpretation would mean that large cached blocks could never be evicted by small unroll requests.

@andrewor14, is the original idea behind spark.storage.unrollFraction that it places an upper limit on the amount of total storage memory that can be used for unrolling? If that's the case, we can simplify StorageMemoryPool.acquireMemory to only accept a single numBytesToAcquire and cap the size of requests at min(spark.storage.unrollFraction * maxStorageMemory - currentUnrollMemory, requestedUnrollMemory).

@JoshRosen JoshRosen changed the title [SPARK-12165] Fix bug in eviction of storage memory by execution [SPARK-12165][SPARK-12189] Fix bug in eviction of storage memory by execution Dec 8, 2015
@JoshRosen JoshRosen changed the title [SPARK-12165][SPARK-12189] Fix bug in eviction of storage memory by execution [SPARK-12165][SPARK-12189] Fix bugs in eviction of storage memory by execution Dec 8, 2015
if (numBytesToAcquire > memoryFree && maxNumBytesToFree > 0) {
val additionalMemoryRequired = numBytesToAcquire - memoryFree
memoryStore.evictBlocksToFreeSpace(
Some(blockId), Math.min(maxNumBytesToFree, additionalMemoryRequired), evictedBlocks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use math.min like we do in other places

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, to improve readability a little:

val additionalMemoryRequired = ...
val numBytesToFree = math.min(maxNumBytesToFree, additionalMemoryRequired)
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks)

@andrewor14
Copy link
Contributor

@JoshRosen despite the number of comments I left I think this patch looks good. I did a close review to verify its correctness and that the two bugs were real issues. On the side I will investigate the test failures and hopefully get this merged soon.

@SparkQA
Copy link

SparkQA commented Dec 9, 2015

Test build #47378 has finished for PR 10170 at commit d182bdc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 9, 2015

Test build #47380 has finished for PR 10170 at commit f6fb406.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Note to self RE: unroll fraction (just to be precise):

At a high-level, the StaticMemoryManager a request for unroll memory will be able to obtain more memory if:

  • there is free storage memory, or
  • currentUnrollMemory / maxStorageMemory < spark.memory.unrollFraction.

An unroll request can only evict blocks if finalUnrollMemory / maxStorageMemory < spark.memory.unrollFraction, where finalUnrollMemory is the amount of unroll memory that would be used after the eviction was performed.

@SparkQA
Copy link

SparkQA commented Dec 9, 2015

Test build #47382 has finished for PR 10170 at commit e2090d1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 9, 2015

Test build #47383 has finished for PR 10170 at commit a43ed34.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Andrew Or and others added 2 commits December 9, 2015 00:47
Before this commit, unrolling would evict too many blocks,
resulting in test failures in BlockManagerSuite. The root cause
is that we used `maxUnrollMemory` as a cap for the extra amount
of memory to evict for unrolling, which is incorrect. Instead,
we should use it as a cap for the total amount of unroll memory
and calculate the amount of memory to evict from there.

The goal of this commit is to preserve the old behavior (in 1.5)
as much as possible. This can be seen from the fact that
BlockManagerSuite now passes without any modifications.
@andrewor14
Copy link
Contributor

LGTM, I'll merge this once tests pass.

@JoshRosen
Copy link
Contributor Author

Updated PR description to remove the following text from the end (which will be incorporated into separate PRs, most likely):

TODOs

  • Add stronger assertions or a dedicated regression test for the _memoryUsed < 0 bug, which was uncovered while testing this patch and a related fix for SPARK-12155.
  • See whether we can now remove the confusing MemoryStore.maxMemory method.

/cc @andrewor14 @zsxwing @yhuai @rxin

@SparkQA
Copy link

SparkQA commented Dec 9, 2015

Test build #47433 has finished for PR 10170 at commit 7e9e191.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

Merging into master and 1.6!!

@asfgit asfgit closed this in aec5ea0 Dec 9, 2015
asfgit pushed a commit that referenced this pull request Dec 9, 2015
…execution

This patch fixes a bug in the eviction of storage memory by execution.

## The bug:

In general, execution should be able to evict storage memory when the total storage memory usage is greater than `maxMemory * spark.memory.storageFraction`. Due to a bug, however, Spark might wind up evicting no storage memory in certain cases where the storage memory usage was between `maxMemory * spark.memory.storageFraction` and `maxMemory`. For example, here is a regression test which illustrates the bug:

```scala
    val maxMemory = 1000L
    val taskAttemptId = 0L
    val (mm, ms) = makeThings(maxMemory)
    // Since we used the default storage fraction (0.5), we should be able to allocate 500 bytes
    // of storage memory which are immune to eviction by execution memory pressure.

    // Acquire enough storage memory to exceed the storage region size
    assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
    assertEvictBlocksToFreeSpaceNotCalled(ms)
    assert(mm.executionMemoryUsed === 0L)
    assert(mm.storageMemoryUsed === 750L)

    // At this point, storage is using 250 more bytes of memory than it is guaranteed, so execution
    // should be able to reclaim up to 250 bytes of storage memory.
    // Therefore, execution should now be able to require up to 500 bytes of memory:
    assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L) // <--- fails by only returning 250L
    assert(mm.storageMemoryUsed === 500L)
    assert(mm.executionMemoryUsed === 500L)
    assertEvictBlocksToFreeSpaceCalled(ms, 250L)
```

The problem relates to the control flow / interaction between `StorageMemoryPool.shrinkPoolToReclaimSpace()` and `MemoryStore.ensureFreeSpace()`. While trying to allocate the 500 bytes of execution memory, the `UnifiedMemoryManager` discovers that it will need to reclaim 250 bytes of memory from storage, so it calls `StorageMemoryPool.shrinkPoolToReclaimSpace(250L)`. This method, in turn, calls `MemoryStore.ensureFreeSpace(250L)`. However, `ensureFreeSpace()` first checks whether the requested space is less than `maxStorageMemory - storageMemoryUsed`, which will be true if there is any free execution memory because it turns out that `MemoryStore.maxStorageMemory = (maxMemory - onHeapExecutionMemoryPool.memoryUsed)` when the `UnifiedMemoryManager` is used.

The control flow here is somewhat confusing (it grew to be messy / confusing over time / as a result of the merging / refactoring of several components). In the pre-Spark 1.6 code, `ensureFreeSpace` was called directly by the `MemoryStore` itself, whereas in 1.6 it's involved in a confusing control flow where `MemoryStore` calls `MemoryManager.acquireStorageMemory`, which then calls back into `MemoryStore.ensureFreeSpace`, which, in turn, calls `MemoryManager.freeStorageMemory`.

## The solution:

The solution implemented in this patch is to remove the confusing circular control flow between `MemoryManager` and `MemoryStore`, making the storage memory acquisition process much more linear / straightforward. The key changes:

- Remove a layer of inheritance which made the memory manager code harder to understand (5384117).
- Move some bounds checks earlier in the call chain (13ba7ad).
- Refactor `ensureFreeSpace()` so that the part which evicts blocks can be called independently from the part which checks whether there is enough free space to avoid eviction (7c68ca0).
- Realize that this lets us remove a layer of overloads from `ensureFreeSpace` (eec4f6c).
- Realize that `ensureFreeSpace()` can simply be replaced with an `evictBlocksToFreeSpace()` method which is called [after we've already figured out](https://github.com/apache/spark/blob/2dc842aea82c8895125d46a00aa43dfb0d121de9/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala#L88) how much memory needs to be reclaimed via eviction; (2dc842a).

Along the way, I fixed some problems with the mocks in `MemoryManagerSuite`: the old mocks would [unconditionally](https://github.com/apache/spark/blob/80a824d36eec9d9a9f092ee1741453851218ec73/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala#L84) report that a block had been evicted even if there was enough space in the storage pool such that eviction would be avoided.

I also fixed a problem where `StorageMemoryPool._memoryUsed` might become negative due to freed memory being double-counted when excution evicts storage. The problem was that `StorageMemoryPoolshrinkPoolToFreeSpace` would [decrement `_memoryUsed`](7c68ca0#diff-935c68a9803be144ed7bafdd2f756a0fL133) even though `StorageMemoryPool.freeMemory` had already decremented it as each evicted block was freed. See SPARK-12189 for details.

Author: Josh Rosen <[email protected]>
Author: Andrew Or <[email protected]>

Closes #10170 from JoshRosen/SPARK-12165.

(cherry picked from commit aec5ea0)
Signed-off-by: Andrew Or <[email protected]>
@JoshRosen JoshRosen deleted the SPARK-12165 branch December 9, 2015 19:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants