Skip to content

Commit c7964c9

Browse files
author
Sun Rui
committed
Merge with upstream master.
2 parents 7feac38 + 0c6e071 commit c7964c9

File tree

10 files changed

+282
-81
lines changed

10 files changed

+282
-81
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,12 @@ pass the variable `spark.executor.memory` to the SparkContext constructor.
8484
sc <- sparkR.init(master="spark://<master>:7077",
8585
sparkEnvir=list(spark.executor.memory="1g"))
8686

87+
Finally, to stop the cluster run
8788

89+
sparkR.stop()
90+
91+
sparkR.stop() can be invoked to terminate a SparkContext created previously via sparkR.init(). Then you can call sparR.init() again to create a new SparkContext that may have different configurations.
92+
8893
## Examples, Unit tests
8994

9095
SparkR comes with several sample programs in the `examples` directory.

pkg/NAMESPACE

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ exportMethods(
66
"checkpoint",
77
"cogroup",
88
"collect",
9+
"collectAsMap",
910
"collectPartition",
1011
"combineByKey",
1112
"count",
@@ -40,6 +41,7 @@ exportMethods(
4041
"persist",
4142
"reduce",
4243
"reduceByKey",
44+
"reduceByKeyLocally",
4345
"rightOuterJoin",
4446
"sampleRDD",
4547
"saveAsTextFile",
@@ -66,6 +68,7 @@ export(
6668
"setCheckpointDir"
6769
)
6870
export("sparkR.init")
71+
export("sparkR.stop")
6972
export("print.jobj")
7073
useDynLib(SparkR, stringHashCode)
7174
importFrom(methods, setGeneric, setMethod, setOldClass)

pkg/R/RDD.R

Lines changed: 116 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ setMethod("collect",
358358
convertJListToRList(collected, flatten)
359359
})
360360

361+
361362
#' @rdname collect-methods
362363
#' @export
363364
#' @description
@@ -382,6 +383,29 @@ setMethod("collectPartition",
382383
convertJListToRList(jList, flatten = TRUE)
383384
})
384385

386+
#' @rdname collect-methods
387+
#' @export
388+
#' @description
389+
#' \code{collectAsMap} returns a named list as a map that contains all of the elements
390+
#' in a key-value pair RDD.
391+
#' @examples
392+
#'\dontrun{
393+
#' sc <- sparkR.init()
394+
#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L)
395+
#' collectAsMap(rdd) # list(`1` = 2, `3` = 4)
396+
#'}
397+
setGeneric("collectAsMap", function(rdd) { standardGeneric("collectAsMap") })
398+
399+
#' @rdname collect-methods
400+
#' @aliases collectAsMap,RDD-method
401+
setMethod("collectAsMap",
402+
signature(rdd = "RDD"),
403+
function(rdd) {
404+
pairList <- collect(rdd)
405+
map <- new.env()
406+
lapply(pairList, function(x) { assign(as.character(x[[1]]), x[[2]], envir = map) })
407+
as.list(map)
408+
})
385409

