Skip to content

Commit 54cd4fd

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-4924
2 parents 61919df + 76389c5 commit 54cd4fd

File tree

269 files changed

+2941
-2999
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

269 files changed

+2941
-2999
lines changed

core/src/main/java/org/apache/spark/TaskContext.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ static void unset() {
6262
*/
6363
public abstract boolean isInterrupted();
6464

65-
/** @deprecated: use isRunningLocally() */
65+
/** @deprecated use {@link #isRunningLocally()} */
6666
@Deprecated
6767
public abstract boolean runningLocally();
6868

@@ -87,19 +87,39 @@ static void unset() {
8787
* is for HadoopRDD to register a callback to close the input stream.
8888
* Will be called in any situation - success, failure, or cancellation.
8989
*
90-
* @deprecated: use addTaskCompletionListener
90+
* @deprecated use {@link #addTaskCompletionListener(scala.Function1)}
9191
*
9292
* @param f Callback function.
9393
*/
9494
@Deprecated
9595
public abstract void addOnCompleteCallback(final Function0<Unit> f);
9696

97+
/**
98+
* The ID of the stage that this task belong to.
99+
*/
97100
public abstract int stageId();
98101

102+
/**
103+
* The ID of the RDD partition that is computed by this task.
104+
*/
99105
public abstract int partitionId();
100106

107+
/**
108+
* How many times this task has been attempted. The first task attempt will be assigned
109+
* attemptNumber = 0, and subsequent attempts will have increasing attempt numbers.
110+
*/
111+
public abstract int attemptNumber();
112+
113+
/** @deprecated use {@link #taskAttemptId()}; it was renamed to avoid ambiguity. */
114+
@Deprecated
101115
public abstract long attemptId();
102116

117+
/**
118+
* An ID that is unique to this task attempt (within the same SparkContext, no two task attempts
119+
* will share the same attempt ID). This is roughly equivalent to Hadoop's TaskAttemptID.
120+
*/
121+
public abstract long taskAttemptId();
122+
103123
/** ::DeveloperApi:: */
104124
@DeveloperApi
105125
public abstract TaskMetrics taskMetrics();

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 77 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ private[spark] class ExecutorAllocationManager(
6565
listenerBus: LiveListenerBus,
6666
conf: SparkConf)
6767
extends Logging {
68+
69+
allocationManager =>
70+
6871
import ExecutorAllocationManager._
6972

7073
// Lower and upper bounds on the number of executors. These are required.
@@ -121,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
121124
private var clock: Clock = new RealClock
122125

123126
// Listener for Spark events that impact the allocation policy
124-
private val listener = new ExecutorAllocationListener(this)
127+
private val listener = new ExecutorAllocationListener
125128

126129
/**
127130
* Verify that the settings specified through the config are valid.
@@ -209,11 +212,12 @@ private[spark] class ExecutorAllocationManager(
209212
addTime += sustainedSchedulerBacklogTimeout * 1000
210213
}
211214

212-
removeTimes.foreach { case (executorId, expireTime) =>
213-
if (now >= expireTime) {
215+
removeTimes.retain { case (executorId, expireTime) =>
216+
val expired = now >= expireTime
217+
if (expired) {
214218
removeExecutor(executorId)
215-
removeTimes.remove(executorId)
216219
}
220+
!expired
217221
}
218222
}
219223

@@ -291,7 +295,7 @@ private[spark] class ExecutorAllocationManager(
291295
// Do not kill the executor if we have already reached the lower bound
292296
val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
293297
if (numExistingExecutors - 1 < minNumExecutors) {
294-
logInfo(s"Not removing idle executor $executorId because there are only " +
298+
logDebug(s"Not removing idle executor $executorId because there are only " +
295299
s"$numExistingExecutors executor(s) left (limit $minNumExecutors)")
296300
return false
297301
}
@@ -315,7 +319,11 @@ private[spark] class ExecutorAllocationManager(
315319
private def onExecutorAdded(executorId: String): Unit = synchronized {
316320
if (!executorIds.contains(executorId)) {
317321
executorIds.add(executorId)
318-
executorIds.foreach(onExecutorIdle)
322+
// If an executor (call this executor X) is not removed because the lower bound
323+
// has been reached, it will no longer be marked as idle. When new executors join,
324+
// however, we are no longer at the lower bound, and so we must mark executor X
325+
// as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
326+
executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
319327
logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
320328
if (numExecutorsPending > 0) {
321329
numExecutorsPending -= 1
@@ -373,10 +381,14 @@ private[spark] class ExecutorAllocationManager(
373381
* the executor is not already marked as idle.
374382
*/
375383
private def onExecutorIdle(executorId: String): Unit = synchronized {
376-
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
377-
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
378-
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
379-
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
384+
if (executorIds.contains(executorId)) {
385+
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
386+
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
387+
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
388+
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
389+
}
390+
} else {
391+
logWarning(s"Attempted to mark unknown executor $executorId idle")
380392
}
381393
}
382394

@@ -396,25 +408,24 @@ private[spark] class ExecutorAllocationManager(
396408
* and consistency of events returned by the listener. For simplicity, it does not account
397409
* for speculated tasks.
398410
*/
399-
private class ExecutorAllocationListener(allocationManager: ExecutorAllocationManager)
400-
extends SparkListener {
411+
private class ExecutorAllocationListener extends SparkListener {
401412

402413
private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
403414
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
404415
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
405416

406417
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
407-
synchronized {
408-
val stageId = stageSubmitted.stageInfo.stageId
409-
val numTasks = stageSubmitted.stageInfo.numTasks
418+
val stageId = stageSubmitted.stageInfo.stageId
419+
val numTasks = stageSubmitted.stageInfo.numTasks
420+
allocationManager.synchronized {
410421
stageIdToNumTasks(stageId) = numTasks
411422
allocationManager.onSchedulerBacklogged()
412423
}
413424
}
414425

415426
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
416-
synchronized {
417-
val stageId = stageCompleted.stageInfo.stageId
427+
val stageId = stageCompleted.stageInfo.stageId
428+
allocationManager.synchronized {
418429
stageIdToNumTasks -= stageId
419430
stageIdToTaskIndices -= stageId
420431

@@ -426,47 +437,62 @@ private[spark] class ExecutorAllocationManager(
426437
}
427438
}
428439

429-
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
440+
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
430441
val stageId = taskStart.stageId
431442
val taskId = taskStart.taskInfo.taskId
432443
val taskIndex = taskStart.taskInfo.index
433444
val executorId = taskStart.taskInfo.executorId
434445

435-
// If this is the last pending task, mark the scheduler queue as empty
436-
stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
437-
val numTasksScheduled = stageIdToTaskIndices(stageId).size
438-
val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1)
439-
if (numTasksScheduled == numTasksTotal) {
440-
// No more pending tasks for this stage
441-
stageIdToNumTasks -= stageId
442-
if (stageIdToNumTasks.isEmpty) {
443-
allocationManager.onSchedulerQueueEmpty()
446+
allocationManager.synchronized {
447+
// This guards against the race condition in which the `SparkListenerTaskStart`
448+
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
449+
// possible because these events are posted in different threads. (see SPARK-4951)
450+
if (!allocationManager.executorIds.contains(executorId)) {
451+
allocationManager.onExecutorAdded(executorId)
452+
}
453+
454+
// If this is the last pending task, mark the scheduler queue as empty
455+
stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
456+
val numTasksScheduled = stageIdToTaskIndices(stageId).size
457+
val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1)
458+
if (numTasksScheduled == numTasksTotal) {
459+
// No more pending tasks for this stage
460+
stageIdToNumTasks -= stageId
461+
if (stageIdToNumTasks.isEmpty) {
462+
allocationManager.onSchedulerQueueEmpty()
463+
}
444464
}
445-
}
446465

447-
// Mark the executor on which this task is scheduled as busy
448-
executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
449-
allocationManager.onExecutorBusy(executorId)
466+
// Mark the executor on which this task is scheduled as busy
467+
executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
468+
allocationManager.onExecutorBusy(executorId)
469+
}
450470
}
451471

452-
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
472+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
453473
val executorId = taskEnd.taskInfo.executorId
454474
val taskId = taskEnd.taskInfo.taskId
455-
456-
// If the executor is no longer running scheduled any tasks, mark it as idle
457-
if (executorIdToTaskIds.contains(executorId)) {
458-
executorIdToTaskIds(executorId) -= taskId
459-
if (executorIdToTaskIds(executorId).isEmpty) {
460-
executorIdToTaskIds -= executorId
461-
allocationManager.onExecutorIdle(executorId)
475+
allocationManager.synchronized {
476+
// If the executor is no longer running scheduled any tasks, mark it as idle
477+
if (executorIdToTaskIds.contains(executorId)) {
478+
executorIdToTaskIds(executorId) -= taskId
479+
if (executorIdToTaskIds(executorId).isEmpty) {
480+
executorIdToTaskIds -= executorId
481+
allocationManager.onExecutorIdle(executorId)
482+
}
462483
}
463484
}
464485
}
465486

466487
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
467488
val executorId = blockManagerAdded.blockManagerId.executorId
468489
if (executorId != SparkContext.DRIVER_IDENTIFIER) {
469-
allocationManager.onExecutorAdded(executorId)
490+
// This guards against the race condition in which the `SparkListenerTaskStart`
491+
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
492+
// possible because these events are posted in different threads. (see SPARK-4951)
493+
if (!allocationManager.executorIds.contains(executorId)) {
494+
allocationManager.onExecutorAdded(executorId)
495+
}
470496
}
471497
}
472498

@@ -478,12 +504,23 @@ private[spark] class ExecutorAllocationManager(
478504
/**
479505
* An estimate of the total number of pending tasks remaining for currently running stages. Does
480506
* not account for tasks which may have failed and been resubmitted.
507+
*
508+
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
481509
*/
482510
def totalPendingTasks(): Int = {
483511
stageIdToNumTasks.map { case (stageId, numTasks) =>
484512
numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
485513
}.sum
486514
}
515+
516+
/**
517+
* Return true if an executor is not currently running a task, and false otherwise.
518+
*
519+
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
520+
*/
521+
def isExecutorIdle(executorId: String): Boolean = {
522+
!executorIdToTaskIds.contains(executorId)
523+
}
487524
}
488525

489526
}

core/src/main/scala/org/apache/spark/HttpFileServer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import com.google.common.io.Files
2424
import org.apache.spark.util.Utils
2525

2626
private[spark] class HttpFileServer(
27+
conf: SparkConf,
2728
securityManager: SecurityManager,
2829
requestedPort: Int = 0)
2930
extends Logging {
@@ -41,7 +42,7 @@ private[spark] class HttpFileServer(
4142
fileDir.mkdir()
4243
jarDir.mkdir()
4344
logInfo("HTTP File server directory is " + baseDir)
44-
httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server")
45+
httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
4546
httpServer.start()
4647
serverUri = httpServer.uri
4748
logDebug("HTTP file server started at: " + serverUri)

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
4242
* around a Jetty server.
4343
*/
4444
private[spark] class HttpServer(
45+
conf: SparkConf,
4546
resourceBase: File,
4647
securityManager: SecurityManager,
4748
requestedPort: Int = 0,
@@ -57,7 +58,7 @@ private[spark] class HttpServer(
5758
} else {
5859
logInfo("Starting HTTP Server")
5960
val (actualServer, actualPort) =
60-
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
61+
Utils.startServiceOnPort[Server](requestedPort, doStart, conf, serverName)
6162
server = actualServer
6263
port = actualPort
6364
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,9 @@ private[spark] object SparkConf {
370370
}
371371

372372
/**
373-
* Return whether the given config is a Spark port config.
373+
* Return true if the given config matches either `spark.*.port` or `spark.port.*`.
374374
*/
375-
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port")
375+
def isSparkPortConf(name: String): Boolean = {
376+
(name.startsWith("spark.") && name.endsWith(".port")) || name.startsWith("spark.port.")
377+
}
376378
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
458458
Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
459459

460460
/** Set a human readable description of the current job. */
461-
@deprecated("use setJobGroup", "0.8.1")
462461
def setJobDescription(value: String) {
463462
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
464463
}

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ object SparkEnv extends Logging {
312312
val httpFileServer =
313313
if (isDriver) {
314314
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
315-
val server = new HttpFileServer(securityManager, fileServerPort)
315+
val server = new HttpFileServer(conf, securityManager, fileServerPort)
316316
server.initialize()
317317
conf.set("spark.fileserver.uri", server.serverUri)
318318
server

core/src/main/scala/org/apache/spark/TaskContextImpl.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,19 @@ import org.apache.spark.util.{TaskCompletionListener, TaskCompletionListenerExce
2222

2323
import scala.collection.mutable.ArrayBuffer
2424

25-
private[spark] class TaskContextImpl(val stageId: Int,
25+
private[spark] class TaskContextImpl(
26+
val stageId: Int,
2627
val partitionId: Int,
27-
val attemptId: Long,
28+
override val taskAttemptId: Long,
29+
override val attemptNumber: Int,
2830
val runningLocally: Boolean = false,
2931
val taskMetrics: TaskMetrics = TaskMetrics.empty)
3032
extends TaskContext
3133
with Logging {
3234

35+
// For backwards-compatibility; this method is now deprecated as of 1.3.0.
36+
override def attemptId: Long = taskAttemptId
37+
3338
// List of callback functions to execute when the task completes.
3439
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]
3540

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ private[broadcast] object HttpBroadcast extends Logging {
153153
private def createServer(conf: SparkConf) {
154154
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
155155
val broadcastPort = conf.getInt("spark.broadcast.port", 0)
156-
server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
156+
server =
157+
new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
157158
server.start()
158159
serverUri = server.uri
159160
logInfo("Broadcast server started at " + serverUri)

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
158158
// Global defaults. These should be keep to minimum to avoid confusing behavior.
159159
master = Option(master).getOrElse("local[*]")
160160

161+
// In YARN mode, app name can be set via SPARK_YARN_APP_NAME (see SPARK-5222)
162+
if (master.startsWith("yarn")) {
163+
name = Option(name).orElse(env.get("SPARK_YARN_APP_NAME")).orNull
164+
}
165+
161166
// Set name from main class if not given
162167
name = Option(name).orElse(Option(mainClass)).orNull
163168
if (name == null && primaryResource != null) {

0 commit comments

Comments
 (0)