Skip to content

Commit a3da508

Browse files
CodingCatkayousterhout
authored andcommitted
SPARK-1171: when executor is removed, we should minus totalCores instead of just freeCores on that executor
https://spark-project.atlassian.net/browse/SPARK-1171 When the executor is removed, the current implementation will only minus the freeCores of that executor. Actually we should minus the totalCores... Author: CodingCat <[email protected]> Author: Nan Zhu <[email protected]> Closes #63 from CodingCat/simplify_CoarseGrainedSchedulerBackend and squashes the following commits: f6bf93f [Nan Zhu] code clean 19c2bb4 [CodingCat] use copy idiom to reconstruct the workerOffers 43c13e9 [CodingCat] keep WorkerOffer immutable af470d3 [CodingCat] style fix 0c0e409 [CodingCat] simplify the implementation of CoarseGrainedSchedulerBackend
1 parent 0283665 commit a3da508

File tree

2 files changed

+7
-3
lines changed

2 files changed

+7
-3
lines changed

core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@ package org.apache.spark.scheduler
2121
* Represents free resources available on an executor.
2222
*/
2323
private[spark]
24-
class WorkerOffer(val executorId: String, val host: String, val cores: Int)
24+
case class WorkerOffer(executorId: String, host: String, cores: Int)

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
5454
private val executorAddress = new HashMap[String, Address]
5555
private val executorHost = new HashMap[String, String]
5656
private val freeCores = new HashMap[String, Int]
57+
private val totalCores = new HashMap[String, Int]
5758
private val addressToExecutorId = new HashMap[Address, String]
5859

5960
override def preStart() {
@@ -76,6 +77,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
7677
sender ! RegisteredExecutor(sparkProperties)
7778
executorActor(executorId) = sender
7879
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
80+
totalCores(executorId) = cores
7981
freeCores(executorId) = cores
8082
executorAddress(executorId) = sender.path.address
8183
addressToExecutorId(sender.path.address) = executorId
@@ -147,10 +149,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
147149
def removeExecutor(executorId: String, reason: String) {
148150
if (executorActor.contains(executorId)) {
149151
logInfo("Executor " + executorId + " disconnected, so removing it")
150-
val numCores = freeCores(executorId)
151-
addressToExecutorId -= executorAddress(executorId)
152+
val numCores = totalCores(executorId)
152153
executorActor -= executorId
153154
executorHost -= executorId
155+
addressToExecutorId -= executorAddress(executorId)
156+
executorAddress -= executorId
157+
totalCores -= executorId
154158
freeCores -= executorId
155159
totalCoreCount.addAndGet(-numCores)
156160
scheduler.executorLost(executorId, SlaveLost(reason))

0 commit comments

Comments
 (0)