Skip to content

Commit 0f947f1

Browse files
committed
[SPARK-2886] Use more specific actor system name than "spark"
As of #1777 we log the name of the actor system when it binds to a port. The current name "spark" is super general and does not convey any meaning. For instance, the following line is taken from my driver log after setting `spark.driver.port` to 5001. ``` 14/08/13 19:33:29 INFO Remoting: Remoting started; listening on addresses: [akka.tcp://sparkandrews-mbp:5001] 14/08/13 19:33:29 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkandrews-mbp:5001] 14/08/06 13:40:05 INFO Utils: Successfully started service 'spark' on port 5001. ``` This commit renames this to "sparkDriver" and "sparkExecutor". The goal of this unambitious PR is simply to make the logged information more explicit without introducing any change in functionality. Author: Andrew Or <[email protected]> Closes #1810 from andrewor14/service-name and squashes the following commits: 8c459ed [Andrew Or] Use a common variable for driver/executor actor system names 3a92843 [Andrew Or] Change actor name to sparkDriver and sparkExecutor 921363e [Andrew Or] Merge branch 'master' of github.com:apache/spark into service-name c8c6a62 [Andrew Or] Do not include hyphens in actor name 1c1b42e [Andrew Or] Avoid spaces in akka system name f644b55 [Andrew Or] Use more specific service name (cherry picked from commit b21ae5b) Signed-off-by: Andrew Or <[email protected]>
1 parent 48a0749 commit 0f947f1

File tree

10 files changed

+47
-29
lines changed

10 files changed

+47
-29
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ object SparkEnv extends Logging {
111111
private val env = new ThreadLocal[SparkEnv]
112112
@volatile private var lastSetSparkEnv : SparkEnv = _
113113

114+
private[spark] val driverActorSystemName = "sparkDriver"
115+
private[spark] val executorActorSystemName = "sparkExecutor"
116+
114117
def set(e: SparkEnv) {
115118
lastSetSparkEnv = e
116119
env.set(e)
@@ -146,9 +149,9 @@ object SparkEnv extends Logging {
146149
}
147150

148151
val securityManager = new SecurityManager(conf)
149-
150-
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
151-
securityManager = securityManager)
152+
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
153+
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
154+
actorSystemName, hostname, port, conf, securityManager)
152155

153156
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
154157
// This is so that we tell the executors the correct port to connect to.

core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster
2020
import org.apache.hadoop.conf.Configuration
2121
import org.apache.hadoop.fs.{Path, FileSystem}
2222

23-
import org.apache.spark.{Logging, SparkContext}
23+
import org.apache.spark.{Logging, SparkContext, SparkEnv}
2424
import org.apache.spark.scheduler.TaskSchedulerImpl
2525

2626
private[spark] class SimrSchedulerBackend(
@@ -38,8 +38,10 @@ private[spark] class SimrSchedulerBackend(
3838
override def start() {
3939
super.start()
4040

41-
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
42-
sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"),
41+
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
42+
SparkEnv.driverActorSystemName,
43+
sc.conf.get("spark.driver.host"),
44+
sc.conf.get("spark.driver.port"),
4345
CoarseGrainedSchedulerBackend.ACTOR_NAME)
4446

4547
val conf = new Configuration()

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.scheduler.cluster
1919

20-
import org.apache.spark.{Logging, SparkConf, SparkContext}
20+
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
2121
import org.apache.spark.deploy.{ApplicationDescription, Command}
2222
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
2323
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
@@ -42,8 +42,10 @@ private[spark] class SparkDeploySchedulerBackend(
4242
super.start()
4343

4444
// The endpoint for executors to talk to us
45-
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
46-
conf.get("spark.driver.host"), conf.get("spark.driver.port"),
45+
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
46+
SparkEnv.driverActorSystemName,
47+
conf.get("spark.driver.host"),
48+
conf.get("spark.driver.port"),
4749
CoarseGrainedSchedulerBackend.ACTOR_NAME)
4850
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
4951
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.mesos.{Scheduler => MScheduler}
2828
import org.apache.mesos._
2929
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
3030

31-
import org.apache.spark.{Logging, SparkContext, SparkException}
31+
import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
3232
import org.apache.spark.scheduler.TaskSchedulerImpl
3333
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3434

@@ -130,7 +130,8 @@ private[spark] class CoarseMesosSchedulerBackend(
130130
}
131131
val command = CommandInfo.newBuilder()
132132
.setEnvironment(environment)
133-
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
133+
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
134+
SparkEnv.driverActorSystemName,
134135
conf.get("spark.driver.host"),
135136
conf.get("spark.driver.port"),
136137
CoarseGrainedSchedulerBackend.ACTOR_NAME)

core/src/main/scala/org/apache/spark/util/AkkaUtils.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import akka.pattern.ask
2727
import com.typesafe.config.ConfigFactory
2828
import org.apache.log4j.{Level, Logger}
2929

30-
import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf}
30+
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}
3131

