@@ -20,6 +20,9 @@ package org.apache.spark.scheduler.cluster.mesos
20
20
import java .io .File
21
21
import java .util .{List => JList }
22
22
import java .util .Collections
23
+ import java .util .concurrent .locks .ReentrantLock
24
+
25
+ import com .google .common .collect .HashBiMap
23
26
24
27
import scala .collection .JavaConversions ._
25
28
import scala .collection .mutable .{HashMap , HashSet }
@@ -69,9 +72,17 @@ private[spark] class CoarseMesosSchedulerBackend(
69
72
70
73
val slaveIdsWithExecutors = new HashSet [String ]
71
74
72
- val taskIdToSlaveId = new HashMap [Int , String ]
73
- val failuresBySlaveId = new HashMap [String , Int ] // How many times tasks on each slave failed
75
+ val taskIdToSlaveId : HashBiMap [Int , String ] = HashBiMap .create[Int , String ]
76
+ val failuresBySlaveId : HashMap [String , Int ] = new HashMap [String , Int ] // How many times tasks on each slave failed
77
+
78
+
79
+ // the total number of executors we aim to have
80
+ private var executorLimit : Option [Int ] = None
81
+ private val pendingRemovedSlaveIds = new HashSet [String ]
74
82
83
+ // private lock object protecting mutable state above. Using the intrinsic lock
84
+ // may lead to deadlocks since the superclass might also try to lock
85
+ private val stateLock = new ReentrantLock
75
86
76
87
val extraCoresPerSlave = conf.getInt(" spark.mesos.extra.cores" , 0 )
77
88
@@ -88,17 +99,16 @@ private[spark] class CoarseMesosSchedulerBackend(
88
99
override def start () {
89
100
super .start()
90
101
91
- synchronized {
102
+ stateLock. synchronized {
92
103
new Thread (" CoarseMesosSchedulerBackend driver" ) {
93
104
setDaemon(true )
94
105
override def run () {
95
106
val scheduler = CoarseMesosSchedulerBackend .this
96
107
val fwInfo = FrameworkInfo .newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
97
108
driver = new MesosSchedulerDriver (scheduler, fwInfo, master)
98
- try { {
109
+ try {
99
110
val ret = driver.run()
100
111
logInfo(" driver.run() returned with code " + ret)
101
- }
102
112
} catch {
103
113
case e : Exception => logError(" driver.run() failed" , e)
104
114
}
@@ -206,7 +216,7 @@ private[spark] class CoarseMesosSchedulerBackend(
206
216
* unless we've already launched more than we wanted to.
207
217
*/
208
218
override def resourceOffers (d : SchedulerDriver , offers : JList [Offer ]) {
209
- synchronized {
219
+ stateLock. synchronized {
210
220
val filters = Filters .newBuilder().setRefuseSeconds(- 1 ).build()
211
221
212
222
for (offer <- offers) {
@@ -274,7 +284,7 @@ private[spark] class CoarseMesosSchedulerBackend(
274
284
val taskId = status.getTaskId.getValue.toInt
275
285
val state = status.getState
276
286
logInfo(" Mesos task " + taskId + " is now " + state)
277
- synchronized {
287
+ stateLock. synchronized {
278
288
if (isFinished(state)) {
279
289
val slaveId = taskIdToSlaveId(taskId)
280
290
slaveIdsWithExecutors -= slaveId
@@ -292,6 +302,7 @@ private[spark] class CoarseMesosSchedulerBackend(
292
302
" is Spark installed on it?" )
293
303
}
294
304
}
305
+ executorTerminated(d, slaveId, s " Executor finished with state $state" )
295
306
driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
296
307
}
297
308
}
@@ -311,17 +322,33 @@ private[spark] class CoarseMesosSchedulerBackend(
311
322
312
323
override def frameworkMessage (d : SchedulerDriver , e : ExecutorID , s : SlaveID , b : Array [Byte ]) {}
313
324
314
- override def slaveLost (d : SchedulerDriver , slaveId : SlaveID ) {
315
- logInfo(" Mesos slave lost: " + slaveId.getValue)
316
- synchronized {
317
- if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
318
- // Note that the slave ID corresponds to the executor ID on that slave
319
- slaveIdsWithExecutors -= slaveId.getValue
320
- removeExecutor(slaveId.getValue, " Mesos slave lost" )
325
+ /** Called when a slave is lost or a Mesos task finished. Update local view on
326
+ * what tasks are running and remove the terminated slave from the list of pending
327
+ * slave IDs that we might have asked to be killed. It also notifies the driver
328
+ * that an executor was removed.
329
+ */
330
+ private def executorTerminated (d : SchedulerDriver , slaveId : String , reason : String ) {
331
+ stateLock.synchronized {
332
+ if (slaveIdsWithExecutors.contains(slaveId)) {
333
+ val slaveIdToTaskId = taskIdToSlaveId.inverse()
334
+ if (slaveIdToTaskId.contains(slaveId)) {
335
+ val taskId : Int = slaveIdToTaskId.get(slaveId)
336
+ taskIdToSlaveId.remove(taskId)
337
+ removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason)
338
+ }
339
+ pendingRemovedSlaveIds -= slaveId
340
+ slaveIdsWithExecutors -= slaveId
321
341
}
322
342
}
323
343
}
324
344
345
+ private def sparkExecutorId (slaveId : String , taskId : String ) = " %s/%s" .format(slaveId, taskId)
346
+
347
+ override def slaveLost (d : SchedulerDriver , slaveId : SlaveID ) {
348
+ logInfo(" Mesos slave lost: " + slaveId.getValue)
349
+ executorTerminated(d, slaveId.getValue, " Mesos slave lost: " + slaveId.getValue)
350
+ }
351
+
325
352
override def executorLost (d : SchedulerDriver , e : ExecutorID , s : SlaveID , status : Int ) {
326
353
logInfo(" Executor lost: %s, marking slave %s as lost" .format(e.getValue, s.getValue))
327
354
slaveLost(d, s)
@@ -333,4 +360,40 @@ private[spark] class CoarseMesosSchedulerBackend(
333
360
super .applicationId
334
361
}
335
362
363
+ override def doRequestTotalExecutors (requestedTotal : Int ): Boolean = {
364
+ // We don't truly know if we can fulfill the full amount of executors
365
+ // since at coarse grain it depends on the amount of slaves available.
366
+ logInfo(" Capping the total amount of executors to " + requestedTotal)
367
+ executorLimit = Option (requestedTotal)
368
+ true
369
+ }
370
+
371
+ override def doKillExecutors (executorIds : Seq [String ]): Boolean = {
372
+ if (driver == null ) {
373
+ logWarning(" Asked to kill executors before the executor was started." )
374
+ return false
375
+ }
376
+
377
+ val slaveIdToTaskId = taskIdToSlaveId.inverse()
378
+ for (executorId <- executorIds) {
379
+ val slaveId = executorId.split(" /" )(0 )
380
+ if (slaveIdToTaskId.contains(slaveId)) {
381
+ driver.killTask(
382
+ TaskID .newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build)
383
+ pendingRemovedSlaveIds += slaveId
384
+ } else {
385
+ logWarning(" Unable to find executor Id '" + executorId + " ' in Mesos scheduler" )
386
+ }
387
+ }
388
+
389
+ assert(pendingRemovedSlaveIds.size <= taskIdToSlaveId.size)
390
+
391
+ // We cannot simply decrement from the existing executor limit as we may not able to
392
+ // launch as much executors as the limit. But we assume if we are notified to kill
393
+ // executors, that means the scheduler wants to set the limit that is less than
394
+ // the amount of the executors that has been launched. Therefore, we take the existing
395
+ // amount of executors launched and deduct the executors killed as the new limit.
396
+ executorLimit = Option (taskIdToSlaveId.size - pendingRemovedSlaveIds.size)
397
+ true
398
+ }
336
399
}
0 commit comments