", actorSystem, master, new JavaSerializer(conf), 1200, conf,
+ securityMgr)
// The put should fail since a1 is not serializable.
class UnserializableClass
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 20ebb1897e6ba..30415814adbba 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -24,6 +24,8 @@ import scala.util.{Failure, Success, Try}
import org.eclipse.jetty.server.Server
import org.scalatest.FunSuite
+import org.apache.spark.SparkConf
+
class UISuite extends FunSuite {
test("jetty port increases under contention") {
val startPort = 4040
@@ -34,15 +36,17 @@ class UISuite extends FunSuite {
case Failure(e) =>
// Either case server port is busy hence setup for test complete
}
- val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq())
- val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq())
+ val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq(),
+ new SparkConf)
+ val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq(),
+ new SparkConf)
// Allow some wiggle room in case ports on the machine are under contention
assert(boundPort1 > startPort && boundPort1 < startPort + 10)
assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10)
}
test("jetty binds to port 0 correctly") {
- val (jettyServer, boundPort) = JettyUtils.startJettyServer("0.0.0.0", 0, Seq())
+ val (jettyServer, boundPort) = JettyUtils.startJettyServer("0.0.0.0", 0, Seq(), new SparkConf)
assert(jettyServer.getState === "STARTED")
assert(boundPort != 0)
Try {new ServerSocket(boundPort)} match {
diff --git a/docs/configuration.md b/docs/configuration.md
index dc5553f3da770..6714f8b13973c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -147,6 +147,34 @@ Apart from these, the following properties are also available, and may be useful
How many stages the Spark UI remembers before garbage collecting.
+
+ spark.ui.filters |
+ None |
+
+ Comma separated list of filter class names to apply to the Spark web ui. The filter should be a
+ standard javax servlet Filter. Parameters to each filter can also be specified by setting a
+ java system property of spark..params='param1=value1,param2=value2'
+ (e.g.-Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params='param1=foo,param2=testing')
+ |
+
+
+ spark.ui.acls.enable |
+ false |
+
+ Whether spark web ui acls should are enabled. If enabled, this checks to see if the user has
+ access permissions to view the web ui. See spark.ui.view.acls for more details.
+ Also note this requires the user to be known, if the user comes across as null no checks
+ are done. Filters can be used to authenticate and set the user.
+ |
+
+
+ spark.ui.view.acls |
+ Empty |
+
+ Comma separated list of users that have view access to the spark web ui. By default only the
+ user that started the Spark job has view access.
+ |
+
spark.shuffle.compress |
true |
@@ -484,6 +512,29 @@ Apart from these, the following properties are also available, and may be useful
Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source.
|
+
+ spark.authenticate |
+ false |
+
+ Whether spark authenticates its internal connections. See spark.authenticate.secret if not
+ running on Yarn.
+ |
+
+
+ spark.authenticate.secret |
+ None |
+
+ Set the secret key used for Spark to authenticate between components. This needs to be set if
+ not running on Yarn and authentication is enabled.
+ |
+
+
+ spark.core.connection.auth.wait.timeout |
+ 30 |
+
+ Number of seconds for the connection to wait for authentication to occur before timing
+ out and giving up.
+ |
diff --git a/docs/index.md b/docs/index.md
index 4eb297df39144..c4f4d79edbc6c 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -103,6 +103,7 @@ For this version of Spark (0.8.1) Hadoop 2.2.x (or newer) users will have to bui
* [Configuration](configuration.html): customize Spark via its configuration system
* [Tuning Guide](tuning.html): best practices to optimize performance and memory use
+* [Security](security.html): Spark security support
* [Hardware Provisioning](hardware-provisioning.html): recommendations for cluster hardware
* [Job Scheduling](job-scheduling.html): scheduling resources across and within Spark applications
* [Building Spark with Maven](building-with-maven.html): build Spark using the Maven system
diff --git a/docs/security.md b/docs/security.md
new file mode 100644
index 0000000000000..9e4218fbcfe7d
--- /dev/null
+++ b/docs/security.md
@@ -0,0 +1,18 @@
+---
+layout: global
+title: Spark Security
+---
+
+Spark currently supports authentication via a shared secret. Authentication can be configured to be on via the `spark.authenticate` configuration parameter. This parameter controls whether the Spark communication protocols do authentication using the shared secret. This authentication is a basic handshake to make sure both sides have the same shared secret and are allowed to communicate. If the shared secret is not identical they will not be allowed to communicate.
+
+The Spark UI can also be secured by using javax servlet filters. A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view acls to make sure they are authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls' control the behavior of the acls. Note that the person who started the application always has view access to the UI.
+
+For Spark on Yarn deployments, configuring `spark.authenticate` to true will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. The Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI.
+
+For other types of Spark deployments, the spark config `spark.authenticate.secret` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. The UI can be secured using a javax servlet filter installed via `spark.ui.filters`. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI.
+
+IMPORTANT NOTE: The NettyBlockFetcherIterator is not secured so do not use netty for the shuffle is running with authentication on.
+
+See [Spark Configuration](configuration.html) for more details on the security configs.
+
+See org.apache.spark.SecurityManager
for implementation details about security.
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 3d7b390724e77..62d3a52615584 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -23,7 +23,7 @@ import scala.util.Random
import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SecurityManager}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.spark.streaming.receivers.Receiver
@@ -112,8 +112,9 @@ object FeederActor {
}
val Seq(host, port) = args.toSeq
-
- val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = new SparkConf)._1
+ val conf = new SparkConf
+ val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf,
+ securityManager = new SecurityManager(conf))._1
val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
println("Feeder started as:" + feeder)
diff --git a/pom.xml b/pom.xml
index c59fada5cd4a0..3b863856e4634 100644
--- a/pom.xml
+++ b/pom.xml
@@ -155,6 +155,21 @@
+
+ org.eclipse.jetty
+ jetty-util
+ 7.6.8.v20121106
+
+
+ org.eclipse.jetty
+ jetty-security
+ 7.6.8.v20121106
+
+
+ org.eclipse.jetty
+ jetty-plus
+ 7.6.8.v20121106
+
org.eclipse.jetty
jetty-server
@@ -295,6 +310,11 @@
mesos
${mesos.version}
+
+ commons-net
+ commons-net
+ 2.2
+
io.netty
netty-all
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index aa1784897566b..138aad7561043 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -226,6 +226,9 @@ object SparkBuild extends Build {
libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.0.17.Final",
"org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
+ "org.eclipse.jetty" % "jetty-util" % "7.6.8.v20121106",
+ "org.eclipse.jetty" % "jetty-plus" % "7.6.8.v20121106",
+ "org.eclipse.jetty" % "jetty-security" % "7.6.8.v20121106",
/** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */
"org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
@@ -285,6 +288,7 @@ object SparkBuild extends Build {
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
"org.apache.mesos" % "mesos" % "0.13.0",
+ "commons-net" % "commons-net" % "2.2",
"net.java.dev.jets3t" % "jets3t" % "0.7.1" excludeAll(excludeCommonsLogging),
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
"org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeCommonsLogging, excludeSLF4J),
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index e3bcf7f30ac8d..1aa94079fd0ae 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -18,12 +18,15 @@
package org.apache.spark.repl
import java.io.{ByteArrayOutputStream, InputStream}
-import java.net.{URI, URL, URLClassLoader, URLEncoder}
+import java.net.{URI, URL, URLEncoder}
import java.util.concurrent.{Executors, ExecutorService}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.spark.SparkEnv
+import org.apache.spark.util.Utils
+
import org.objectweb.asm._
import org.objectweb.asm.Opcodes._
@@ -53,7 +56,13 @@ extends ClassLoader(parent) {
if (fileSystem != null) {
fileSystem.open(new Path(directory, pathInDirectory))
} else {
- new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
+ if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
+ val uri = new URI(classUri + "/" + urlEncode(pathInDirectory))
+ val newuri = Utils.constructURIForAuthentication(uri, SparkEnv.get.securityManager)
+ newuri.toURL().openStream()
+ } else {
+ new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
+ }
}
}
val bytes = readAndTransformClass(name, inputStream)
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index f52ebe4a159f1..9b1da195002c2 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -881,6 +881,8 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
})
def process(settings: Settings): Boolean = savingContextLoader {
+ if (getMaster() == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
+
this.settings = settings
createInterpreter()
@@ -939,16 +941,9 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
def createSparkContext(): SparkContext = {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
- val master = this.master match {
- case Some(m) => m
- case None => {
- val prop = System.getenv("MASTER")
- if (prop != null) prop else "local"
- }
- }
val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath)
val conf = new SparkConf()
- .setMaster(master)
+ .setMaster(getMaster())
.setAppName("Spark shell")
.setJars(jars)
.set("spark.repl.class.uri", intp.classServer.uri)
@@ -963,6 +958,17 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
sparkContext
}
+ private def getMaster(): String = {
+ val master = this.master match {
+ case Some(m) => m
+ case None => {
+ val prop = System.getenv("MASTER")
+ if (prop != null) prop else "local"
+ }
+ }
+ master
+ }
+
/** process command-line arguments and do as they request */
def process(args: Array[String]): Boolean = {
val command = new SparkCommandLine(args.toList, msg => echo(msg))
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index 1d73d0b6993a8..90a96ad38381e 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -36,7 +36,7 @@ import scala.tools.reflect.StdRuntimeTags._
import scala.util.control.ControlThrowable
import util.stackTraceString
-import org.apache.spark.{HttpServer, SparkConf, Logging}
+import org.apache.spark.{Logging, HttpServer, SecurityManager, SparkConf}
import org.apache.spark.util.Utils
// /** directory to save .class files to */
@@ -83,15 +83,17 @@ import org.apache.spark.util.Utils
* @author Moez A. Abdel-Gawad
* @author Lex Spoon
*/
- class SparkIMain(initialSettings: Settings, val out: JPrintWriter) extends SparkImports with Logging {
+ class SparkIMain(initialSettings: Settings, val out: JPrintWriter)
+ extends SparkImports with Logging {
imain =>
- val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1")
+ val conf = new SparkConf()
+ val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1")
/** Local directory to save .class files too */
val outputDir = {
val tmp = System.getProperty("java.io.tmpdir")
- val rootDir = new SparkConf().get("spark.repl.classdir", tmp)
+ val rootDir = conf.get("spark.repl.classdir", tmp)
Utils.createTempDir(rootDir)
}
if (SPARK_DEBUG_REPL) {
@@ -99,7 +101,8 @@ import org.apache.spark.util.Utils
}
val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles
- val classServer = new HttpServer(outputDir) /** Jetty server that will serve our classes to worker nodes */
+ val classServer = new HttpServer(outputDir,
+ new SecurityManager(conf)) /** Jetty server that will serve our classes to worker nodes */
private var currentSettings: Settings = initialSettings
var printResults = true // whether to print result lines
var totalSilence = false // whether to print anything
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e045b9f0248f6..bb574f415293a 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -27,7 +27,6 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
@@ -36,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -87,27 +86,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
resourceManager = registerWithResourceManager()
- // Workaround until hadoop moves to something which has
- // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
- // ignore result.
- // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
- // Hence args.workerCores = numCore disabled above. Any better option?
-
- // Compute number of threads for akka
- //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
- //if (minimumMemory > 0) {
- // val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
- // if (numCore > 0) {
- // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
- // TODO: Uncomment when hadoop is on a version which has this fixed.
- // args.workerCores = numCore
- // }
- //}
- // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
+ // setup AmIpFilter for the SparkUI - do this before we start the UI
+ addAmIpFilter()
ApplicationMaster.register(this)
+
+ // Call this to force generation of secret so it gets populated into the
+ // hadoop UGI. This has to happen before the startUserClass which does a
+ // doAs in order for the credentials to be passed on to the worker containers.
+ val securityMgr = new SecurityManager(sparkConf)
+
// Start the user's JAR
userThread = startUserClass()
@@ -132,6 +120,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
System.exit(0)
}
+ // add the yarn amIpFilter that Yarn requires for properly securing the UI
+ private def addAmIpFilter() {
+ val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+ System.setProperty("spark.ui.filters", amFilter)
+ val proxy = YarnConfiguration.getProxyHostAndPort(conf)
+ val parts : Array[String] = proxy.split(":")
+ val uriBase = "http://" + proxy +
+ System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+
+ val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+ System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params",
+ params)
+ }
+
/** Get the Yarn approved local directories. */
private def getLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 138c27910b0b0..b735d01df8097 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
@@ -50,8 +50,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
private var yarnAllocator: YarnAllocationHandler = _
private var driverClosed:Boolean = false
+ val securityManager = new SecurityManager(sparkConf)
val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
- conf = sparkConf)._1
+ conf = sparkConf, securityManager = securityManager)._1
var actor: ActorRef = _
// This actor just working as a monitor to watch on Driver Actor.
@@ -110,6 +111,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval =
System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index fe37168e5a7ba..11322b1202f99 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -134,7 +134,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
" --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
+ " --worker-cores NUM Number of cores for the workers (Default: 1).\n" +
" --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index d6c12a9f5952d..4c6e1dcd6dac3 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,11 +17,13 @@
package org.apache.spark.deploy.yarn
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.conf.Configuration
+import org.apache.spark.deploy.SparkHadoopUtil
/**
* Contains util methods to interact with Hadoop from spark.
@@ -44,4 +46,24 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
val jobCreds = conf.getCredentials()
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}
+
+ override def getCurrentUserCredentials(): Credentials = {
+ UserGroupInformation.getCurrentUser().getCredentials()
+ }
+
+ override def addCurrentUserCredentials(creds: Credentials) {
+ UserGroupInformation.getCurrentUser().addCredentials(creds)
+ }
+
+ override def addSecretKeyToUserCredentials(key: String, secret: String) {
+ val creds = new Credentials()
+ creds.addSecretKey(new Text(key), secret.getBytes())
+ addCurrentUserCredentials(creds)
+ }
+
+ override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = {
+ val credentials = getCurrentUserCredentials()
+ if (credentials != null) credentials.getSecretKey(new Text(key)) else null
+ }
+
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index dd117d5810949..b48a2d50db5ef 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -27,7 +27,6 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.protocolrecords._
@@ -37,8 +36,9 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -91,12 +91,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
amClient.init(yarnConf)
amClient.start()
- // Workaround until hadoop moves to something which has
- // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
- // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
+ // setup AmIpFilter for the SparkUI - do this before we start the UI
+ addAmIpFilter()
ApplicationMaster.register(this)
+ // Call this to force generation of secret so it gets populated into the
+ // hadoop UGI. This has to happen before the startUserClass which does a
+ // doAs in order for the credentials to be passed on to the worker containers.
+ val securityMgr = new SecurityManager(sparkConf)
+
// Start the user's JAR
userThread = startUserClass()
@@ -121,6 +125,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
System.exit(0)
}
+ // add the yarn amIpFilter that Yarn requires for properly securing the UI
+ private def addAmIpFilter() {
+ val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+ System.setProperty("spark.ui.filters", amFilter)
+ val proxy = WebAppUtils.getProxyHostAndPort(conf)
+ val parts : Array[String] = proxy.split(":")
+ val uriBase = "http://" + proxy +
+ System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+
+ val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+ System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
+ }
+
/** Get the Yarn approved local directories. */
private def getLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
@@ -261,7 +278,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
val schedulerInterval =
sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 40600f38e5e73..f1c1fea0b5895 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
@@ -52,8 +52,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
private var amClient: AMRMClient[ContainerRequest] = _
+ val securityManager = new SecurityManager(sparkConf)
val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
- conf = sparkConf)._1
+ conf = sparkConf, securityManager = securityManager)._1
var actor: ActorRef = _
// This actor just working as a monitor to watch on Driver Actor.
@@ -105,6 +106,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val interval = math.min(timeoutInterval / 2, schedulerInterval)
reporterThread = launchReporterThread(interval)
+
// Wait for the reporter thread to Finish.
reporterThread.join()