Skip to content

Commit 82197ed

Browse files
sarutaksrowen
authored andcommitted
[SPARK-4949]shutdownCallback in SparkDeploySchedulerBackend should be enclosed by synchronized block.
A variable `shutdownCallback` in SparkDeploySchedulerBackend can be accessed from multiple threads so it should be enclosed by synchronized block. Author: Kousuke Saruta <[email protected]> Closes #3781 from sarutak/SPARK-4949 and squashes the following commits: c146c93 [Kousuke Saruta] Removed "setShutdownCallback" method c7265dc [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-4949 42ca528 [Kousuke Saruta] Changed the declaration of the variable "shutdownCallback" as a volatile reference instead of AtomicReference 552df7c [Kousuke Saruta] Changed the declaration of the variable "shutdownCallback" as a volatile reference instead of AtomicReference f556819 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-4949 1b60fd1 [Kousuke Saruta] Improved the locking logics 5942765 [Kousuke Saruta] Enclosed shutdownCallback in SparkDeploySchedulerBackend by synchronized block
1 parent e79a7a6 commit 82197ed

File tree

1 file changed

+16
-19
lines changed

1 file changed

+16
-19
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.scheduler.cluster
1919

20+
import java.util.concurrent.Semaphore
21+
2022
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
2123
import org.apache.spark.deploy.{ApplicationDescription, Command}
2224
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
@@ -31,16 +33,16 @@ private[spark] class SparkDeploySchedulerBackend(
3133
with AppClientListener
3234
with Logging {
3335

34-
var client: AppClient = null
35-
var stopping = false
36-
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
37-
@volatile var appId: String = _
36+
private var client: AppClient = null
37+
private var stopping = false
38+
39+
@volatile var shutdownCallback: SparkDeploySchedulerBackend => Unit = _
40+
@volatile private var appId: String = _
3841

39-
val registrationLock = new Object()
40-
var registrationDone = false
42+
private val registrationBarrier = new Semaphore(0)
4143

42-
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
43-
val totalExpectedCores = maxCores.getOrElse(0)
44+
private val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
45+
private val totalExpectedCores = maxCores.getOrElse(0)
4446

4547
override def start() {
4648
super.start()
@@ -95,8 +97,10 @@ private[spark] class SparkDeploySchedulerBackend(
9597
stopping = true
9698
super.stop()
9799
client.stop()
98-
if (shutdownCallback != null) {
99-
shutdownCallback(this)
100+
101+
val callback = shutdownCallback
102+
if (callback != null) {
103+
callback(this)
100104
}
101105
}
102106

@@ -149,18 +153,11 @@ private[spark] class SparkDeploySchedulerBackend(
149153
}
150154

151155
private def waitForRegistration() = {
152-
registrationLock.synchronized {
153-
while (!registrationDone) {
154-
registrationLock.wait()
155-
}
156-
}
156+
registrationBarrier.acquire()
157157
}
158158

159159
private def notifyContext() = {
160-
registrationLock.synchronized {
161-
registrationDone = true
162-
registrationLock.notifyAll()
163-
}
160+
registrationBarrier.release()
164161
}
165162

166163
}

0 commit comments

Comments
 (0)