Skip to content

SPARK-1676: (branch-0.9 fix) Cache Hadoop UGIs to prevent FileSystem leak #618

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,28 @@ class SparkHadoopUtil {
val conf = newConfiguration()
UserGroupInformation.setConfiguration(conf)

def runAsUser(user: String)(func: () => Unit) {
/** Creates a UserGroupInformation for Spark based on SPARK_USER environment variable. */
def createSparkUser(): Option[UserGroupInformation] = {
val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
if (user != SparkContext.SPARK_UNKNOWN_USER) {
val ugi = UserGroupInformation.createRemoteUser(user)
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
Some(UserGroupInformation.createRemoteUser(user))
} else {
func()
None
}
}

/**
* If a user is specified, we will run the function as that user. We additionally transfer
* Spark's tokens to the given UGI to ensure it has access to data written by Spark.
*/
def runAsUser(user: Option[UserGroupInformation])(func: () => Unit) {
user match {
case Some(ugi) => {
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})}
case None => func()
}
}

Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,14 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
// NB: Workaround for SPARK-1676. Caching UGIs prevents continuously creating FileSystem
// objects with "unique" UGIs, but is not a good solution if real UGIs and tokens are needed,
// mainly because expired tokens cannot be removed from the UGI.
// This behavior is a branch-0.9-specific bug fix. See SPARK-1676 for more information.
val cacheUgi = conf.getBoolean("spark.user.cacheUserGroupInformation", false)

val cachedSparkUser = SparkHadoopUtil.get.createSparkUser()
def getSparkUser = if (cacheUgi) cachedSparkUser else SparkHadoopUtil.get.createSparkUser()

def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask)
Expand Down Expand Up @@ -173,7 +180,7 @@ private[spark] class Executor(
}
}

override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
override def run(): Unit = SparkHadoopUtil.get.runAsUser(getSparkUser) { () =>
val startTime = System.currentTimeMillis()
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,18 @@ Apart from these, the following properties are also available, and may be useful
applications; you can set it through <code>SPARK_JAVA_OPTS</code> in <code>spark-env.sh</code>.
</td>
</tr>
<tr>
<td>spark.user.cacheUserGroupInformation</td>
<td>false</td>
<td>
Caching UGIs is a workaround for [SPARK-1676](https://issues.apache.org/jira/browse/SPARK-1676)
for users who are not using security in a very serious manner. Caching UGIs can produce
security-related exceptions when tokens have an expiry, or are shared between users. On the other
hand, not caching UGIs means that every FileSystem.get() call can potentially create and cache a
new FileSystem object, which leads to leaked memory and file descriptors.
</td>
</tr>

</table>

## Viewing Spark Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)
private val sparkUser = SparkHadoopUtil.createSparkUser()

def run() {
// Setup the directories so things go to yarn approved directories rather
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
math.max(args.numWorkers * 2, 3))

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)

private val sparkUser = SparkHadoopUtil.createSparkUser()

def run() {
// Setup the directories so things go to YARN approved directories rather
Expand Down