Skip to content

Commit 9187066

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into closure-cleaner
2 parents d889950 + 53befac commit 9187066

File tree

147 files changed

+5926
-1017
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

147 files changed

+5926
-1017
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ exportMethods(
7171
"unpersist",
7272
"value",
7373
"values",
74+
"zipPartitions",
7475
"zipRDD",
7576
"zipWithIndex",
7677
"zipWithUniqueId"

R/pkg/R/RDD.R

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
6666
.Object
6767
})
6868

69+
setMethod("show", "RDD",
70+
function(.Object) {
71+
cat(paste(callJMethod(.Object@jrdd, "toString"), "\n", sep=""))
72+
})
73+
6974
setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
7075
.Object@env <- new.env()
7176
.Object@env$isCached <- FALSE
@@ -1590,3 +1595,49 @@ setMethod("intersection",
15901595

15911596
keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
15921597
})
1598+
1599+
#' Zips an RDD's partitions with one (or more) RDD(s).
1600+
#' Same as zipPartitions in Spark.
1601+
#'
1602+
#' @param ... RDDs to be zipped.
1603+
#' @param func A function to transform zipped partitions.
1604+
#' @return A new RDD by applying a function to the zipped partitions.
1605+
#' Assumes that all the RDDs have the *same number of partitions*, but
1606+
#' does *not* require them to have the same number of elements in each partition.
1607+
#' @examples
1608+
#'\dontrun{
1609+
#' sc <- sparkR.init()
1610+
#' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
1611+
#' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
1612+
#' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
1613+
#' collect(zipPartitions(rdd1, rdd2, rdd3,
1614+
#' func = function(x, y, z) { list(list(x, y, z))} ))
1615+
#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
1616+
#'}
1617+
#' @rdname zipRDD
1618+
#' @aliases zipPartitions,RDD
1619+
setMethod("zipPartitions",
1620+
"RDD",
1621+
function(..., func) {
1622+
rrdds <- list(...)
1623+
if (length(rrdds) == 1) {
1624+
return(rrdds[[1]])
1625+
}
1626+
nPart <- sapply(rrdds, numPartitions)
1627+
if (length(unique(nPart)) != 1) {
1628+
stop("Can only zipPartitions RDDs which have the same number of partitions.")
1629+
}
1630+
1631+
rrdds <- lapply(rrdds, function(rdd) {
1632+
mapPartitionsWithIndex(rdd, function(partIndex, part) {
1633+
print(length(part))
1634+
list(list(partIndex, part))
1635+
})
1636+
})
1637+
union.rdd <- Reduce(unionRDD, rrdds)
1638+
zipped.rdd <- values(groupByKey(union.rdd, numPartitions = nPart[1]))
1639+
res <- mapPartitions(zipped.rdd, function(plist) {
1640+
do.call(func, plist[[1]])
1641+
})
1642+
res
1643+
})

R/pkg/R/generics.R

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,11 @@ setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
217217
#' @export
218218
setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") })
219219

220+
#' @rdname zipRDD
221+
#' @export
222+
setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") },
223+
signature = "...")
224+
220225
#' @rdname zipWithIndex
221226
#' @seealso zipWithUniqueId
222227
#' @export

R/pkg/inst/tests/test_binary_function.R

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,36 @@ test_that("cogroup on two RDDs", {
6666
expect_equal(sortKeyValueList(actual),
6767
sortKeyValueList(expected))
6868
})
69+
70+
test_that("zipPartitions() on RDDs", {
71+
rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
72+
rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
73+
rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
74+
actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
75+
func = function(x, y, z) { list(list(x, y, z))} ))
76+
expect_equal(actual,
77+
list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))
78+
79+
mockFile = c("Spark is pretty.", "Spark is awesome.")
80+
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
81+
writeLines(mockFile, fileName)
82+
83+
rdd <- textFile(sc, fileName, 1)
84+
actual <- collect(zipPartitions(rdd, rdd,
85+
func = function(x, y) { list(paste(x, y, sep = "\n")) }))
86+
expected <- list(paste(mockFile, mockFile, sep = "\n"))
87+
expect_equal(actual, expected)
88+
89+
rdd1 <- parallelize(sc, 0:1, 1)
90+
actual <- collect(zipPartitions(rdd1, rdd,
91+
func = function(x, y) { list(x + nchar(y)) }))
92+
expected <- list(0:1 + nchar(mockFile))
93+
expect_equal(actual, expected)
94+
95+
rdd <- map(rdd, function(x) { x })
96+
actual <- collect(zipPartitions(rdd, rdd1,
97+
func = function(x, y) { list(y + nchar(x)) }))
98+
expect_equal(actual, expected)
99+
100+
unlink(fileName)
101+
})

