Skip to content

Commit 3d0a02d

Browse files
tgravescsaarondav
authored andcommitted
[WIP] SPARK-1676: Cache Hadoop UGIs by default to prevent FileSystem leak
Move the doAs in Executor higher up so that we only have 1 ugi and aren't leaking filesystems. Fix spark on yarn to work when the cluster is running as user "yarn" but the clients are launched as the user and want to read/write to hdfs as the user. Note this hasn't been fully tested yet. Need to test in standalone mode. Putting this up for people to look at and possibly test. I don't have access to a mesos cluster. This is alternative to #607 Author: Thomas Graves <[email protected]> Closes #621 from tgravescs/SPARK-1676 and squashes the following commits: 244d55a [Thomas Graves] fix line length 44163d4 [Thomas Graves] Rework 9398853 [Thomas Graves] change to have doAs in executor higher up.
1 parent 9347565 commit 3d0a02d

File tree

8 files changed

+69
-46
lines changed

8 files changed

+69
-46
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,36 @@ import org.apache.hadoop.mapred.JobConf
2424
import org.apache.hadoop.security.Credentials
2525
import org.apache.hadoop.security.UserGroupInformation
2626

27-
import org.apache.spark.{SparkContext, SparkException}
27+
import org.apache.spark.{Logging, SparkContext, SparkException}
2828

2929
import scala.collection.JavaConversions._
3030

