Skip to content

SPARK-2425 Don't kill a still-running Application because of some misbehaving Executors #1360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ private[spark] class ApplicationInfo(

def retryCount = _retryCount

def incrementRetryCount = {
def incrementRetryCount() = {
_retryCount += 1
_retryCount
}

def resetRetryCount() = _retryCount = 0

def markFinished(endState: ApplicationState.Value) {
state = endState
endTime = System.currentTimeMillis()
Expand Down
26 changes: 16 additions & 10 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -296,28 +296,34 @@ private[spark] class Master(
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) => {
val appInfo = idToApp(appId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will crash if idToApp doesn't already contain appId. Maybe we should log a warning instead of throwing a key not found exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I realize you moved this from elsewhere. It would still be good to add a safeguard

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not necessary. If idToApp does not contain appId, then execOption will be None and we'll never end up in this case Some(exec).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you're right

exec.state = state
if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
if (ExecutorState.isFinished(state)) {
val appInfo = idToApp(appId)
// Remove this executor from the worker and app
logInfo("Removing executor " + exec.fullId + " because it is " + state)
logInfo(s"Removing executor ${exec.fullId} because it is $state")
appInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)

val normalExit = exitStatus.exists(_ == 0)
val normalExit = exitStatus == Some(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to change this? The old code seems pretty clear

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just seemed even clearer to directly compare Options instead of using a HOF.

// Only retry certain number of times so we don't go into an infinite loop.
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else if (!normalExit) {
logError("Application %s with ID %s failed %d times, removing it".format(
appInfo.desc.name, appInfo.id, appInfo.retryCount))
removeApplication(appInfo, ApplicationState.FAILED)
if (!normalExit) {
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else {
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
}
}
case None =>
logWarning("Got status update for unknown executor " + appId + "/" + execId)
logWarning(s"Got status update for unknown executor $appId/$execId")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ private[spark] class ExecutorRunner(
Files.write(header, stderr, Charsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

state = ExecutorState.RUNNING
worker ! ExecutorStateChanged(appId, execId, state, None, None)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private[spark] class Worker(
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING)
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this part of fixing the bug, or just to make the logic in the existing state machine clearer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's part of the fix. ExecutorRunners now start in state LOADING and don't become RUNNING until their process is running and stderr && stdout are successfully opened -- see the changes in this PR to ExecutorRunner.scala. If ExecutorRunners were to continue to be created in state RUNNING, then the new check for RUNNING Executors in Master.scala will always prevent Applications from being killed even though useful Executors doing actual work may not exist.

executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
Expand Down