Skip to content

Commit 20ef1b4

Browse files
committed
[SPARK-3287] When ResourceManager High Availability is enabled, ApplicationMaster webUI is not displayed
1 parent 27df6ce commit 20ef1b4

File tree

5 files changed

+35
-14
lines changed

5 files changed

+35
-14
lines changed

core/src/main/scala/org/apache/spark/ui/JettyUtils.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.ui
1919

20-
import java.net.{InetSocketAddress, URL}
20+
import java.net.{InetSocketAddress, URL, URLDecoder}
2121
import javax.servlet.DispatcherType
2222
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
2323

@@ -147,15 +147,25 @@ private[spark] object JettyUtils extends Logging {
147147
val holder : FilterHolder = new FilterHolder()
148148
holder.setClassName(filter)
149149
// Get any parameters for each filter
150-
val paramName = "spark." + filter + ".params"
151-
val params = conf.get(paramName, "").split(',').map(_.trim()).toSet
150+
var paramName = "spark." + filter + ".params"
151+
var params = conf.get(paramName, "").split(',').map(_.trim()).toSet
152152
params.foreach {
153153
case param : String =>
154154
if (!param.isEmpty) {
155155
val parts = param.split("=")
156156
if (parts.length == 2) holder.setInitParameter(parts(0), parts(1))
157157
}
158158
}
159+
paramName = "spark." + filter + ".encodedparams"
160+
params = conf.get(paramName, "").split(',').map(_.trim()).toSet
161+
params.foreach {
162+
case param : String =>
163+
if (!param.isEmpty) {
164+
val parts = param.split("=")
165+
if (parts.length == 2) holder.setInitParameter(parts(0),
166+
URLDecoder.decode(parts(1), "UTF-8"))
167+
}
168+
}
159169
val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR,
160170
DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST)
161171
handlers.foreach { case(handler) => handler.addFilter(holder, "/*", enumDispatcher) }

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
7575
}
7676

7777
override def getProxyHostAndPort(conf: YarnConfiguration) =
78-
YarnConfiguration.getProxyHostAndPort(conf)
78+
List(YarnConfiguration.getProxyHostAndPort(conf))
7979

8080
override def getMaxRegAttempts(conf: YarnConfiguration) =
8181
conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn
1919

2020
import java.io.IOException
2121
import java.net.Socket
22+
import java.net.URLEncoder
2223
import java.util.concurrent.atomic.AtomicReference
2324

2425
import scala.collection.JavaConversions._
@@ -32,6 +33,7 @@ import org.apache.hadoop.util.ShutdownHookManager
3233
import org.apache.hadoop.yarn.api._
3334
import org.apache.hadoop.yarn.api.records._
3435
import org.apache.hadoop.yarn.conf.YarnConfiguration
36+
import org.apache.hadoop.yarn.webapp.util.WebAppUtils
3537

3638
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
3739
import org.apache.spark.deploy.SparkHadoopUtil
@@ -324,17 +326,24 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
324326
/** Add the Yarn IP filter that is required for properly securing the UI. */
325327
private def addAmIpFilter() = {
326328
val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
327-
val proxy = client.getProxyHostAndPort(yarnConf)
328-
val parts = proxy.split(":")
329-
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
330-
val uriBase = "http://" + proxy + proxyBase
331-
val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
332-
329+
val proxies = client.getProxyHostsAndPorts(yarnConf)
330+
var sbProxies = new StringBuilder
331+
var sbUrlBases = new StringBuilder
332+
for (proxy <- proxies) {
333+
sbProxies ++= proxy.split(":")(0)
334+
sbProxies +=','
335+
sbUrlBases ++= WebAppUtils.getHttpSchemePrefix(yarnConf)
336+
sbUrlBases ++= proxy
337+
sbUrlBases ++= System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
338+
sbUrlBases +=','
339+
}
340+
var params = "PROXY_HOSTS=" + URLEncoder.encode(sbProxies.toString(), "UTF-8") + ","
341+
params ++= "PROXY_URI_BASES=" + URLEncoder.encode(sbUrlBases.toString(), "UTF-8")
333342
if (isDriver) {
334343
System.setProperty("spark.ui.filters", amFilter)
335-
System.setProperty(s"spark.$amFilter.params", params)
344+
System.setProperty(s"spark.$amFilter.encodedparams", params)
336345
} else {
337-
actor ! AddWebUIFilter(amFilter, params, proxyBase)
346+
actor ! AddWebUIFilter(amFilter, params, sbUrlBases.toString())
338347
}
339348
}
340349

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ trait YarnRMClient {
5959
def getAttemptId(): ApplicationAttemptId
6060

6161
/** Returns the RM's proxy host and port. */
62-
def getProxyHostAndPort(conf: YarnConfiguration): String
62+
def getProxyHostsAndPorts(conf: YarnConfiguration): scala.collection.mutable.Buffer[String]
6363

6464
/** Returns the maximum number of attempts to register the AM. */
6565
def getMaxRegAttempts(conf: YarnConfiguration): Int

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.deploy.yarn
1919

2020
import scala.collection.{Map, Set}
21+
import scala.collection.JavaConversions._
2122

2223
import org.apache.hadoop.yarn.api._
2324
import org.apache.hadoop.yarn.api.protocolrecords._
@@ -68,7 +69,8 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
6869
appAttemptId
6970
}
7071

71-
override def getProxyHostAndPort(conf: YarnConfiguration) = WebAppUtils.getProxyHostAndPort(conf)
72+
override def getProxyHostsAndPorts(conf: YarnConfiguration) =
73+
WebAppUtils.getProxyHostsAndPortsForAmFilter(conf)
7274

7375
override def getMaxRegAttempts(conf: YarnConfiguration) =
7476
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)

0 commit comments

Comments
 (0)