Skip to content

SPARK-1291: Link the spark UI to RM ui in yarn-client mode #1112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,7 @@ private[spark] object CoarseGrainedClusterMessages {

case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage

case class AddWebUIFilter(filterName:String, filterParams: String, proxyBase :String)
extends CoarseGrainedClusterMessage

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
import org.apache.spark.ui.JettyUtils

/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
Expand Down Expand Up @@ -136,6 +137,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
removeExecutor(executorId, reason)
sender ! true

case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
sender ! true
case DisassociatedEvent(_, address, _) =>
addressToExecutorId.get(address).foreach(removeExecutor(_,
"remote Akka client disassociated"))
Expand Down Expand Up @@ -276,6 +280,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
}
false
}

// Add filters to the SparkUI
def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) {
if (proxyBase != null && proxyBase.nonEmpty) {
System.setProperty("spark.ui.proxyBase", proxyBase)
}

if (Seq(filterName, filterParams).forall(t => t != null && t.nonEmpty)) {
logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
conf.set("spark.ui.filters", filterName)
conf.set(s"spark.$filterName.params", filterParams)
JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf)
}
}
}

private[spark] object CoarseGrainedSchedulerBackend {
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,16 @@ private[spark] object UIUtils extends Logging {
}

// Yarn has to go through a proxy so the base uri is provided and has to be on all links
val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("")
def uiRoot: String = {
if (System.getenv("APPLICATION_WEB_PROXY_BASE") != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use use Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse(System.getProperty("spark.ui.proxyBase")).getOrElse("") for this instead of != null checks.

Also we should be using the SparkConf whenever possible and not system properties unless there is explicit reason

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option(System.getenv("APPLICATION_WEB_PROXY_BASE")) will create a new object

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set the "spark.ui.proxyBase" property in ApplicationMaster.scala's addAmIpFilter() so that

  1. The code is more consistent
  2. The if-else case here is reduced

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addAmIpFilter is invoked in yarn Container. The code in Client.scala

System.getenv("APPLICATION_WEB_PROXY_BASE")
} else if (System.getProperty("spark.ui.proxyBase") != null) {
System.getProperty("spark.ui.proxyBase")
}
else {
""
}
}

def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.deploy.SparkHadoopUtil

Expand Down Expand Up @@ -81,6 +82,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
driverClosed = true
case x: AddWebUIFilter =>
logInfo(s"Add WebUI Filter. $x")
driver ! x
}
}

Expand Down Expand Up @@ -111,7 +115,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}

waitForSparkMaster()

addAmIpFilter()
// Allocate all containers
allocateExecutors()

Expand Down Expand Up @@ -171,7 +175,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}

private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
logInfo("Registering the ApplicationMaster")
val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
.asInstanceOf[RegisterApplicationMasterRequest]
appMasterRequest.setApplicationAttemptId(appAttemptId)
Expand All @@ -180,10 +185,21 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
// What do we provide here ? Might make sense to expose something sensible later ?
appMasterRequest.setTrackingUrl("")
appMasterRequest.setTrackingUrl(appUIAddress)
resourceManager.registerApplicationMaster(appMasterRequest)
}

// add the yarn amIpFilter that Yarn requires for properly securing the UI
private def addAmIpFilter() {
val proxy = YarnConfiguration.getProxyHostAndPort(conf)
val parts = proxy.split(":")
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
val uriBase = "http://" + proxy + proxyBase
val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
}

private def waitForSparkMaster() {
logInfo("Waiting for spark driver to be reachable.")
var driverUp = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ private[spark] class YarnClientSchedulerBackend(
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)

val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.yarn.webapp.util.WebAppUtils

/**
* An application master that allocates executors on behalf of a driver that is running outside
Expand Down Expand Up @@ -82,6 +84,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
driverClosed = true
case x: AddWebUIFilter =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make the same changes for yarn alpha mode also please

logInfo(s"Add WebUI Filter. $x")
driver ! x
}
}

Expand All @@ -99,6 +104,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
registerApplicationMaster()

waitForSparkMaster()
addAmIpFilter()

// Allocate all containers
allocateExecutors()
Expand Down Expand Up @@ -142,9 +148,20 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}

private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
logInfo("Registering the ApplicationMaster")
// TODO: Find out client's Spark UI address and fill in here?
amClient.registerApplicationMaster(Utils.localHostName(), 0, "")
val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
amClient.registerApplicationMaster(Utils.localHostName(), 0, appUIAddress)
}

// add the yarn amIpFilter that Yarn requires for properly securing the UI
private def addAmIpFilter() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the functions addAmIpFilter() in ApplicationMaster.scala and ExecutorLauncher.scala be combined in say a new file YarnUtil.scala?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are differences in the code.Should not be combined.

val proxy = WebAppUtils.getProxyHostAndPort(conf)
val parts = proxy.split(":")
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
val uriBase = "http://" + proxy + proxyBase
val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
}

private def waitForSparkMaster() {
Expand Down