Skip to content

Commit 0fc6a31

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into timeline-viewer-feature
2 parents 19815ae + 8a53de1 commit 0fc6a31

File tree

163 files changed

+3189
-1184
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

163 files changed

+3189
-1184
lines changed

R/pkg/R/RDD.R

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
8585

8686
if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
8787
# This transformation is the first in its stage:
88-
.Object@func <- func
88+
.Object@func <- cleanClosure(func)
8989
.Object@prev_jrdd <- getJRDD(prev)
9090
.Object@env$prev_serializedMode <- prev@env$serializedMode
9191
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
@@ -94,7 +94,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
9494
pipelinedFunc <- function(split, iterator) {
9595
func(split, prev@func(split, iterator))
9696
}
97-
.Object@func <- pipelinedFunc
97+
.Object@func <- cleanClosure(pipelinedFunc)
9898
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
9999
# Get the serialization mode of the parent RDD
100100
.Object@env$prev_serializedMode <- prev@env$prev_serializedMode
@@ -144,17 +144,13 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
144144
return(rdd@env$jrdd_val)
145145
}
146146

147-
computeFunc <- function(split, part) {
148-
rdd@func(split, part)
149-
}
150-
151147
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
152148
connection = NULL)
153149

154150
broadcastArr <- lapply(ls(.broadcastNames),
155151
function(name) { get(name, .broadcastNames) })
156152

157-
serializedFuncArr <- serialize(computeFunc, connection = NULL)
153+
serializedFuncArr <- serialize(rdd@func, connection = NULL)
158154

159155
prev_jrdd <- rdd@prev_jrdd
160156

