Skip to content

Commit 3ce4516

Browse files
witgoJames Z.M. Gao
authored andcommitted
SPARK-1291: Link the spark UI to RM ui in yarn-client mode
Author: witgo <[email protected]> Closes apache#1112 from witgo/SPARK-1291 and squashes the following commits: 6022bcd [witgo] review commit 1fbb925 [witgo] add addAmIpFilter to yarn alpha 210299c [witgo] review commit 1b92a07 [witgo] review commit 6896586 [witgo] Add comments to addWebUIFilter 3e9630b [witgo] review commit 142ee29 [witgo] review commit 1fe7710 [witgo] Link the spark UI to RM ui in yarn-client mode Conflicts: yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
1 parent 5752689 commit 3ce4516

File tree

6 files changed

+71
-7
lines changed

6 files changed

+71
-7
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,7 @@ private[spark] object CoarseGrainedClusterMessages {
6666

6767
case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
6868

69+
case class AddWebUIFilter(filterName:String, filterParams: String, proxyBase :String)
70+
extends CoarseGrainedClusterMessage
71+
6972
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
3131
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
3232
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
3333
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
34+
import org.apache.spark.ui.JettyUtils
3435

3536
/**
3637
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -136,6 +137,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
136137
removeExecutor(executorId, reason)
137138
sender ! true
138139

140+
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
141+
addWebUIFilter(filterName, filterParams, proxyBase)
142+
sender ! true
139143
case DisassociatedEvent(_, address, _) =>
140144
addressToExecutorId.get(address).foreach(removeExecutor(_,
141145
"remote Akka client disassociated"))
@@ -273,6 +277,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
273277
}
274278
false
275279
}
280+
281+
// Add filters to the SparkUI
282+
def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) {
283+
if (proxyBase != null && proxyBase.nonEmpty) {
284+
System.setProperty("spark.ui.proxyBase", proxyBase)
285+
}
286+
287+
if (Seq(filterName, filterParams).forall(t => t != null && t.nonEmpty)) {
288+
logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
289+
conf.set("spark.ui.filters", filterName)
290+
conf.set(s"spark.$filterName.params", filterParams)
291+
JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf)
292+
}
293+
}
276294
}
277295

278296
private[spark] object CoarseGrainedSchedulerBackend {

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,16 @@ private[spark] object UIUtils extends Logging {
135135
}
136136

137137
// Yarn has to go through a proxy so the base uri is provided and has to be on all links
138-
val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("")
138+
def uiRoot: String = {
139+
if (System.getenv("APPLICATION_WEB_PROXY_BASE") != null) {
140+
System.getenv("APPLICATION_WEB_PROXY_BASE")
141+
} else if (System.getProperty("spark.ui.proxyBase") != null) {
142+
System.getProperty("spark.ui.proxyBase")
143+
}
144+
else {
145+
""
146+
}
147+
}
139148

140149
def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource
141150

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import akka.actor.Terminated
3232
import org.apache.spark.{Logging, SecurityManager, SparkConf}
3333
import org.apache.spark.util.{Utils, AkkaUtils}
3434
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
35+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
3536
import org.apache.spark.scheduler.SplitInfo
3637
import org.apache.spark.deploy.SparkHadoopUtil
3738

@@ -81,6 +82,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
8182
case x: DisassociatedEvent =>
8283
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
8384
driverClosed = true
85+
case x: AddWebUIFilter =>
86+
logInfo(s"Add WebUI Filter. $x")
87+
driver ! x
8488
}
8589
}
8690

@@ -111,7 +115,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
111115
}
112116

113117
waitForSparkMaster()
114-
118+
addAmIpFilter()
115119
// Allocate all containers
116120
allocateExecutors()
117121

@@ -171,7 +175,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
171175
}
172176

173177
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
174-
logInfo("Registering the ApplicationMaster")
178+
val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
179+
logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
175180
val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
176181
.asInstanceOf[RegisterApplicationMasterRequest]
177182
appMasterRequest.setApplicationAttemptId(appAttemptId)
@@ -180,10 +185,21 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
180185
appMasterRequest.setHost(Utils.localHostName())
181186
appMasterRequest.setRpcPort(0)
182187
// What do we provide here ? Might make sense to expose something sensible later ?
183-
appMasterRequest.setTrackingUrl("")
188+
appMasterRequest.setTrackingUrl(appUIAddress)
184189
resourceManager.registerApplicationMaster(appMasterRequest)
185190
}
186191

192+
// add the yarn amIpFilter that Yarn requires for properly securing the UI
193+
private def addAmIpFilter() {
194+
val proxy = YarnConfiguration.getProxyHostAndPort(conf)
195+
val parts = proxy.split(":")
196+
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
197+
val uriBase = "http://" + proxy + proxyBase
198+
val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
199+
val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
200+
actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
201+
}
202+
187203
private def waitForSparkMaster() {
188204
logInfo("Waiting for spark driver to be reachable.")
189205
var driverUp = false

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ private[spark] class YarnClientSchedulerBackend(
4848
val driverHost = conf.get("spark.driver.host")
4949
val driverPort = conf.get("spark.driver.port")
5050
val hostport = driverHost + ":" + driverPort
51+
conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
5152

5253
val argsArrayBuf = new ArrayBuffer[String]()
5354
argsArrayBuf += (

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ import akka.actor.Terminated
3131
import org.apache.spark.{Logging, SecurityManager, SparkConf}
3232
import org.apache.spark.util.{Utils, AkkaUtils}
3333
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
34+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
3435
import org.apache.spark.scheduler.SplitInfo
3536
import org.apache.hadoop.yarn.client.api.AMRMClient
3637
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
3738
import org.apache.spark.deploy.SparkHadoopUtil
39+
import org.apache.hadoop.yarn.webapp.util.WebAppUtils
3840

3941
/**
4042
* An application master that allocates executors on behalf of a driver that is running outside
@@ -82,6 +84,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
8284
case x: DisassociatedEvent =>
8385
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
8486
driverClosed = true
87+
case x: AddWebUIFilter =>
88+
logInfo(s"Add WebUI Filter. $x")
89+
driver ! x
8590
}
8691
}
8792

@@ -99,6 +104,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
99104
registerApplicationMaster()
100105

101106
waitForSparkMaster()
107+
addAmIpFilter()
102108

103109
// Allocate all containers
104110
allocateExecutors()
@@ -151,9 +157,20 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
151157
}
152158

153159
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
154-
logInfo("Registering the ApplicationMaster")
155-
// TODO:(Raymond) Find out Spark UI address and fill in here?
156-
amClient.registerApplicationMaster(Utils.localHostName(), 0, "")
160+
val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
161+
logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
162+
amClient.registerApplicationMaster(Utils.localHostName(), 0, appUIAddress)
163+
}
164+
165+
// add the yarn amIpFilter that Yarn requires for properly securing the UI
166+
private def addAmIpFilter() {
167+
val proxy = WebAppUtils.getProxyHostAndPort(conf)
168+
val parts = proxy.split(":")
169+
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
170+
val uriBase = "http://" + proxy + proxyBase
171+
val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
172+
val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
173+
actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
157174
}
158175

159176
private def waitForSparkMaster() {

0 commit comments

Comments
 (0)