@@ -65,6 +65,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
65
65
private val maxNumWorkerFailures = sparkConf.getInt(" spark.yarn.max.worker.failures" ,
66
66
math.max(args.numWorkers * 2 , 3 ))
67
67
68
+ private var registered = false
69
+
68
70
def run () {
69
71
// Setup the directories so things go to yarn approved directories rather
70
72
// then user specified and /tmp.
@@ -110,7 +112,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
110
112
waitForSparkContextInitialized()
111
113
112
114
// Do this after spark master is up and SparkContext is created so that we can register UI Url
113
- val appMasterResponse : RegisterApplicationMasterResponse = registerApplicationMaster()
115
+ synchronized {
116
+ if (! isFinished) {
117
+ registerApplicationMaster()
118
+ registered = true
119
+ }
120
+ }
114
121
115
122
// Allocate all containers
116
123
allocateWorkers()
@@ -208,7 +215,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
208
215
var count = 0
209
216
val waitTime = 10000L
210
217
val numTries = sparkConf.getInt(" spark.yarn.ApplicationMaster.waitTries" , 10 )
211
- while (ApplicationMaster .sparkContextRef.get() == null && count < numTries) {
218
+ while (ApplicationMaster .sparkContextRef.get() == null && count < numTries
219
+ && ! isFinished) {
212
220
logInfo(" Waiting for spark context initialization ... " + count)
213
221
count = count + 1
214
222
ApplicationMaster .sparkContextRef.wait(waitTime)
@@ -341,17 +349,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
341
349
return
342
350
}
343
351
isFinished = true
352
+
353
+ logInfo(" finishApplicationMaster with " + status)
354
+ if (registered) {
355
+ val finishReq = Records .newRecord(classOf [FinishApplicationMasterRequest ])
356
+ .asInstanceOf [FinishApplicationMasterRequest ]
357
+ finishReq.setAppAttemptId(appAttemptId)
358
+ finishReq.setFinishApplicationStatus(status)
359
+ finishReq.setDiagnostics(diagnostics)
360
+ // Set tracking url to empty since we don't have a history server.
361
+ finishReq.setTrackingUrl(" " )
362
+ resourceManager.finishApplicationMaster(finishReq)
363
+ }
344
364
}
345
-
346
- logInfo(" finishApplicationMaster with " + status)
347
- val finishReq = Records .newRecord(classOf [FinishApplicationMasterRequest ])
348
- .asInstanceOf [FinishApplicationMasterRequest ]
349
- finishReq.setAppAttemptId(appAttemptId)
350
- finishReq.setFinishApplicationStatus(status)
351
- finishReq.setDiagnostics(diagnostics)
352
- // Set tracking url to empty since we don't have a history server.
353
- finishReq.setTrackingUrl(" " )
354
- resourceManager.finishApplicationMaster(finishReq)
355
365
}
356
366
357
367
/**
0 commit comments