@@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence
41
41
TextInputFormat }
42
42
import org .apache .hadoop .mapreduce .{InputFormat => NewInputFormat , Job => NewHadoopJob }
43
43
import org .apache .hadoop .mapreduce .lib .input .{FileInputFormat => NewFileInputFormat }
44
+ import org .apache .hadoop .yarn .conf .YarnConfiguration
44
45
45
46
import org .apache .mesos .MesosNativeLibrary
46
47
@@ -56,8 +57,7 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
56
57
import org .apache .spark .rdd ._
57
58
import org .apache .spark .rpc .{RpcAddress , RpcEndpointRef }
58
59
import org .apache .spark .scheduler ._
59
- import org .apache .spark .scheduler .cluster .{CoarseGrainedSchedulerBackend ,
60
- SparkDeploySchedulerBackend , SimrSchedulerBackend }
60
+ import org .apache .spark .scheduler .cluster .{ExecutorInfo , CoarseGrainedSchedulerBackend , SparkDeploySchedulerBackend , SimrSchedulerBackend }
61
61
import org .apache .spark .scheduler .cluster .mesos .{CoarseMesosSchedulerBackend , MesosSchedulerBackend }
62
62
import org .apache .spark .scheduler .local .LocalBackend
63
63
import org .apache .spark .storage ._
@@ -225,6 +225,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
225
225
private var _jars : Seq [String ] = _
226
226
private var _files : Seq [String ] = _
227
227
private var _shutdownHookRef : AnyRef = _
228
+ private var _logUrls : Option [Predef .Map [String , String ]] = None
228
229
229
230
/* ------------------------------------------------------------------------------------- *
230
231
| Accessors and public fields. These provide access to the internal state of the |
@@ -314,6 +315,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
314
315
private [spark] def dagScheduler_= (ds : DAGScheduler ): Unit = {
315
316
_dagScheduler = ds
316
317
}
318
+ private [spark] def logUrls : Option [Predef .Map [String , String ]] = _logUrls
319
+ private [spark] def logUrls_= (logUrlsMap : Option [Predef .Map [String , String ]]): Unit = {
320
+ _logUrls = logUrlsMap
321
+ logInfo(s " Setting log urls to ${_logUrls.get.mkString(" | " )}" )
322
+ }
317
323
318
324
def applicationId : String = _applicationId
319
325
def applicationAttemptId : Option [String ] = _applicationAttemptId
@@ -1912,6 +1918,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
1912
1918
// the cluster manager to get an application ID (in case the cluster manager provides one).
1913
1919
listenerBus.post(SparkListenerApplicationStart (appName, Some (applicationId),
1914
1920
startTime, sparkUser, applicationAttemptId))
1921
+ _logUrls.foreach { logUrlsMap =>
1922
+ listenerBus.post(SparkListenerExecutorAdded (System .currentTimeMillis(), SparkContext
1923
+ .DRIVER_IDENTIFIER , new ExecutorInfo (Utils .localHostName(), 0 , logUrlsMap)))
1924
+ }
1915
1925
}
1916
1926
1917
1927
/** Post the application end event */
@@ -2422,6 +2432,21 @@ object SparkContext extends Logging {
2422
2432
}
2423
2433
}
2424
2434
scheduler.initialize(backend)
2435
+ val logUrl = System .getProperty(" spark.yarn.driver.log.url" )
2436
+ if (logUrl != null ) {
2437
+ // lookup appropriate http scheme for container log urls
2438
+ val yarnConf : YarnConfiguration = new YarnConfiguration (sc.hadoopConfiguration)
2439
+ val yarnHttpPolicy = yarnConf.get(
2440
+ YarnConfiguration .YARN_HTTP_POLICY_KEY ,
2441
+ YarnConfiguration .YARN_HTTP_POLICY_DEFAULT
2442
+ )
2443
+ val httpScheme = if (yarnHttpPolicy == " HTTPS_ONLY" ) " https://" else " http://"
2444
+ val baseUrl = s " $httpScheme$logUrl"
2445
+ sc.logUrls =
2446
+ Some (Predef .Map (
2447
+ " stderr" -> s " $baseUrl/stderr?start=0 " ,
2448
+ " stdout" -> s " $baseUrl/stdout?start=0 " ))
2449
+ }
2425
2450
(backend, scheduler)
2426
2451
2427
2452
case " yarn-client" =>
0 commit comments