Skip to content

Commit 03e8d0a

Browse files
committed
Make the fine-grained mode scheduler respect
spark.mesos.mesosExecutor.cores when launching Mesos executors (regression)
1 parent 391e6be commit 03e8d0a

File tree

2 files changed

+33
-3
lines changed

2 files changed

+33
-3
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import org.apache.spark.scheduler._
3232
import org.apache.spark.scheduler.cluster.ExecutorInfo
3333
import org.apache.spark.util.Utils
3434

35-
3635
/**
3736
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
3837
* separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
@@ -127,7 +126,7 @@ private[spark] class MesosSchedulerBackend(
127126
}
128127
val builder = MesosExecutorInfo.newBuilder()
129128
val (resourcesAfterCpu, usedCpuResources) =
130-
partitionResources(availableResources, "cpus", scheduler.CPUS_PER_TASK)
129+
partitionResources(availableResources, "cpus", mesosExecutorCores)
131130
val (resourcesAfterMem, usedMemResources) =
132131
partitionResources(resourcesAfterCpu.asJava, "mem", calculateTotalMemory(sc))
133132

core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,38 @@ import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSui
4242

4343
class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
4444

45+
test("Use configured mesosExecutor.cores for ExecutorInfo") {
46+
val mesosExecutorCores = 3
47+
val conf = new SparkConf
48+
conf.set("spark.mesos.mesosExecutor.cores", mesosExecutorCores.toString)
49+
50+
val listenerBus = mock[LiveListenerBus]
51+
listenerBus.post(
52+
SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
53+
54+
val sc = mock[SparkContext]
55+
when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
56+
57+
when(sc.conf).thenReturn(conf)
58+
when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
59+
when(sc.executorMemory).thenReturn(100)
60+
when(sc.listenerBus).thenReturn(listenerBus)
61+
val taskScheduler = mock[TaskSchedulerImpl]
62+
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
63+
64+
val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master")
65+
66+
val resources = Arrays.asList(
67+
mesosSchedulerBackend.createResource("cpus", 4),
68+
mesosSchedulerBackend.createResource("mem", 1024))
69+
// uri is null.
70+
val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
71+
val executorResources = executorInfo.getResourcesList
72+
val cpus = executorResources.asScala.find(_.getName.equals("cpus")).get.getScalar.getValue
73+
74+
assert(cpus === mesosExecutorCores)
75+
}
76+
4577
test("check spark-class location correctly") {
4678
val conf = new SparkConf
4779
conf.set("spark.mesos.executor.home" , "/mesos-home")
@@ -263,7 +295,6 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
263295
.setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
264296
.setHostname(s"host${id.toString}").build()
265297

266-
267298
val mesosOffers = new java.util.ArrayList[Offer]
268299
mesosOffers.add(offer)
269300

0 commit comments

Comments
 (0)