Skip to content

Commit fa1fa80

Browse files
submit app to HA cluster in standalone cluster mode
1 parent bfd3ee9 commit fa1fa80

File tree

6 files changed

+149
-103
lines changed

6 files changed

+149
-103
lines changed

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,15 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
3535
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
3636
extends Actor with ActorLogReceive with Logging {
3737

38-
var masterActor: ActorSelection = _
38+
var mastersActor = driverArgs.masters.map { m =>
39+
context.actorSelection(Master.toAkkaUrl(m, AkkaUtils.protocol(context.system)))
40+
}
3941
val timeout = AkkaUtils.askTimeout(conf)
4042

4143
override def preStart() = {
42-
masterActor = context.actorSelection(
43-
Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system)))
44-
4544
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
4645

47-
println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
46+
println(s"Sending ${driverArgs.cmd} command to ${driverArgs.masters}")
4847

4948
driverArgs.cmd match {
5049
case "launch" =>
@@ -79,10 +78,12 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
7978
driverArgs.supervise,
8079
command)
8180

81+
for (masterActor <- mastersActor)
8282
masterActor ! RequestSubmitDriver(driverDescription)
8383

8484
case "kill" =>
8585
val driverId = driverArgs.driverId
86+
for (masterActor <- mastersActor)
8687
masterActor ! RequestKillDriver(driverId)
8788
}
8889
}
@@ -92,48 +93,61 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
9293
println(s"... waiting before polling master for driver state")
9394
Thread.sleep(5000)
9495
println("... polling master for driver state")
95-
val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
96-
.mapTo[DriverStatusResponse]
97-
val statusResponse = Await.result(statusFuture, timeout)
98-
99-
statusResponse.found match {
100-
case false =>
101-
println(s"ERROR: Cluster master did not recognize $driverId")
102-
System.exit(-1)
103-
case true =>
104-
println(s"State of $driverId is ${statusResponse.state.get}")
105-
// Worker node, if present
106-
(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
107-
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
108-
println(s"Driver running on $hostPort ($id)")
109-
case _ =>
110-
}
111-
// Exception, if present
112-
statusResponse.exception.map { e =>
113-
println(s"Exception from cluster was: $e")
114-
e.printStackTrace()
115-
System.exit(-1)
116-
}
117-
System.exit(0)
96+
for (masterActor <- mastersActor) {
97+
val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
98+
.mapTo[DriverStatusResponse]
99+
val statusResponse = Await.result(statusFuture, timeout)
100+
101+
statusResponse.found match {
102+
case false =>
103+
statusResponse.exception.getOrElse {
104+
println(s"ERROR: Cluster master did not recognize $driverId")
105+
System.exit(-1)
106+
}
107+
case true =>
108+
println(s"State of $driverId is ${statusResponse.state.get}")
109+
// Worker node, if present
110+
(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
111+
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
112+
println(s"Driver running on $hostPort ($id)")
113+
case _ =>
114+
}
115+
// Exception, if present
116+
statusResponse.exception.map { e =>
117+
println(s"Exception from cluster was: $e")
118+
e.printStackTrace()
119+
System.exit(-1)
120+
}
121+
System.exit(0)
122+
}
118123
}
119124
}
120125

121126
override def receiveWithLogging = {
122127

123128
case SubmitDriverResponse(success, driverId, message) =>
124129
println(message)
125-
if (success) pollAndReportStatus(driverId.get) else System.exit(-1)
130+
if (success) {
131+
pollAndReportStatus(driverId.get)
132+
} else if (!message.contains("Can only")) {
133+
System.exit(-1)
134+
}
135+
126136

127137
case KillDriverResponse(driverId, success, message) =>
128138
println(message)
129-
if (success) pollAndReportStatus(driverId) else System.exit(-1)
139+
if (success) {
140+
pollAndReportStatus(driverId)
141+
} else if (!message.contains("Can only")) {
142+
System.exit(-1)
143+
}
130144

131145
case DisassociatedEvent(_, remoteAddress, _) =>
132-
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
146+
println(s"Error connecting to master ${driverArgs.masters} ($remoteAddress), exiting.")
133147
System.exit(-1)
134148

135149
case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
136-
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
150+
println(s"Error connecting to master ${driverArgs.masters} ($remoteAddress), exiting.")
137151
println(s"Cause was: $cause")
138152
System.exit(-1)
139153
}
@@ -163,7 +177,9 @@ object Client {
163177
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
164178

165179
// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
166-
Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem))
180+
for (m <- driverArgs.masters) {
181+
Master.toAkkaUrl(m, AkkaUtils.protocol(actorSystem))
182+
}
167183
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
168184

