Skip to content

Commit 67fbecb

Browse files
Andrew OrJoshRosen
authored andcommitted
[SPARK-10956] Common MemoryManager interface for storage and execution
This patch introduces a `MemoryManager` that is the central arbiter of how much memory to grant to storage and execution. This patch is primarily concerned only with refactoring while preserving the existing behavior as much as possible. This is the first step away from the existing rigid separation of storage and execution memory, which has several major drawbacks discussed on the [issue](https://issues.apache.org/jira/browse/SPARK-10956). It is the precursor of a series of patches that will attempt to address those drawbacks. Author: Andrew Or <[email protected]> Author: Josh Rosen <[email protected]> Author: andrewor14 <[email protected]> Closes #9000 from andrewor14/memory-manager.
1 parent 0984129 commit 67fbecb

File tree

11 files changed

+752
-228
lines changed

11 files changed

+752
-228
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.annotation.DeveloperApi
3030
import org.apache.spark.api.python.PythonWorkerFactory
3131
import org.apache.spark.broadcast.BroadcastManager
3232
import org.apache.spark.metrics.MetricsSystem
33+
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager}
3334
import org.apache.spark.network.BlockTransferService
3435
import org.apache.spark.network.netty.NettyBlockTransferService
3536
import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv}
@@ -69,6 +70,8 @@ class SparkEnv (
6970
val httpFileServer: HttpFileServer,
7071
val sparkFilesDir: String,
7172
val metricsSystem: MetricsSystem,
73+
// TODO: unify these *MemoryManager classes (SPARK-10984)
74+
val memoryManager: MemoryManager,
7275
val shuffleMemoryManager: ShuffleMemoryManager,
7376
val executorMemoryManager: ExecutorMemoryManager,
7477
val outputCommitCoordinator: OutputCommitCoordinator,
@@ -332,7 +335,8 @@ object SparkEnv extends Logging {
332335
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
333336
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
334337

335-
val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores)
338+
val memoryManager = new StaticMemoryManager(conf)
339+
val shuffleMemoryManager = ShuffleMemoryManager.create(conf, memoryManager, numUsableCores)
336340

337341
val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
338342

@@ -343,8 +347,8 @@ object SparkEnv extends Logging {
343347

344348
// NB: blockManager is not valid until initialize() is called later.
345349
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
346-
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,
347-
numUsableCores)
350+
serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
351+
blockTransferService, securityManager, numUsableCores)
348352

349353
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
350354

