Skip to content

Commit f03fe7f

Browse files
committed
Merge pull request apache#12 from apache/master
merge lastest spark
2 parents f12fa50 + f98773a commit f03fe7f

File tree

380 files changed

+13450
-3668
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

380 files changed

+13450
-3668
lines changed

CONTRIBUTING.md

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
## Contributing to Spark
22

3-
Contributions via GitHub pull requests are gladly accepted from their original
4-
author. Along with any pull requests, please state that the contribution is
5-
your original work and that you license the work to the project under the
6-
project's open source license. Whether or not you state this explicitly, by
7-
submitting any copyrighted material via pull request, email, or other means
8-
you agree to license the material under the project's open source license and
9-
warrant that you have the legal authority to do so.
3+
*Before opening a pull request*, review the
4+
[Contributing to Spark wiki](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark).
5+
It lists steps that are required before creating a PR. In particular, consider:
6+
7+
- Is the change important and ready enough to ask the community to spend time reviewing?
8+
- Have you searched for existing, related JIRAs and pull requests?
9+
- Is this a new feature that can stand alone as a package on http://spark-packages.org ?
10+
- Is the change being proposed clearly explained and motivated?
1011

11-
Please see the [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
12-
for more information.
12+
When you contribute code, you affirm that the contribution is your original work and that you
13+
license the work to the project under the project's open source license. Whether or not you
14+
state this explicitly, by submitting any copyrighted material via pull request, email, or
15+
other means you agree to license the material under the project's open source license and
16+
warrant that you have the legal authority to do so.

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ exportMethods(
7171
"unpersist",
7272
"value",
7373
"values",
74+
"zipPartitions",
7475
"zipRDD",
7576
"zipWithIndex",
7677
"zipWithUniqueId"

R/pkg/R/DataFrame.R

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -790,9 +790,12 @@ setMethod("$", signature(x = "DataFrame"),
790790

791791
setMethod("$<-", signature(x = "DataFrame"),
792792
function(x, name, value) {
793-
stopifnot(class(value) == "Column")
793+
stopifnot(class(value) == "Column" || is.null(value))
794794
cols <- columns(x)
795795
if (name %in% cols) {
796+
if (is.null(value)) {
797+
cols <- Filter(function(c) { c != name }, cols)
798+
}
796799
cols <- lapply(cols, function(c) {
797800
if (c == name) {
798801
alias(value, name)
@@ -802,6 +805,9 @@ setMethod("$<-", signature(x = "DataFrame"),
802805
})
803806
nx <- select(x, cols)
804807
} else {
808+
if (is.null(value)) {
809+
return(x)
810+
}
805811
nx <- withColumn(x, name, value)
806812
}
807813
x@sdf <- nx@sdf

R/pkg/R/RDD.R

Lines changed: 70 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
6666
.Object
6767
})
6868

69+
setMethod("show", "RDD",
70+
function(.Object) {
71+
cat(paste(callJMethod(.Object@jrdd, "toString"), "\n", sep=""))
72+
})
73+
6974
setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
7075
.Object@env <- new.env()
7176
.Object@env$isCached <- FALSE
@@ -91,8 +96,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
9196
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
9297
# prev_serializedMode is used during the delayed computation of JRDD in getJRDD
9398
} else {
94-
pipelinedFunc <- function(split, iterator) {
95-
func(split, prev@func(split, iterator))
99+
pipelinedFunc <- function(partIndex, part) {
100+
func(partIndex, prev@func(partIndex, part))
96101
}
97102
.Object@func <- cleanClosure(pipelinedFunc)
98103
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
@@ -306,7 +311,7 @@ setMethod("numPartitions",
306311
signature(x = "RDD"),
307312
function(x) {
308313
jrdd <- getJRDD(x)
309-
partitions <- callJMethod(jrdd, "splits")
314+
partitions <- callJMethod(jrdd, "partitions")
310315
callJMethod(partitions, "size")
311316
})
312317

@@ -452,8 +457,8 @@ setMethod("countByValue",
452457
setMethod("lapply",
453458
signature(X = "RDD", FUN = "function"),
454459
function(X, FUN) {
455-
func <- function(split, iterator) {
456-
lapply(iterator, FUN)
460+
func <- function(partIndex, part) {
461+
lapply(part, FUN)
457462
}
458463
lapplyPartitionsWithIndex(X, func)
459464
})
@@ -538,8 +543,8 @@ setMethod("mapPartitions",
538543
#'\dontrun{
539544
#' sc <- sparkR.init()
540545
#' rdd <- parallelize(sc, 1:10, 5L)
541-
#' prod <- lapplyPartitionsWithIndex(rdd, function(split, part) {
542-
#' split * Reduce("+", part) })
546+
#' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
547+
#' partIndex * Reduce("+", part) })
543548
#' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
544549
#'}
545550
#' @rdname lapplyPartitionsWithIndex
@@ -813,7 +818,7 @@ setMethod("distinct",
813818
#' @examples
814819
#'\dontrun{
815820
#' sc <- sparkR.init()
816-
#' rdd <- parallelize(sc, 1:10) # ensure each num is in its own split
821+
#' rdd <- parallelize(sc, 1:10)
817822
#' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
818823
#' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
819824
#'}
@@ -825,14 +830,14 @@ setMethod("sampleRDD",
825830
function(x, withReplacement, fraction, seed) {
826831

827832
# The sampler: takes a partition and returns its sampled version.
828-
samplingFunc <- function(split, part) {
833+
samplingFunc <- function(partIndex, part) {
829834
set.seed(seed)
830835
res <- vector("list", length(part))
831836
len <- 0
832837

833838
# Discards some random values to ensure each partition has a
834839
# different random seed.
835-
runif(split)
840+
runif(partIndex)
836841

837842
for (elem in part) {
838843
if (withReplacement) {
@@ -967,7 +972,7 @@ setMethod("keyBy",
967972
setMethod("repartition",
968973
signature(x = "RDD", numPartitions = "numeric"),
969974
function(x, numPartitions) {
970-
coalesce(x, numToInt(numPartitions), TRUE)
975+
coalesce(x, numPartitions, TRUE)
971976
})
972977

973978
#' Return a new RDD that is reduced into numPartitions partitions.
@@ -989,8 +994,8 @@ setMethod("coalesce",
989994
function(x, numPartitions, shuffle = FALSE) {
990995
numPartitions <- numToInt(numPartitions)
991996
if (shuffle || numPartitions > SparkR::numPartitions(x)) {
992-
func <- function(s, part) {
993-
set.seed(s) # split as seed
997+
func <- function(partIndex, part) {
998+
set.seed(partIndex) # partIndex as seed
994999
start <- as.integer(sample(numPartitions, 1) - 1)
9951000
lapply(seq_along(part),
9961001
function(i) {
@@ -1035,7 +1040,7 @@ setMethod("saveAsObjectFile",
10351040
#' Save this RDD as a text file, using string representations of elements.
10361041
#'
10371042
#' @param x The RDD to save
1038-
#' @param path The directory where the splits of the text file are saved
1043+
#' @param path The directory where the partitions of the text file are saved
10391044
#' @examples
10401045
#'\dontrun{
10411046
#' sc <- sparkR.init()
@@ -1335,10 +1340,10 @@ setMethod("zipWithUniqueId",
13351340
function(x) {
13361341
n <- numPartitions(x)
13371342

1338-
partitionFunc <- function(split, part) {
1343+
partitionFunc <- function(partIndex, part) {
13391344
mapply(
13401345
function(item, index) {
1341-
list(item, (index - 1) * n + split)
1346+
list(item, (index - 1) * n + partIndex)
13421347
},
13431348
part,
13441349
seq_along(part),
@@ -1382,11 +1387,11 @@ setMethod("zipWithIndex",
13821387
startIndices <- Reduce("+", nums, accumulate = TRUE)
13831388
}
13841389

1385-
partitionFunc <- function(split, part) {
1386-
if (split == 0) {
1390+
partitionFunc <- function(partIndex, part) {
1391+
if (partIndex == 0) {
13871392
startIndex <- 0
13881393
} else {
1389-
startIndex <- startIndices[[split]]
1394+
startIndex <- startIndices[[partIndex]]
13901395
}
13911396

13921397
mapply(
@@ -1590,3 +1595,49 @@ setMethod("intersection",
15901595

15911596
keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
15921597
})
1598+
1599+
#' Zips an RDD's partitions with one (or more) RDD(s).
1600+
#' Same as zipPartitions in Spark.
1601+
#'
1602+
#' @param ... RDDs to be zipped.
1603+
#' @param func A function to transform zipped partitions.
1604+
#' @return A new RDD by applying a function to the zipped partitions.
1605+
#' Assumes that all the RDDs have the *same number of partitions*, but
1606+
#' does *not* require them to have the same number of elements in each partition.
1607+
#' @examples
1608+
#'\dontrun{
1609+
#' sc <- sparkR.init()
1610+
#' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
1611+
#' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
1612+
#' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
1613+
#' collect(zipPartitions(rdd1, rdd2, rdd3,
1614+
#' func = function(x, y, z) { list(list(x, y, z))} ))
1615+
#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
1616+
#'}
1617+
#' @rdname zipRDD
1618+
#' @aliases zipPartitions,RDD
1619+
setMethod("zipPartitions",
1620+
"RDD",
1621+
function(..., func) {
1622+
rrdds <- list(...)
1623+
if (length(rrdds) == 1) {
1624+
return(rrdds[[1]])
1625+
}
1626+
nPart <- sapply(rrdds, numPartitions)
1627+
if (length(unique(nPart)) != 1) {
1628+
stop("Can only zipPartitions RDDs which have the same number of partitions.")
1629+
}
1630+
1631+
rrdds <- lapply(rrdds, function(rdd) {
1632+
mapPartitionsWithIndex(rdd, function(partIndex, part) {
1633+
print(length(part))
1634+
list(list(partIndex, part))
1635+
})
1636+
})
1637+
union.rdd <- Reduce(unionRDD, rrdds)
1638+
zipped.rdd <- values(groupByKey(union.rdd, numPartitions = nPart[1]))
1639+
res <- mapPartitions(zipped.rdd, function(plist) {
1640+
do.call(func, plist[[1]])
1641+
})
1642+
res
1643+
})

R/pkg/R/context.R

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
# context.R: SparkContext driven functions
1919

20-
getMinSplits <- function(sc, minSplits) {
21-
if (is.null(minSplits)) {
20+
getMinPartitions <- function(sc, minPartitions) {
21+
if (is.null(minPartitions)) {
2222
defaultParallelism <- callJMethod(sc, "defaultParallelism")
23-
minSplits <- min(defaultParallelism, 2)
23+
minPartitions <- min(defaultParallelism, 2)
2424
}
25-
as.integer(minSplits)
25+
as.integer(minPartitions)
2626
}
2727

2828
#' Create an RDD from a text file.
@@ -33,7 +33,7 @@ getMinSplits <- function(sc, minSplits) {
3333
#'
3434
#' @param sc SparkContext to use
3535
#' @param path Path of file to read. A vector of multiple paths is allowed.
36-
#' @param minSplits Minimum number of splits to be created. If NULL, the default
36+
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
3737
#' value is chosen based on available parallelism.
3838
#' @return RDD where each item is of type \code{character}
3939
#' @export
@@ -42,13 +42,13 @@ getMinSplits <- function(sc, minSplits) {
4242
#' sc <- sparkR.init()
4343
#' lines <- textFile(sc, "myfile.txt")
4444
#'}
45-
textFile <- function(sc, path, minSplits = NULL) {
45+
textFile <- function(sc, path, minPartitions = NULL) {
4646
# Allow the user to have a more flexible definiton of the text file path
4747
path <- suppressWarnings(normalizePath(path))
4848
#' Convert a string vector of paths to a string containing comma separated paths
4949
path <- paste(path, collapse = ",")
5050

51-
jrdd <- callJMethod(sc, "textFile", path, getMinSplits(sc, minSplits))
51+
jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions))
5252
# jrdd is of type JavaRDD[String]
5353
RDD(jrdd, "string")
5454
}
@@ -60,7 +60,7 @@ textFile <- function(sc, path, minSplits = NULL) {
6060
#'
6161
#' @param sc SparkContext to use
6262
#' @param path Path of file to read. A vector of multiple paths is allowed.
63-
#' @param minSplits Minimum number of splits to be created. If NULL, the default
63+
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
6464
#' value is chosen based on available parallelism.
6565
#' @return RDD containing serialized R objects.
6666
#' @seealso saveAsObjectFile
@@ -70,13 +70,13 @@ textFile <- function(sc, path, minSplits = NULL) {
7070
#' sc <- sparkR.init()
7171
#' rdd <- objectFile(sc, "myfile")
7272
#'}
73-
objectFile <- function(sc, path, minSplits = NULL) {
73+
objectFile <- function(sc, path, minPartitions = NULL) {
7474
# Allow the user to have a more flexible definiton of the text file path
7575
path <- suppressWarnings(normalizePath(path))
7676
#' Convert a string vector of paths to a string containing comma separated paths
7777
path <- paste(path, collapse = ",")
7878

79-
jrdd <- callJMethod(sc, "objectFile", path, getMinSplits(sc, minSplits))
79+
jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions))
8080
# Assume the RDD contains serialized R objects.
8181
RDD(jrdd, "byte")
8282
}

R/pkg/R/generics.R

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ setGeneric("countByValue", function(x) { standardGeneric("countByValue") })
6060

6161
#' @rdname distinct
6262
#' @export
63-
setGeneric("distinct", function(x, numPartitions = 1L) { standardGeneric("distinct") })
63+
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })
6464

6565
#' @rdname filterRDD
6666
#' @export
@@ -182,7 +182,7 @@ setGeneric("setName", function(x, name) { standardGeneric("setName") })
182182
#' @rdname sortBy
183183
#' @export
184184
setGeneric("sortBy",
185-
function(x, func, ascending = TRUE, numPartitions = 1L) {
185+
function(x, func, ascending = TRUE, numPartitions = 1) {
186186
standardGeneric("sortBy")
187187
})
188188

@@ -217,6 +217,11 @@ setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
217217
#' @export
218218
setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") })
219219

220+
#' @rdname zipRDD
221+
#' @export
222+
setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") },
223+
signature = "...")
224+
220225
#' @rdname zipWithIndex
221226
#' @seealso zipWithUniqueId
222227
#' @export
@@ -244,7 +249,7 @@ setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues")
244249

245250
#' @rdname intersection
246251
#' @export
247-
setGeneric("intersection", function(x, other, numPartitions = 1L) {
252+
setGeneric("intersection", function(x, other, numPartitions = 1) {
248253
standardGeneric("intersection") })
249254

250255
#' @rdname keys
@@ -346,21 +351,21 @@ setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("ri
346351
#' @rdname sortByKey
347352
#' @export
348353
setGeneric("sortByKey",
349-
function(x, ascending = TRUE, numPartitions = 1L) {
354+
function(x, ascending = TRUE, numPartitions = 1) {
350355
standardGeneric("sortByKey")
351356
})
352357

353358
#' @rdname subtract
354359
#' @export
355360
setGeneric("subtract",
356-
function(x, other, numPartitions = 1L) {
361+
function(x, other, numPartitions = 1) {
357362
standardGeneric("subtract")
358363
})
359364

360365
#' @rdname subtractByKey
361366
#' @export
362367
setGeneric("subtractByKey",
363-
function(x, other, numPartitions = 1L) {
368+
function(x, other, numPartitions = 1) {
364369
standardGeneric("subtractByKey")
365370
})
366371

0 commit comments

Comments
 (0)