Skip to content

Commit 9398853

Browse files
committed
change to have doAs in executor higher up.
1 parent 98b6559 commit 9398853

File tree

8 files changed

+72
-43
lines changed

8 files changed

+72
-43
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,30 @@ import org.apache.hadoop.mapred.JobConf
2424
import org.apache.hadoop.security.Credentials
2525
import org.apache.hadoop.security.UserGroupInformation
2626

27-
import org.apache.spark.{SparkContext, SparkException}
27+
import org.apache.spark.{Logging, SparkContext, SparkException}
2828

2929
import scala.collection.JavaConversions._
3030

3131
/**
3232
* Contains util methods to interact with Hadoop from Spark.
3333
*/
34-
class SparkHadoopUtil {
34+
class SparkHadoopUtil extends Logging {
3535
val conf: Configuration = newConfiguration()
3636
UserGroupInformation.setConfiguration(conf)
3737

38+
// IMPORTANT NOTE: If this function is going to be called repeated in the same process
39+
// you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly
40+
// do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
3841
def runAsUser(user: String)(func: () => Unit) {
3942
if (user != SparkContext.SPARK_UNKNOWN_USER) {
43+
logInfo("running as user: " + user)
4044
val ugi = UserGroupInformation.createRemoteUser(user)
4145
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
4246
ugi.doAs(new PrivilegedExceptionAction[Unit] {
4347
def run: Unit = func()
4448
})
4549
} else {
50+
logInfo("running as SPARK_UNKNOWN_USER")
4651
func()
4752
}
4853
}

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ import java.nio.ByteBuffer
2222
import akka.actor._
2323
import akka.remote._
2424

25-
import org.apache.spark.{SecurityManager, SparkConf, Logging}
25+
import org.apache.spark.{SparkContext, Logging, SecurityManager, SparkConf}
2626
import org.apache.spark.TaskState.TaskState
27+
import org.apache.spark.deploy.SparkHadoopUtil
2728
import org.apache.spark.deploy.worker.WorkerWatcher
2829
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
2930
import org.apache.spark.util.{AkkaUtils, Utils}
@@ -94,25 +95,32 @@ private[spark] class CoarseGrainedExecutorBackend(
9495

9596
private[spark] object CoarseGrainedExecutorBackend {
9697
def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
97-
workerUrl: Option[String]) {
98-
// Debug code
99-
Utils.checkHost(hostname)
100-
101-
val conf = new SparkConf
102-
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
103-
// before getting started with all our system properties, etc
104-
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
105-
indestructible = true, conf = conf, new SecurityManager(conf))
106-
// set it
107-
val sparkHostPort = hostname + ":" + boundPort
108-
actorSystem.actorOf(
109-
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
110-
sparkHostPort, cores),
111-
name = "Executor")
112-
workerUrl.foreach{ url =>
113-
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
98+
workerUrl: Option[String]) {
99+
100+
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
101+
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
102+
103+
// Debug code
104+
Utils.checkHost(hostname)
105+
106+
val conf = new SparkConf
107+
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
108+
// before getting started with all our system properties, etc
109+
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
110+
indestructible = true, conf = conf, new SecurityManager(conf))
111+
// set it
112+
val sparkHostPort = hostname + ":" + boundPort
113+
actorSystem.actorOf(
114+
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
115+
sparkHostPort, cores),
116+
name = "Executor")
117+
workerUrl.foreach {
118+
url =>
119+
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
120+
}
121+
actorSystem.awaitTermination()
122+
114123
}
115-
actorSystem.awaitTermination()
116124
}
117125