@@ -417,6 +421,7 @@ object SparkEnv extends Logging {
417421
httpFileServer,
418422
sparkFilesDir,
419423
metricsSystem,
424+
memoryManager,
420425
shuffleMemoryManager,
421426
executorMemoryManager,
422427
outputCommitCoordinator,
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.memory
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
23+
24+
25+
/**
26+
* An abstract memory manager that enforces how memory is shared between execution and storage.
27+
*
28+
* In this context, execution memory refers to that used for computation in shuffles, joins,
29+
* sorts and aggregations, while storage memory refers to that used for caching and propagating
30+
* internal data across the cluster. There exists one of these per JVM.
31+
*/
32+
private[spark] abstract class MemoryManager {
33+
34+
// The memory store used to evict cached blocks
35+
private var _memoryStore: MemoryStore = _
36+
protected def memoryStore: MemoryStore = {
37+
if (_memoryStore == null) {
38+
throw new IllegalArgumentException("memory store not initialized yet")
39+
}
40+
_memoryStore
41+
}
42+
43+
/**
44+
* Set the [[MemoryStore]] used by this manager to evict cached blocks.
45+
* This must be set after construction due to initialization ordering constraints.
46+
*/
47+
def setMemoryStore(store: MemoryStore): Unit = {
48+
_memoryStore = store
49+
}
50+
51+
/**
52+
* Acquire N bytes of memory for execution.
53+
* @return number of bytes successfully granted (<= N).
54+
*/
55+
def acquireExecutionMemory(numBytes: Long): Long
56+
57+
/**
58+
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
59+
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
60+
* @return whether all N bytes were successfully granted.
61+
*/
62+
def acquireStorageMemory(
63+
blockId: BlockId,
64+
numBytes: Long,
65+
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
66+
67+
/**
68+
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
69+
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
70+
* @return whether all N bytes were successfully granted.
71+
*/
72+
def acquireUnrollMemory(
73+
blockId: BlockId,
74+
numBytes: Long,
75+
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
76+
77+
/**
78+
* Release N bytes of execution memory.
79+
*/
80+
def releaseExecutionMemory(numBytes: Long): Unit
81+
82+
/**
83+
* Release N bytes of storage memory.
84+
*/
85+
def releaseStorageMemory(numBytes: Long): Unit
86+
87+
/**
88+
* Release all storage memory acquired.
89+
*/
90+
def releaseStorageMemory(): Unit
91+
92+
/**
93+
* Release N bytes of unroll memory.
94+
*/
95+
def releaseUnrollMemory(numBytes: Long): Unit
96+
97+
/**
98+
* Total available memory for execution, in bytes.
99+
*/
100+
def maxExecutionMemory: Long
101+
102+
/**
103+
* Total available memory for storage, in bytes.
104+
*/
105+
def maxStorageMemory: Long
106+
107+
/**
108+
* Execution memory currently in use, in bytes.
109+
*/
110+
def executionMemoryUsed: Long
111+
112+
/**
113+
* Storage memory currently in use, in bytes.
114+
*/
115+
def storageMemoryUsed: Long
116+
117+
}
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.memory
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.{Logging, SparkConf}
23+
import org.apache.spark.storage.{BlockId, BlockStatus}
24+
25+
26+
/**
27+
* A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
28+
*
29+
* The sizes of the execution and storage regions are determined through
30+
* `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction` respectively. The two
31+
* regions are cleanly separated such that neither usage can borrow memory from the other.
32+
*/
33+
private[spark] class StaticMemoryManager(
34+
conf: SparkConf,
35+
override val maxExecutionMemory: Long,
36+
override val maxStorageMemory: Long)
37+
extends MemoryManager with Logging {
38+
39+
// Max number of bytes worth of blocks to evict when unrolling
40+
private val maxMemoryToEvictForUnroll: Long = {
41+
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
42+
}
43+
44+
// Amount of execution / storage memory in use
45+
// Accesses must be synchronized on `this`
46+
private var _executionMemoryUsed: Long = 0
47+
private var _storageMemoryUsed: Long = 0
48+
49+
def this(conf: SparkConf) {
50+
this(
51+
conf,
52+
StaticMemoryManager.getMaxExecutionMemory(conf),
53+
StaticMemoryManager.getMaxStorageMemory(conf))
54+
}
55+
56+
/**
57+
* Acquire N bytes of memory for execution.
58+
* @return number of bytes successfully granted (<= N).
59+
*/
60+
override def acquireExecutionMemory(numBytes: Long): Long = synchronized {
61+
assert(_executionMemoryUsed <= maxExecutionMemory)
62+
val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed)
63+
_executionMemoryUsed += bytesToGrant
64+
bytesToGrant
65+
}
66+
67+
/**
68+
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
69+
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
70+
* @return whether all N bytes were successfully granted.
71+
*/
72+
override def acquireStorageMemory(
73+
blockId: BlockId,
74+
numBytes: Long,
75+
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
76+
acquireStorageMemory(blockId, numBytes, numBytes, evictedBlocks)
77+
}
78+
79+
/**
80+
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
81+
*
82+
* This evicts at most M bytes worth of existing blocks, where M is a fraction of the storage
83+
* space specified by `spark.storage.unrollFraction`. Blocks evicted in the process, if any,
84+
* are added to `evictedBlocks`.
85+
*
86+
* @return whether all N bytes were successfully granted.
87+
*/
88+
override def acquireUnrollMemory(
89+
blockId: BlockId,
90+
numBytes: Long,
91+
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
92+
val currentUnrollMemory = memoryStore.currentUnrollMemory
93+
val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory)
94+
val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
95+
acquireStorageMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
96+
}
97+
98+
/**
99+
* Acquire N bytes of storage memory for the given block, evicting existing ones if necessary.
100+
*
101+
* @param blockId the ID of the block we are acquiring storage memory for
102+
* @param numBytesToAcquire the size of this block
103+
* @param numBytesToFree the size of space to be freed through evicting blocks
104+
* @param evictedBlocks a holder for blocks evicted in the process
105+
* @return whether all N bytes were successfully granted.
106+
*/
107+
private def acquireStorageMemory(
108+
blockId: BlockId,
109+
numBytesToAcquire: Long,
110+
numBytesToFree: Long,
111+
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
112+
// Note: Keep this outside synchronized block to avoid potential deadlocks!
113+
memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
114+
synchronized {
115+
assert(_storageMemoryUsed <= maxStorageMemory)
116+
val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory
117+
if (enoughMemory) {
118+
_storageMemoryUsed += numBytesToAcquire
119+
}
120+
enoughMemory
121+
}
122+
}
123+
124+
/**
125+
* Release N bytes of execution memory.
126+
*/
127+
override def releaseExecutionMemory(numBytes: Long): Unit = synchronized {
128+
if (numBytes > _executionMemoryUsed) {
129+
logWarning(s"Attempted to release $numBytes bytes of execution " +
130+
s"memory when we only have ${_executionMemoryUsed} bytes")
131+
_executionMemoryUsed = 0
132+
} else {
133+
_executionMemoryUsed -= numBytes
134+
}
135+
}
136+
137+
/**
138+
* Release N bytes of storage memory.
139+
*/
140+
override def releaseStorageMemory(numBytes: Long): Unit = synchronized {
141+
if (numBytes > _storageMemoryUsed) {
142+
logWarning(s"Attempted to release $numBytes bytes of storage " +
143+
s"memory when we only have ${_storageMemoryUsed} bytes")
144+
_storageMemoryUsed = 0
145+
} else {
146+
_storageMemoryUsed -= numBytes
147+
}
148+
}
149+
150+
/**
151+
* Release all storage memory acquired.
152+
*/
153+
override def releaseStorageMemory(): Unit = synchronized {
154+
_storageMemoryUsed = 0
155+
}
156+
157+
/**
158+
* Release N bytes of unroll memory.
159+
*/
160+
override def releaseUnrollMemory(numBytes: Long): Unit = {
161+
releaseStorageMemory(numBytes)
162+
}
163+
164+
/**
165+
* Amount of execution memory currently in use, in bytes.
166+
*/
167+
override def executionMemoryUsed: Long = synchronized {
168+
_executionMemoryUsed
169+
}
170+
171+
/**
172+
* Amount of storage memory currently in use, in bytes.
173+
*/
174+
override def storageMemoryUsed: Long = synchronized {
175+
_storageMemoryUsed
176+
}
177+
178+
}
179+
180+
181+
private[spark] object StaticMemoryManager {
182+
183+
/**
184+
* Return the total amount of memory available for the storage region, in bytes.
185+
*/
186+
private def getMaxStorageMemory(conf: SparkConf): Long = {
187+
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
188+
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
189+
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
190+
}
191+
192+
193+
/**
194+
* Return the total amount of memory available for the execution region, in bytes.
195+
*/
196+
private def getMaxExecutionMemory(conf: SparkConf): Long = {
197+
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
198+
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
199+
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
200+
}
201+
202+
}

0 commit comments

Comments
 (0)