Skip to content

Commit 0afd696

Browse files
author
Marcelo Vanzin
committed
Wait until master responds before returning from start().
This allows the application ID set by the master to be included in the SparkListenerApplicationStart event. This should affect job scheduling because tasks can only be submitted after executors register, which will happen after the client registers with the master anyway. (This is similar to what the Mesos backend does to implement the same behavior.)
1 parent abc4697 commit 0afd696

File tree

1 file changed

+23
-0
lines changed

1 file changed

+23
-0
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ private[spark] class SparkDeploySchedulerBackend(
3636
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
3737
var appId: String = _
3838

39+
val registrationLock = new Object()
40+
var registrationDone = false
41+
3942
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
4043

4144
override def start() {
@@ -64,6 +67,8 @@ private[spark] class SparkDeploySchedulerBackend(
6467

6568
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
6669
client.start()
70+
71+
waitForRegistration()
6772
}
6873

6974
override def stop() {
@@ -78,15 +83,18 @@ private[spark] class SparkDeploySchedulerBackend(
7883
override def connected(appId: String) {
7984
logInfo("Connected to Spark cluster with app ID " + appId)
8085
this.appId = appId
86+
notifyRegistered()
8187
}
8288

8389
override def disconnected() {
90+
notifyRegistered()
8491
if (!stopping) {
8592
logWarning("Disconnected from Spark cluster! Waiting for reconnection...")
8693
}
8794
}
8895

8996
override def dead(reason: String) {
97+
notifyRegistered()
9098
if (!stopping) {
9199
logError("Application has been killed. Reason: " + reason)
92100
scheduler.error(reason)
@@ -113,4 +121,19 @@ private[spark] class SparkDeploySchedulerBackend(
113121

114122
override def applicationId(): Option[String] = Some(appId)
115123

124+
private def waitForRegistration() = {
125+
registrationLock.synchronized {
126+
while (!registrationDone) {
127+
registrationLock.wait()
128+
}
129+
}
130+
}
131+
132+
private def notifyRegistered() = {
133+
registrationLock.synchronized {
134+
registrationDone = true
135+
registrationLock.notifyAll()
136+
}
137+
}
138+
116139
}

0 commit comments

Comments
 (0)