From cca6a8a968d0073fcf918ed1e40a3719640d2b37 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Wed, 30 Apr 2014 13:51:54 -0700 Subject: [PATCH 1/2] SPARK-1676 Cache Hadoop UGIs by default to prevent FileSystem leak UserGroupInformation objects (UGIs) are used for Hadoop security. A relatively recent PR (#29) makes Spark always use UGIs when executing tasks. Unfortunately, this causes HDFS-3545, which causes the FileSystem cache to continuously create new FileSystems, as the UGIs look different (even though they're logically identical). This causes a memory and sometimes file descriptor leak for FileSystems (like S3N) which maintain open connections. This solution is to introduce a config option (enabled by default) which reuses a single Spark user UGI, rather than creating new ones for each task. The downside to this approach is that UGIs cannot be safely cached (see the notes in HDFS-3545). For example, if a token expires, it will never be cleared from the UGI but may be used anyway (usage of a particular token on a UGI is nondeterministic as it is backed by a Set). This setting is enabled by default because the memory leak can become serious very quickly. In one benchmark, attempting to read 10k files from an S3 directory caused 45k connections to remain open to S3 after the job completed. These file descriptors are never cleaned up, nor the memory used by the associated FileSystems. Conflicts: docs/configuration.md yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- .../apache/spark/deploy/SparkHadoopUtil.scala | 27 ++++++++++++++----- .../org/apache/spark/executor/Executor.scala | 10 +++++-- docs/configuration.md | 12 +++++++++ .../spark/deploy/yarn/ApplicationMaster.scala | 3 +-- .../spark/deploy/yarn/ApplicationMaster.scala | 5 ++-- 5 files changed, 43 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index ec15647e1d9eb..e6b9ea99aa40a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -34,15 +34,28 @@ class SparkHadoopUtil { val conf = newConfiguration() UserGroupInformation.setConfiguration(conf) - def runAsUser(user: String)(func: () => Unit) { + /** Creates a UserGroupInformation for Spark based on SPARK_USER environment variable. */ + def createSparkUser(): Option[UserGroupInformation] = { + val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) if (user != SparkContext.SPARK_UNKNOWN_USER) { - val ugi = UserGroupInformation.createRemoteUser(user) - transferCredentials(UserGroupInformation.getCurrentUser(), ugi) - ugi.doAs(new PrivilegedExceptionAction[Unit] { - def run: Unit = func() - }) + Some(UserGroupInformation.createRemoteUser(user)) } else { - func() + None + } + } + + /** + * If a user is specified, we will run the function as that user. We additionally transfer + * Spark's tokens to the given UGI to ensure it has access to data written by Spark. + */ + def runAsUser(user: Option[UserGroupInformation])(func: () => Unit) { + user match { + case Some(ugi) => { + transferCredentials(UserGroupInformation.getCurrentUser(), ugi) + ugi.doAs(new PrivilegedExceptionAction[Unit] { + def run: Unit = func() + })} + case None => func() } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 661a390da39d0..8ceef73833071 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -127,7 +127,13 @@ private[spark] class Executor( // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] - val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) + // NB: Workaround for SPARK-1676. Caching UGIs prevents continuously creating FileSystem + // objects with "unique" UGIs, but is not a good solution if real UGIs and tokens are needed, + // mainly because expired tokens cannot be removed from the UGI. + val cacheUgi = conf.getBoolean("spark.user.cacheUserGroupInformation", true) + + val cachedSparkUser = SparkHadoopUtil.get.createSparkUser() + def getSparkUser = if (cacheUgi) cachedSparkUser else SparkHadoopUtil.get.createSparkUser() def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { val tr = new TaskRunner(context, taskId, serializedTask) @@ -173,7 +179,7 @@ private[spark] class Executor( } } - override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => + override def run(): Unit = SparkHadoopUtil.get.runAsUser(getSparkUser) { () => val startTime = System.currentTimeMillis() SparkEnv.set(env) Thread.currentThread.setContextClassLoader(replClassLoader) diff --git a/docs/configuration.md b/docs/configuration.md index 3f03d97e8054c..0a7e0732fd9ed 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -471,6 +471,18 @@ Apart from these, the following properties are also available, and may be useful applications; you can set it through SPARK_JAVA_OPTS in spark-env.sh. + + spark.user.cacheUserGroupInformation + true + + Caching UGIs is a workaround for [SPARK-1676](https://issues.apache.org/jira/browse/SPARK-1676) + for users who are not using security in a very serious manner. Caching UGIs can produce + security-related exceptions when tokens have an expiry, or are shared between users. On the other + hand, not caching UGIs means that every FileSystem.get() call can potentially create and cache a + new FileSystem object, which leads to leaked memory and file descriptors. + + + ## Viewing Spark Properties diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e045b9f0248f6..521d3571301c1 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -68,8 +68,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private var registered = false - private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( - SparkContext.SPARK_UNKNOWN_USER) + private val sparkUser = SparkHadoopUtil.createSparkUser() def run() { // Setup the directories so things go to yarn approved directories rather diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index b312a427bca63..070e6b66e47b6 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -69,9 +69,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, math.max(args.numWorkers * 2, 3)) private var registered = false - - private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( - SparkContext.SPARK_UNKNOWN_USER) + + private val sparkUser = SparkHadoopUtil.createSparkUser() def run() { // Setup the directories so things go to YARN approved directories rather From c8f29bca37b42aa639d679e1de76a86bcf00d12e Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 1 May 2014 20:51:59 -0700 Subject: [PATCH 2/2] Set to false by default and point towards branch-0.9 --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 3 ++- docs/configuration.md | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 8ceef73833071..1f7e85c3c36b7 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -130,7 +130,8 @@ private[spark] class Executor( // NB: Workaround for SPARK-1676. Caching UGIs prevents continuously creating FileSystem // objects with "unique" UGIs, but is not a good solution if real UGIs and tokens are needed, // mainly because expired tokens cannot be removed from the UGI. - val cacheUgi = conf.getBoolean("spark.user.cacheUserGroupInformation", true) + // This behavior is a branch-0.9-specific bug fix. See SPARK-1676 for more information. + val cacheUgi = conf.getBoolean("spark.user.cacheUserGroupInformation", false) val cachedSparkUser = SparkHadoopUtil.get.createSparkUser() def getSparkUser = if (cacheUgi) cachedSparkUser else SparkHadoopUtil.get.createSparkUser() diff --git a/docs/configuration.md b/docs/configuration.md index 0a7e0732fd9ed..195e1956f64ee 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -473,7 +473,7 @@ Apart from these, the following properties are also available, and may be useful spark.user.cacheUserGroupInformation - true + false Caching UGIs is a workaround for [SPARK-1676](https://issues.apache.org/jira/browse/SPARK-1676) for users who are not using security in a very serious manner. Caching UGIs can produce