386410
#' Look up elements of a key in an RDD
387411
#'
@@ -1388,26 +1412,32 @@ setMethod("groupByKey",
13881412
groupVals <- function(part) {
13891413
vals <- new.env()
13901414
keys <- new.env()
1415+
pred <- function(item) exists(item$hash, keys)
1416+
appendList <- function(acc, x) {
1417+
addItemToAccumulator(acc, x)
1418+
acc
1419+
}
1420+
makeList <- function(x) {
1421+
acc <- initAccumulator()
1422+
addItemToAccumulator(acc, x)
1423+
acc
1424+
}
13911425
# Each item in the partition is list of (K, V)
13921426
lapply(part,
13931427
function(item) {
1394-
hashVal <- as.character(hashCode(item[[1]]))
1395-
if (exists(hashVal, vals)) {
1396-
acc <- vals[[hashVal]]
1397-
acc[[length(acc) + 1]] <- item[[2]]
1398-
vals[[hashVal]] <- acc
1399-
} else {
1400-
vals[[hashVal]] <- list(item[[2]])
1401-
keys[[hashVal]] <- item[[1]]
1402-
}
1428+
item$hash <- as.character(hashCode(item[[1]]))
1429+
updateOrCreatePair(item, keys, vals, pred,
1430+
appendList, makeList)
14031431
})
1432+
# extract out data field
1433+
vals <- eapply(vals,
1434+
function(x) {
1435+
length(x$data) <- x$counter
1436+
x$data
1437+
})
14041438
# Every key in the environment contains a list
14051439
# Convert that to list(K, Seq[V])
1406-
grouped <- lapply(ls(vals),
1407-
function(name) {
1408-
list(keys[[name]], vals[[name]])
1409-
})
1410-
grouped
1440+
convertEnvsToList(keys, vals)
14111441
}
14121442
lapplyPartition(shuffled, groupVals)
14131443
})
@@ -1448,28 +1478,78 @@ setMethod("reduceByKey",
14481478
reduceVals <- function(part) {
14491479
vals <- new.env()
14501480
keys <- new.env()
1481+
pred <- function(item) exists(item$hash, keys)
14511482
lapply(part,
14521483
function(item) {
1453-
hashVal <- as.character(hashCode(item[[1]]))
1454-
if (exists(hashVal, vals)) {
1455-
vals[[hashVal]] <- do.call(
1456-
combineFunc, list(vals[[hashVal]], item[[2]]))
1457-
} else {
1458-
vals[[hashVal]] <- item[[2]]
1459-
keys[[hashVal]] <- item[[1]]
1460-
}
1484+
item$hash <- as.character(hashCode(item[[1]]))
1485+
updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
14611486
})
1462-
combined <- lapply(ls(vals),
1463-
function(name) {
1464-
list(keys[[name]], vals[[name]])
1465-
})
1466-
combined
1487+
convertEnvsToList(keys, vals)
14671488
}
14681489
locallyReduced <- lapplyPartition(rdd, reduceVals)
14691490
shuffled <- partitionBy(locallyReduced, numPartitions)
14701491
lapplyPartition(shuffled, reduceVals)
14711492
})
14721493

