Skip to content

Commit b9e1c2e

Browse files
aarondavrxin
authored andcommitted
[SPARK-4370] [Core] Limit number of Netty cores based on executor size
Author: Aaron Davidson <[email protected]> Closes apache#3155 from aarondav/conf and squashes the following commits: 7045e77 [Aaron Davidson] Add mesos comment 4770f6e [Aaron Davidson] [SPARK-4370] [Core] Limit number of Netty cores based on executor size
1 parent 23f5bdf commit b9e1c2e

File tree

17 files changed

+104
-64
lines changed

17 files changed

+104
-64
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,11 @@ object SparkEnv extends Logging {
168168
executorId: String,
169169
hostname: String,
170170
port: Int,
171+
numCores: Int,
171172
isLocal: Boolean,
172173
actorSystem: ActorSystem = null): SparkEnv = {
173-
create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem)
174+
create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem,
175+
numUsableCores = numCores)
174176
}
175177

176178
/**
@@ -184,7 +186,8 @@ object SparkEnv extends Logging {
184186
isDriver: Boolean,
185187
isLocal: Boolean,
186188
listenerBus: LiveListenerBus = null,
187-
defaultActorSystem: ActorSystem = null): SparkEnv = {
189+
defaultActorSystem: ActorSystem = null,
190+
numUsableCores: Int = 0): SparkEnv = {
188191

189192
// Listener bus is only used on the driver
190193
if (isDriver) {
@@ -276,7 +279,7 @@ object SparkEnv extends Logging {
276279
val blockTransferService =
277280
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
278281
case "netty" =>
279-
new NettyBlockTransferService(conf, securityManager)
282+
new NettyBlockTransferService(conf, securityManager, numUsableCores)
280283
case "nio" =>
281284
new NioBlockTransferService(conf, securityManager)
282285
}
@@ -287,7 +290,8 @@ object SparkEnv extends Logging {
287290

288291
// NB: blockManager is not valid until initialize() is called later.
289292
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
290-
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
293+
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,
294+
numUsableCores)
291295

292296
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
293297

core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
3939
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
4040
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
4141

42-
private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
42+
private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
4343
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
4444
private val transportContext: TransportContext = {
4545
val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ private[spark] class CoarseGrainedExecutorBackend(
5757
override def receiveWithLogging = {
5858
case RegisteredExecutor =>
5959
logInfo("Successfully registered with driver")
60-
// Make this host instead of hostPort ?
6160
val (hostname, _) = Utils.parseHostPort(hostPort)
62-
executor = new Executor(executorId, hostname, sparkProperties, isLocal = false, actorSystem)
61+
executor = new Executor(executorId, hostname, sparkProperties, cores, isLocal = false,
62+
actorSystem)
6363

6464
case RegisterExecutorFailed(message) =>
6565
logError("Slave registration failed: " + message)

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ private[spark] class Executor(
4343
executorId: String,
4444
slaveHostname: String,
4545
properties: Seq[(String, String)],
46+
numCores: Int,
4647
isLocal: Boolean = false,
4748
actorSystem: ActorSystem = null)
4849
extends Logging
@@ -83,7 +84,7 @@ private[spark] class Executor(
8384
if (!isLocal) {
8485
val port = conf.getInt("spark.executor.port", 0)
8586
val _env = SparkEnv.createExecutorEnv(
86-
conf, executorId, slaveHostname, port, isLocal, actorSystem)
87+
conf, executorId, slaveHostname, port, numCores, isLocal, actorSystem)
8788
SparkEnv.set(_env)
8889
_env.metricsSystem.registerSource(executorSource)
8990
_env.blockManager.initialize(conf.getAppId)

core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.executor
1919

2020
import java.nio.ByteBuffer
2121

22+
import scala.collection.JavaConversions._
23+
2224
import org.apache.mesos.protobuf.ByteString
2325
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
2426
import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
@@ -50,14 +52,23 @@ private[spark] class MesosExecutorBackend
5052
executorInfo: ExecutorInfo,
5153
frameworkInfo: FrameworkInfo,
5254
slaveInfo: SlaveInfo) {
53-
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
55+
56+
// Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend.
57+
val cpusPerTask = executorInfo.getResourcesList
58+
.find(_.getName == "cpus")
59+
.map(_.getScalar.getValue.toInt)
60+
.getOrElse(0)
61+
val executorId = executorInfo.getExecutorId.getValue
62+
63+
logInfo(s"Registered with Mesos as executor ID $executorId with $cpusPerTask cpus")
5464
this.driver = driver
5565
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
5666
Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
5767
executor = new Executor(
58-
executorInfo.getExecutorId.getValue,
68+
executorId,
5969
slaveInfo.getHostname,
60-
properties)
70+
properties,
71+
cpusPerTask)
6172
}
6273

6374
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ import org.apache.spark.util.Utils
3535
/**
3636
* A BlockTransferService that uses Netty to fetch a set of blocks at at time.
3737
*/
38-
class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager)
38+
class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager, numCores: Int)
3939
extends BlockTransferService {
4040

4141
// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
4242
private val serializer = new JavaSerializer(conf)
4343
private val authEnabled = securityManager.isAuthenticationEnabled()
44-
private val transportConf = SparkTransportConf.fromSparkConf(conf)
44+
private val transportConf = SparkTransportConf.fromSparkConf(conf, numCores)
4545

4646
private[this] var transportContext: TransportContext = _
4747
private[this] var server: TransportServer = _

core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,22 @@ package org.apache.spark.network.netty
2020
import org.apache.spark.SparkConf
2121
import org.apache.spark.network.util.{TransportConf, ConfigProvider}
2222

23-
/**
24-
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
25-
*/
2623
object SparkTransportConf {
27-
def fromSparkConf(conf: SparkConf): TransportConf = {
24+
/**
25+
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
26+
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
27+
* use the given number of cores, rather than all of the machine's cores.
28+
* This restriction will only occur if these properties are not already set.
29+
*/
30+
def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = {
31+
val conf = _conf.clone
32+
if (numUsableCores > 0) {
33+
// Only set if serverThreads/clientThreads not already set.
34+
conf.set("spark.shuffle.io.serverThreads",
35+
conf.get("spark.shuffle.io.serverThreads", numUsableCores.toString))
36+
conf.set("spark.shuffle.io.clientThreads",
37+
conf.get("spark.shuffle.io.clientThreads", numUsableCores.toString))
38+
}
2839
new TransportConf(new ConfigProvider {
2940
override def get(name: String): String = conf.get(name)
3041
})

core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private[spark] class LocalActor(
5151
private val localExecutorHostname = "localhost"
5252

5353
val executor = new Executor(
54-
localExecutorId, localExecutorHostname, scheduler.conf.getAll, isLocal = true)
54+
localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true)
5555

5656
override def receiveWithLogging = {
5757
case ReviveOffers =>

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ private[spark] class BlockManager(
7373
mapOutputTracker: MapOutputTracker,
7474
shuffleManager: ShuffleManager,
7575
blockTransferService: BlockTransferService,
76-
securityManager: SecurityManager)
76+
securityManager: SecurityManager,
77+
numUsableCores: Int)
7778
extends BlockDataManager with Logging {
7879

7980
val diskBlockManager = new DiskBlockManager(this, conf)
@@ -121,8 +122,8 @@ private[spark] class BlockManager(
121122
// Client to read other executors' shuffle files. This is either an external service, or just the
122123
// standard BlockTranserService to directly connect to other Executors.
123124
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
124-
new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager,
125-
securityManager.isAuthenticationEnabled())
125+
val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
126+
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled())
126127
} else {
127128
blockTransferService
128129
}
@@ -174,9 +175,10 @@ private[spark] class BlockManager(
174175
mapOutputTracker: MapOutputTracker,
175176
shuffleManager: ShuffleManager,
176177
blockTransferService: BlockTransferService,
177-
securityManager: SecurityManager) = {
178+
securityManager: SecurityManager,
179+
numUsableCores: Int) = {
178180
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
179-
conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
181+
conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores)
180182
}
181183

182184
/**

core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
3838
var rpcHandler: ExternalShuffleBlockHandler = _
3939

4040
override def beforeAll() {
41-
val transportConf = SparkTransportConf.fromSparkConf(conf)
41+
val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 2)
4242
rpcHandler = new ExternalShuffleBlockHandler(transportConf)
4343
val transportContext = new TransportContext(transportConf, rpcHandler)
4444
server = transportContext.createServer()

0 commit comments

Comments
 (0)