@@ -52,6 +52,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
52
52
.asInstanceOf [YarnConfiguration ]
53
53
private val isDriver = args.userClass != null
54
54
55
+ private var exitCode = 0
56
+
55
57
// Default to numExecutors * 2, with minimum of 3
56
58
private val maxNumExecutorFailures = sparkConf.getInt(" spark.yarn.max.executor.failures" ,
57
59
sparkConf.getInt(" spark.yarn.max.worker.failures" , math.max(args.numExecutors * 2 , 3 )))
@@ -95,7 +97,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
95
97
if (sc != null ) {
96
98
logInfo(" Invoking sc stop from shutdown hook" )
97
99
sc.stop()
98
- finish(FinalApplicationStatus .SUCCEEDED )
100
+ }
101
+
102
+ // Shuts down the AM.
103
+ if (! finished) {
104
+ finish(finalStatus)
99
105
}
100
106
101
107
// Cleanup the staging dir after the app is finished, or if it's the last attempt at
@@ -123,13 +129,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
123
129
} else {
124
130
runExecutorLauncher(securityMgr)
125
131
}
126
-
127
- if (finalStatus != FinalApplicationStatus .UNDEFINED ) {
128
- finish(finalStatus)
129
- 0
130
- } else {
131
- 1
132
- }
132
+ exitCode
133
133
}
134
134
135
135
final def finish (status : FinalApplicationStatus , diagnostics : String = null ) = synchronized {
@@ -386,31 +386,50 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
386
386
private def startUserClass (): Thread = {
387
387
logInfo(" Starting the user JAR in a separate Thread" )
388
388
System .setProperty(" spark.executor.instances" , args.numExecutors.toString)
389
+ var stopped = false
389
390
val mainMethod = Class .forName(args.userClass, false ,
390
391
Thread .currentThread.getContextClassLoader).getMethod(" main" , classOf [Array [String ]])
391
392
392
393
userClassThread = new Thread {
393
394
override def run () {
394
- var status = FinalApplicationStatus .FAILED
395
+ finalStatus = FinalApplicationStatus .FAILED
395
396
try {
396
- // Copy
397
- val mainArgs = new Array [ String ](args.userArgs.size)
398
- args.userArgs.copyToArray(mainArgs, 0 , args.userArgs.size)
399
- mainMethod.invoke( null , mainArgs)
400
- // Some apps have "System.exit(0)" at the end. The user thread will stop here unless
401
- // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED.
402
- status = FinalApplicationStatus . SUCCEEDED
403
- } catch {
404
- case e : InvocationTargetException =>
405
- e.getCause match {
406
- case _ : InterruptedException =>
407
- // Reporter thread can interrupt to stop user class
397
+ System .setSecurityManager( new java.lang. SecurityManager () {
398
+ override def checkExit ( paramInt : Int ) {
399
+ if ( ! stopped) {
400
+ exitCode = paramInt
401
+ if (exitCode == 0 ) {
402
+ finalStatus = FinalApplicationStatus . SUCCEEDED
403
+ }
404
+ stopped = true
405
+ }
406
+ }
407
+
408
+ override def checkPermission ( perm : java.security. Permission ) : Unit = {
408
409
409
- case e => throw e
410
410
}
411
- } finally {
412
- logDebug(" Finishing main" )
413
- finalStatus = status
411
+ })
412
+ }
413
+ catch {
414
+ case e : SecurityException =>
415
+ logError(" setSecurityManager:" , e)
416
+ }
417
+
418
+ Utils .tryOrExit {
419
+ try {
420
+ val mainArgs = new Array [String ](args.userArgs.size)
421
+ args.userArgs.copyToArray(mainArgs, 0 , args.userArgs.size)
422
+ mainMethod.invoke(null , mainArgs)
423
+ finalStatus = FinalApplicationStatus .SUCCEEDED
424
+ } catch {
425
+ case e : InvocationTargetException =>
426
+ e.getCause match {
427
+ case _ : InterruptedException =>
428
+ // Reporter thread can interrupt to stop user class
429
+
430
+ case e => throw e
431
+ }
432
+ }
414
433
}
415
434
}
416
435
}
0 commit comments