Skip to content

Commit 4770f6e

Browse files
committed
[SPARK-4370] [Core] Limit number of Netty cores based on executor size
Right now, the NettyBlockTransferService uses the total number of cores on the system as the number of threads and buffer arenas to create. The latter is more troubling – this can lead to significant allocation of extra heap and direct memory in situations where executors are relatively small compared to the whole machine. For instance, on a machine with 32 cores, we will allocate (32 cores * 16MB per arena = 512MB) * 2 for client and server = 1GB direct and heap memory. This can be a huge overhead if you're only using, say, 8 of those cores.
1 parent 84324fb commit 4770f6e

File tree

17 files changed

+103
-64
lines changed

17 files changed

+103
-64
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,11 @@ object SparkEnv extends Logging {
168168
executorId: String,
169169
hostname: String,
170170
port: Int,
171+
numCores: Int,
171172
isLocal: Boolean,
172173
actorSystem: ActorSystem = null): SparkEnv = {
173-
create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem)
174+
create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem,
175+
numUsableCores = numCores)
174176
}
175177

176178
/**
@@ -184,7 +186,8 @@ object SparkEnv extends Logging {
184186
isDriver: Boolean,
185187
isLocal: Boolean,
186188
listenerBus: LiveListenerBus = null,
187-
defaultActorSystem: ActorSystem = null): SparkEnv = {
189+
defaultActorSystem: ActorSystem = null,
190+
numUsableCores: Int = 0): SparkEnv = {
188191

189192
// Listener bus is only used on the driver
190193
if (isDriver) {
@@ -276,7 +279,7 @@ object SparkEnv extends Logging {
276279
val blockTransferService =
277280
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
278281
case "netty" =>
279-
new NettyBlockTransferService(conf, securityManager)
282+
new NettyBlockTransferService(conf, securityManager, numUsableCores)
280283
case "nio" =>
281284
new NioBlockTransferService(conf, securityManager)
282285
}
@@ -287,7 +290,8 @@ object SparkEnv extends Logging {
287290

288291
// NB: blockManager is not valid until initialize() is called later.
289292
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
290-
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
293+
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,
294+
numUsableCores)
291295

292296
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
293297

core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
3939
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
4040
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
4141

42-
private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
42+
private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
4343
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
4444
private val transportContext: TransportContext = {
4545
val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ private[spark] class CoarseGrainedExecutorBackend(
5757
override def receiveWithLogging = {
5858
case RegisteredExecutor =>
5959
logInfo("Successfully registered with driver")
60-
// Make this host instead of hostPort ?
6160
val (hostname, _) = Utils.parseHostPort(hostPort)
62-
executor = new Executor(executorId, hostname, sparkProperties, isLocal = false, actorSystem)
61+
executor = new Executor(executorId, hostname, sparkProperties, cores, isLocal = false,
62+
actorSystem)
6363

6464
case RegisterExecutorFailed(message) =>
6565
logError("Slave registration failed: " + message)

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ private[spark] class Executor(
4343
executorId: String,
4444
slaveHostname: String,
4545
properties: Seq[(String, String)],
46+
numCores: Int,
4647
isLocal: Boolean = false,
4748
actorSystem: ActorSystem = null)
4849
extends Logging
@@ -83,7 +84,7 @@ private[spark] class Executor(
8384
if (!isLocal) {
8485
val port = conf.getInt("spark.executor.port", 0)
8586
val _env = SparkEnv.createExecutorEnv(
86-
conf, executorId, slaveHostname, port, isLocal, actorSystem)
87+
conf, executorId, slaveHostname, port, numCores, isLocal, actorSystem)
8788
SparkEnv.set(_env)
8889
_env.metricsSystem.registerSource(executorSource)
8990
_env.blockManager.initialize(conf.getAppId)

core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.executor
1919

2020
import java.nio.ByteBuffer
2121

22+
import scala.collection.JavaConversions._
23+
2224
import org.apache.mesos.protobuf.ByteString
2325
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
2426
import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
@@ -50,14 +52,22 @@ private[spark] class MesosExecutorBackend
5052
executorInfo: ExecutorInfo,
5153
frameworkInfo: FrameworkInfo,
5254
slaveInfo: SlaveInfo) {
53-
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
55+
56+
val cpusPerTask = executorInfo.getResourcesList
57+
.find(_.getName == "cpus")
58+
.map(_.getScalar.getValue.toInt)
59+
.getOrElse(0)
60+
val executorId = executorInfo.getExecutorId.getValue
61+
62+
logInfo(s"Registered with Mesos as executor ID $executorId with $cpusPerTask cpus")
5463
this.driver = driver
5564
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
5665
Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
5766
executor = new Executor(
58-
executorInfo.getExecutorId.getValue,
67+
executorId,
5968
slaveInfo.getHostname,
60-
properties)
69+
properties,
70+
cpusPerTask)
6171
}
6272

6373
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ import org.apache.spark.util.Utils
3535
/**
3636
* A BlockTransferService that uses Netty to fetch a set of blocks at at time.
3737
*/
38-
class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager)
38+
class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager, numCores: Int)
3939
extends BlockTransferService {
4040

4141
// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
4242
private val serializer = new JavaSerializer(conf)
4343
private val authEnabled = securityManager.isAuthenticationEnabled()
44-
private val transportConf = SparkTransportConf.fromSparkConf(conf)
44+
private val transportConf = SparkTransportConf.fromSparkConf(conf, numCores)
4545

4646
private[this] var transportContext: TransportContext = _
4747
private[this] var server: TransportServer = _

core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,22 @@ package org.apache.spark.network.netty
2020
import org.apache.spark.SparkConf
2121
import org.apache.spark.network.util.{TransportConf, ConfigProvider}
2222

23-
/**
24-
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
25-
*/
2623
object SparkTransportConf {
27-
def fromSparkConf(conf: SparkConf): TransportConf = {
24+
/**
25+
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
26+
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
27+
* use the given number of cores, rather than all of the machine's cores.
28+
* This restriction will only occur if these properties are not already set.
29+
*/
30+
def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = {
31+
val conf = _conf.clone
32+
if (numUsableCores > 0) {
33+
// Only set if serverThreads/clientThreads not already set.
34+
conf.set("spark.shuffle.io.serverThreads",
35+
conf.get("spark.shuffle.io.serverThreads", numUsableCores.toString))
36+
conf.set("spark.shuffle.io.clientThreads",
37+
conf.get("spark.shuffle.io.clientThreads", numUsableCores.toString))
38+
}
2839
new TransportConf(new ConfigProvider {
2940
override def get(name: String): String = conf.get(name)
3041
})

core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private[spark] class LocalActor(
5151
private val localExecutorHostname = "localhost"
5252

5353
val executor = new Executor(
54-
localExecutorId, localExecutorHostname, scheduler.conf.getAll, isLocal = true)
54+
localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true)
5555

5656
override def receiveWithLogging = {
5757
case ReviveOffers =>

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ private[spark] class BlockManager(
7373
mapOutputTracker: MapOutputTracker,
7474
shuffleManager: ShuffleManager,
7575
blockTransferService: BlockTransferService,
76-
securityManager: SecurityManager)
76+
securityManager: SecurityManager,
77+
numUsableCores: Int)
7778
extends BlockDataManager with Logging {
7879

7980
val diskBlockManager = new DiskBlockManager(this, conf)
@@ -121,8 +122,8 @@ private[spark] class BlockManager(
121122
// Client to read other executors' shuffle files. This is either an external service, or just the
122123
// standard BlockTranserService to directly connect to other Executors.
123124
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
124-
new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager,
125-
securityManager.isAuthenticationEnabled())
125+
val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
126+
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled())
126127
} else {
127128
blockTransferService
128129
}
@@ -174,9 +175,10 @@ private[spark] class BlockManager(
174175
mapOutputTracker: MapOutputTracker,
175176
shuffleManager: ShuffleManager,
176177
blockTransferService: BlockTransferService,
177-
securityManager: SecurityManager) = {
178+
securityManager: SecurityManager,
179+
numUsableCores: Int) = {
178180
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
179-
conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
181+
conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores)
180182
}
181183

182184
/**

core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
3838
var rpcHandler: ExternalShuffleBlockHandler = _
3939

4040
override def beforeAll() {
41-
val transportConf = SparkTransportConf.fromSparkConf(conf)
41+
val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 2)
4242
rpcHandler = new ExternalShuffleBlockHandler(transportConf)
4343
val transportContext = new TransportContext(transportConf, rpcHandler)
4444
server = transportContext.createServer()

0 commit comments

Comments
 (0)