Skip to content

Commit bd21292

Browse files
authored
Merge pull request apache#66 from jcuquemelle/SPARK-22683-criteo2.2
[SPARK-22683][CORE] Allow tuning the number of dynamically allocated executors
2 parents 7c94543 + e480d69 commit bd21292

File tree

3 files changed

+52
-1
lines changed

3 files changed

+52
-1
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,13 @@ private[spark] class ExecutorAllocationManager(
114114
// TODO: The default value of 1 for spark.executor.cores works right now because dynamic
115115
// allocation is only supported for YARN and the default number of cores per executor in YARN is
116116
// 1, but it might need to be attained differently for different cluster managers
117-
private val tasksPerExecutor =
117+
private val taskSlotPerExecutor =
118118
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
119119

120+
private val tasksPerExecutorSlot = conf.getInt("spark.dynamicAllocation.tasksPerExecutorSlot", 1)
121+
122+
private val tasksPerExecutor = tasksPerExecutorSlot * taskSlotPerExecutor
123+
120124
validateSettings()
121125

122126
// Number of executors to add in the next round

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,34 @@ class ExecutorAllocationManagerSuite
137137
assert(numExecutorsToAdd(manager) === 1)
138138
}
139139

140+
test("tasksPerExecutorSlot is correctly handled") {
141+
val conf = new SparkConf()
142+
.setMaster("myDummyLocalExternalClusterManager")
143+
.setAppName("test-executor-allocation-manager")
144+
.set("spark.dynamicAllocation.enabled", "true")
145+
.set("spark.dynamicAllocation.testing", "true")
146+
147+
val sc0 = new SparkContext(conf)
148+
contexts += sc0
149+
var manager = sc0.executorAllocationManager.get
150+
assert(tasksPerExecutor(manager) === 1)
151+
sc0.stop()
152+
153+
val conf1 = conf.clone.set("spark.dynamicAllocation.tasksPerExecutorSlot", "2")
154+
val sc1 = new SparkContext(conf1)
155+
contexts += sc1
156+
manager = sc1.executorAllocationManager.get
157+
assert(tasksPerExecutor(manager) === 2)
158+
sc1.stop()
159+
160+
val conf2 = conf1.clone.set("spark.executor.cores", "2")
161+
val sc2 = new SparkContext(conf2)
162+
contexts += sc2
163+
manager = sc2.executorAllocationManager.get
164+
assert(tasksPerExecutor(manager) === 4)
165+
sc2.stop()
166+
}
167+
140168
test("add executors capped by num pending tasks") {
141169
sc = createSparkContext(0, 10, 0)
142170
val manager = sc.executorAllocationManager.get
@@ -1061,6 +1089,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
10611089
private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy)
10621090
private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks)
10631091
private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount)
1092+
private val _tasksPerExecutor = PrivateMethod[Int]('tasksPerExecutor)
10641093

10651094
private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = {
10661095
manager invokePrivate _numExecutorsToAdd()
@@ -1143,6 +1172,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
11431172
private def hostToLocalTaskCount(manager: ExecutorAllocationManager): Map[String, Int] = {
11441173
manager invokePrivate _hostToLocalTaskCount()
11451174
}
1175+
1176+
private def tasksPerExecutor(manager: ExecutorAllocationManager): Int = {
1177+
manager invokePrivate _tasksPerExecutor()
1178+
}
11461179
}
11471180

11481181
/**

docs/configuration.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1584,6 +1584,7 @@ Apart from these, the following properties are also available, and may be useful
15841584
<code>spark.dynamicAllocation.minExecutors</code>,
15851585
<code>spark.dynamicAllocation.maxExecutors</code>, and
15861586
<code>spark.dynamicAllocation.initialExecutors</code>
1587+
<code>spark.dynamicAllocation.tasksPerExecutorSlots</code>
15871588
</td>
15881589
</tr>
15891590
<tr>
@@ -1628,6 +1629,19 @@ Apart from these, the following properties are also available, and may be useful
16281629
Lower bound for the number of executors if dynamic allocation is enabled.
16291630
</td>
16301631
</tr>
1632+
<tr>
1633+
<td><code>spark.dynamicAllocation.tasksPerSlot</code></td>
1634+
<td>1</td>
1635+
<td>
1636+
Each executor can process a certain number of tasks in parallel (task slots).
1637+
The number of task slots per executor is: executor.cores / task.cpus.
1638+
The ExecutorAllocationManager will set a target number of running executors equal to:
1639+
nbCurrentTask / (taskSlots * tasksPerSlot), with nbCurrentTask being the total number
1640+
of running and backlogged tasks. With the default value of 1, each available task slot
1641+
will compute a single task in average, which gives the best latency. With small tasks
1642+
however, this setting wastes a lot of resources due to executor allocation overhead
1643+
</td>
1644+
</tr>
16311645
<tr>
16321646
<td><code>spark.dynamicAllocation.schedulerBacklogTimeout</code></td>
16331647
<td>1s</td>

0 commit comments

Comments
 (0)