Skip to content

Commit 47598f2

Browse files
committed
SPARK-2425 Don't kill a still-running Application because of some misbehaving Executors
Introduces a LOADING -> RUNNING ApplicationState transition and prevents Master from removing an Application with RUNNING Executors. Two basic changes: 1) Instead of allowing MAX_NUM_RETRY abnormal Executor exits over the entire lifetime of the Application, allow that many since any Executor successfully began running the Application; 2) Don't remove the Application while Master still thinks that there are RUNNING Executors. This should be fine as long as the ApplicationInfo doesn't believe any Executors are forever RUNNING when they are not. I think that any non-RUNNING Executors will eventually no longer be RUNNING in Master's accounting, but another set of eyes should confirm that. This PR also doesn't try to detect which nodes have gone rogue or to kill off bad Workers, so repeatedly failing Executors will continue to fail and fill up log files with failure reports as long as the Application keeps running. Author: Mark Hamstra <[email protected]> Closes apache#1360 from markhamstra/SPARK-2425 and squashes the following commits: f099c0b [Mark Hamstra] Reuse appInfo b2b7b25 [Mark Hamstra] Moved 'Application failed' logging bdd0928 [Mark Hamstra] switched to string interpolation 1dd591b [Mark Hamstra] SPARK-2425 introduce LOADING -> RUNNING ApplicationState transition and prevent Master from removing Application with RUNNING Executors Conflicts: core/src/main/scala/org/apache/spark/deploy/master/Master.scala core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
1 parent 1c696dc commit 47598f2

File tree

4 files changed

+46
-25
lines changed

4 files changed

+46
-25
lines changed

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,13 @@ private[spark] class ApplicationInfo(
9090

9191
def retryCount = _retryCount
9292

93-
def incrementRetryCount = {
93+
def incrementRetryCount() = {
9494
_retryCount += 1
9595
_retryCount
9696
}
9797

98+
def resetRetryCount() = _retryCount = 0
99+
98100
def markFinished(endState: ApplicationState.Value) {
99101
state = endState
100102
endTime = System.currentTimeMillis()

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -264,27 +264,34 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
264264
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
265265
execOption match {
266266
case Some(exec) => {
267+
val appInfo = idToApp(appId)
267268
exec.state = state
269+
if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
268270
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
269271
if (ExecutorState.isFinished(state)) {
270-
val appInfo = idToApp(appId)
271272
// Remove this executor from the worker and app
272-
logInfo("Removing executor " + exec.fullId + " because it is " + state)
273+
logInfo(s"Removing executor ${exec.fullId} because it is $state")
273274
appInfo.removeExecutor(exec)
274275
exec.worker.removeExecutor(exec)
275276

277+
val normalExit = exitStatus == Some(0)
276278
// Only retry certain number of times so we don't go into an infinite loop.
277-
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
278-
schedule()
279-
} else {
280-
logError("Application %s with ID %s failed %d times, removing it".format(
281-
appInfo.desc.name, appInfo.id, appInfo.retryCount))
282-
removeApplication(appInfo, ApplicationState.FAILED)
279+
if (!normalExit) {
280+
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
281+
schedule()
282+
} else {
283+
val execs = appInfo.executors.values
284+
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
285+
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
286+
s"${appInfo.retryCount} times; removing it")
287+
removeApplication(appInfo, ApplicationState.FAILED)
288+
}
289+
}
283290
}
284291
}
285292
}
286293
case None =>
287-
logWarning("Got status update for unknown executor " + appId + "/" + execId)
294+
logWarning(s"Got status update for unknown executor $appId/$execId")
288295
}
289296
}
290297

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,10 @@ private[spark] class ExecutorRunner(
139139
Files.write(header, stderr, Charsets.UTF_8)
140140
CommandUtils.redirectStream(process.getErrorStream, stderr)
141141

142-
// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
143-
// long-lived processes only. However, in the future, we might restart the executor a few
144-
// times on the same machine.
142+
state = ExecutorState.RUNNING
143+
worker ! ExecutorStateChanged(appId, execId, state, None, None)
144+
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
145+
// or with nonzero exit code
145146
val exitCode = process.waitFor()
146147
state = ExecutorState.FAILED
147148
val message = "Command exited with code " + exitCode

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -211,18 +211,29 @@ private[spark] class Worker(
211211
if (masterUrl != activeMasterUrl) {
212212
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
213213
} else {
214-
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
215-
// TODO (pwendell): We shuld make sparkHome an Option[String] in
216-
// ApplicationDescription to be more explicit about this.
217-
val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
218-
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
219-
self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
220-
executors(appId + "/" + execId) = manager
221-
manager.start()
222-
coresUsed += cores_
223-
memoryUsed += memory_
224-
masterLock.synchronized {
225-
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
214+
try {
215+
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
216+
val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
217+
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
218+
self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, conf, ExecutorState.LOADING)
219+
executors(appId + "/" + execId) = manager
220+
manager.start()
221+
coresUsed += cores_
222+
memoryUsed += memory_
223+
masterLock.synchronized {
224+
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
225+
}
226+
} catch {
227+
case e: Exception => {
228+
logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
229+
if (executors.contains(appId + "/" + execId)) {
230+
executors(appId + "/" + execId).kill()
231+
executors -= appId + "/" + execId
232+
}
233+
masterLock.synchronized {
234+
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
235+
}
236+
}
226237
}
227238
}
228239

0 commit comments

Comments
 (0)