Skip to content

Commit 9d636be

Browse files
per Andrew's comments
1 parent 979760c commit 9d636be

File tree

7 files changed

+77
-73
lines changed

7 files changed

+77
-73
lines changed

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
3333
/**
3434
* Proxy that relays messages to the driver.
3535
*
36-
* Now we don't support retry in case submission failed. In HA mode, client will submit request to
36+
* We currently don't support retry if submission fails. In HA mode, client will submit request to
3737
* all masters and see which one could handle it.
3838
*/
3939
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
@@ -107,10 +107,8 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
107107
val statusResponse = Await.result(statusFuture, timeout)
108108
statusResponse.found match {
109109
case false =>
110-
statusResponse.exception.getOrElse {
111-
println(s"ERROR: Cluster master did not recognize $driverId")
112-
System.exit(-1)
113-
}
110+
println(s"ERROR: Cluster master did not recognize $driverId")
111+
System.exit(-1)
114112
case true =>
115113
println(s"State of $driverId is ${statusResponse.state.get}")
116114
// Worker node, if present
@@ -136,7 +134,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
136134
if (success) {
137135
activeMasterActor = context.actorSelection(sender.path)
138136
pollAndReportStatus(driverId.get)
139-
} else if (!message.startsWith(Utils.MASTER_NOT_ALIVE_STRING)) {
137+
} else if (!Utils.responseFromBackup(message)) {
140138
System.exit(-1)
141139
}
142140

@@ -146,14 +144,17 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
146144
if (success) {
147145
activeMasterActor = context.actorSelection(sender.path)
148146
pollAndReportStatus(driverId)
149-
} else if (!message.startsWith(Utils.MASTER_NOT_ALIVE_STRING)) {
147+
} else if (!Utils.responseFromBackup(message)) {
150148
System.exit(-1)
151149
}
152150

153151
case DisassociatedEvent(_, remoteAddress, _) =>
154152
if (!lostMasters.contains(remoteAddress)) {
155153
println(s"Error connecting to master $remoteAddress.")
156154
lostMasters += remoteAddress
155+
// Note that this heuristic does not account for the fact that a Master can recover within
156+
// the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This
157+
// is not currently a concern, however, because this client does not retry submissions.
157158
if (lostMasters.size >= masterActors.size) {
158159
println("No master is available, exiting.")
159160
System.exit(-1)

core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,13 @@ private[deploy] class ClientArguments(args: Array[String]) {
7979
}
8080

8181
jarUrl = _jarUrl
82-
masters = Utils.splitMasterAdress(_master)
82+
masters = Utils.parseStandaloneMasterUrls(_master)
8383
mainClass = _mainClass
8484
_driverOptions ++= tail
8585

8686
case "kill" :: _master :: _driverId :: tail =>
8787
cmd = "kill"
88-
masters = Utils.splitMasterAdress(_master)
88+
masters = Utils.parseStandaloneMasterUrls(_master)
8989
driverId = _driverId
9090

9191
case _ =>

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ private[master] class Master(
254254

255255
case RequestSubmitDriver(description) => {
256256
if (state != RecoveryState.ALIVE) {
257-
val msg = s"${Utils.MASTER_NOT_ALIVE_STRING}$state. " +
257+
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
258258
"Can only accept driver submissions in ALIVE state."
259259
sender ! SubmitDriverResponse(false, None, msg)
260260
} else {
@@ -275,7 +275,7 @@ private[master] class Master(
275275

276276
case RequestKillDriver(driverId) => {
277277
if (state != RecoveryState.ALIVE) {
278-
val msg = s"${Utils.MASTER_NOT_ALIVE_STRING}$state. Can only kill drivers in ALIVE state."
278+
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. Can only kill drivers in ALIVE state."
279279
sender ! KillDriverResponse(driverId, success = false, msg)
280280
} else {
281281
logInfo("Asked to kill driver " + driverId)
@@ -307,7 +307,7 @@ private[master] class Master(
307307

308308
case RequestDriverStatus(driverId) => {
309309
if (state != RecoveryState.ALIVE) {
310-
val msg = s"${Utils.MASTER_NOT_ALIVE_STRING}$state. " +
310+
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
311311
"Can only request driver status in ALIVE state."
312312
sender ! DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg)))
313313
} else {

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala

Lines changed: 49 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,14 @@ import org.apache.spark.util.Utils
5353
* is a mismatch, the server will respond with the highest protocol version it supports. A future
5454
* implementation of this client can use that information to retry using the version specified
5555
* by the server.
56-
*
57-
* Now we don't support retry in case submission failed. In HA mode, client will submit request to
58-
* all masters and see which one could handle it.
5956
*/
6057
private[deploy] class StandaloneRestClient(master: String) extends Logging {
6158
import StandaloneRestClient._
6259

63-
private val masters: Array[String] = Utils.splitMasterAdress(master)
60+
private val masters: Array[String] = Utils.parseStandaloneMasterUrls(master)
6461

62+
// Set of masters that lost contact with us, used to keep track of
63+
// whether there are masters still alive for us to communicate with
6564
private val lostMasters = new mutable.HashSet[String]
6665

6766
/**
@@ -73,9 +72,9 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging {
7372
private[rest] def createSubmission(
7473
request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
7574
logInfo(s"Submitting a request to launch an application in $master.")
76-
var suc: Boolean = false
75+
var handled: Boolean = false
7776
var response: SubmitRestProtocolResponse = null
78-
for (m <- masters if !suc) {
77+
for (m <- masters if !handled) {
7978
validateMaster(m)
8079
val url = getSubmitUrl(m)
8180
try {
@@ -85,22 +84,17 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging {
8584
if (s.success) {
8685
reportSubmissionStatus(s)
8786
handleRestResponse(s)
88-
suc = true
87+
handled = true
8988
}
9089
case unexpected =>
9190
handleUnexpectedRestResponse(unexpected)
9291
}
9392
} catch {
9493
case unreachable @ (_: FileNotFoundException | _: SocketException | _: ConnectException) =>
95-
if(handleConnectionException(m)) {
94+
if (handleConnectionException(m)) {
9695
throw new SubmitRestConnectionException(
9796
s"Unable to connect to server", unreachable)
9897
}
99-
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
100-
if(handleConnectionException(m)) {
101-
throw new SubmitRestProtocolException(
102-
"Malformed response received from server", malformed)
103-
}
10498
}
10599
}
106100
response
@@ -109,33 +103,28 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging {
109103
/** Request that the server kill the specified submission. */
110104
def killSubmission(submissionId: String): SubmitRestProtocolResponse = {
111105
logInfo(s"Submitting a request to kill submission $submissionId in $master.")
112-
var suc: Boolean = false
106+
var handled: Boolean = false
113107
var response: SubmitRestProtocolResponse = null
114-
for (m <- masters if !suc) {
108+
for (m <- masters if !handled) {
115109
validateMaster(m)
116110
val url = getKillUrl(m, submissionId)
117111
try {
118112
response = post(url)
119113
response match {
120114
case k: KillSubmissionResponse =>
121-
if (!k.message.startsWith(Utils.MASTER_NOT_ALIVE_STRING)) {
115+
if (!Utils.responseFromBackup(k.message)) {
122116
handleRestResponse(k)
123-
suc = true
117+
handled = true
124118
}
125119
case unexpected =>
126120
handleUnexpectedRestResponse(unexpected)
127121
}
128122
} catch {
129123
case unreachable @ (_: FileNotFoundException | _: SocketException | _: ConnectException) =>
130-
if(handleConnectionException(m)) {
124+
if (handleConnectionException(m)) {
131125
throw new SubmitRestConnectionException(
132126
s"Unable to connect to server", unreachable)
133127
}
134-
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
135-
if(handleConnectionException(m)) {
136-
throw new SubmitRestProtocolException(
137-
"Malformed response received from server", malformed)
138-
}
139128
}
140129
}
141130
response
@@ -146,9 +135,9 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging {
146135
submissionId: String,
147136
quiet: Boolean = false): SubmitRestProtocolResponse = {
148137
logInfo(s"Submitting a request for the status of submission $submissionId in $master.")
149-
var suc: Boolean = false
138+
var handled: Boolean = false
150139
var response: SubmitRestProtocolResponse = null
151-
for (m <- masters) {
140+
for (m <- masters if !handled) {
152141
validateMaster(m)
153142
val url = getStatusUrl(m, submissionId)
154143
try {
@@ -158,21 +147,16 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging {
158147
if (!quiet) {
159148
handleRestResponse(s)
160149
}
161-
suc = true
150+
handled = true
162151
case unexpected =>
163152
handleUnexpectedRestResponse(unexpected)
164153
}
165154
} catch {
166155
case unreachable @ (_: FileNotFoundException | _: SocketException | _: ConnectException) =>
167-
if(handleConnectionException(m)) {
156+
if (handleConnectionException(m)) {
168157
throw new SubmitRestConnectionException(
169158
s"Unable to connect to server", unreachable)
170159
}
171-
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
172-
if(handleConnectionException(m)) {
173-
throw new SubmitRestProtocolException(
174-
"Malformed response received from server", malformed)
175-
}
176160
}
177161
}
178162
response
@@ -235,30 +219,35 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging {
235219
* Exposed for testing.
236220
*/
237221
private[rest] def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = {
238-
val dataStream =
239-
if (connection.getResponseCode == HttpServletResponse.SC_OK) {
240-
connection.getInputStream
241-
} else {
242-
connection.getErrorStream
222+
try {
223+
val dataStream =
224+
if (connection.getResponseCode == HttpServletResponse.SC_OK) {
225+
connection.getInputStream
226+
} else {
227+
connection.getErrorStream
228+
}
229+
// If the server threw an exception while writing a response, it will not have a body
230+
if (dataStream == null) {
231+
throw new SubmitRestProtocolException("Server returned empty body")
243232
}
244-
// If the server threw an exception while writing a response, it will not have a body
245-
if (dataStream == null) {
246-
throw new SubmitRestProtocolException("Server returned empty body")
247-
}
248-
val responseJson = Source.fromInputStream(dataStream).mkString
249-
logDebug(s"Response from the server:\n$responseJson")
250-
val response = SubmitRestProtocolMessage.fromJson(responseJson)
251-
response.validate()
252-
response match {
253-
// If the response is an error, log the message
254-
case error: ErrorResponse =>
255-
logError(s"Server responded with error:\n${error.message}")
256-
error
257-
// Otherwise, simply return the response
258-
case response: SubmitRestProtocolResponse => response
259-
case unexpected =>
260-
throw new SubmitRestProtocolException(
261-
s"Message received from server was not a response:\n${unexpected.toJson}")
233+
val responseJson = Source.fromInputStream(dataStream).mkString
234+
logDebug(s"Response from the server:\n$responseJson")
235+
val response = SubmitRestProtocolMessage.fromJson(responseJson)
236+
response.validate()
237+
response match {
238+
// If the response is an error, log the message
239+
case error: ErrorResponse =>
240+
logError(s"Server responded with error:\n${error.message}")
241+
error
242+
// Otherwise, simply return the response
243+
case response: SubmitRestProtocolResponse => response
244+
case unexpected =>
245+
throw new SubmitRestProtocolException(
246+
s"Message received from server was not a response:\n${unexpected.toJson}")
247+
}
248+
} catch {
249+
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
250+
throw new SubmitRestProtocolException("Malformed response received from server", malformed)
262251
}
263252
}
264253

@@ -357,7 +346,11 @@ private[deploy] class StandaloneRestClient(master: String) extends Logging {
357346
}
358347

359348
/**
360-
* When a connection exception was caught, we see whether all masters are lost.
349+
* When a connection exception is caught, return true if all masters are lost.
350+
* Note that the heuristic used here does not take into account that masters
351+
* can recover during the lifetime of this client. This assumption should be
352+
* harmless because this client currently does not support retrying submission
353+
* on failure yet (SPARK-6443).
361354
*/
362355
private def handleConnectionException(masterUrl: String): Boolean = {
363356
if (!lostMasters.contains(masterUrl)) {

core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
105105
if (masters != null) { // Two positional arguments were given
106106
printUsageAndExit(1)
107107
}
108-
masters = Utils.splitMasterAdress(value)
108+
masters = Utils.parseStandaloneMasterUrls(value)
109109
parse(tail)
110110

111111
case Nil =>

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2106,11 +2106,21 @@ private[spark] object Utils extends Logging {
21062106
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
21072107
}
21082108

2109-
def splitMasterAdress(masterAddr: String): Array[String] = {
2110-
masterAddr.stripPrefix("spark://").split(",").map("spark://" + _)
2109+
/**
2110+
* Split the comma delimited string of master URLs into a list.
2111+
* For instance, "spark://abc,def" becomes [spark://abc, spark://def].
2112+
*/
2113+
def parseStandaloneMasterUrls(masterUrls: String): Array[String] = {
2114+
masterUrls.stripPrefix("spark://").split(",").map("spark://" + _)
21112115
}
21122116

2113-
val MASTER_NOT_ALIVE_STRING = "Current state is not alive: "
2117+
/** An identifier that backup masters use in their responses. */
2118+
val BACKUP_STANDALONE_MASTER_PREFIX = "Current state is not alive"
2119+
2120+
/** Return true if the response message is sent from a backup Master on standby. */
2121+
def responseFromBackup(msg: String): Boolean = {
2122+
message.startsWith(BACKUP_STANDALONE_MASTER_PREFIX)
2123+
}
21142124
}
21152125

21162126
/**

core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ import javax.servlet.http.HttpServletResponse
2424
import scala.collection.mutable
2525

2626
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
27+
import com.fasterxml.jackson.core.JsonParseException
2728
import com.google.common.base.Charsets
2829
import org.scalatest.{BeforeAndAfterEach, FunSuite}
2930
import org.json4s.JsonAST._
3031
import org.json4s.jackson.JsonMethods._
31-
import com.fasterxml.jackson.core.JsonParseException
3232

3333
import org.apache.spark._
3434
import org.apache.spark.util.{AkkaUtils, Utils}

0 commit comments

Comments
 (0)