@@ -205,6 +205,15 @@ private[spark] class MesosSchedulerBackend(
205
205
offer.getHostname,
206
206
getResource(offer.getResourcesList, " cpus" ).toInt)
207
207
208
+ private def inClassLoader ()(fun : => Unit ) = {
209
+ val oldClassLoader = setClassLoader()
210
+ try {
211
+ fun
212
+ } finally {
213
+ restoreClassLoader(oldClassLoader)
214
+ }
215
+ }
216
+
208
217
override def disconnected (d : SchedulerDriver ) {}
209
218
210
219
override def reregistered (d : SchedulerDriver , masterInfo : MasterInfo ) {}
@@ -215,8 +224,7 @@ private[spark] class MesosSchedulerBackend(
215
224
* tasks are balanced across the cluster.
216
225
*/
217
226
override def resourceOffers (d : SchedulerDriver , offers : JList [Offer ]) {
218
- val oldClassLoader = setClassLoader()
219
- try {
227
+ inClassLoader() {
220
228
val (acceptedOffers, declinedOffers) = offers.partition(o => {
221
229
val mem = getResource(o.getResourcesList, " mem" )
222
230
val slaveId = o.getSlaveId.getValue
@@ -233,11 +241,11 @@ private[spark] class MesosSchedulerBackend(
233
241
scheduler.resourceOffers(offerableWorkers)
234
242
.filter(! _.isEmpty)
235
243
.foreach(_.foreach(taskDesc => {
236
- val slaveId = taskDesc.executorId
237
- slaveIdsWithExecutors += slaveId
238
- taskIdToSlaveId(taskDesc.taskId) = slaveId
239
- mesosTasks.getOrElseUpdate(slaveId, new JArrayList [MesosTaskInfo ])
240
- .add(createMesosTask(taskDesc, slaveId))
244
+ val slaveId = taskDesc.executorId
245
+ slaveIdsWithExecutors += slaveId
246
+ taskIdToSlaveId(taskDesc.taskId) = slaveId
247
+ mesosTasks.getOrElseUpdate(slaveId, new JArrayList [MesosTaskInfo ])
248
+ .add(createMesosTask(taskDesc, slaveId))
241
249
}))
242
250
243
251
// Reply to the offers
@@ -250,8 +258,6 @@ private[spark] class MesosSchedulerBackend(
250
258
}
251
259
252
260
declinedOffers.foreach(o => d.declineOffer(o.getId))
253
- } finally {
254
- restoreClassLoader(oldClassLoader)
255
261
}
256
262
}
257
263
@@ -290,8 +296,7 @@ private[spark] class MesosSchedulerBackend(
290
296
}
291
297
292
298
override def statusUpdate (d : SchedulerDriver , status : TaskStatus ) {
293
- val oldClassLoader = setClassLoader()
294
- try {
299
+ inClassLoader() {
295
300
val tid = status.getTaskId.getValue.toLong
296
301
val state = TaskState .fromMesos(status.getState)
297
302
synchronized {
@@ -304,18 +309,13 @@ private[spark] class MesosSchedulerBackend(
304
309
}
305
310
}
306
311
scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
307
- } finally {
308
- restoreClassLoader(oldClassLoader)
309
312
}
310
313
}
311
314
312
315
override def error (d : SchedulerDriver , message : String ) {
313
- val oldClassLoader = setClassLoader()
314
- try {
316
+ inClassLoader() {
315
317
logError(" Mesos error: " + message)
316
318
scheduler.error(message)
317
- } finally {
318
- restoreClassLoader(oldClassLoader)
319
319
}
320
320
}
321
321
@@ -332,15 +332,12 @@ private[spark] class MesosSchedulerBackend(
332
332
override def frameworkMessage (d : SchedulerDriver , e : ExecutorID , s : SlaveID , b : Array [Byte ]) {}
333
333
334
334
private def recordSlaveLost (d : SchedulerDriver , slaveId : SlaveID , reason : ExecutorLossReason ) {
335
- val oldClassLoader = setClassLoader()
336
- try {
335
+ inClassLoader() {
337
336
logInfo(" Mesos slave lost: " + slaveId.getValue)
338
337
synchronized {
339
338
slaveIdsWithExecutors -= slaveId.getValue
340
339
}
341
340
scheduler.executorLost(slaveId.getValue, reason)
342
- } finally {
343
- restoreClassLoader(oldClassLoader)
344
341
}
345
342
}
346
343
0 commit comments