Skip to content

Commit ba6f044

Browse files
committed
fixes for reduceByKeyLocally
1 parent b082a35 commit ba6f044

File tree

3 files changed

+37
-27
lines changed

3 files changed

+37
-27
lines changed

pkg/R/RDD.R

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,9 +1388,22 @@ setMethod("groupByKey",
13881388
function(item) {
13891389
item$hash <- as.character(hashCode(item[[1]]))
13901390
updateOrCreatePair(item, keys, vals, pred,
1391-
function(vs, v) c(vs, list(v)),
1392-
function(x) list(x))
1391+
function(acc, x) {
1392+
addItemToAccumulator(acc, x)
1393+
acc
1394+
},
1395+
function(x) {
1396+
acc <- initAccumulator()
1397+
addItemToAccumulator(acc, x)
1398+
acc
1399+
})
13931400
})
1401+
# extract out data field
1402+
vals <- eapply(vals,
1403+
function(x) {
1404+
length(x$data) <- x$counter
1405+
x$data
1406+
})
13941407
# Every key in the environment contains a list
13951408
# Convert that to list(K, Seq[V])
13961409
convertEnvsToList(keys, vals)
@@ -1438,7 +1451,7 @@ setMethod("reduceByKey",
14381451
lapply(part,
14391452
function(item) {
14401453
item$hash <- as.character(hashCode(item[[1]]))
1441-
updateOrCreatePair(item, keys, vals, pred, combineFunc, function(x) x)
1454+
updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
14421455
})
14431456
convertEnvsToList(keys, vals)
14441457
}
@@ -1451,13 +1464,12 @@ setMethod("reduceByKey",
14511464
#'
14521465
#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
14531466
#' and merges the values for each key using an associative reduce function, but return the
1454-
#' results immediately to master as R list.
1467+
#' results immediately to the driver as an R list.
14551468
#'
14561469
#' @param rdd The RDD to reduce by key. Should be an RDD where each element is
14571470
#' list(K, V) or c(K, V).
14581471
#' @param combineFunc The associative reduce function to use.
1459-
#' @return An list where each element is list(K, V') where V' is the merged
1460-
#' value
1472+
#' @return A list of elements of type list(K, V') where V' is the merged value for each key
14611473
#' @rdname reduceByKeyLocally
14621474
#' @seealso reduceByKey
14631475
#' @export
@@ -1467,7 +1479,7 @@ setMethod("reduceByKey",
14671479
#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
14681480
#' rdd <- parallelize(sc, pairs)
14691481
#' reduced <- reduceByKeyLocally(rdd, "+")
1470-
#' reduced[[1]] # Should be a list(1, 6)
1482+
#' reduced # list(list(1, 6), list(1.1, 3))
14711483
#'}
14721484
setGeneric("reduceByKeyLocally",
14731485
function(rdd, combineFunc) {
@@ -1486,7 +1498,7 @@ setMethod("reduceByKeyLocally",
14861498
lapply(part,
14871499
function(item) {
14881500
item$hash <- as.character(hashCode(item[[1]]))
1489-
updateOrCreatePair(item, keys, vals, pred, combineFunc, function(x) x)
1501+
updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
14901502
})
14911503
list(list(keys, vals)) # return hash to avoid re-compute in merge
14921504
}
@@ -1498,7 +1510,7 @@ setMethod("reduceByKeyLocally",
14981510
function(name) {
14991511
item <- list(x[[1]][[name]], x[[2]][[name]])
15001512
item$hash <- name
1501-
updateOrCreatePair(item, accum[[1]], accum[[2]], pred, combineFunc, function(x) x)
1513+
updateOrCreatePair(item, accum[[1]], accum[[2]], pred, combineFunc, identity)
15021514
})
15031515
accum
15041516
}
@@ -1573,8 +1585,7 @@ setMethod("combineByKey",
15731585
lapply(part,
15741586
function(item) {
15751587
item$hash <- as.character(item[[1]])
1576-
updateOrCreatePair(item, keys, combiners, pred, mergeCombiners,
1577-
function(x) x)
1588+
updateOrCreatePair(item, keys, combiners, pred, mergeCombiners, identity)
15781589
})
15791590
convertEnvsToList(keys, combiners)
15801591
}

pkg/R/utils.R

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -263,20 +263,20 @@ joinTaggedList <- function(tagged_list, cnull) {
263263
# Utility function to reduce a key-value list with predicate
264264
# Used in *ByKey functions
265265
# param
266-
# item key-val pair
266+
# pair key-value pair
267267
# keys/vals env of key/value with hashes
268-
# pred predicate function
269-
# update_fn update or merge function for existing pair, similar with `mergeVal` @combineByKey
270-
# create_fn create function for new pair, similar with `createCombiner` @combinebykey
271-
updateOrCreatePair <- function(item, keys, vals, pred, update_fn, create_fn) {
272-
# assum hashval bind to `$hash`, key/val with index 1/2
273-
hashVal <- item$hash
274-
key <- item[[1]]
275-
val <- item[[2]]
276-
if (pred(item)) {
277-
assign(hashVal, do.call(update_fn, list(get(hashVal, envir=vals), val)), envir=vals)
268+
# updateOrCreatePred predicate function
269+
# updateFn update or merge function for existing pair, similar with `mergeVal` @combineByKey
270+
# createFn create function for new pair, similar with `createCombiner` @combinebykey
271+
updateOrCreatePair <- function(pair, keys, vals, updateOrCreatePred, updateFn, createFn) {
272+
# assume hashVal bind to `$hash`, key/val with index 1/2
273+
hashVal <- pair$hash
274+
key <- pair[[1]]
275+
val <- pair[[2]]
276+
if (updateOrCreatePred(pair)) {
277+
assign(hashVal, do.call(updateFn, list(get(hashVal, envir = vals), val)), envir = vals)
278278
} else {
279-
assign(hashVal, do.call(create_fn, list(val)), envir=vals)
279+
assign(hashVal, do.call(createFn, list(val)), envir = vals)
280280
assign(hashVal, key, envir=keys)
281281
}
282282
}

pkg/man/reduceByKeyLocally.Rd

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,20 @@ list(K, V) or c(K, V).}
1818
\item{combineFunc}{The associative reduce function to use.}
1919
}
2020
\value{
21-
An list where each element is list(K, V') where V' is the merged
22-
value
21+
A list of elements of type list(K, V') where V' is the merged value for each key
2322
}
2423
\description{
2524
This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
2625
and merges the values for each key using an associative reduce function, but return the
27-
results immediately to master as R list.
26+
results immediately to the driver as an R list.
2827
}
2928
\examples{
3029
\dontrun{
3130
sc <- sparkR.init()
3231
pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
3332
rdd <- parallelize(sc, pairs)
3433
reduced <- reduceByKeyLocally(rdd, "+")
35-
reduced[[1]] # Should be a list(1, 6)
34+
reduced # list(list(1, 6), list(1.1, 3))
3635
}
3736
}
3837
\seealso{

0 commit comments

Comments
 (0)