Skip to content

Commit 96b2785

Browse files
markhamstraJoshRosen
authored andcommitted
[SPARK-4498][core] Don't transition ExecutorInfo to RUNNING until Driver adds Executor
The ExecutorInfo only reaches the RUNNING state if the Driver is alive to send the ExecutorStateChanged message to master. Else, appInfo.resetRetryCount() is never called and failing Executors will eventually exceed ApplicationState.MAX_NUM_RETRY, resulting in the application being removed from the master's accounting. JoshRosen Author: Mark Hamstra <[email protected]> Closes #3550 from markhamstra/SPARK-4498 and squashes the following commits: 8f543b1 [Mark Hamstra] Don't transition ExecutorInfo to RUNNING until Executor is added by Driver
1 parent 513ef82 commit 96b2785

File tree

2 files changed

+1
-2
lines changed

2 files changed

+1
-2
lines changed

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ private[spark] class AppClient(
134134
val fullId = appId + "/" + id
135135
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
136136
cores))
137+
master ! ExecutorStateChanged(appId, id, ExecutorState.RUNNING, None, None)
137138
listener.executorAdded(fullId, workerId, hostPort, cores, memory)
138139

139140
case ExecutorUpdated(id, state, message, exitStatus) =>

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,6 @@ private[spark] class ExecutorRunner(
144144
Files.write(header, stderr, UTF_8)
145145
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
146146

147-
state = ExecutorState.RUNNING
148-
worker ! ExecutorStateChanged(appId, execId, state, None, None)
149147
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
150148
// or with nonzero exit code
151149
val exitCode = process.waitFor()

0 commit comments

Comments
 (0)