From 02b37ca7e9256628e31cd56ba21d7e579c1233d6 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 4 May 2014 21:43:03 -0400 Subject: [PATCH 1/5] keep schedule() calling in the main thread --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 4 +++- .../scala/org/apache/spark/deploy/master/MasterMessages.scala | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index fdb633bd33608..cf73d6d9fbec2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -175,6 +175,8 @@ private[spark] class Master( } } + case TriggerSchedule => schedule() + case RevokedLeadership => { logError("Leadership has been revoked -- master shutting down.") System.exit(0) @@ -448,7 +450,7 @@ private[spark] class Master( } state = RecoveryState.ALIVE - schedule() + self ! TriggerSchedule logInfo("Recovery complete - resuming operations!") } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala index db72d8ae9bdaf..ad848bb9862bd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala @@ -39,4 +39,6 @@ private[master] object MasterMessages { case object RequestWebUIPort case class WebUIPortResponse(webUIBoundPort: Int) + + case object TriggerSchedule } From ec9b7bb29a56a2d2fde35698f4544c4d101d637b Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 5 May 2014 08:55:59 -0400 Subject: [PATCH 2/5] address the comments --- .../org/apache/spark/deploy/master/Master.scala | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index cf73d6d9fbec2..d8381ae431f09 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -104,6 +104,8 @@ private[spark] class Master( var leaderElectionAgent: ActorRef = _ + private var recoverCallable: Cancellable = _ + // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. @@ -152,6 +154,10 @@ private[spark] class Master( } override def postStop() { + // prevent the CompleteRecovery message sending to restarted master + if (recoverCallable != null) { + recoverCallable.cancel() + } webUi.stop() fileSystemsUsed.foreach(_.close()) masterMetricsSystem.stop() @@ -171,11 +177,12 @@ private[spark] class Master( logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedDrivers, storedWorkers) - context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } + recoverCallable = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) + { self ! CompleteRecovery } } } - case TriggerSchedule => schedule() + case CompleteRecovery => completeRecovery() case RevokedLeadership => { logError("Leadership has been revoked -- master shutting down.") @@ -450,7 +457,7 @@ private[spark] class Master( } state = RecoveryState.ALIVE - self ! TriggerSchedule + schedule() logInfo("Recovery complete - resuming operations!") } @@ -467,7 +474,7 @@ private[spark] class Master( * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */ - def schedule() { + private def schedule() { if (state != RecoveryState.ALIVE) { return } // First schedule drivers, they take strict precedence over applications @@ -487,7 +494,7 @@ private[spark] class Master( // Try to spread out each app among all the nodes, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(canUse(app, _)).sortBy(_.coresFree).reverse + .filter(canUse(app, _)).sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) From 36a2ac0cfde233468970c80a212e1299cd7097d7 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 9 May 2014 16:00:19 -0400 Subject: [PATCH 3/5] address Aaron's comments --- .../src/main/scala/org/apache/spark/deploy/master/Master.scala | 3 +-- .../scala/org/apache/spark/deploy/master/MasterMessages.scala | 2 -- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index d8381ae431f09..e28b94e6419f0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -177,8 +177,7 @@ private[spark] class Master( logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedDrivers, storedWorkers) - recoverCallable = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) - { self ! CompleteRecovery } + recoverCallable = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, CompleteRecovery) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala index ad848bb9862bd..db72d8ae9bdaf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala @@ -39,6 +39,4 @@ private[master] object MasterMessages { case object RequestWebUIPort case class WebUIPortResponse(webUIBoundPort: Int) - - case object TriggerSchedule } From 69e0a2a4e1ff81908b3d6a888ca30416f8136e24 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 9 May 2014 16:07:54 -0400 Subject: [PATCH 4/5] style fix --- .../src/main/scala/org/apache/spark/deploy/master/Master.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index e28b94e6419f0..58cada751a2a3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -177,7 +177,8 @@ private[spark] class Master( logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedDrivers, storedWorkers) - recoverCallable = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, CompleteRecovery) + recoverCallable = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, + CompleteRecovery) } } From 81bb4cad05a3d8600c1930fc0ed727c4bd0119c8 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 9 May 2014 20:48:57 -0400 Subject: [PATCH 5/5] rename variable --- .../scala/org/apache/spark/deploy/master/Master.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 58cada751a2a3..f254f5585ba25 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -104,7 +104,7 @@ private[spark] class Master( var leaderElectionAgent: ActorRef = _ - private var recoverCallable: Cancellable = _ + private var recoveryCompletionTask: Cancellable = _ // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app @@ -155,8 +155,8 @@ private[spark] class Master( override def postStop() { // prevent the CompleteRecovery message sending to restarted master - if (recoverCallable != null) { - recoverCallable.cancel() + if (recoveryCompletionTask != null) { + recoveryCompletionTask.cancel() } webUi.stop() fileSystemsUsed.foreach(_.close()) @@ -177,7 +177,7 @@ private[spark] class Master( logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedDrivers, storedWorkers) - recoverCallable = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, + recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, CompleteRecovery) } }