Skip to content

Commit 90d94c0

Browse files
committed
merge master
2 parents 9e74ab5 + b2ebf42 commit 90d94c0

File tree

121 files changed

+2154
-1110
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

121 files changed

+2154
-1110
lines changed

core/pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@
7070
<dependency>
7171
<groupId>org.apache.commons</groupId>
7272
<artifactId>commons-math3</artifactId>
73+
<version>3.3</version>
74+
<scope>test</scope>
7375
</dependency>
7476
<dependency>
7577
<groupId>com.google.code.findbugs</groupId>

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,24 @@ span.kill-link {
8787
span.kill-link a {
8888
color: gray;
8989
}
90+
91+
span.expand-details {
92+
font-size: 10pt;
93+
cursor: pointer;
94+
color: grey;
95+
float: right;
96+
}
97+
98+
.stage-details {
99+
max-height: 100px;
100+
overflow-y: auto;
101+
margin: 0;
102+
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
103+
}
104+
105+
.stage-details.collapsed {
106+
max-height: 0;
107+
padding-top: 0;
108+
padding-bottom: 0;
109+
border: none;
110+
}

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,14 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
3232
private val loading = new HashSet[RDDBlockId]()
3333

3434
/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
35-
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext,
35+
def getOrCompute[T](
36+
rdd: RDD[T],
37+
split: Partition,
38+
context: TaskContext,
3639
storageLevel: StorageLevel): Iterator[T] = {
40+
3741
val key = RDDBlockId(rdd.id, split.index)
38-
logDebug("Looking for partition " + key)
42+
logDebug(s"Looking for partition $key")
3943
blockManager.get(key) match {
4044
case Some(values) =>
4145
// Partition is already materialized, so just return its values
@@ -45,7 +49,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4549
// Mark the split as loading (unless someone else marks it first)
4650
loading.synchronized {
4751
if (loading.contains(key)) {
48-
logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
52+
logInfo(s"Another thread is loading $key, waiting for it to finish...")
4953
while (loading.contains(key)) {
5054
try {
5155
loading.wait()
@@ -54,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
5458
logWarning(s"Got an exception while waiting for another thread to load $key", e)
5559
}
5660
}
57-
logInfo("Finished waiting for %s".format(key))
61+
logInfo(s"Finished waiting for $key")
5862
/* See whether someone else has successfully loaded it. The main way this would fail
5963
* is for the RDD-level cache eviction policy if someone else has loaded the same RDD
6064
* partition but we didn't want to make space for it. However, that case is unlikely
@@ -64,7 +68,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
6468
case Some(values) =>
6569
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
6670
case None =>
67-
logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
71+
logInfo(s"Whoever was loading $key failed; we'll try it ourselves")
6872
loading.add(key)
6973
}
7074
} else {
@@ -73,7 +77,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
7377
}
7478
try {
7579
// If we got here, we have to load the split
76-
logInfo("Partition %s not found, computing it".format(key))
80+
logInfo(s"Partition $key not found, computing it")
7781
val computedValues = rdd.computeOrReadCheckpoint(split, context)
7882

7983
// Persist the result, so long as the task is not running locally
@@ -97,8 +101,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
97101
case Some(values) =>
98102
values.asInstanceOf[Iterator[T]]
99103
case None =>
100-
logInfo("Failure to store %s".format(key))
101-
throw new Exception("Block manager failed to return persisted valued")
104+
logInfo(s"Failure to store $key")
105+
throw new SparkException("Block manager failed to return persisted value")
102106
}
103107
} else {
104108
// In this case the RDD is cached to an array buffer. This will save the results

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
4949
import org.apache.spark.scheduler.local.LocalBackend
5050
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
5151
import org.apache.spark.ui.SparkUI
52-
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
52+
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
5353

5454
/**
5555
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -434,12 +434,21 @@ class SparkContext(config: SparkConf) extends Logging {
434434

435435
// Methods for creating RDDs
436436

437-
/** Distribute a local Scala collection to form an RDD. */
437+
/** Distribute a local Scala collection to form an RDD.
438+
*
439+
* @note Parallelize acts lazily. If `seq` is a mutable collection and is
440+
* altered after the call to parallelize and before the first action on the
441+
* RDD, the resultant RDD will reflect the modified collection. Pass a copy of
442+
* the argument to avoid this.
443+
*/
438444
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
439445
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
440446
}
441447

442-
/** Distribute a local Scala collection to form an RDD. */
448+
/** Distribute a local Scala collection to form an RDD.
449+
*
450+
* This method is identical to `parallelize`.
451+
*/
443452
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
444453
parallelize(seq, numSlices)
445454
}
@@ -1027,9 +1036,11 @@ class SparkContext(config: SparkConf) extends Logging {
10271036
* Capture the current user callsite and return a formatted version for printing. If the user
10281037
* has overridden the call site, this will return the user's version.
10291038
*/
1030-
private[spark] def getCallSite(): String = {
1031-
val defaultCallSite = Utils.getCallSiteInfo
1032-
Option(getLocalProperty("externalCallSite")).getOrElse(defaultCallSite.toString)
1039+
private[spark] def getCallSite(): CallSite = {
1040+
Option(getLocalProperty("externalCallSite")) match {
1041+
case Some(callSite) => CallSite(callSite, long = "")
1042+
case None => Utils.getCallSite
1043+
}
10331044
}
10341045

10351046
/**
@@ -1049,11 +1060,11 @@ class SparkContext(config: SparkConf) extends Logging {
10491060
}
10501061
val callSite = getCallSite
10511062
val cleanedFunc = clean(func)
1052-
logInfo("Starting job: " + callSite)
1063+
logInfo("Starting job: " + callSite.short)
10531064
val start = System.nanoTime
10541065
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
10551066
resultHandler, localProperties.get)
1056-
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
1067+
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
10571068
rdd.doCheckpoint()
10581069
}
10591070

@@ -1134,11 +1145,11 @@ class SparkContext(config: SparkConf) extends Logging {
11341145
evaluator: ApproximateEvaluator[U, R],
11351146
timeout: Long): PartialResult[R] = {
11361147
val callSite = getCallSite
1137-
logInfo("Starting job: " + callSite)
1148+
logInfo("Starting job: " + callSite.short)
11381149
val start = System.nanoTime
11391150
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
11401151
localProperties.get)
1141-
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
1152+
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
11421153
result
11431154
}
11441155

core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.spark.api.java
1919

20+
import java.util.Comparator
21+
2022
import scala.language.implicitConversions
2123
import scala.reflect.ClassTag
2224

2325
import org.apache.spark._
26+
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
2427
import org.apache.spark.api.java.function.{Function => JFunction}
2528
import org.apache.spark.rdd.RDD
2629
import org.apache.spark.storage.StorageLevel
@@ -172,6 +175,19 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
172175
rdd.setName(name)
173176
this
174177
}
178+
179+
/**
180+
* Return this RDD sorted by the given key function.
181+
*/
182+
def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = {
183+
import scala.collection.JavaConverters._
184+
def fn = (x: T) => f.call(x)
185+
import com.google.common.collect.Ordering // shadows scala.math.Ordering
186+
implicit val ordering = Ordering.natural().asInstanceOf[Ordering[S]]
187+
implicit val ctag: ClassTag[S] = fakeClassTag
188+
wrapRDD(rdd.sortBy(fn, ascending, numPartitions))
189+
}
190+
175191
}
176192

177193
object JavaRDD {

core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ package org.apache.spark.deploy
1919

2020
private[spark] object ExecutorState extends Enumeration {
2121

22-
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
22+
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
2323

2424
type ExecutorState = Value
2525

26-
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
26+
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
2727
}

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
2020
import java.util.Date
2121

2222
import scala.collection.mutable
23+
import scala.collection.mutable.ArrayBuffer
2324

2425
import akka.actor.ActorRef
2526

@@ -36,6 +37,7 @@ private[spark] class ApplicationInfo(
3637

3738
@transient var state: ApplicationState.Value = _
3839
@transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
40+
@transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _
3941
@transient var coresGranted: Int = _
4042
@transient var endTime: Long = _
4143
@transient var appSource: ApplicationSource = _
@@ -51,6 +53,7 @@ private[spark] class ApplicationInfo(
5153
endTime = -1L
5254
appSource = new ApplicationSource(this)
5355
nextExecutorId = 0
56+
removedExecutors = new ArrayBuffer[ExecutorInfo]
5457
}
5558

5659
private def newExecutorId(useID: Option[Int] = None): Int = {
@@ -74,6 +77,7 @@ private[spark] class ApplicationInfo(
7477

7578
def removeExecutor(exec: ExecutorInfo) {
7679
if (executors.contains(exec.id)) {
80+
removedExecutors += executors(exec.id)
7781
executors -= exec.id
7882
coresGranted -= exec.cores
7983
}

core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,19 @@ private[spark] class ExecutorInfo(
3434
}
3535

3636
def fullId: String = application.id + "/" + id
37+
38+
override def equals(other: Any): Boolean = {
39+
other match {
40+
case info: ExecutorInfo =>
41+
fullId == info.fullId &&
42+
worker.id == info.worker.id &&
43+
cores == info.cores &&
44+
memory == info.memory
45+
case _ => false
46+
}
47+
}
48+
49+
override def toString: String = fullId
50+
51+
override def hashCode: Int = toString.hashCode()
3752
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,11 @@ private[spark] class Master(
303303
appInfo.removeExecutor(exec)
304304
exec.worker.removeExecutor(exec)
305305

306+
val normalExit = exitStatus.exists(_ == 0)
306307
// Only retry certain number of times so we don't go into an infinite loop.
307-
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
308+
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
308309
schedule()
309-
} else {
310+
} else if (!normalExit) {
310311
logError("Application %s with ID %s failed %d times, removing it".format(
311312
appInfo.desc.name, appInfo.id, appInfo.retryCount))
312313
removeApplication(appInfo, ApplicationState.FAILED)

core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.xml.Node
2525
import akka.pattern.ask
2626
import org.json4s.JValue
2727

28-
import org.apache.spark.deploy.JsonProtocol
28+
import org.apache.spark.deploy.{ExecutorState, JsonProtocol}
2929
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
3030
import org.apache.spark.deploy.master.ExecutorInfo
3131
import org.apache.spark.ui.{WebUIPage, UIUtils}
@@ -57,43 +57,55 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
5757
})
5858

5959
val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs")
60-
val executors = app.executors.values.toSeq
61-
val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors)
60+
val allExecutors = (app.executors.values ++ app.removedExecutors).toSet.toSeq
61+
// This includes executors that are either still running or have exited cleanly
62+
val executors = allExecutors.filter { exec =>
63+
!ExecutorState.isFinished(exec.state) || exec.state == ExecutorState.EXITED
64+
}
65+
val removedExecutors = allExecutors.diff(executors)
66+
val executorsTable = UIUtils.listingTable(executorHeaders, executorRow, executors)
67+
val removedExecutorsTable = UIUtils.listingTable(executorHeaders, executorRow, removedExecutors)
6268

6369
val content =
64-
<div class="row-fluid">
65-
<div class="span12">
66-
<ul class="unstyled">
67-
<li><strong>ID:</strong> {app.id}</li>
68-
<li><strong>Name:</strong> {app.desc.name}</li>
69-
<li><strong>User:</strong> {app.desc.user}</li>
70-
<li><strong>Cores:</strong>
71-
{
72-
if (app.desc.maxCores.isEmpty) {
73-
"Unlimited (%s granted)".format(app.coresGranted)
74-
} else {
75-
"%s (%s granted, %s left)".format(
76-
app.desc.maxCores.get, app.coresGranted, app.coresLeft)
77-
}
78-
}
79-
</li>
80-
<li>
81-
<strong>Executor Memory:</strong>
82-
{Utils.megabytesToString(app.desc.memoryPerSlave)}
83-
</li>
84-
<li><strong>Submit Date:</strong> {app.submitDate}</li>
85-
<li><strong>State:</strong> {app.state}</li>
86-
<li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
87-
</ul>
88-
</div>
70+
<div class="row-fluid">
71+
<div class="span12">
72+
<ul class="unstyled">
73+
<li><strong>ID:</strong> {app.id}</li>
74+
<li><strong>Name:</strong> {app.desc.name}</li>
75+
<li><strong>User:</strong> {app.desc.user}</li>
76+
<li><strong>Cores:</strong>
77+
{
78+
if (app.desc.maxCores.isEmpty) {
79+
"Unlimited (%s granted)".format(app.coresGranted)
80+
} else {
81+
"%s (%s granted, %s left)".format(
82+
app.desc.maxCores.get, app.coresGranted, app.coresLeft)
83+
}
84+
}
85+
</li>
86+
<li>
87+
<strong>Executor Memory:</strong>
88+
{Utils.megabytesToString(app.desc.memoryPerSlave)}
89+
</li>
90+
<li><strong>Submit Date:</strong> {app.submitDate}</li>
91+
<li><strong>State:</strong> {app.state}</li>
92+
<li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
93+
</ul>
8994
</div>
95+
</div>
9096

91-
<div class="row-fluid"> <!-- Executors -->
92-
<div class="span12">
93-
<h4> Executor Summary </h4>
94-
{executorTable}
95-
</div>
96-
</div>;
97+
<div class="row-fluid"> <!-- Executors -->
98+
<div class="span12">
99+
<h4> Executor Summary </h4>
100+
{executorsTable}
101+
{
102+
if (removedExecutors.nonEmpty) {
103+
<h4> Removed Executors </h4> ++
104+
removedExecutorsTable
105+
}
106+
}
107+
</div>
108+
</div>;
97109
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
98110
}
99111

0 commit comments

Comments
 (0)