3131
/**
3232
* Contains util methods to interact with Hadoop from Spark.
3333
*/
34-
class SparkHadoopUtil {
34+
class SparkHadoopUtil extends Logging {
3535
val conf: Configuration = newConfiguration()
3636
UserGroupInformation.setConfiguration(conf)
3737

38-
def runAsUser(user: String)(func: () => Unit) {
38+
/**
39+
* Runs the given function with a Hadoop UserGroupInformation as a thread local variable
40+
* (distributed to child threads), used for authenticating HDFS and YARN calls.
41+
*
42+
* IMPORTANT NOTE: If this function is going to be called repeated in the same process
43+
* you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly
44+
* do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
45+
*/
46+
def runAsSparkUser(func: () => Unit) {
47+
val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
3948
if (user != SparkContext.SPARK_UNKNOWN_USER) {
49+
logDebug("running as user: " + user)
4050
val ugi = UserGroupInformation.createRemoteUser(user)
4151
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
4252
ugi.doAs(new PrivilegedExceptionAction[Unit] {
4353
def run: Unit = func()
4454
})
4555
} else {
56+
logDebug("running as SPARK_UNKNOWN_USER")
4657
func()
4758
}
4859
}

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ import java.nio.ByteBuffer
2222
import akka.actor._
2323
import akka.remote._
2424

25-
import org.apache.spark.{SecurityManager, SparkConf, Logging}
25+
import org.apache.spark.{Logging, SecurityManager, SparkConf}
2626
import org.apache.spark.TaskState.TaskState
27+
import org.apache.spark.deploy.SparkHadoopUtil
2728
import org.apache.spark.deploy.worker.WorkerWatcher
2829
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
2930
import org.apache.spark.util.{AkkaUtils, Utils}
@@ -94,25 +95,30 @@ private[spark] class CoarseGrainedExecutorBackend(
9495

9596
private[spark] object CoarseGrainedExecutorBackend {
9697
def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
97-
workerUrl: Option[String]) {
98-
// Debug code
99-
Utils.checkHost(hostname)
100-
101-
val conf = new SparkConf
102-
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
103-
// before getting started with all our system properties, etc
104-
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
105-
indestructible = true, conf = conf, new SecurityManager(conf))
106-
// set it
107-
val sparkHostPort = hostname + ":" + boundPort
108-
actorSystem.actorOf(
109-
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
110-
sparkHostPort, cores),
111-
name = "Executor")
112-
workerUrl.foreach{ url =>
113-
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
98+
workerUrl: Option[String]) {
99+
100+
SparkHadoopUtil.get.runAsSparkUser { () =>
101+
// Debug code
102+
Utils.checkHost(hostname)
103+
104+
val conf = new SparkConf
105+
// Create a new ActorSystem to run the backend, because we can't create a
106+
// SparkEnv / Executor before getting started with all our system properties, etc
107+
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
108+
indestructible = true, conf = conf, new SecurityManager(conf))
109+
// set it
110+
val sparkHostPort = hostname + ":" + boundPort
111+
actorSystem.actorOf(
112+
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
113+
sparkHostPort, cores),
114+
name = "Executor")
115+
workerUrl.foreach {
116+
url =>
117+
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
118+
}
119+
actorSystem.awaitTermination()
120+
114121
}
115-
actorSystem.awaitTermination()
116122
}
117123

118124
def main(args: Array[String]) {

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,6 @@ private[spark] class Executor(
128128
// Maintains the list of running tasks.
129129
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
130130

131-
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
132-
133131
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
134132
val tr = new TaskRunner(context, taskId, serializedTask)
135133
runningTasks.put(taskId, tr)
@@ -172,7 +170,7 @@ private[spark] class Executor(
172170
}
173171
}
174172

175-
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
173+
override def run() {
176174
val startTime = System.currentTimeMillis()
177175
SparkEnv.set(env)
178176
Thread.currentThread.setContextClassLoader(replClassLoader)

core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ import com.google.protobuf.ByteString
2323
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
2424
import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
2525

26-
import org.apache.spark.Logging
27-
import org.apache.spark.TaskState
26+
import org.apache.spark.{Logging, TaskState}
2827
import org.apache.spark.TaskState.TaskState
2928
import org.apache.spark.util.Utils
29+
import org.apache.spark.deploy.SparkHadoopUtil
3030

3131
private[spark] class MesosExecutorBackend
3232
extends MesosExecutor
@@ -95,9 +95,11 @@ private[spark] class MesosExecutorBackend
9595
*/
9696
private[spark] object MesosExecutorBackend {
9797
def main(args: Array[String]) {
98-
MesosNativeLibrary.load()
99-
// Create a new Executor and start it running
100-
val runner = new MesosExecutorBackend()
101-
new MesosExecutorDriver(runner).run()
98+
SparkHadoopUtil.get.runAsSparkUser { () =>
99+
MesosNativeLibrary.load()
100+
// Create a new Executor and start it running
101+
val runner = new MesosExecutorBackend()
102+
new MesosExecutorDriver(runner).run()
103+
}
102104
}
103105
}

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
7070

7171
private var registered = false
7272

73-
private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
74-
SparkContext.SPARK_UNKNOWN_USER)
75-
7673
def run() {
7774
// Setup the directories so things go to yarn approved directories rather
7875
// then user specified and /tmp.
@@ -192,7 +189,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
192189
false /* initialize */ ,
193190
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
194191
val t = new Thread {
195-
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
192+
override def run() {
193+
196194
var successed = false
197195
try {
198196
// Copy
@@ -480,6 +478,8 @@ object ApplicationMaster {
480478

481479
def main(argStrings: Array[String]) {
482480
val args = new ApplicationMasterArguments(argStrings)
483-
new ApplicationMaster(args).run()
481+
SparkHadoopUtil.get.runAsSparkUser { () =>
482+
new ApplicationMaster(args).run()
483+
}
484484
}
485485
}

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
2929
import akka.actor._
3030
import akka.remote._
3131
import akka.actor.Terminated
32-
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
32+
import org.apache.spark.{Logging, SecurityManager, SparkConf}
3333
import org.apache.spark.util.{Utils, AkkaUtils}
3434
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3535
import org.apache.spark.scheduler.SplitInfo
36+
import org.apache.spark.deploy.SparkHadoopUtil
3637

3738
/**
3839
* An application master that allocates executors on behalf of a driver that is running outside
@@ -279,6 +280,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
279280
object ExecutorLauncher {
280281
def main(argStrings: Array[String]) {
281282
val args = new ApplicationMasterArguments(argStrings)
282-
new ExecutorLauncher(args).run()
283+
SparkHadoopUtil.get.runAsSparkUser { () =>
284+
new ExecutorLauncher(args).run()
285+
}
283286
}
284287
}

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
7171
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
7272

7373
private var registered = false
74-
75-
private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
76-
SparkContext.SPARK_UNKNOWN_USER)
7774

7875
def run() {
7976
// Setup the directories so things go to YARN approved directories rather
@@ -179,8 +176,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
179176
false /* initialize */ ,
180177
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
181178
val t = new Thread {
182-
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
183-
var successed = false
179+
override def run() {
180+
181+
var successed = false
184182
try {
185183
// Copy
186184
var mainArgs: Array[String] = new Array[String](args.userArgs.size)
@@ -462,6 +460,8 @@ object ApplicationMaster {
462460

463461
def main(argStrings: Array[String]) {
464462
val args = new ApplicationMasterArguments(argStrings)
465-
new ApplicationMaster(args).run()
463+
SparkHadoopUtil.get.runAsSparkUser { () =>
464+
new ApplicationMaster(args).run()
465+
}
466466
}
467467
}

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
2828
import akka.actor._
2929
import akka.remote._
3030
import akka.actor.Terminated
31-
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
31+
import org.apache.spark.{Logging, SecurityManager, SparkConf}
3232
import org.apache.spark.util.{Utils, AkkaUtils}
3333
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3434
import org.apache.spark.scheduler.SplitInfo
3535
import org.apache.hadoop.yarn.client.api.AMRMClient
3636
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
37+
import org.apache.spark.deploy.SparkHadoopUtil
3738

3839
/**
3940
* An application master that allocates executors on behalf of a driver that is running outside
@@ -255,6 +256,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
255256
object ExecutorLauncher {
256257
def main(argStrings: Array[String]) {
257258
val args = new ApplicationMasterArguments(argStrings)
258-
new ExecutorLauncher(args).run()
259+
SparkHadoopUtil.get.runAsSparkUser { () =>
260+
new ExecutorLauncher(args).run()
261+
}
259262
}
260263
}

0 commit comments

Comments
 (0)