Skip to content

Commit 94066bf

Browse files
author
Sun Rui
committed
[SPARKR-153] phase 1: implement fold() and aggregate().
1 parent 7972858 commit 94066bf

File tree

5 files changed

+163
-0
lines changed

5 files changed

+163
-0
lines changed

pkg/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
exportClasses("RDD")
33
exportClasses("Broadcast")
44
exportMethods(
5+
"aggregateRDD",
56
"cache",
67
"checkpoint",
78
"cogroup",
@@ -17,6 +18,7 @@ exportMethods(
1718
"filterRDD",
1819
"flatMap",
1920
"flatMapValues",
21+
"fold",
2022
"foreach",
2123
"foreachPartition",
2224
"fullOuterJoin",

pkg/R/RDD.R

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,6 +1374,71 @@ setMethod("top",
13741374
takeOrderedElem(rdd, num, FALSE)
13751375
})
13761376

1377+
#' Fold an RDD using a given associative function and a neutral "zero value".
1378+
#'
1379+
#' Aggregate the elements of each partition, and then the results for all the
1380+
#' partitions, using a given associative function and a neutral "zero value".
1381+
#'
1382+
#' @param rdd An RDD.
1383+
#' @param zeroValue A neutral "zero value".
1384+
#' @param op An associative function for the folding operation.
1385+
#' @return The folding result.
1386+
#' @rdname fold
1387+
#' @export
1388+
#' @examples
1389+
#'\dontrun{
1390+
#' sc <- sparkR.init()
1391+
#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
1392+
#' fold(rdd, 0, "+") # 15
1393+
#'}
1394+
setGeneric("fold", function(rdd, zeroValue, op) { standardGeneric("fold") })
1395+
1396+
#' @rdname fold
1397+
#' @aliases fold,RDD,RDD-method
1398+
setMethod("fold",
1399+
signature(rdd = "RDD", zeroValue = "ANY", op = "ANY"),
1400+
function(rdd, zeroValue, op) {
1401+
aggregateRDD(rdd, zeroValue, op, op)
1402+
})
1403+
1404+
#' Aggregate an RDD using the given combine functions and a neutral "zero value".
1405+
#'
1406+
#' Aggregate the elements of each partition, and then the results for all the
1407+
#' partitions, using given combine functions and a neutral "zero value".
1408+
#'
1409+
#' @param rdd An RDD.
1410+
#' @param zeroValue A neutral "zero value".
1411+
#' @param seqOp A function to aggregate the RDD elements. It may return a different
1412+
#' result type from the type of the RDD elements.
1413+
#' @param combOp A function to aggregate results of seqOp.
1414+
#' @return The aggregation result.
1415+
#' @rdname aggregateRDD
1416+
#' @export
1417+
#' @examples
1418+
#'\dontrun{
1419+
#' sc <- sparkR.init()
1420+
#' rdd <- parallelize(sc, list(1, 2, 3, 4))
1421+
#' zeroValue <- list(0, 0)
1422+
#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
1423+
#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
1424+
#' aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
1425+
#'}
1426+
setGeneric("aggregateRDD", function(rdd, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })
1427+
1428+
#' @rdname aggregateRDD
1429+
#' @aliases aggregateRDD,RDD,RDD-method
1430+
setMethod("aggregateRDD",
1431+
signature(rdd = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"),
1432+
function(rdd, zeroValue, seqOp, combOp) {
1433+
partitionFunc <- function(part) {
1434+
Reduce(seqOp, part, zeroValue)
1435+
}
1436+
1437+
partitionList <- collect(lapplyPartition(rdd, partitionFunc),
1438+
flatten = FALSE)
1439+
Reduce(combOp, partitionList, zeroValue)
1440+
})
1441+
13771442
############ Shuffle Functions ############
13781443

13791444
#' Partition an RDD by key

pkg/inst/tests/test_rdd.R

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,28 @@ test_that("top() on RDDs", {
302302
expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3])
303303
})
304304

305+
test_that("fold() on RDDs", {
306+
actual <- fold(rdd, 0, "+")
307+
expect_equal(actual, Reduce("+", nums, 0))
308+
309+
rdd <- parallelize(sc, list())
310+
actual <- fold(rdd, 0, "+")
311+
expect_equal(actual, 0)
312+
})
313+
314+
test_that("aggregate() on RDDs", {
315+
rdd <- parallelize(sc, list(1, 2, 3, 4))
316+
zeroValue <- list(0, 0)
317+
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
318+
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
319+
actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
320+
expect_equal(actual, list(10, 4))
321+
322+
rdd <- parallelize(sc, list())
323+
actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
324+
expect_equal(actual, list(0, 0))
325+
})
326+
305327
test_that("keys() on RDDs", {
306328
keys <- keys(intRdd)
307329
actual <- collect(keys)

pkg/man/aggregateRDD.Rd

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{aggregateRDD}
4+
\alias{aggregateRDD}
5+
\alias{aggregateRDD,RDD,RDD-method}
6+
\alias{aggregateRDD,RDD-method}
7+
\title{Aggregate an RDD using the given combine functions and a neutral "zero value".}
8+
\usage{
9+
aggregateRDD(rdd, zeroValue, seqOp, combOp)
10+
11+
\S4method{aggregateRDD}{RDD}(rdd, zeroValue, seqOp, combOp)
12+
}
13+
\arguments{
14+
\item{rdd}{An RDD.}
15+
16+
\item{zeroValue}{A neutral "zero value".}
17+
18+
\item{seqOp}{A function to aggregate the RDD elements. It may return a different
19+
result type from the type of the RDD elements.}
20+
21+
\item{combOp}{A function to aggregate results of seqOp.}
22+
}
23+
\value{
24+
The aggregation result.
25+
}
26+
\description{
27+
Aggregate the elements of each partition, and then the results for all the
28+
partitions, using given combine functions and a neutral "zero value".
29+
}
30+
\examples{
31+
\dontrun{
32+
sc <- sparkR.init()
33+
rdd <- parallelize(sc, list(1, 2, 3, 4))
34+
zeroValue <- list(0, 0)
35+
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
36+
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
37+
aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
38+
}
39+
}
40+

pkg/man/fold.Rd

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{fold}
4+
\alias{fold}
5+
\alias{fold,RDD,RDD-method}
6+
\alias{fold,RDD-method}
7+
\title{Fold an RDD using a given associative function and a neutral "zero value".}
8+
\usage{
9+
fold(rdd, zeroValue, op)
10+
11+
\S4method{fold}{RDD}(rdd, zeroValue, op)
12+
}
13+
\arguments{
14+
\item{rdd}{An RDD.}
15+
16+
\item{zeroValue}{A neutral "zero value".}
17+
18+
\item{op}{An associative function for the folding operation.}
19+
}
20+
\value{
21+
The folding result.
22+
}
23+
\description{
24+
Aggregate the elements of each partition, and then the results for all the
25+
partitions, using a given associative function and a neutral "zero value".
26+
}
27+
\examples{
28+
\dontrun{
29+
sc <- sparkR.init()
30+
rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
31+
fold(rdd, 0, "+") # 15
32+
}
33+
}
34+

0 commit comments

Comments
 (0)