@@ -551,11 +547,7 @@ setMethod("mapPartitions",
551547
setMethod("lapplyPartitionsWithIndex",
552548
signature(X = "RDD", FUN = "function"),
553549
function(X, FUN) {
554-
FUN <- cleanClosure(FUN)
555-
closureCapturingFunc <- function(split, part) {
556-
FUN(split, part)
557-
}
558-
PipelinedRDD(X, closureCapturingFunc)
550+
PipelinedRDD(X, FUN)
559551
})
560552

561553
#' @rdname lapplyPartitionsWithIndex

R/pkg/R/pairRDD.R

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -694,10 +694,6 @@ setMethod("cogroup",
694694
for (i in 1:rddsLen) {
695695
rdds[[i]] <- lapply(rdds[[i]],
696696
function(x) { list(x[[1]], list(i, x[[2]])) })
697-
# TODO(hao): As issue [SparkR-142] mentions, the right value of i
698-
# will not be captured into UDF if getJRDD is not invoked.
699-
# It should be resolved together with that issue.
700-
getJRDD(rdds[[i]]) # Capture the closure.
701697
}
702698
union.rdd <- Reduce(unionRDD, rdds)
703699
group.func <- function(vlist) {

bin/spark-class

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,22 @@ if [ $(command -v "$JAR_CMD") ] ; then
8282
fi
8383
fi
8484

85+
LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"
86+
87+
# Add the launcher build dir to the classpath if requested.
88+
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
89+
LAUNCH_CLASSPATH="$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
90+
fi
91+
92+
export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"
93+
8594
# The launcher library will print arguments separated by a NULL character, to allow arguments with
8695
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
8796
# an array that will be used to exec the final command.
8897
CMD=()
8998
while IFS= read -d '' -r ARG; do
9099
CMD+=("$ARG")
91-
done < <("$RUNNER" -cp "$SPARK_ASSEMBLY_JAR" org.apache.spark.launcher.Main "$@")
100+
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
92101

93102
if [ "${CMD[0]}" = "usage" ]; then
94103
"${CMD[@]}"

bin/spark-class2.cmd

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,22 @@ if "%SPARK_ASSEMBLY_JAR%"=="0" (
4646
exit /b 1
4747
)
4848

49+
set LAUNCH_CLASSPATH=%SPARK_ASSEMBLY_JAR%
50+
51+
rem Add the launcher build dir to the classpath if requested.
52+
if not "x%SPARK_PREPEND_CLASSES%"=="x" (
53+
set LAUNCH_CLASSPATH=%SPARK_HOME%\launcher\target\scala-%SPARK_SCALA_VERSION%\classes;%LAUNCH_CLASSPATH%
54+
)
55+
56+
set _SPARK_ASSEMBLY=%SPARK_ASSEMBLY_JAR%
57+
4958
rem Figure out where java is.
5059
set RUNNER=java
5160
if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
5261

5362
rem The launcher library prints the command to be executed in a single line suitable for being
5463
rem executed by the batch interpreter. So read all the output of the launcher into a variable.
55-
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %SPARK_ASSEMBLY_JAR% org.apache.spark.launcher.Main %*"') do (
64+
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %LAUNCH_CLASSPATH% org.apache.spark.launcher.Main %*"') do (
5665
set SPARK_CMD=%%i
5766
)
5867
%SPARK_CMD%

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.lang.ref.{ReferenceQueue, WeakReference}
2222
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
2323

2424
import org.apache.spark.broadcast.Broadcast
25-
import org.apache.spark.rdd.RDD
25+
import org.apache.spark.rdd.{RDDCheckpointData, RDD}
2626
import org.apache.spark.util.Utils
2727

2828
/**
@@ -33,6 +33,7 @@ private case class CleanRDD(rddId: Int) extends CleanupTask
3333
private case class CleanShuffle(shuffleId: Int) extends CleanupTask
3434
private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
3535
private case class CleanAccum(accId: Long) extends CleanupTask
36+
private case class CleanCheckpoint(rddId: Int) extends CleanupTask
3637

3738
/**
3839
* A WeakReference associated with a CleanupTask.
@@ -94,12 +95,12 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
9495
@volatile private var stopped = false
9596

9697
/** Attach a listener object to get information of when objects are cleaned. */
97-
def attachListener(listener: CleanerListener) {
98+
def attachListener(listener: CleanerListener): Unit = {
9899
listeners += listener
99100
}
100101

101102
/** Start the cleaner. */
102-
def start() {
103+
def start(): Unit = {
103104
cleaningThread.setDaemon(true)
104105
cleaningThread.setName("Spark Context Cleaner")
105106
cleaningThread.start()
@@ -108,7 +109,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
108109
/**
109110
* Stop the cleaning thread and wait until the thread has finished running its current task.
110111
*/
111-
def stop() {
112+
def stop(): Unit = {
112113
stopped = true
113114
// Interrupt the cleaning thread, but wait until the current task has finished before
114115
// doing so. This guards against the race condition where a cleaning thread may
@@ -121,7 +122,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
121122
}
122123

123124
/** Register a RDD for cleanup when it is garbage collected. */
124-
def registerRDDForCleanup(rdd: RDD[_]) {
125+
def registerRDDForCleanup(rdd: RDD[_]): Unit = {
125126
registerForCleanup(rdd, CleanRDD(rdd.id))
126127
}
127128

@@ -130,17 +131,22 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
130131
}
131132

132133
/** Register a ShuffleDependency for cleanup when it is garbage collected. */
133-
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]) {
134+
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = {
134135
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
135136
}
136137

137138
/** Register a Broadcast for cleanup when it is garbage collected. */
138-
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]) {
139+
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]): Unit = {
139140
registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
140141
}
141142

143+
/** Register a RDDCheckpointData for cleanup when it is garbage collected. */
144+
def registerRDDCheckpointDataForCleanup[T](rdd: RDD[_], parentId: Int): Unit = {
145+
registerForCleanup(rdd, CleanCheckpoint(parentId))
146+
}
147+
142148
/** Register an object for cleanup. */
143-
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) {
149+
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
144150
referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)
145151
}
146152

@@ -164,6 +170,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
164170
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
165171
case CleanAccum(accId) =>
166172
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
173+
case CleanCheckpoint(rddId) =>
174+
doCleanCheckpoint(rddId)
167175
}
168176
}
169177
}
@@ -175,7 +183,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
175183
}
176184