3232
/**
3333
* Various utility classes for working with Akka.
@@ -192,10 +192,11 @@ private[spark] object AkkaUtils extends Logging {
192192
}
193193

194194
def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): ActorRef = {
195+
val driverActorSystemName = SparkEnv.driverActorSystemName
195196
val driverHost: String = conf.get("spark.driver.host", "localhost")
196197
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
197198
Utils.checkHost(driverHost, "Expected hostname")
198-
val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
199+
val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name"
199200
val timeout = AkkaUtils.lookupTimeout(conf)
200201
logInfo(s"Connecting to $name: $url")
201202
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,21 @@ package org.apache.spark.streaming.receiver
2020
import java.nio.ByteBuffer
2121
import java.util.concurrent.atomic.AtomicLong
2222

23-
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
23+
import scala.collection.mutable.ArrayBuffer
2424
import scala.concurrent.Await
2525

2626
import akka.actor.{Actor, Props}
2727
import akka.pattern.ask
2828

29+
import com.google.common.base.Throwables
30+
2931
import org.apache.spark.{Logging, SparkEnv}
30-
import org.apache.spark.storage.StreamBlockId
3132
import org.apache.spark.streaming.scheduler._
3233
import org.apache.spark.util.{Utils, AkkaUtils}
3334
import org.apache.spark.storage.StreamBlockId
3435
import org.apache.spark.streaming.scheduler.DeregisterReceiver
3536
import org.apache.spark.streaming.scheduler.AddBlock
36-
import scala.Some
3737
import org.apache.spark.streaming.scheduler.RegisterReceiver
38-
import com.google.common.base.Throwables
3938

4039
/**
4140
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
@@ -56,7 +55,8 @@ private[streaming] class ReceiverSupervisorImpl(
5655
private val trackerActor = {
5756
val ip = env.conf.get("spark.driver.host", "localhost")
5857
val port = env.conf.getInt("spark.driver.port", 7077)
59-
val url = "akka.tcp://spark@%s:%s/user/ReceiverTracker".format(ip, port)
58+
val url = "akka.tcp://%s@%s:%s/user/ReceiverTracker".format(
59+
SparkEnv.driverActorSystemName, ip, port)
6060
env.actorSystem.actorSelection(url)
6161
}
6262

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
2828
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
2929
import akka.actor._
3030
import akka.remote._
31-
import org.apache.spark.{Logging, SecurityManager, SparkConf}
31+
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
3232
import org.apache.spark.util.{Utils, AkkaUtils}
3333
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3434
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
@@ -210,8 +210,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
210210
sparkConf.set("spark.driver.host", driverHost)
211211
sparkConf.set("spark.driver.port", driverPort.toString)
212212

213-
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
214-
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
213+
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
214+
SparkEnv.driverActorSystemName,
215+
driverHost,
216+
driverPort.toString,
217+
CoarseGrainedSchedulerBackend.ACTOR_NAME)
215218

216219
actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
217220
}

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.collection
2626
import scala.collection.JavaConversions._
2727
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
2828

29-
import org.apache.spark.{Logging, SparkConf}
29+
import org.apache.spark.{Logging, SparkConf, SparkEnv}
3030
import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
3131
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3232
import org.apache.spark.util.Utils
@@ -245,8 +245,10 @@ private[yarn] class YarnAllocationHandler(
245245
// Deallocate + allocate can result in reusing id's wrongly - so use a different counter
246246
// (executorIdCounter)
247247
val executorId = executorIdCounter.incrementAndGet().toString
248-
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
249-
sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
248+
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
249+
SparkEnv.driverActorSystemName,
250+
sparkConf.get("spark.driver.host"),
251+
sparkConf.get("spark.driver.port"),
250252
CoarseGrainedSchedulerBackend.ACTOR_NAME)
251253

252254
logInfo("launching container on " + containerId + " host " + executorHostname)

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
2525
import org.apache.hadoop.yarn.conf.YarnConfiguration
2626
import akka.actor._
2727
import akka.remote._
28-
import org.apache.spark.{Logging, SecurityManager, SparkConf}
28+
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
2929
import org.apache.spark.util.{Utils, AkkaUtils}
3030
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3131
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
@@ -174,8 +174,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
174174
sparkConf.set("spark.driver.host", driverHost)
175175
sparkConf.set("spark.driver.port", driverPort.toString)
176176

177-
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
178-
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
177+
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
178+
SparkEnv.driverActorSystemName,
179+
driverHost,
180+
driverPort.toString,
181+
CoarseGrainedSchedulerBackend.ACTOR_NAME)
179182

180183
actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
181184
}

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.collection
2626
import scala.collection.JavaConversions._
2727
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
2828

29-
import org.apache.spark.{Logging, SparkConf}
29+
import org.apache.spark.{Logging, SparkConf, SparkEnv}
3030
import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
3131
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3232
import org.apache.spark.util.Utils
@@ -262,7 +262,8 @@ private[yarn] class YarnAllocationHandler(
262262
numExecutorsRunning.decrementAndGet()
263263
} else {
264264
val executorId = executorIdCounter.incrementAndGet().toString
265-
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
265+
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
266+
SparkEnv.driverActorSystemName,
266267
sparkConf.get("spark.driver.host"),
267268
sparkConf.get("spark.driver.port"),
268269
CoarseGrainedSchedulerBackend.ACTOR_NAME)

0 commit comments

Comments
 (0)