diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index ec15647e1d9eb..e6b9ea99aa40a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -34,15 +34,28 @@ class SparkHadoopUtil {
val conf = newConfiguration()
UserGroupInformation.setConfiguration(conf)
- def runAsUser(user: String)(func: () => Unit) {
+ /** Creates a UserGroupInformation for Spark based on SPARK_USER environment variable. */
+ def createSparkUser(): Option[UserGroupInformation] = {
+ val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
if (user != SparkContext.SPARK_UNKNOWN_USER) {
- val ugi = UserGroupInformation.createRemoteUser(user)
- transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
- ugi.doAs(new PrivilegedExceptionAction[Unit] {
- def run: Unit = func()
- })
+ Some(UserGroupInformation.createRemoteUser(user))
} else {
- func()
+ None
+ }
+ }
+
+ /**
+ * If a user is specified, we will run the function as that user. We additionally transfer
+ * Spark's tokens to the given UGI to ensure it has access to data written by Spark.
+ */
+ def runAsUser(user: Option[UserGroupInformation])(func: () => Unit) {
+ user match {
+ case Some(ugi) => {
+ transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
+ ugi.doAs(new PrivilegedExceptionAction[Unit] {
+ def run: Unit = func()
+ })}
+ case None => func()
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 661a390da39d0..1f7e85c3c36b7 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -127,7 +127,14 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
- val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
+ // NB: Workaround for SPARK-1676. Caching UGIs prevents continuously creating FileSystem
+ // objects with "unique" UGIs, but is not a good solution if real UGIs and tokens are needed,
+ // mainly because expired tokens cannot be removed from the UGI.
+ // This behavior is a branch-0.9-specific bug fix. See SPARK-1676 for more information.
+ val cacheUgi = conf.getBoolean("spark.user.cacheUserGroupInformation", false)
+
+ val cachedSparkUser = SparkHadoopUtil.get.createSparkUser()
+ def getSparkUser = if (cacheUgi) cachedSparkUser else SparkHadoopUtil.get.createSparkUser()
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask)
@@ -173,7 +180,7 @@ private[spark] class Executor(
}
}
- override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
+ override def run(): Unit = SparkHadoopUtil.get.runAsUser(getSparkUser) { () =>
val startTime = System.currentTimeMillis()
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
diff --git a/docs/configuration.md b/docs/configuration.md
index 3f03d97e8054c..195e1956f64ee 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -471,6 +471,18 @@ Apart from these, the following properties are also available, and may be useful
applications; you can set it through SPARK_JAVA_OPTS
in spark-env.sh
.
+