@@ -533,6 +533,7 @@ private[master] class Master(
533
533
534
534
/**
535
535
* Schedule executors to be launched on the workers.
536
+ * Returns an array containing number of cores assigned to each worker.
536
537
*
537
538
* There are two modes of launching executors. The first attempts to spread out an application's
538
539
* executors on as many workers as possible, while the second does the opposite (i.e. launch them
@@ -543,10 +544,18 @@ private[master] class Master(
543
544
* multiple executors from the same application may be launched on the same worker if the worker
544
545
* has enough cores and memory. Otherwise, each executor grabs all the cores available on the
545
546
* worker by default, in which case only one executor may be launched on each worker.
547
+ *
548
+ * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
549
+ * at a time). Consider the following example: cluster has 4 workers with 16 cores each.
550
+ * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
551
+ * allocated at a time, 12 cores from each worker would be assigned to each executor.
552
+ * Since 12 < 16, no executors would launch [SPARK-8881].
546
553
*/
547
-
548
- private [master] def scheduleExecutorsOnWorkers (app : ApplicationInfo ,
549
- usableWorkers : Array [WorkerInfo ], spreadOutApps : Boolean ): Array [Int ] = {
554
+ private [master] def coresToAssign (
555
+ app : ApplicationInfo ,
556
+ usableWorkers : Array [WorkerInfo ],
557
+ spreadOutApps : Boolean ): Array [Int ] = {
558
+ // Default value for number of cores per executor is 1
550
559
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1 )
551
560
val memoryPerExecutor = app.desc.memoryPerExecutorMB
552
561
val numUsable = usableWorkers.length
@@ -588,24 +597,25 @@ private[master] class Master(
588
597
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
589
598
// in the queue, then the second app, etc.
590
599
for (app <- waitingApps if app.coresLeft > 0 ) {
600
+ // Default value for number of cores per executor is 1
591
601
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1 )
592
602
val usableWorkers = workers.toArray.filter(_.state == WorkerState .ALIVE )
593
603
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
594
604
worker.coresFree >= coresPerExecutor)
595
605
.sortBy(_.coresFree).reverse
596
- val assignedCores = scheduleExecutorsOnWorkers (app, usableWorkers, spreadOutApps)
606
+ val assignedCores = coresToAssign (app, usableWorkers, spreadOutApps)
597
607
598
608
// Now that we've decided how many cores to allocate on each worker, let's allocate them
599
609
var pos = 0
600
610
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0 ) {
601
- allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor,
602
- usableWorkers(pos))
611
+ allocateWorkerResourceToExecutors(
612
+ app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
603
613
}
604
614
}
605
615
}
606
616
607
617
/**
608
- * Allocate a worker's resources to one or more executors
618
+ * Allocate a worker's resources to one or more executors.
609
619
* @param app the info of the application which the executors belong to
610
620
* @param assignedCores number of cores on this worker for this application
611
621
* @param coresPerExecutor number of cores per executor
@@ -616,8 +626,8 @@ private[master] class Master(
616
626
assignedCores : Int ,
617
627
coresPerExecutor : Int ,
618
628
worker : WorkerInfo ): Unit = {
619
-
620
- var numExecutors = assignedCores/ coresPerExecutor
629
+ // If cores per executor is specified, then this division should have a remainder of zero
630
+ val numExecutors = assignedCores / coresPerExecutor
621
631
for (i <- 1 to numExecutors) {
622
632
val exec = app.addExecutor(worker, coresPerExecutor)
623
633
launchExecutor(worker, exec)
0 commit comments