Skip to content

Commit 6dbd220

Browse files
committed
Merge pull request alteryx#34 from kayousterhout/rename
Renamed StandaloneX to CoarseGrainedX. (as suggested by @rxin here https://github.com/apache/incubator-spark/pull/14) The previous names were confusing because the components weren't just used in Standalone mode. The scheduler used for Standalone mode is called SparkDeploySchedulerBackend, so referring to the base class as StandaloneSchedulerBackend was misleading.
2 parents 983b83f + f95a2be commit 6dbd220

File tree

6 files changed

+42
-36
lines changed

6 files changed

+42
-36
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ import org.apache.spark.deploy.LocalSparkCluster
5656
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
5757
import org.apache.spark.rdd._
5858
import org.apache.spark.scheduler._
59-
import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
59+
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend,
6060
ClusterScheduler}
6161
import org.apache.spark.scheduler.local.LocalScheduler
6262
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
@@ -213,7 +213,7 @@ class SparkContext(
213213
throw new SparkException("YARN mode not available ?", th)
214214
}
215215
}
216-
val backend = new StandaloneSchedulerBackend(scheduler, this.env.actorSystem)
216+
val backend = new CoarseGrainedSchedulerBackend(scheduler, this.env.actorSystem)
217217
scheduler.initialize(backend)
218218
scheduler
219219

core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala renamed to core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClie
2424

2525
import org.apache.spark.{Logging, SparkEnv}
2626
import org.apache.spark.TaskState.TaskState
27-
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
27+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
2828
import org.apache.spark.util.{Utils, AkkaUtils}
2929

3030

