Skip to content

Commit d852faf

Browse files
author
Andrew Or
committed
Add tests + fix scheduling with memory limits
Previously, if cores per executor is not set, because we assign 1 core at a time we end up requiring the worker to have enough memory for N executors, where N is the number of cores assigned. This is incorrect because in this mode we should have at most 1 executor per worker. This is fixed in this commit with a regression test.
1 parent 41a7cdf commit d852faf

File tree

2 files changed

+202
-5
lines changed

2 files changed

+202
-5
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ private[master] class Master(
559559
* allocated at a time, 12 cores from each worker would be assigned to each executor.
560560
* Since 12 < 16, no executors would launch [SPARK-8881].
561561
*/
562-
private[master] def scheduleExecutorsOnWorkers(
562+
private def scheduleExecutorsOnWorkers(
563563
app: ApplicationInfo,
564564
usableWorkers: Array[WorkerInfo],
565565
spreadOutApps: Boolean): Array[Int] = {
@@ -585,7 +585,11 @@ private[master] class Master(
585585
while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) {
586586
coresToAssign -= coresPerExecutor
587587
assignedCores(pos) += coresPerExecutor
588-
assignedMemory(pos) += memoryPerExecutor
588+
// If cores per executor is not set, we are assigning 1 core at a time
589+
// without actually meaning to launch 1 executor for each core assigned
590+
if (app.desc.coresPerExecutor.isDefined) {
591+
assignedMemory(pos) += memoryPerExecutor
592+
}
589593

590594
// Spreading out an application means spreading out its executors across as
591595
// many workers as possible. If we are not spreading out, then we should keep

core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala

Lines changed: 196 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@ import scala.language.postfixOps
2525

2626
import org.json4s._
2727
import org.json4s.jackson.JsonMethods._
28-
import org.scalatest.Matchers
28+
import org.scalatest.{Matchers, PrivateMethodTester}
2929
import org.scalatest.concurrent.Eventually
3030
import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
3131

32-
import org.apache.spark.{SparkConf, SparkFunSuite}
32+
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
3333
import org.apache.spark.deploy._
34+
import org.apache.spark.rpc.RpcEnv
3435

35-
class MasterSuite extends SparkFunSuite with Matchers with Eventually {
36+
class MasterSuite extends SparkFunSuite with Matchers with Eventually with PrivateMethodTester {
3637

3738
test("can use a custom recovery mode factory") {
3839
val conf = new SparkConf(loadDefaults = false)
@@ -142,4 +143,196 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually {
142143
}
143144
}
144145

146+
test("basic scheduling - spread out") {
147+
testBasicScheduling(spreadOut = true)
148+
}
149+
150+
test("basic scheduling - no spread out") {
151+
testBasicScheduling(spreadOut = false)
152+
}
153+
154+
test("scheduling with max cores - spread out") {
155+
testSchedulingWithMaxCores(spreadOut = true)
156+
}
157+
158+
test("scheduling with max cores - no spread out") {
159+
testSchedulingWithMaxCores(spreadOut = false)
160+
}
161+
162+
test("scheduling with cores per executor - spread out") {
163+
testSchedulingWithCoresPerExecutor(spreadOut = true)
164+
}
165+
166+
test("scheduling with cores per executor - no spread out") {
167+
testSchedulingWithCoresPerExecutor(spreadOut = false)
168+
}
169+
170+
test("scheduling with cores per executor AND max cores - spread out") {
171+
testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true)
172+
}
173+
174+
test("scheduling with cores per executor AND max cores - no spread out") {
175+
testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false)
176+
}
177+
178+
private def testBasicScheduling(spreadOut: Boolean): Unit = {
179+
val master = makeMaster()
180+
val appInfo = makeAppInfo(1024)
181+
val workerInfo = makeWorkerInfo(4096, 10)
182+
val workerInfos = Array(workerInfo, workerInfo, workerInfo)
183+
val scheduledCores = master.invokePrivate(
184+
_scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
185+
assert(scheduledCores.length === 3)
186+
assert(scheduledCores(0) === 10)
187+
assert(scheduledCores(1) === 10)
188+
assert(scheduledCores(2) === 10)
189+
}
190+
191+
private def testSchedulingWithMaxCores(spreadOut: Boolean): Unit = {
192+
val master = makeMaster()
193+
val appInfo1 = makeAppInfo(1024, maxCores = Some(8))
194+
val appInfo2 = makeAppInfo(1024, maxCores = Some(16))
195+
val workerInfo = makeWorkerInfo(4096, 10)
196+
val workerInfos = Array(workerInfo, workerInfo, workerInfo)
197+
var scheduledCores = master.invokePrivate(
198+
_scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
199+
assert(scheduledCores.length === 3)
200+
// With spreading out, each worker should be assigned a few cores
201+
if (spreadOut) {
202+
assert(scheduledCores(0) === 3)
203+
assert(scheduledCores(1) === 3)
204+
assert(scheduledCores(2) === 2)
205+
} else {
206+
// Without spreading out, the cores should be concentrated on the first worker
207+
assert(scheduledCores(0) === 8)
208+
assert(scheduledCores(1) === 0)
209+
assert(scheduledCores(2) === 0)
210+
}
211+
// Now test the same thing with max cores > cores per worker
212+
scheduledCores = master.invokePrivate(
213+
_scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
214+
assert(scheduledCores.length === 3)
215+
if (spreadOut) {
216+
assert(scheduledCores(0) === 6)
217+
assert(scheduledCores(1) === 5)
218+
assert(scheduledCores(2) === 5)
219+
} else {
220+
// Without spreading out, the first worker should be fully booked,
221+
// and the leftover cores should spill over to the second worker only.
222+
assert(scheduledCores(0) === 10)
223+
assert(scheduledCores(1) === 6)
224+
assert(scheduledCores(2) === 0)
225+
}
226+
}
227+
228+
private def testSchedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
229+
val master = makeMaster()
230+
val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2))
231+
val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2))
232+
val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3))
233+
val workerInfo = makeWorkerInfo(4096, 10)
234+
val workerInfos = Array(workerInfo, workerInfo, workerInfo)
235+
// Each worker should end up with 4 executors with 2 cores each
236+
// This should be 4 because of the memory restriction on each worker
237+
var scheduledCores = master.invokePrivate(
238+
_scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
239+
assert(scheduledCores.length === 3)
240+
assert(scheduledCores(0) === 8)
241+
assert(scheduledCores(1) === 8)
242+
assert(scheduledCores(2) === 8)
243+
// Now test the same thing without running into the worker memory limit
244+
// Each worker should now end up with 5 executors with 2 cores each
245+
scheduledCores = master.invokePrivate(
246+
_scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
247+
assert(scheduledCores.length === 3)
248+
assert(scheduledCores(0) === 10)
249+
assert(scheduledCores(1) === 10)
250+
assert(scheduledCores(2) === 10)
251+
// Now test the same thing with a cores per executor that 10 is not divisible by
252+
scheduledCores = master.invokePrivate(
253+
_scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
254+
assert(scheduledCores.length === 3)
255+
assert(scheduledCores(0) === 9)
256+
assert(scheduledCores(1) === 9)
257+
assert(scheduledCores(2) === 9)
258+
}
259+
260+
// Sorry for the long method name!
261+
private def testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
262+
val master = makeMaster()
263+
val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(4))
264+
val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(20))
265+
val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = Some(20))
266+
val workerInfo = makeWorkerInfo(4096, 10)
267+
val workerInfos = Array(workerInfo, workerInfo, workerInfo)
268+
// We should only launch two executors, each with exactly 2 cores
269+
var scheduledCores = master.invokePrivate(
270+
_scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
271+
assert(scheduledCores.length === 3)
272+
if (spreadOut) {
273+
assert(scheduledCores(0) === 2)
274+
assert(scheduledCores(1) === 2)
275+
assert(scheduledCores(2) === 0)
276+
} else {
277+
assert(scheduledCores(0) === 4)
278+
assert(scheduledCores(1) === 0)
279+
assert(scheduledCores(2) === 0)
280+
}
281+
// Test max cores > number of cores per worker
282+
scheduledCores = master.invokePrivate(
283+
_scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
284+
assert(scheduledCores.length === 3)
285+
if (spreadOut) {
286+
assert(scheduledCores(0) === 8)
287+
assert(scheduledCores(1) === 6)
288+
assert(scheduledCores(2) === 6)
289+
} else {
290+
assert(scheduledCores(0) === 10)
291+
assert(scheduledCores(1) === 10)
292+
assert(scheduledCores(2) === 0)
293+
}
294+
// Test max cores > number of cores per worker AND
295+
// a cores per executor that is 10 is not divisible by
296+
scheduledCores = master.invokePrivate(
297+
_scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
298+
assert(scheduledCores.length === 3)
299+
if (spreadOut) {
300+
assert(scheduledCores(0) === 6)
301+
assert(scheduledCores(1) === 6)
302+
assert(scheduledCores(2) === 6)
303+
} else {
304+
assert(scheduledCores(0) === 9)
305+
assert(scheduledCores(1) === 9)
306+
assert(scheduledCores(2) === 0)
307+
}
308+
}
309+
310+
// ===============================
311+
// | Utility methods for testing |
312+
// ===============================
313+
314+
private val _scheduleExecutorsOnWorkers = PrivateMethod[Array[Int]]('scheduleExecutorsOnWorkers)
315+
316+
private def makeMaster(conf: SparkConf = new SparkConf): Master = {
317+
val securityMgr = new SecurityManager(conf)
318+
val rpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 7077, conf, securityMgr)
319+
val master = new Master(rpcEnv, rpcEnv.address, 8080, securityMgr, conf)
320+
master
321+
}
322+
323+
private def makeAppInfo(
324+
memoryPerExecutorMb: Int,
325+
coresPerExecutor: Option[Int] = None,
326+
maxCores: Option[Int] = None): ApplicationInfo = {
327+
val desc = new ApplicationDescription(
328+
"test", maxCores, memoryPerExecutorMb, null, "", None, None, coresPerExecutor)
329+
val appId = System.currentTimeMillis.toString
330+
new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue)
331+
}
332+
333+
private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = {
334+
val workerId = System.currentTimeMillis.toString
335+
new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address")
336+
}
337+
145338
}

0 commit comments

Comments
 (0)