1494+
#' Merge values by key locally
1495+
#'
1496+
#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
1497+
#' and merges the values for each key using an associative reduce function, but return the
1498+
#' results immediately to the driver as an R list.
1499+
#'
1500+
#' @param rdd The RDD to reduce by key. Should be an RDD where each element is
1501+
#' list(K, V) or c(K, V).
1502+
#' @param combineFunc The associative reduce function to use.
1503+
#' @return A list of elements of type list(K, V') where V' is the merged value for each key
1504+
#' @rdname reduceByKeyLocally
1505+
#' @seealso reduceByKey
1506+
#' @export
1507+
#' @examples
1508+
#'\dontrun{
1509+
#' sc <- sparkR.init()
1510+
#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
1511+
#' rdd <- parallelize(sc, pairs)
1512+
#' reduced <- reduceByKeyLocally(rdd, "+")
1513+
#' reduced # list(list(1, 6), list(1.1, 3))
1514+
#'}
1515+
setGeneric("reduceByKeyLocally",
1516+
function(rdd, combineFunc) {
1517+
standardGeneric("reduceByKeyLocally")
1518+
})
1519+
1520+
#' @rdname reduceByKeyLocally
1521+
#' @aliases reduceByKeyLocally,RDD,integer-method
1522+
setMethod("reduceByKeyLocally",
1523+
signature(rdd = "RDD", combineFunc = "ANY"),
1524+
function(rdd, combineFunc) {
1525+
reducePart <- function(part) {
1526+
vals <- new.env()
1527+
keys <- new.env()
1528+
pred <- function(item) exists(item$hash, keys)
1529+
lapply(part,
1530+
function(item) {
1531+
item$hash <- as.character(hashCode(item[[1]]))
1532+
updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
1533+
})
1534+
list(list(keys, vals)) # return hash to avoid re-compute in merge
1535+
}
1536+
mergeParts <- function(accum, x) {
1537+
pred <- function(item) {
1538+
exists(item$hash, accum[[1]])
1539+
}
1540+
lapply(ls(x[[1]]),
1541+
function(name) {
1542+
item <- list(x[[1]][[name]], x[[2]][[name]])
1543+
item$hash <- name
1544+
updateOrCreatePair(item, accum[[1]], accum[[2]], pred, combineFunc, identity)
1545+
})
1546+
accum
1547+
}
1548+
reduced <- mapPartitions(rdd, reducePart)
1549+
merged <- reduce(reduced, mergeParts)
1550+
convertEnvsToList(merged[[1]], merged[[2]])
1551+
})
1552+
14731553
#' Combine values by key
14741554
#'
14751555
#' Generic function to combine the elements for each key using a custom set of
@@ -1519,46 +1599,28 @@ setMethod("combineByKey",
15191599
combineLocally <- function(part) {
15201600
combiners <- new.env()
15211601
keys <- new.env()
1602+
pred <- function(item) exists(item$hash, keys)
15221603
lapply(part,
15231604
function(item) {
1524-
k <- as.character(item[[1]])
1525-
if (!exists(k, keys)) {
1526-
combiners[[k]] <- do.call(createCombiner,
1527-
list(item[[2]]))
1528-
keys[[k]] <- item[[1]]
1529-
} else {
1530-
combiners[[k]] <- do.call(mergeValue,
1531-
list(combiners[[k]],
1532-
item[[2]]))
1533-
}
1534-
})
1535-
lapply(ls(keys), function(k) {
1536-
list(keys[[k]], combiners[[k]])
1605+
item$hash <- as.character(item[[1]])
1606+
updateOrCreatePair(item, keys, combiners, pred, mergeValue, createCombiner)
15371607
})
1608+
convertEnvsToList(keys, combiners)
15381609
}
15391610
locallyCombined <- lapplyPartition(rdd, combineLocally)
15401611
shuffled <- partitionBy(locallyCombined, numPartitions)
15411612
mergeAfterShuffle <- function(part) {
15421613
combiners <- new.env()
15431614
keys <- new.env()
1615+
pred <- function(item) exists(item$hash, keys)
15441616
lapply(part,
15451617
function(item) {
1546-
k <- as.character(item[[1]])
1547-
if (!exists(k, combiners)) {
1548-
combiners[[k]] <- item[[2]]
1549-
keys[[k]] <- item[[1]]
1550-
} else {
1551-
combiners[[k]] <- do.call(mergeCombiners,
1552-
list(combiners[[k]],
1553-
item[[2]]))
1554-
}
1555-
})
1556-
lapply(ls(keys), function(k) {
1557-
list(keys[[k]], combiners[[k]])
1618+
item$hash <- as.character(item[[1]])
1619+
updateOrCreatePair(item, keys, combiners, pred, mergeCombiners, identity)
15581620
})
1621+
convertEnvsToList(keys, combiners)
15591622
}
1560-
combined <-lapplyPartition(shuffled, mergeAfterShuffle)
1561-
combined
1623+
lapplyPartition(shuffled, mergeAfterShuffle)
15621624
})
15631625

15641626
############ Binary Functions #############