R/pkg/inst/tests/test_rdd.R

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,11 @@ test_that("collectAsMap() on a pairwise RDD", {
759759
expect_equal(vals, list(`1` = "a", `2` = "b"))
760760
})
761761

762+
test_that("show()", {
763+
rdd <- parallelize(sc, list(1:10))
764+
expect_output(show(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
765+
})
766+
762767
test_that("sampleByKey() on pairwise RDDs", {
763768
rdd <- parallelize(sc, 1:2000)
764769
pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list("a", x) else list("b", x) })

assembly/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,6 @@
194194
<plugin>
195195
<groupId>org.apache.maven.plugins</groupId>
196196
<artifactId>maven-assembly-plugin</artifactId>
197-
<version>2.4</version>
198197
<executions>
199198
<execution>
200199
<id>dist</id>

bin/spark-class2.cmd

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
6161

6262
rem The launcher library prints the command to be executed in a single line suitable for being
6363
rem executed by the batch interpreter. So read all the output of the launcher into a variable.
64-
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %LAUNCH_CLASSPATH% org.apache.spark.launcher.Main %*"') do (
64+
set LAUNCHER_OUTPUT=%temp%\spark-class-launcher-output-%RANDOM%.txt
65+
"%RUNNER%" -cp %LAUNCH_CLASSPATH% org.apache.spark.launcher.Main %* > %LAUNCHER_OUTPUT%
66+
for /f "tokens=*" %%i in (%LAUNCHER_OUTPUT%) do (
6567
set SPARK_CMD=%%i
6668
)
69+
del %LAUNCHER_OUTPUT%
6770
%SPARK_CMD%

conf/spark-env.sh.template

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# This file is sourced when running various Spark programs.
44
# Copy it as spark-env.sh and edit that to configure Spark for your site.
55

6-
# Options read when launching programs locally with
6+
# Options read when launching programs locally with
77
# ./bin/run-example or ./bin/spark-submit
88
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
99
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
@@ -39,6 +39,7 @@
3939
# - SPARK_WORKER_DIR, to set the working directory of worker processes
4040
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
4141
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
42+
# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
4243
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
4344
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
4445

core/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,6 @@
478478
<plugin>
479479
<groupId>org.codehaus.mojo</groupId>
480480
<artifactId>exec-maven-plugin</artifactId>
481-
<version>1.3.2</version>
482481
<executions>
483482
<execution>
484483
<id>sparkr-pkg</id>

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
7676

7777
private var timeoutCheckingTask: ScheduledFuture[_] = null
7878

79-
private val timeoutCheckingThread =
80-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")
79+
// "eventLoopThread" is used to run some pretty fast actions. The actions running in it should not
80+
// block the thread for a long time.
81+
private val eventLoopThread =
82+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-receiver-event-loop-thread")
8183

8284
private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
8385

8486
override def onStart(): Unit = {
85-
timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
87+
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
8688
override def run(): Unit = Utils.tryLogNonFatalError {
8789
Option(self).foreach(_.send(ExpireDeadHosts))
8890
}
@@ -99,11 +101,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
99101
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
100102
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
101103
if (scheduler != null) {
102-
val unknownExecutor = !scheduler.executorHeartbeatReceived(
103-
executorId, taskMetrics, blockManagerId)
104-
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
105104
executorLastSeen(executorId) = System.currentTimeMillis()
106-
context.reply(response)
105+
eventLoopThread.submit(new Runnable {
106+
override def run(): Unit = Utils.tryLogNonFatalError {
107+
val unknownExecutor = !scheduler.executorHeartbeatReceived(
108+
executorId, taskMetrics, blockManagerId)
109+
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
110+
context.reply(response)
111+
}
112+
})
107113
} else {
108114
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
109115
// case rarely happens. However, if it really happens, log it and ask the executor to
@@ -125,7 +131,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
125131
if (sc.supportDynamicAllocation) {
126132
// Asynchronously kill the executor to avoid blocking the current thread
127133
killExecutorThread.submit(new Runnable {
128-
override def run(): Unit = sc.killExecutor(executorId)
134+
override def run(): Unit = Utils.tryLogNonFatalError {
135+
sc.killExecutor(executorId)
136+
}
129137
})
130138
}
131139
executorLastSeen.remove(executorId)
@@ -137,7 +145,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
137145
if (timeoutCheckingTask != null) {
138146
timeoutCheckingTask.cancel(true)
139147
}
140-
timeoutCheckingThread.shutdownNow()
148+
eventLoopThread.shutdownNow()
141149
killExecutorThread.shutdownNow()
142150
}
143151
}

0 commit comments

Comments
 (0)