18
18
package org .apache .spark .scheduler .cluster .mesos
19
19
20
20
import java .io .File
21
- import java .util .{List => JList }
21
+ import java .util .{List => JList , Collections }
22
+ import java .util .concurrent .locks .ReentrantLock
22
23
23
24
import scala .collection .JavaConversions ._
24
25
import scala .collection .mutable .{HashMap , HashSet }
25
26
27
+ import com .google .common .collect .HashBiMap
28
+ import org .apache .mesos .Protos .{TaskInfo => MesosTaskInfo , _ }
26
29
import org .apache .mesos .{Scheduler => MScheduler , _ }
27
30
import org .apache .mesos .Protos .{TaskInfo => MesosTaskInfo , _ }
28
31
import org .apache .spark .{SparkContext , SparkEnv , SparkException , TaskState }
@@ -60,9 +63,27 @@ private[spark] class CoarseMesosSchedulerBackend(
60
63
61
64
val slaveIdsWithExecutors = new HashSet [String ]
62
65
63
- val taskIdToSlaveId = new HashMap [Int , String ]
64
- val failuresBySlaveId = new HashMap [String , Int ] // How many times tasks on each slave failed
66
+ val taskIdToSlaveId : HashBiMap [Int , String ] = HashBiMap .create[Int , String ]
67
+ // How many times tasks on each slave failed
68
+ val failuresBySlaveId : HashMap [String , Int ] = new HashMap [String , Int ]
69
+
70
+ /**
71
+ * The total number of executors we aim to have. Undefined when not using dynamic allocation
72
+ * and before the ExecutorAllocatorManager calls [[doRequesTotalExecutors ]].
73
+ */
74
+ private var executorLimitOption : Option [Int ] = None
75
+
76
+ /**
77
+ * Return the current executor limit, which may be [[Int.MaxValue ]]
78
+ * before properly initialized.
79
+ */
80
+ private [mesos] def executorLimit : Int = executorLimitOption.getOrElse(Int .MaxValue )
81
+
82
+ private val pendingRemovedSlaveIds = new HashSet [String ]
65
83
84
+ // private lock object protecting mutable state above. Using the intrinsic lock
85
+ // may lead to deadlocks since the superclass might also try to lock
86
+ private val stateLock = new ReentrantLock
66
87
67
88
val extraCoresPerSlave = conf.getInt(" spark.mesos.extra.cores" , 0 )
68
89
@@ -86,7 +107,7 @@ private[spark] class CoarseMesosSchedulerBackend(
86
107
startScheduler(master, CoarseMesosSchedulerBackend .this , fwInfo)
87
108
}
88
109
89
- def createCommand (offer : Offer , numCores : Int ): CommandInfo = {
110
+ def createCommand (offer : Offer , numCores : Int , taskId : Int ): CommandInfo = {
90
111
val executorSparkHome = conf.getOption(" spark.mesos.executor.home" )
91
112
.orElse(sc.getSparkHome())
92
113
.getOrElse {
@@ -120,10 +141,6 @@ private[spark] class CoarseMesosSchedulerBackend(
120
141
}
121
142
val command = CommandInfo .newBuilder()
122
143
.setEnvironment(environment)
123
- val driverUrl = sc.env.rpcEnv.uriOf(
124
- SparkEnv .driverActorSystemName,
125
- RpcAddress (conf.get(" spark.driver.host" ), conf.get(" spark.driver.port" ).toInt),
126
- CoarseGrainedSchedulerBackend .ENDPOINT_NAME )
127
144
128
145
val uri = conf.getOption(" spark.executor.uri" )
129
146
.orElse(Option (System .getenv(" SPARK_EXECUTOR_URI" )))
@@ -133,7 +150,7 @@ private[spark] class CoarseMesosSchedulerBackend(
133
150
command.setValue(
134
151
" %s \" %s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
135
152
.format(prefixEnv, runScript) +
136
- s " --driver-url $driverUrl " +
153
+ s " --driver-url $driverURL " +
137
154
s " --executor-id ${offer.getSlaveId.getValue}" +
138
155
s " --hostname ${offer.getHostname}" +
139
156
s " --cores $numCores" +
@@ -142,11 +159,12 @@ private[spark] class CoarseMesosSchedulerBackend(
142
159
// Grab everything to the first '.'. We'll use that and '*' to
143
160
// glob the directory "correctly".
144
161
val basename = uri.get.split('/' ).last.split('.' ).head
162
+ val executorId = sparkExecutorId(offer.getSlaveId.getValue, taskId.toString)
145
163
command.setValue(
146
164
s " cd $basename*; $prefixEnv " +
147
165
" ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
148
- s " --driver-url $driverUrl " +
149
- s " --executor-id ${offer.getSlaveId.getValue} " +
166
+ s " --driver-url $driverURL " +
167
+ s " --executor-id $executorId " +
150
168
s " --hostname ${offer.getHostname}" +
151
169
s " --cores $numCores" +
152
170
s " --app-id $appId" )
@@ -155,6 +173,17 @@ private[spark] class CoarseMesosSchedulerBackend(
155
173
command.build()
156
174
}
157
175
176
+ protected def driverURL : String = {
177
+ if (conf.contains(" spark.testing" )) {
178
+ " driverURL"
179
+ } else {
180
+ sc.env.rpcEnv.uriOf(
181
+ SparkEnv .driverActorSystemName,
182
+ RpcAddress (conf.get(" spark.driver.host" ), conf.get(" spark.driver.port" ).toInt),
183
+ CoarseGrainedSchedulerBackend .ENDPOINT_NAME )
184
+ }
185
+ }
186
+
158
187
override def offerRescinded (d : SchedulerDriver , o : OfferID ) {}
159
188
160
189
override def registered (d : SchedulerDriver , frameworkId : FrameworkID , masterInfo : MasterInfo ) {
@@ -172,17 +201,18 @@ private[spark] class CoarseMesosSchedulerBackend(
172
201
* unless we've already launched more than we wanted to.
173
202
*/
174
203
override def resourceOffers (d : SchedulerDriver , offers : JList [Offer ]) {
175
- synchronized {
204
+ stateLock. synchronized {
176
205
val filters = Filters .newBuilder().setRefuseSeconds(5 ).build()
177
206
for (offer <- offers) {
178
207
val offerAttributes = toAttributeMap(offer.getAttributesList)
179
208
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
180
- val slaveId = offer.getSlaveId.toString
209
+ val slaveId = offer.getSlaveId.getValue
181
210
val mem = getResource(offer.getResourcesList, " mem" )
182
211
val cpus = getResource(offer.getResourcesList, " cpus" ).toInt
183
212
val id = offer.getId.getValue
184
- if (meetsConstraints &&
213
+ if (taskIdToSlaveId.size < executorLimit &&
185
214
totalCoresAcquired < maxCores &&
215
+ meetsConstraints &&
186
216
mem >= calculateTotalMemory(sc) &&
187
217
cpus >= 1 &&
188
218
failuresBySlaveId.getOrElse(slaveId, 0 ) < MAX_SLAVE_FAILURES &&
@@ -197,7 +227,7 @@ private[spark] class CoarseMesosSchedulerBackend(
197
227
val task = MesosTaskInfo .newBuilder()
198
228
.setTaskId(TaskID .newBuilder().setValue(taskId.toString).build())
199
229
.setSlaveId(offer.getSlaveId)
200
- .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
230
+ .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId ))
201
231
.setName(" Task " + taskId)
202
232
.addResources(createResource(" cpus" , cpusToUse))
203
233
.addResources(createResource(" mem" , calculateTotalMemory(sc)))
@@ -209,7 +239,9 @@ private[spark] class CoarseMesosSchedulerBackend(
209
239
210
240
// accept the offer and launch the task
211
241
logDebug(s " Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" )
212
- d.launchTasks(List (offer.getId), List (task.build()), filters)
242
+ d.launchTasks(
243
+ Collections .singleton(offer.getId),
244
+ Collections .singleton(task.build()), filters)
213
245
} else {
214
246
// Decline the offer
215
247
logDebug(s " Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" )
@@ -224,7 +256,7 @@ private[spark] class CoarseMesosSchedulerBackend(
224
256
val taskId = status.getTaskId.getValue.toInt
225
257
val state = status.getState
226
258
logInfo(" Mesos task " + taskId + " is now " + state)
227
- synchronized {
259
+ stateLock. synchronized {
228
260
if (TaskState .isFinished(TaskState .fromMesos(state))) {
229
261
val slaveId = taskIdToSlaveId(taskId)
230
262
slaveIdsWithExecutors -= slaveId
@@ -242,8 +274,9 @@ private[spark] class CoarseMesosSchedulerBackend(
242
274
" is Spark installed on it?" )
243
275
}
244
276
}
277
+ executorTerminated(d, slaveId, s " Executor finished with state $state" )
245
278
// In case we'd rejected everything before but have now lost a node
246
- mesosDriver .reviveOffers()
279
+ d .reviveOffers()
247
280
}
248
281
}
249
282
}
@@ -262,18 +295,39 @@ private[spark] class CoarseMesosSchedulerBackend(
262
295
263
296
override def frameworkMessage (d : SchedulerDriver , e : ExecutorID , s : SlaveID , b : Array [Byte ]) {}
264
297
265
- override def slaveLost (d : SchedulerDriver , slaveId : SlaveID ) {
266
- logInfo(" Mesos slave lost: " + slaveId.getValue)
267
- synchronized {
268
- if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
269
- // Note that the slave ID corresponds to the executor ID on that slave
270
- slaveIdsWithExecutors -= slaveId.getValue
271
- removeExecutor(slaveId.getValue, " Mesos slave lost" )
298
+ /**
299
+ * Called when a slave is lost or a Mesos task finished. Update local view on
300
+ * what tasks are running and remove the terminated slave from the list of pending
301
+ * slave IDs that we might have asked to be killed. It also notifies the driver
302
+ * that an executor was removed.
303
+ */
304
+ private def executorTerminated (d : SchedulerDriver , slaveId : String , reason : String ): Unit = {
305
+ stateLock.synchronized {
306
+ if (slaveIdsWithExecutors.contains(slaveId)) {
307
+ val slaveIdToTaskId = taskIdToSlaveId.inverse()
308
+ if (slaveIdToTaskId.contains(slaveId)) {
309
+ val taskId : Int = slaveIdToTaskId.get(slaveId)
310
+ taskIdToSlaveId.remove(taskId)
311
+ removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason)
312
+ }
313
+ // TODO: This assumes one Spark executor per Mesos slave,
314
+ // which may no longer be true after SPARK-5095
315
+ pendingRemovedSlaveIds -= slaveId
316
+ slaveIdsWithExecutors -= slaveId
272
317
}
273
318
}
274
319
}
275
320
276
- override def executorLost (d : SchedulerDriver , e : ExecutorID , s : SlaveID , status : Int ) {
321
+ private def sparkExecutorId (slaveId : String , taskId : String ): String = {
322
+ s " $slaveId/ $taskId"
323
+ }
324
+
325
+ override def slaveLost (d : SchedulerDriver , slaveId : SlaveID ): Unit = {
326
+ logInfo(" Mesos slave lost: " + slaveId.getValue)
327
+ executorTerminated(d, slaveId.getValue, " Mesos slave lost: " + slaveId.getValue)
328
+ }
329
+
330
+ override def executorLost (d : SchedulerDriver , e : ExecutorID , s : SlaveID , status : Int ): Unit = {
277
331
logInfo(" Executor lost: %s, marking slave %s as lost" .format(e.getValue, s.getValue))
278
332
slaveLost(d, s)
279
333
}
@@ -284,4 +338,34 @@ private[spark] class CoarseMesosSchedulerBackend(
284
338
super .applicationId
285
339
}
286
340
341
+ override def doRequestTotalExecutors (requestedTotal : Int ): Boolean = {
342
+ // We don't truly know if we can fulfill the full amount of executors
343
+ // since at coarse grain it depends on the amount of slaves available.
344
+ logInfo(" Capping the total amount of executors to " + requestedTotal)
345
+ executorLimitOption = Some (requestedTotal)
346
+ true
347
+ }
348
+
349
+ override def doKillExecutors (executorIds : Seq [String ]): Boolean = {
350
+ if (mesosDriver == null ) {
351
+ logWarning(" Asked to kill executors before the Mesos driver was started." )
352
+ return false
353
+ }
354
+
355
+ val slaveIdToTaskId = taskIdToSlaveId.inverse()
356
+ for (executorId <- executorIds) {
357
+ val slaveId = executorId.split(" /" )(0 )
358
+ if (slaveIdToTaskId.contains(slaveId)) {
359
+ mesosDriver.killTask(
360
+ TaskID .newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build())
361
+ pendingRemovedSlaveIds += slaveId
362
+ } else {
363
+ logWarning(" Unable to find executor Id '" + executorId + " ' in Mesos scheduler" )
364
+ }
365
+ }
366
+ // no need to adjust `executorLimitOption` since the AllocationManager already communicated
367
+ // the desired limit through a call to `doRequestTotalExecutors`.
368
+ // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]]
369
+ true
370
+ }
287
371
}
0 commit comments