Skip to content

SPARK-1676: Cache Hadoop UGIs by default to prevent FileSystem leak #607

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,28 @@ class SparkHadoopUtil {
val conf: Configuration = newConfiguration()
UserGroupInformation.setConfiguration(conf)

def runAsUser(user: String)(func: () => Unit) {
/** Creates a UserGroupInformation for Spark based on SPARK_USER environment variable. */
def createSparkUser(): Option[UserGroupInformation] = {
val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
if (user != SparkContext.SPARK_UNKNOWN_USER) {
val ugi = UserGroupInformation.createRemoteUser(user)
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
Some(UserGroupInformation.createRemoteUser(user))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my first time looking at this code, so bear with me a little. :-)

I'm not sure what's the objective of calling createRemoteUser() here. What purpose is it serving? Isn't it better to just rely on getCurrentUser() to define the user? Then you wouldn't need SPARK_USER nor SPARK_UNKNOWN_USER.

Unless you want to create a dummy user for the non-kerberos case that is different from the logged in user? I'd say that, in that case, it's better to let users do this in their own code (by wrapping their app in a UGI.doAs() call) instead of building it into Spark.

As for the approach, I think this should work. But to address @pwendell's comments about tokens, there should be code somewhere that's renewing the kerberos ticket (by calling UserGroupInformation.reloginFromKeytab() at appropriate periods). Unfortunately I don't know what the best practices are around this - in our internal code, we just call reloginFromKeytab() periodically as part of our framework for talking to Hadoop services (so individual clients don't need to worry about it), and that seems to work fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what's the objective of calling createRemoteUser() here.

In non-secure mode, the process runs as "yarn", but needs to interact with HDFS as the app user.

But to address @pwendell's comments about tokens, there should be code somewhere that's renewing the kerberos ticket (by calling UserGroupInformation.reloginFromKeytab() at appropriate periods).

On YARN, neither the driver nor the container will necessarily have keytabs. They authenticate using delegation tokens, which currently don't get replaced.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I noticed later that this is mainly talking about the executors (d'oh).

} else {
func()
None
}
}

/**
* If a user is specified, we will run the function as that user. We additionally transfer
* Spark's tokens to the given UGI to ensure it has access to data written by Spark.
*/
def runAsUser(user: Option[UserGroupInformation])(func: () => Unit) {
user match {
case Some(ugi) => {
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})}
case None => func()
}
}

Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,13 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
// NB: Workaround for SPARK-1676. Caching UGIs prevents continuously creating FileSystem
// objects with "unique" UGIs, but is not a good solution if real UGIs and tokens are needed,
// mainly because expired tokens cannot be removed from the UGI.
val cacheUgi = conf.getBoolean("spark.user.cacheUserGroupInformation", true)

val cachedSparkUser = SparkHadoopUtil.get.createSparkUser()
def getSparkUser = if (cacheUgi) cachedSparkUser else SparkHadoopUtil.get.createSparkUser()

def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask)
Expand Down Expand Up @@ -172,7 +178,7 @@ private[spark] class Executor(
}
}

override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
override def run(): Unit = SparkHadoopUtil.get.runAsUser(getSparkUser) { () =>
val startTime = System.currentTimeMillis()
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
Expand Down
11 changes: 11 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,17 @@ Apart from these, the following properties are also available, and may be useful
Set a special library path to use when launching executor JVM's.
</td>
</tr>
<tr>
<td>spark.user.cacheUserGroupInformation</td>
<td>true</td>
<td>
Caching UGIs is a workaround for [SPARK-1676](https://issues.apache.org/jira/browse/SPARK-1676)
for users who are not using security in a very serious manner. Caching UGIs can produce
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sort of implies that we can actually break the hadoop security model if this is enabled, but I don't think that's true. I just think that some long-running jobs might have trouble with token renewal.

security-related exceptions when tokens have an expiry, or are shared between users. On the other
hand, not caching UGIs means that every FileSystem.get() call can potentially create and cache a
new FileSystem object, which leads to leaked memory and file descriptors.
</td>
</tr>

</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)
private val sparkUser = SparkHadoopUtil.createSparkUser()

def run() {
// Setup the directories so things go to yarn approved directories rather
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)
private val sparkUser = SparkHadoopUtil.createSparkUser()

def run() {
// Setup the directories so things go to YARN approved directories rather
Expand Down