177185
/** Perform RDD cleanup. */
178-
def doCleanupRDD(rddId: Int, blocking: Boolean) {
186+
def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
179187
try {
180188
logDebug("Cleaning RDD " + rddId)
181189
sc.unpersistRDD(rddId, blocking)
@@ -187,7 +195,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
187195
}
188196

189197
/** Perform shuffle cleanup, asynchronously. */
190-
def doCleanupShuffle(shuffleId: Int, blocking: Boolean) {
198+
def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
191199
try {
192200
logDebug("Cleaning shuffle " + shuffleId)
193201
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
@@ -200,7 +208,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
200208
}
201209

202210
/** Perform broadcast cleanup. */
203-
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
211+
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean): Unit = {
204212
try {
205213
logDebug(s"Cleaning broadcast $broadcastId")
206214
broadcastManager.unbroadcast(broadcastId, true, blocking)
@@ -212,7 +220,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
212220
}
213221

214222
/** Perform accumulator cleanup. */
215-
def doCleanupAccum(accId: Long, blocking: Boolean) {
223+
def doCleanupAccum(accId: Long, blocking: Boolean): Unit = {
216224
try {
217225
logDebug("Cleaning accumulator " + accId)
218226
Accumulators.remove(accId)
@@ -223,6 +231,18 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
223231
}
224232
}
225233

234+
/** Perform checkpoint cleanup. */
235+
def doCleanCheckpoint(rddId: Int): Unit = {
236+
try {
237+
logDebug("Cleaning rdd checkpoint data " + rddId)
238+
RDDCheckpointData.clearRDDCheckpointData(sc, rddId)
239+
logInfo("Cleaned rdd checkpoint data " + rddId)
240+
}
241+
catch {
242+
case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
243+
}
244+
}
245+
226246
private def blockManagerMaster = sc.env.blockManager.master
227247
private def broadcastManager = sc.env.broadcastManager
228248
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,27 @@ import java.net.URI
2222
private[spark] class ApplicationDescription(
2323
val name: String,
2424
val maxCores: Option[Int],
25-
val memoryPerSlave: Int,
25+
val memoryPerExecutorMB: Int,
2626
val command: Command,
2727
var appUiUrl: String,
2828
val eventLogDir: Option[URI] = None,
2929
// short name of compression codec used when writing event logs, if any (e.g. lzf)
30-
val eventLogCodec: Option[String] = None)
30+
val eventLogCodec: Option[String] = None,
31+
val coresPerExecutor: Option[Int] = None)
3132
extends Serializable {
3233

3334
val user = System.getProperty("user.name", "<unknown>")
3435

3536
def copy(
3637
name: String = name,
3738
maxCores: Option[Int] = maxCores,
38-
memoryPerSlave: Int = memoryPerSlave,
39+
memoryPerExecutorMB: Int = memoryPerExecutorMB,
3940
command: Command = command,
4041
appUiUrl: String = appUiUrl,
4142
eventLogDir: Option[URI] = eventLogDir,
4243
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
4344
new ApplicationDescription(
44-
name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)
45+
name, maxCores, memoryPerExecutorMB, command, appUiUrl, eventLogDir, eventLogCodec)
4546

4647
override def toString: String = "ApplicationDescription(" + name + ")"
4748
}

core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ private[deploy] object JsonProtocol {
4646
("name" -> obj.desc.name) ~
4747
("cores" -> obj.desc.maxCores) ~
4848
("user" -> obj.desc.user) ~
49-
("memoryperslave" -> obj.desc.memoryPerSlave) ~
49+
("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~
5050
("submitdate" -> obj.submitDate.toString) ~
5151
("state" -> obj.state.toString) ~
5252
("duration" -> obj.duration)
@@ -55,7 +55,7 @@ private[deploy] object JsonProtocol {
5555
def writeApplicationDescription(obj: ApplicationDescription): JObject = {
5656
("name" -> obj.name) ~
5757
("cores" -> obj.maxCores) ~
58-
("memoryperslave" -> obj.memoryPerSlave) ~
58+
("memoryperslave" -> obj.memoryPerExecutorMB) ~
5959
("user" -> obj.user) ~
6060
("command" -> obj.command.toString)
6161
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,8 @@ object SparkSubmit {
406406
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
407407

408408
// Other options
409+
OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
410+
sysProp = "spark.executor.cores"),
409411
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
410412
sysProp = "spark.executor.memory"),
411413
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,10 +482,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
482482
| Spark standalone and Mesos only:
483483
| --total-executor-cores NUM Total cores for all executors.
484484
|
485+
| Spark standalone and YARN only:
486+
| --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode,
487+
| or all available cores on the worker in standalone mode)
488+
|
485489
| YARN-only:
486490
| --driver-cores NUM Number of cores used by the driver, only in cluster mode
487491
| (Default: 1).
488-
| --executor-cores NUM Number of cores per executor (Default: 1).
489492
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
490493
| --num-executors NUM Number of executors to launch (Default: 2).
491494
| --archives ARCHIVES Comma separated list of archives to be extracted into the

core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
9090
</span>
9191
</h4> ++
9292
appTable
93+
} else if (requestedIncomplete) {
94+
<h4>No incomplete applications found!</h4>
9395
} else {
9496
<h4>No completed applications found!</h4> ++
9597
<p>Did you specify the correct logging directory?

0 commit comments

Comments
 (0)