@@ -32,6 +32,7 @@ import akka.actor.Terminated
32
32
import org .apache .spark .{Logging , SecurityManager , SparkConf }
33
33
import org .apache .spark .util .{Utils , AkkaUtils }
34
34
import org .apache .spark .scheduler .cluster .CoarseGrainedSchedulerBackend
35
+ import org .apache .spark .scheduler .cluster .CoarseGrainedClusterMessages .AddWebUIFilter
35
36
import org .apache .spark .scheduler .SplitInfo
36
37
import org .apache .spark .deploy .SparkHadoopUtil
37
38
@@ -81,6 +82,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
81
82
case x : DisassociatedEvent =>
82
83
logInfo(s " Driver terminated or disconnected! Shutting down. $x" )
83
84
driverClosed = true
85
+ case x : AddWebUIFilter =>
86
+ logInfo(s " Add WebUI Filter. $x" )
87
+ driver ! x
84
88
}
85
89
}
86
90
@@ -111,7 +115,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
111
115
}
112
116
113
117
waitForSparkMaster()
114
-
118
+ addAmIpFilter()
115
119
// Allocate all containers
116
120
allocateExecutors()
117
121
@@ -171,7 +175,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
171
175
}
172
176
173
177
private def registerApplicationMaster (): RegisterApplicationMasterResponse = {
174
- logInfo(" Registering the ApplicationMaster" )
178
+ val appUIAddress = sparkConf.get(" spark.driver.appUIAddress" , " " )
179
+ logInfo(s " Registering the ApplicationMaster with appUIAddress: $appUIAddress" )
175
180
val appMasterRequest = Records .newRecord(classOf [RegisterApplicationMasterRequest ])
176
181
.asInstanceOf [RegisterApplicationMasterRequest ]
177
182
appMasterRequest.setApplicationAttemptId(appAttemptId)
@@ -180,10 +185,21 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
180
185
appMasterRequest.setHost(Utils .localHostName())
181
186
appMasterRequest.setRpcPort(0 )
182
187
// What do we provide here ? Might make sense to expose something sensible later ?
183
- appMasterRequest.setTrackingUrl(" " )
188
+ appMasterRequest.setTrackingUrl(appUIAddress )
184
189
resourceManager.registerApplicationMaster(appMasterRequest)
185
190
}
186
191
192
+ // add the yarn amIpFilter that Yarn requires for properly securing the UI
193
+ private def addAmIpFilter () {
194
+ val proxy = YarnConfiguration .getProxyHostAndPort(conf)
195
+ val parts : Array [String ] = proxy.split(" :" )
196
+ val proxyBase = System .getenv(ApplicationConstants .APPLICATION_WEB_PROXY_BASE_ENV )
197
+ val uriBase = " http://" + proxy + proxyBase
198
+ val amFilter = " PROXY_HOST=" + parts(0 ) + " ," + " PROXY_URI_BASE=" + uriBase
199
+ val amFilterName = " org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
200
+ actor ! AddWebUIFilter (amFilterName, amFilter, proxyBase)
201
+ }
202
+
187
203
private def waitForSparkMaster () {
188
204
logInfo(" Waiting for spark driver to be reachable." )
189
205
var driverUp = false
0 commit comments