Skip to content

Commit be82dcc

Browse files
committed
Merge pull request apache#93 from hlin09/hlin09
[SPARKR-15] Adds function Filter() to extract elements that satisfy a predicate.
2 parents 579db58 + 488ac47 commit be82dcc

File tree

6 files changed

+149
-0
lines changed

6 files changed

+149
-0
lines changed

pkg/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ exportMethods(
1010
"combineByKey",
1111
"count",
1212
"distinct",
13+
"Filter",
14+
"filter",
1315
"flatMap",
1416
"groupByKey",
1517
"length",

pkg/R/RDD.R

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,44 @@ setMethod("mapPartitionsWithIndex",
520520
lapplyPartitionsWithIndex(X, FUN)
521521
})
522522

523+
#' This function returns a new RDD containing only the elements that satisfy
524+
#' a predicate (i.e. returning TRUE in a given logical function).
525+
#' The same as `filter()' in Spark.
526+
#'
527+
#' @param f A unary predicate function.
528+
#' @param x The RDD to be filtered.
529+
#' @rdname Filter
530+
#' @export
531+
#' @examples
532+
#'\dontrun{
533+
#' sc <- sparkR.init()
534+
#' rdd <- parallelize(sc, 1:10)
535+
#' unlist(collect(Filter(function (x) { x < 3 }, rdd))) # c(1, 2)
536+
#'}
537+
#setGeneric("Filter", function(f, x) { standardGeneric("Filter") })
538+
539+
#' @rdname Filter
540+
#' @aliases Filter,function,RDD-method filter,function,RDD-method
541+
setMethod("Filter",
542+
signature(f = "function", x = "RDD"),
543+
function(f, x) {
544+
filter.func <- function(part) {
545+
Filter(f, part)
546+
}
547+
lapplyPartition(x, filter.func)
548+
})
549+
550+
#' @rdname Filter
551+
#' @export
552+
setGeneric("filter", function(f, x) { standardGeneric("filter") })
553+
554+
#' @rdname Filter
555+
#' @aliases filter,function,RDD-method
556+
setMethod("filter",
557+
signature(f = "function", x = "RDD"),
558+
function(f, x) {
559+
Filter(f, x)
560+
})
523561

524562
#' Reduce across elements of an RDD.
525563
#'

pkg/inst/tests/test_rdd.R

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,21 @@ test_that("mapPartitions on RDD", {
3333
expect_equal(actual, list(15, 40))
3434
})
3535

36+
test_that("Filter on RDD", {
37+
filtered.rdd <- Filter(function(x) { x %% 2 == 0 }, rdd)
38+
actual <- collect(filtered.rdd)
39+
expect_equal(actual, list(2, 4, 6, 8, 10))
40+
41+
filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd)
42+
actual <- collect(filtered.rdd)
43+
expect_equal(actual, list(list(1L, -1)))
44+
45+
# Filter out all elements.
46+
filtered.rdd <- Filter(function(x) { x > 10 }, rdd)
47+
actual <- collect(filtered.rdd)
48+
expect_equal(actual, list())
49+
})
50+
3651
test_that("lookup on RDD", {
3752
vals <- lookup(intRdd, 1L)
3853
expect_equal(vals, list(-1, 200))

pkg/man/Filter.Rd

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{Filter,function,RDD-method}
4+
\alias{Filter,function,RDD-method}
5+
\alias{filter}
6+
\alias{filter,function,RDD-method}
7+
\title{This function returns a new RDD containing only the elements that satisfy
8+
a predicate (i.e. returning TRUE in a given logical function).
9+
The same as `filter()' in Spark.}
10+
\usage{
11+
\S4method{Filter}{`function`,RDD}(f, x)
12+
13+
filter(f, x)
14+
15+
\S4method{filter}{`function`,RDD}(f, x)
16+
}
17+
\arguments{
18+
\item{f}{A unary predicate function.}
19+
20+
\item{x}{The RDD to be filtered.}
21+
}
22+
\description{
23+
This function returns a new RDD containing only the elements that satisfy
24+
a predicate (i.e. returning TRUE in a given logical function).
25+
The same as `filter()' in Spark.
26+
}
27+
\examples{
28+
\dontrun{
29+
sc <- sparkR.init()
30+
rdd <- parallelize(sc, 1:10)
31+
unlist(collect(Filter(function (x) { x < 3 }, rdd))) # c(1, 2)
32+
}
33+
}
34+

pkg/man/distinct.Rd

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{distinct}
4+
\alias{distinct}
5+
\alias{distinct,RDD,missingOrInteger-method}
6+
\title{Removes the duplicates from RDD.}
7+
\usage{
8+
distinct(rdd, numPartitions)
9+
10+
\S4method{distinct}{RDD,missingOrInteger}(rdd, numPartitions)
11+
}
12+
\arguments{
13+
\item{rdd}{The RDD to remove duplicates from.}
14+
15+
\item{numPartitions}{Number of partitions to create.}
16+
}
17+
\description{
18+
This function returns a new RDD containing the distinct elements in the
19+
given RDD. The same as `distinct()' in Spark.
20+
}
21+
\examples{
22+
\dontrun{
23+
sc <- sparkR.init()
24+
rdd <- parallelize(sc, c(1,2,2,3,3,3))
25+
sort(unlist(collect(distinct(rdd)))) # c(1, 2, 3)
26+
}
27+
}
28+

pkg/man/mapValues.Rd

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{mapValues}
4+
\alias{mapValues}
5+
\alias{mapValues,RDD,function-method}
6+
\title{Applies a function to all values of the elements, without modifying the keys.}
7+
\usage{
8+
mapValues(X, FUN)
9+
10+
\S4method{mapValues}{RDD,`function`}(X, FUN)
11+
}
12+
\arguments{
13+
\item{X}{The RDD to apply the transformation.}
14+
15+
\item{FUN}{the transformation to apply on the value of each element.}
16+
}
17+
\value{
18+
a new RDD created by the transformation.
19+
}
20+
\description{
21+
The same as `mapValues()' in Spark.
22+
}
23+
\examples{
24+
\dontrun{
25+
sc <- sparkR.init()
26+
rdd <- parallelize(sc, 1:10)
27+
makePairs <- lapply(rdd, function(x) { list(x, x) })
28+
collect(mapValues(makePairs, function(x) { x * 2) })
29+
Output: list(list(1,2), list(2,4), list(3,6), ...)
30+
}
31+
}
32+

0 commit comments

Comments
 (0)