Skip to content

Commit 543a98d

Browse files
committed
Schedule multiple jobs
1 parent 6887e5e commit 543a98d

File tree

1 file changed

+63
-41
lines changed

1 file changed

+63
-41
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 63 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,11 @@ private[spark] case class ClusterSchedulerState(
7575

7676
private[spark] trait ClusterScheduler {
7777
def submitDriver(desc: DriverRequest): SubmitResponse
78+
7879
def killDriver(submissionId: String): KillResponse
80+
7981
def getStatus(submissionId: String): StatusResponse
82+
8083
def getState(): ClusterSchedulerState
8184
}
8285

@@ -225,65 +228,84 @@ private[spark] class MesosClusterScheduler(conf: SparkConf)
225228
)
226229
}
227230

231+
private [spark] case class ResourceOffer(val offer: Offer, var cpu: Double, var mem: Double)
232+
228233
override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
229-
// We should try to schedule all the drivers if the offers fit.
234+
var submission = queue.peek
230235

231-
// Non-blocking poll.
232-
val submissionOption = Option(queue.poll(0, TimeUnit.SECONDS))
236+
val usedOffers = new mutable.HashSet[OfferID]
233237

234-
if (submissionOption.isEmpty) {
235-
offers.foreach(o => driver.declineOffer(o.getId))
236-
return
238+
val currentOffers = offers.map {
239+
o =>
240+
ResourceOffer(
241+
o,
242+
getResource(o.getResourcesList, "cpus"),
243+
getResource(o.getResourcesList, "mem"))
237244
}
238245

239-
val submission = submissionOption.get
246+
var canSchedule = true
240247

241-
var remainingOffers = offers
248+
while (canSchedule && submission != null) {
249+
val driverCpu = submission.req.desc.cores
250+
val driverMem = submission.req.desc.mem
242251

243-
val driverCpu = submission.req.desc.cores
244-
val driverMem = submission.req.desc.mem
252+
val offerOption = currentOffers.find { o =>
253+
o.cpu >= driverCpu && o.mem >= driverMem
254+
}
245255

246-
// Should use the passed in driver cpu and memory.
247-
val offerOption = offers.find { o =>
248-
getResource(o.getResourcesList, "cpus") >= driverCpu &&
249-
getResource(o.getResourcesList, "mem") >= driverMem
250-
}
256+
if (offerOption.isDefined) {
257+
val offer = offerOption.get
251258

252-
offerOption.foreach { offer =>
253-
val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
259+
offer.cpu -= driverCpu
260+
offer.mem -= driverMem
254261

255-
val cpuResource = Resource.newBuilder()
256-
.setName("cpus").setType(Value.Type.SCALAR)
257-
.setScalar(Value.Scalar.newBuilder().setValue(driverCpu)).build()
262+
val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
258263

259-
val memResource = Resource.newBuilder()
260-
.setName("mem").setType(Value.Type.SCALAR)
261-
.setScalar(Value.Scalar.newBuilder().setValue(driverMem)).build()
264+
val cpuResource = Resource.newBuilder()
265+
.setName("cpus").setType(Value.Type.SCALAR)
266+
.setScalar(Value.Scalar.newBuilder().setValue(driverCpu)).build()
262267

263-
val commandInfo = buildCommand(submission.req)
268+
val memResource = Resource.newBuilder()
269+
.setName("mem").setType(Value.Type.SCALAR)
270+
.setScalar(Value.Scalar.newBuilder().setValue(driverMem)).build()
264271

265-
val taskInfo = TaskInfo.newBuilder()
266-
.setTaskId(taskId)
267-
.setName(s"driver for ${submission.req.desc.command.mainClass}")
268-
.setSlaveId(offer.getSlaveId)
269-
.setCommand(commandInfo)
270-
.addResources(cpuResource)
271-
.addResources(memResource)
272-
.build
272+
val commandInfo = buildCommand(submission.req)
273273

274-
//TODO: logDebug("")
275-
driver.launchTasks(Collections.singleton(offer.getId), Collections.singleton(taskInfo))
274+
val taskInfo = TaskInfo.newBuilder()
275+
.setTaskId(taskId)
276+
.setName(s"driver for ${submission.req.desc.command.mainClass}")
277+
.setSlaveId(offer.offer.getSlaveId)
278+
.setCommand(commandInfo)
279+
.addResources(cpuResource)
280+
.addResources(memResource)
281+
.build
276282

277-
stateLock.synchronized {
278-
launchedDrivers(submission.submissionId) =
279-
ClusterTaskState(submission, taskId, offer.getSlaveId,
280-
None, DriverState.SUBMITTED, new Date())
281-
}
283+
logDebug(s"Launching task ${taskInfo}, with offer: ${offer.offer}")
284+
driver.launchTasks(Collections.singleton(offer.offer.getId), Collections.singleton(taskInfo))
282285

283-
remainingOffers = offers.filter(o => o.getId.equals(offer.getId))
286+
stateLock.synchronized {
287+
launchedDrivers(submission.submissionId) =
288+
ClusterTaskState(submission, taskId, offer.offer.getSlaveId,
289+
None, DriverState.SUBMITTED, new Date())
290+
}
291+
292+
usedOffers += offer.offer.getId
293+
294+
// remove driver from queue.
295+
queue.poll(0, TimeUnit.SECONDS)
296+
297+
submission = queue.peek
298+
} else {
299+
// We can stop at very first driver that we cannot schedule on.
300+
// TODO: We should remove the top driver that cannot be scheduled
301+
// over a configurable time period.
302+
canSchedule = false
303+
}
284304
}
285305

286-
remainingOffers.foreach(o => driver.declineOffer(o.getId))
306+
offers
307+
.filter(o => !usedOffers.contains(o.getId))
308+
.foreach(o => driver.declineOffer(o.getId))
287309
}
288310

289311
def getState(): ClusterSchedulerState = {

0 commit comments

Comments
 (0)