31-
private[spark] class StandaloneExecutorBackend(
31+
private[spark] class CoarseGrainedExecutorBackend(
3232
driverUrl: String,
3333
executorId: String,
3434
hostPort: String,
@@ -87,7 +87,7 @@ private[spark] class StandaloneExecutorBackend(
8787
}
8888
}
8989

90-
private[spark] object StandaloneExecutorBackend {
90+
private[spark] object CoarseGrainedExecutorBackend {
9191
def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
9292
// Debug code
9393
Utils.checkHost(hostname)
@@ -99,15 +99,17 @@ private[spark] object StandaloneExecutorBackend {
9999
val sparkHostPort = hostname + ":" + boundPort
100100
System.setProperty("spark.hostPort", sparkHostPort)
101101
val actor = actorSystem.actorOf(
102-
Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
102+
Props(new CoarseGrainedExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
103103
name = "Executor")
104104
actorSystem.awaitTermination()
105105
}
106106

107107
def main(args: Array[String]) {
108108
if (args.length < 4) {
109109
//the reason we allow the last frameworkId argument is to make it easy to kill rogue executors
110-
System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <executorId> <hostname> <cores> [<appid>]")
110+
System.err.println(
111+
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> <cores> " +
112+
"[<appid>]")
111113
System.exit(1)
112114
}
113115
run(args(0), args(1), args(2), args(3).toInt)

core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala renamed to core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,28 +24,28 @@ import org.apache.spark.scheduler.TaskDescription
2424
import org.apache.spark.util.{Utils, SerializableBuffer}
2525

2626

27-
private[spark] sealed trait StandaloneClusterMessage extends Serializable
27+
private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
2828

29-
private[spark] object StandaloneClusterMessages {
29+
private[spark] object CoarseGrainedClusterMessages {
3030

3131
// Driver to executors
32-
case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
32+
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
3333

34-
case class KillTask(taskId: Long, executor: String) extends StandaloneClusterMessage
34+
case class KillTask(taskId: Long, executor: String) extends CoarseGrainedClusterMessage
3535

3636
case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
37-
extends StandaloneClusterMessage
37+
extends CoarseGrainedClusterMessage
3838

39-
case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
39+
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
4040

4141
// Executors to driver
4242
case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
43-
extends StandaloneClusterMessage {
43+
extends CoarseGrainedClusterMessage {
4444
Utils.checkHostPort(hostPort, "Expected host port")
4545
}
4646

4747
case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
48-
data: SerializableBuffer) extends StandaloneClusterMessage
48+
data: SerializableBuffer) extends CoarseGrainedClusterMessage
4949

5050
object StatusUpdate {
5151
/** Alternate factory method that takes a ByteBuffer directly for the data field */
@@ -56,10 +56,10 @@ private[spark] object StandaloneClusterMessages {
5656
}
5757

5858
// Internal messages in driver
59-
case object ReviveOffers extends StandaloneClusterMessage
59+
case object ReviveOffers extends CoarseGrainedClusterMessage
6060

61-
case object StopDriver extends StandaloneClusterMessage
61+
case object StopDriver extends CoarseGrainedClusterMessage
6262

63-
case class RemoveExecutor(executorId: String, reason: String) extends StandaloneClusterMessage
63+
case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
6464

6565
}

core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala renamed to core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,19 @@ import akka.util.duration._
3030

3131
import org.apache.spark.{SparkException, Logging, TaskState}
3232
import org.apache.spark.scheduler.TaskDescription
33-
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
33+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
3434
import org.apache.spark.util.Utils
3535

3636
/**
37-
* A standalone scheduler backend, which waits for standalone executors to connect to it through
38-
* Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained
39-
* Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
37+
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
38+
* This backend holds onto each executor for the duration of the Spark job rather than relinquishing
39+
* executors whenever a task is done and asking the scheduler to launch a new executor for
40+
* each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the
41+
* coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode
42+
* (spark.deploy.*).
4043
*/
4144
private[spark]
42-
class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
45+
class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
4346
extends SchedulerBackend with Logging
4447
{
4548
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
@@ -162,7 +165,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
162165
}
163166
}
164167
driverActor = actorSystem.actorOf(
165-
Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
168+
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
166169
}
167170

168171
private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
@@ -202,6 +205,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
202205
}
203206
}
204207

205-
private[spark] object StandaloneSchedulerBackend {
206-
val ACTOR_NAME = "StandaloneScheduler"
208+
private[spark] object CoarseGrainedSchedulerBackend {
209+
val ACTOR_NAME = "CoarseGrainedScheduler"
207210
}

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ private[spark] class SparkDeploySchedulerBackend(
2828
sc: SparkContext,
2929
masters: Array[String],
3030
appName: String)
31-
extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
31+
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
3232
with ClientListener
3333
with Logging {
3434

@@ -44,10 +44,10 @@ private[spark] class SparkDeploySchedulerBackend(
4444
// The endpoint for executors to talk to us
4545
val driverUrl = "akka://spark@%s:%s/user/%s".format(
4646
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
47-
StandaloneSchedulerBackend.ACTOR_NAME)
47+
CoarseGrainedSchedulerBackend.ACTOR_NAME)
4848
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
4949
val command = Command(
50-
"org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
50+
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
5151
val sparkHome = sc.getSparkHome().getOrElse(null)
5252
val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
5353
"http://" + sc.ui.appUIAddress)

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,14 @@ import org.apache.mesos._
3030
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
3131

3232
import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
33-
import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
33+
import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
3434

3535
/**
3636
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
3737
* onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
3838
* a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
39-
* StandaloneBackend mechanism. This class is useful for lower and more predictable latency.
39+
* CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
40+
* latency.
4041
*
4142
* Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
4243
* remove this.
@@ -46,7 +47,7 @@ private[spark] class CoarseMesosSchedulerBackend(
4647
sc: SparkContext,
4748
master: String,
4849
appName: String)
49-
extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
50+
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
5051
with MScheduler
5152
with Logging {
5253

@@ -122,20 +123,20 @@ private[spark] class CoarseMesosSchedulerBackend(
122123
val driverUrl = "akka://spark@%s:%s/user/%s".format(
123124
System.getProperty("spark.driver.host"),
124125
System.getProperty("spark.driver.port"),
125-
StandaloneSchedulerBackend.ACTOR_NAME)
126+
CoarseGrainedSchedulerBackend.ACTOR_NAME)
126127
val uri = System.getProperty("spark.executor.uri")
127128
if (uri == null) {
128129
val runScript = new File(sparkHome, "spark-class").getCanonicalPath
129130
command.setValue(
130-
"\"%s\" org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
131+
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
131132
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
132133
} else {
133134
// Grab everything to the first '.'. We'll use that and '*' to
134135
// glob the directory "correctly".
135136
val basename = uri.split('/').last.split('.').head
136137
command.setValue(
137-
"cd %s*; ./spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
138-
basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
138+
"cd %s*; ./spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
139+
.format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
139140
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
140141
}
141142
return command.build()

0 commit comments

Comments
 (0)