pkg/R/context.R

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,16 @@ parallelize <- function(sc, coll, numSlices = 1) {
8585
# TODO: bound/safeguard numSlices
8686
# TODO: unit tests for if the split works for all primitives
8787
# TODO: support matrix, data frame, etc
88-
if (!is.list(coll)) {
89-
if (!is.vector(coll)) {
90-
message(paste("context.R: parallelize() currently only supports lists and vectors.",
91-
"Calling as.list() to coerce coll into a list."))
88+
if ((!is.list(coll) && !is.vector(coll)) || is.data.frame(coll)) {
89+
if (is.data.frame(coll)) {
90+
message(paste("context.R: A data frame is parallelized by columns."))
91+
} else {
92+
if (is.matrix(coll)) {
93+
message(paste("context.R: A matrix is parallelized by elements."))
94+
} else {
95+
message(paste("context.R: parallelize() currently only supports lists and vectors.",
96+
"Calling as.list() to coerce coll into a list."))
97+
}
9298
}
9399
coll <- as.list(coll)
94100
}
@@ -109,7 +115,6 @@ parallelize <- function(sc, coll, numSlices = 1) {
109115
RDD(jrdd, TRUE)
110116
}
111117

112-
113118
#' Include this specified package on all workers
114119
#'
115120
#' This function can be used to include a package on all workers before the

pkg/R/sparkR.R

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,30 +23,39 @@ connExists <- function(env) {
2323

2424
# Stop the Spark context.
2525
# Also terminates the backend this R session is connected to
26-
sparkR.stop <- function(env) {
27-
cat("Stopping SparkR\n")
26+
sparkR.stop <- function(env = .sparkREnv) {
2827

2928
if (!connExists(env)) {
3029
# When the workspace is saved in R, the connections are closed
3130
# *before* the finalizer is run. In these cases, we reconnect
3231
# to the backend, so we can shut it down.
33-
connectBackend("localhost", .sparkREnv$sparkRBackendPort)
34-
}
35-
36-
if (exists(".sparkRjsc", envir = env)) {
37-
sc <- get(".sparkRjsc", envir = env)
38-
callJMethod(sc, "stop")
32+
tryCatch({
33+
connectBackend("localhost", .sparkREnv$sparkRBackendPort)
34+
}, error = function(err) {
35+
cat("Error in Connection: Use sparkR.init() to restart SparkR\n")
36+
}, warning = function(war) {
37+
cat("No Connection Found: Use sparkR.init() to restart SparkR\n")
38+
})
39+
}
40+
41+
if (exists(".sparkRCon", envir = env)) {
42+
cat("Stopping SparkR\n")
43+
if (exists(".sparkRjsc", envir = env)) {
44+
sc <- get(".sparkRjsc", envir = env)
45+
callJMethod(sc, "stop")
46+
rm(".sparkRjsc", envir = env)
47+
}
48+
49+
callJStatic("SparkRHandler", "stopBackend")
50+
# Also close the connection and remove it from our env
51+
conn <- get(".sparkRCon", env)
52+
close(conn)
53+
rm(".sparkRCon", envir = env)
54+
# Finally, sleep for 1 sec to let backend finish exiting.
55+
# Without this we get port conflicts in RStudio when we try to 'Restart R'.
56+
Sys.sleep(1)
3957
}
40-
41-
callJStatic("SparkRHandler", "stopBackend")
42-
# Also close the connection and remove it from our env
43-
conn <- get(".sparkRCon", env)
44-
close(conn)
45-
rm(".sparkRCon", envir = env)
46-
47-
# Finally, sleep for 1 sec to let backend finish exiting.
48-
# Without this we get port conflicts in RStudio when we try to 'Restart R'.
49-
Sys.sleep(1)
58+
5059
}
5160

5261
#' Initialize a new Spark Context.
@@ -84,7 +93,7 @@ sparkR.init <- function(
8493
sparkRBackendPort = 12345) {
8594

8695
if (exists(".sparkRjsc", envir = .sparkREnv)) {
87-
cat("Re-using existing Spark Context. Please restart R to create a new Spark Context\n")
96+
cat("Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n")
8897
return(get(".sparkRjsc", envir = .sparkREnv))
8998
}
9099

pkg/R/sparkRClient.R

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
# if one doesn't already exist
55
connectBackend <- function(hostname, port, timeout = 6000) {
66
if (exists(".sparkRcon", envir = .sparkREnv)) {
7-
cat("SparkRBackend client connection already exists\n")
8-
return(get(".sparkRcon", envir = .sparkREnv))
7+
if (isOpen(env[[".sparkRCon"]])) {
8+
cat("SparkRBackend client connection already exists\n")
9+
return(get(".sparkRcon", envir = .sparkREnv))
10+
}
911
}
1012

1113
con <- socketConnection(host = hostname, port = port, server = FALSE,

0 commit comments

Comments
 (0)