@@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
38
38
import org .apache .hadoop .yarn .util .Records
39
39
40
40
import org .apache .spark .{Logging , SecurityManager , SparkConf , SparkContext , SparkException }
41
- import org .apache .spark .util .Utils
42
41
43
42
/**
44
43
* The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN.
@@ -419,34 +418,40 @@ private[spark] trait ClientBase extends Logging {
419
418
returnOnRunning : Boolean = false ,
420
419
logApplicationReport : Boolean = true ): YarnApplicationState = {
421
420
val interval = sparkConf.getLong(" spark.yarn.report.interval" , 1000 )
422
- var firstIteration = true
421
+ var lastState : YarnApplicationState = null
423
422
while (true ) {
424
423
Thread .sleep(interval)
425
424
val report = getApplicationReport(appId)
426
425
val state = report.getYarnApplicationState
427
426
428
427
if (logApplicationReport) {
429
428
logInfo(s " Application report from ResourceManager for app ${appId.getId} (state: $state) " )
430
- val clientToken = Option (getClientToken(report)).getOrElse(" N/A" )
431
- val appDiagnostics = Option (report.getDiagnostics).getOrElse(" N/A" )
432
- val details = " \n " +
433
- s " \t full application identifier: $appId\n " +
434
- s " \t clientToken: $clientToken\n " +
435
- s " \t appDiagnostics: $appDiagnostics\n " +
436
- s " \t appMasterHost: ${report.getHost}\n " +
437
- s " \t appQueue: ${report.getQueue}\n " +
438
- s " \t appMasterRpcPort: ${report.getRpcPort}\n " +
439
- s " \t appStartTime: ${report.getStartTime}\n " +
440
- s " \t yarnAppState: $state\n " +
441
- s " \t distributedFinalState: ${report.getFinalApplicationStatus}\n " +
442
- s " \t appTrackingUrl: ${report.getTrackingUrl}\n " +
443
- s " \t appUser: ${report.getUser}"
444
-
445
- // Log report details every iteration if DEBUG is enabled, otherwise only the first
429
+ val details = Seq [(String , String )](
430
+ (" full identifier" , appId.toString),
431
+ (" client token" , getClientToken(report)),
432
+ (" diagnostics" , report.getDiagnostics),
433
+ (" ApplicationMaster host" , report.getHost),
434
+ (" ApplicationMaster RPC port" , report.getRpcPort.toString),
435
+ (" queue" , report.getQueue),
436
+ (" start time" , report.getStartTime.toString),
437
+ (" final status" , report.getFinalApplicationStatus.toString),
438
+ (" tracking URL" , report.getTrackingUrl),
439
+ (" user" , report.getUser)
440
+ )
441
+
442
+ // Use more loggable format if value is null or empty
443
+ val formattedDetails = details
444
+ .map { case (k, v) =>
445
+ val newValue = Option (v).filter(_.nonEmpty).getOrElse(" N/A" )
446
+ s " \n\t $k: $newValue" }
447
+ .mkString(" " )
448
+
449
+ // If DEBUG is enabled, log report details every iteration
450
+ // Otherwise, log them every time the application changes state
446
451
if (log.isDebugEnabled) {
447
- logDebug(details )
448
- } else if (firstIteration ) {
449
- logInfo(details )
452
+ logDebug(formattedDetails )
453
+ } else if (lastState != state ) {
454
+ logInfo(formattedDetails )
450
455
}
451
456
}
452
457
@@ -460,8 +465,9 @@ private[spark] trait ClientBase extends Logging {
460
465
return state
461
466
}
462
467
463
- firstIteration = false
468
+ lastState = state
464
469
}
470
+
465
471
// Never reached, but keeps compiler happy
466
472
throw new SparkException (" While loop is depleted! This should never happen..." )
467
473
}
0 commit comments