169185
actorSystem.awaitTermination()

core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ private[deploy] class ClientArguments(args: Array[String]) {
3535
var logLevel = Level.WARN
3636

3737
// launch parameters
38-
var master: String = ""
38+
var masters: Array[String] = null
3939
var jarUrl: String = ""
4040
var mainClass: String = ""
4141
var supervise: Boolean = DEFAULT_SUPERVISE
@@ -80,13 +80,13 @@ private[deploy] class ClientArguments(args: Array[String]) {
8080
}
8181

8282
jarUrl = _jarUrl
83-
master = _master
83+
masters = _master.stripPrefix("spark://").split(",").map("spark://" + _)
8484
mainClass = _mainClass
8585
_driverOptions ++= tail
8686

8787
case "kill" :: _master :: _driverId :: tail =>
8888
cmd = "kill"
89-
master = _master
89+
masters = _master.stripPrefix("spark://").split(",").map("spark://" + _)
9090
driverId = _driverId
9191

9292
case _ =>

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,17 +115,17 @@ object SparkSubmit {
115115

116116
/** Kill an existing submission using the REST protocol. Standalone cluster mode only. */
117117
private def kill(args: SparkSubmitArguments): Unit = {
118-
new StandaloneRestClient()
119-
.killSubmission(args.master, args.submissionToKill)
118+
new StandaloneRestClient(args.master)
119+
.killSubmission(args.submissionToKill)
120120
}
121121

122122
/**
123123
* Request the status of an existing submission using the REST protocol.
124124
* Standalone cluster mode only.
125125
*/
126126
private def requestStatus(args: SparkSubmitArguments): Unit = {
127-
new StandaloneRestClient()
128-
.requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
127+
new StandaloneRestClient(args.master)
128+
.requestSubmissionStatus(args.submissionToRequestStatusFor)
129129
}
130130

131131
/**

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -305,12 +305,17 @@ private[master] class Master(
305305
}
306306

307307
case RequestDriverStatus(driverId) => {
308-
(drivers ++ completedDrivers).find(_.id == driverId) match {
309-
case Some(driver) =>
310-
sender ! DriverStatusResponse(found = true, Some(driver.state),
311-
driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
312-
case None =>
313-
sender ! DriverStatusResponse(found = false, None, None, None, None)
308+
if (state != RecoveryState.ALIVE) {
309+
val msg = s"Can only request driver status in ALIVE state. Current state: $state."
310+
sender ! DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg)))
311+
} else {
312+
(drivers ++ completedDrivers).find(_.id == driverId) match {
313+
case Some(driver) =>
314+
sender ! DriverStatusResponse(found = true, Some(driver.state),
315+
driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
316+
case None =>
317+
sender ! DriverStatusResponse(found = false, None, None, None, None)
318+
}
314319
}
315320
}
316321

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala

Lines changed: 60 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -52,55 +52,82 @@ import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
5252
* implementation of this client can use that information to retry using the version specified
5353
* by the server.
5454
*/
55-
private[deploy] class StandaloneRestClient extends Logging {
55+
private[deploy] class StandaloneRestClient(master: String) extends Logging {
5656
import StandaloneRestClient._
5757

58+
val masters: Array[String] = master.stripPrefix("spark://").split(",").map("spark://" + _)
59+
5860
/**
5961
* Submit an application specified by the parameters in the provided request.
6062
*
6163
* If the submission was successful, poll the status of the submission and report
6264
* it to the user. Otherwise, report the error message provided by the server.
6365
*/
6466
private[rest] def createSubmission(
65-
master: String,
6667
request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
6768
logInfo(s"Submitting a request to launch an application in $master.")
68-
validateMaster(master)
69-
val url = getSubmitUrl(master)
70-
val response = postJson(url, request.toJson)
71-
response match {
72-
case s: CreateSubmissionResponse =>
73-
reportSubmissionStatus(master, s)
74-
handleRestResponse(s)
75-
case unexpected =>
76-
handleUnexpectedRestResponse(unexpected)
69+
var suc: Boolean = false
70+
var response: SubmitRestProtocolResponse = null
71+
for (m <- masters if !suc) {
72+
validateMaster(m)
73+
val url = getSubmitUrl(m)
74+
response = postJson(url, request.toJson)
75+
response match {
76+
case s: CreateSubmissionResponse =>
77+
if (s.success) {
78+
reportSubmissionStatus(s)
79+
handleRestResponse(s)
80+
suc = true
81+
}
82+
case unexpected =>
83+
handleUnexpectedRestResponse(unexpected)
84+
}
7785
}
7886
response
7987
}
8088

8189
/** Request that the server kill the specified submission. */
82-
def killSubmission(master: String, submissionId: String): SubmitRestProtocolResponse = {
90+
def killSubmission(submissionId: String): SubmitRestProtocolResponse = {
8391
logInfo(s"Submitting a request to kill submission $submissionId in $master.")
84-
validateMaster(master)
85-
val response = post(getKillUrl(master, submissionId))
86-
response match {
87-
case k: KillSubmissionResponse => handleRestResponse(k)
88-
case unexpected => handleUnexpectedRestResponse(unexpected)
92+
var suc: Boolean = false
93+
var response: SubmitRestProtocolResponse = null
94+
for (m <- masters if !suc) {
95+
validateMaster(m)
96+
response = post(getKillUrl(m, submissionId))
97+
response match {
98+
case k: KillSubmissionResponse =>
99+
if (!k.message.contains("Can only")) {
100+
handleRestResponse(k)
101+
suc = true
102+
}
103+
case unexpected =>
104+
handleUnexpectedRestResponse(unexpected)
105+
}
89106
}
90107
response
91108
}
92109

93110
/** Request the status of a submission from the server. */
94111
def requestSubmissionStatus(
95-
master: String,
96112
submissionId: String,
97113
quiet: Boolean = false): SubmitRestProtocolResponse = {
98114
logInfo(s"Submitting a request for the status of submission $submissionId in $master.")
99-
validateMaster(master)
100-
val response = get(getStatusUrl(master, submissionId))
101-
response match {
102-
case s: SubmissionStatusResponse => if (!quiet) { handleRestResponse(s) }
103-
case unexpected => handleUnexpectedRestResponse(unexpected)
115+
var suc: Boolean = false
116+
var response: SubmitRestProtocolResponse = null
117+
for (m <- masters) {
118+
validateMaster(m)
119+
response = get(getStatusUrl(m, submissionId))
120+
response match {
121+
case s: SubmissionStatusResponse =>
122+
if (!s.message.contains("Can only")) {
123+
if (!quiet) {
124+
handleRestResponse(s)
125+
}
126+
suc = true
127+
}
128+
case unexpected =>
129+
handleUnexpectedRestResponse(unexpected)
130+
}
104131
}
105132
response
106133
}
@@ -228,30 +255,24 @@ private[deploy] class StandaloneRestClient extends Logging {
228255

229256
/** Report the status of a newly created submission. */
230257
private def reportSubmissionStatus(
231-
master: String,
232258
submitResponse: CreateSubmissionResponse): Unit = {
233-
if (submitResponse.success) {
234-
val submissionId = submitResponse.submissionId
235-
if (submissionId != null) {
236-
logInfo(s"Submission successfully created as $submissionId. Polling submission state...")
237-
pollSubmissionStatus(master, submissionId)
238-
} else {
239-
// should never happen
240-
logError("Application successfully submitted, but submission ID was not provided!")
241-
}
259+
val submissionId = submitResponse.submissionId
260+
if (submissionId != null) {
261+
logInfo(s"Submission successfully created as $submissionId. Polling submission state...")
262+
pollSubmissionStatus(submissionId)
242263
} else {
243-
val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("")
244-
logError("Application submission failed" + failMessage)
264+
// should never happen
265+
logError("Application successfully submitted, but submission ID was not provided!")
245266
}
246267
}
247268

248269
/**
249270
* Poll the status of the specified submission and log it.
250271
* This retries up to a fixed number of times before giving up.
251272
*/
252-
private def pollSubmissionStatus(master: String, submissionId: String): Unit = {
273+
private def pollSubmissionStatus(submissionId: String): Unit = {
253274
(1 to REPORT_DRIVER_STATUS_MAX_TRIES).foreach { _ =>
254-
val response = requestSubmissionStatus(master, submissionId, quiet = true)
275+
val response = requestSubmissionStatus(submissionId, quiet = true)
255276
val statusResponse = response match {
256277
case s: SubmissionStatusResponse => s
257278
case _ => return // unexpected type, let upstream caller handle it
@@ -311,10 +332,10 @@ private[rest] object StandaloneRestClient {
311332
}
312333
val sparkProperties = conf.getAll.toMap
313334
val environmentVariables = env.filter { case (k, _) => k.startsWith("SPARK_") }
314-
val client = new StandaloneRestClient
335+
val client = new StandaloneRestClient(master)
315336
val submitRequest = client.constructSubmitRequest(
316337
appResource, mainClass, appArgs, sparkProperties, environmentVariables)
317-
client.createSubmission(master, submitRequest)
338+
client.createSubmission(submitRequest)
318339
}
319340

320341
def main(args: Array[String]): Unit = {

0 commit comments

Comments
 (0)