Skip to content

Commit 2f452cb

Browse files
CodingCataarondav
authored andcommitted
SPARK-1686: keep schedule() calling in the main thread
https://issues.apache.org/jira/browse/SPARK-1686 moved from original JIRA (by @markhamstra): In deploy.master.Master, the completeRecovery method is the last thing to be called when a standalone Master is recovering from failure. It is responsible for resetting some state, relaunching drivers, and eventually resuming its scheduling duties. There are currently four places in Master.scala where completeRecovery is called. Three of them are from within the actor's receive method, and aren't problems. The last starts from within receive when the ElectedLeader message is received, but the actual completeRecovery() call is made from the Akka scheduler. That means that it will execute on a different scheduler thread, and Master itself will end up running (i.e., schedule() ) from that Akka scheduler thread. In this PR, I added a new master message TriggerSchedule to trigger the "local" call of schedule() in the scheduler thread Author: CodingCat <[email protected]> Closes #639 from CodingCat/SPARK-1686 and squashes the following commits: 81bb4ca [CodingCat] rename variable 69e0a2a [CodingCat] style fix 36a2ac0 [CodingCat] address Aaron's comments ec9b7bb [CodingCat] address the comments 02b37ca [CodingCat] keep schedule() calling in the main thread
1 parent 59577df commit 2f452cb

File tree

1 file changed

+12
-3
lines changed
  • core/src/main/scala/org/apache/spark/deploy/master

1 file changed

+12
-3
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ private[spark] class Master(
104104

105105
var leaderElectionAgent: ActorRef = _
106106

107+
private var recoveryCompletionTask: Cancellable = _
108+
107109
// As a temporary workaround before better ways of configuring memory, we allow users to set
108110
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
109111
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
@@ -152,6 +154,10 @@ private[spark] class Master(
152154
}
153155

154156
override def postStop() {
157+
// prevent the CompleteRecovery message sending to restarted master
158+
if (recoveryCompletionTask != null) {
159+
recoveryCompletionTask.cancel()
160+
}
155161
webUi.stop()
156162
fileSystemsUsed.foreach(_.close())
157163
masterMetricsSystem.stop()
@@ -171,10 +177,13 @@ private[spark] class Master(
171177
logInfo("I have been elected leader! New state: " + state)
172178
if (state == RecoveryState.RECOVERING) {
173179
beginRecovery(storedApps, storedDrivers, storedWorkers)
174-
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
180+
recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
181+
CompleteRecovery)
175182
}
176183
}
177184

185+
case CompleteRecovery => completeRecovery()
186+
178187
case RevokedLeadership => {
179188
logError("Leadership has been revoked -- master shutting down.")
180189
System.exit(0)
@@ -465,7 +474,7 @@ private[spark] class Master(
465474
* Schedule the currently available resources among waiting apps. This method will be called
466475
* every time a new app joins or resource availability changes.
467476
*/
468-
def schedule() {
477+
private def schedule() {
469478
if (state != RecoveryState.ALIVE) { return }
470479

471480
// First schedule drivers, they take strict precedence over applications
@@ -485,7 +494,7 @@ private[spark] class Master(
485494
// Try to spread out each app among all the nodes, until it has all its cores
486495
for (app <- waitingApps if app.coresLeft > 0) {
487496
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
488-
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
497+
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
489498
val numUsable = usableWorkers.length
490499
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
491500
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

0 commit comments

Comments
 (0)