118126
def main(args: Array[String]) {

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,6 @@ private[spark] class Executor(
128128
// Maintains the list of running tasks.
129129
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
130130

131-
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
132-
133131
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
134132
val tr = new TaskRunner(context, taskId, serializedTask)
135133
runningTasks.put(taskId, tr)
@@ -172,7 +170,7 @@ private[spark] class Executor(
172170
}
173171
}
174172

175-
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
173+
override def run() {
176174
val startTime = System.currentTimeMillis()
177175
SparkEnv.set(env)
178176
Thread.currentThread.setContextClassLoader(replClassLoader)

core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ import com.google.protobuf.ByteString
2323
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
2424
import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
2525

26-
import org.apache.spark.Logging
27-
import org.apache.spark.TaskState
26+
import org.apache.spark.{SparkContext, Logging, TaskState}
2827
import org.apache.spark.TaskState.TaskState
2928
import org.apache.spark.util.Utils
29+
import org.apache.spark.deploy.SparkHadoopUtil
3030

3131
private[spark] class MesosExecutorBackend
3232
extends MesosExecutor
@@ -95,9 +95,13 @@ private[spark] class MesosExecutorBackend
9595
*/
9696
private[spark] object MesosExecutorBackend {
9797
def main(args: Array[String]) {
98-
MesosNativeLibrary.load()
99-
// Create a new Executor and start it running
100-
val runner = new MesosExecutorBackend()
101-
new MesosExecutorDriver(runner).run()
98+
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
99+
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
100+
101+
MesosNativeLibrary.load()
102+
// Create a new Executor and start it running
103+
val runner = new MesosExecutorBackend()
104+
new MesosExecutorDriver(runner).run()
105+
}
102106
}
103107
}

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
7070

7171
private var registered = false
7272

73-
private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
74-
SparkContext.SPARK_UNKNOWN_USER)
75-
7673
def run() {
7774
// Setup the directories so things go to yarn approved directories rather
7875
// then user specified and /tmp.
@@ -192,7 +189,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
192189
false /* initialize */ ,
193190
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
194191
val t = new Thread {
195-
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
192+
override def run() {
193+
196194
var successed = false
197195
try {
198196
// Copy
@@ -480,6 +478,10 @@ object ApplicationMaster {
480478

481479
def main(argStrings: Array[String]) {
482480
val args = new ApplicationMasterArguments(argStrings)
483-
new ApplicationMaster(args).run()
481+
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
482+
SparkContext.SPARK_UNKNOWN_USER)
483+
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
484+
new ApplicationMaster(args).run()
485+
}
484486
}
485487
}

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
3333
import org.apache.spark.util.{Utils, AkkaUtils}
3434
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3535
import org.apache.spark.scheduler.SplitInfo
36+
import org.apache.spark.deploy.SparkHadoopUtil
3637

3738
/**
3839
* An application master that allocates executors on behalf of a driver that is running outside
@@ -279,6 +280,10 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
279280
object ExecutorLauncher {
280281
def main(argStrings: Array[String]) {
281282
val args = new ApplicationMasterArguments(argStrings)
282-
new ExecutorLauncher(args).run()
283+
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
284+
SparkContext.SPARK_UNKNOWN_USER)
285+
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
286+
new ExecutorLauncher(args).run()
287+
}
283288
}
284289
}

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
7171
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
7272

7373
private var registered = false
74-
75-
private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
76-
SparkContext.SPARK_UNKNOWN_USER)
7774

7875
def run() {
7976
// Setup the directories so things go to YARN approved directories rather
@@ -179,8 +176,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
179176
false /* initialize */ ,
180177
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
181178
val t = new Thread {
182-
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
183-
var successed = false
179+
override def run() {
180+
181+
var successed = false
184182
try {
185183
// Copy
186184
var mainArgs: Array[String] = new Array[String](args.userArgs.size)
@@ -462,6 +460,10 @@ object ApplicationMaster {
462460

463461
def main(argStrings: Array[String]) {
464462
val args = new ApplicationMasterArguments(argStrings)
465-
new ApplicationMaster(args).run()
463+
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
464+
SparkContext.SPARK_UNKNOWN_USER)
465+
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
466+
new ApplicationMaster(args).run()
467+
}
466468
}
467469
}

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3434
import org.apache.spark.scheduler.SplitInfo
3535
import org.apache.hadoop.yarn.client.api.AMRMClient
3636
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
37+
import org.apache.spark.deploy.SparkHadoopUtil
3738

3839
/**
3940
* An application master that allocates executors on behalf of a driver that is running outside
@@ -255,6 +256,10 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
255256
object ExecutorLauncher {
256257
def main(argStrings: Array[String]) {
257258
val args = new ApplicationMasterArguments(argStrings)
258-
new ExecutorLauncher(args).run()
259+
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
260+
SparkContext.SPARK_UNKNOWN_USER)
261+
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
262+
new ExecutorLauncher(args).run()
263+
}
259264
}
260265
}

0 commit comments

Comments
 (0)