Skip to content

Commit fe7816f

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into viz
2 parents 205f838 + 998aac2 commit fe7816f

File tree

200 files changed

+5851
-1994
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

200 files changed

+5851
-1994
lines changed

CONTRIBUTING.md

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
## Contributing to Spark
22

3-
Contributions via GitHub pull requests are gladly accepted from their original
4-
author. Along with any pull requests, please state that the contribution is
5-
your original work and that you license the work to the project under the
6-
project's open source license. Whether or not you state this explicitly, by
7-
submitting any copyrighted material via pull request, email, or other means
8-
you agree to license the material under the project's open source license and
9-
warrant that you have the legal authority to do so.
3+
*Before opening a pull request*, review the
4+
[Contributing to Spark wiki](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark).
5+
It lists steps that are required before creating a PR. In particular, consider:
6+
7+
- Is the change important and ready enough to ask the community to spend time reviewing?
8+
- Have you searched for existing, related JIRAs and pull requests?
9+
- Is this a new feature that can stand alone as a package on http://spark-packages.org ?
10+
- Is the change being proposed clearly explained and motivated?
1011

11-
Please see the [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
12-
for more information.
12+
When you contribute code, you affirm that the contribution is your original work and that you
13+
license the work to the project under the project's open source license. Whether or not you
14+
state this explicitly, by submitting any copyrighted material via pull request, email, or
15+
other means you agree to license the material under the project's open source license and
16+
warrant that you have the legal authority to do so.

R/pkg/R/DataFrame.R

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -790,9 +790,12 @@ setMethod("$", signature(x = "DataFrame"),
790790

791791
setMethod("$<-", signature(x = "DataFrame"),
792792
function(x, name, value) {
793-
stopifnot(class(value) == "Column")
793+
stopifnot(class(value) == "Column" || is.null(value))
794794
cols <- columns(x)
795795
if (name %in% cols) {
796+
if (is.null(value)) {
797+
cols <- Filter(function(c) { c != name }, cols)
798+
}
796799
cols <- lapply(cols, function(c) {
797800
if (c == name) {
798801
alias(value, name)
@@ -802,6 +805,9 @@ setMethod("$<-", signature(x = "DataFrame"),
802805
})
803806
nx <- select(x, cols)
804807
} else {
808+
if (is.null(value)) {
809+
return(x)
810+
}
805811
nx <- withColumn(x, name, value)
806812
}
807813
x@sdf <- nx@sdf

R/pkg/R/RDD.R

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
9191
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
9292
# prev_serializedMode is used during the delayed computation of JRDD in getJRDD
9393
} else {
94-
pipelinedFunc <- function(split, iterator) {
95-
func(split, prev@func(split, iterator))
94+
pipelinedFunc <- function(partIndex, part) {
95+
func(partIndex, prev@func(partIndex, part))
9696
}
9797
.Object@func <- cleanClosure(pipelinedFunc)
9898
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
@@ -306,7 +306,7 @@ setMethod("numPartitions",
306306
signature(x = "RDD"),
307307
function(x) {
308308
jrdd <- getJRDD(x)
309-
partitions <- callJMethod(jrdd, "splits")
309+
partitions <- callJMethod(jrdd, "partitions")
310310
callJMethod(partitions, "size")
311311
})
312312

@@ -452,8 +452,8 @@ setMethod("countByValue",
452452
setMethod("lapply",
453453
signature(X = "RDD", FUN = "function"),
454454
function(X, FUN) {
455-
func <- function(split, iterator) {
456-
lapply(iterator, FUN)
455+
func <- function(partIndex, part) {
456+
lapply(part, FUN)
457457
}
458458
lapplyPartitionsWithIndex(X, func)
459459
})
@@ -538,8 +538,8 @@ setMethod("mapPartitions",
538538
#'\dontrun{
539539
#' sc <- sparkR.init()
540540
#' rdd <- parallelize(sc, 1:10, 5L)
541-
#' prod <- lapplyPartitionsWithIndex(rdd, function(split, part) {
542-
#' split * Reduce("+", part) })
541+
#' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
542+
#' partIndex * Reduce("+", part) })
543543
#' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
544544
#'}
545545
#' @rdname lapplyPartitionsWithIndex
@@ -813,7 +813,7 @@ setMethod("distinct",
813813
#' @examples
814814
#'\dontrun{
815815
#' sc <- sparkR.init()
816-
#' rdd <- parallelize(sc, 1:10) # ensure each num is in its own split
816+
#' rdd <- parallelize(sc, 1:10)
817817
#' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
818818
#' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
819819
#'}
@@ -825,14 +825,14 @@ setMethod("sampleRDD",
825825
function(x, withReplacement, fraction, seed) {
826826

827827
# The sampler: takes a partition and returns its sampled version.
828-
samplingFunc <- function(split, part) {
828+
samplingFunc <- function(partIndex, part) {
829829
set.seed(seed)
830830
res <- vector("list", length(part))
831831
len <- 0
832832

833833
# Discards some random values to ensure each partition has a
834834
# different random seed.
835-
runif(split)
835+
runif(partIndex)
836836

837837
for (elem in part) {
838838
if (withReplacement) {
@@ -967,7 +967,7 @@ setMethod("keyBy",
967967
setMethod("repartition",
968968
signature(x = "RDD", numPartitions = "numeric"),
969969
function(x, numPartitions) {
970-
coalesce(x, numToInt(numPartitions), TRUE)
970+
coalesce(x, numPartitions, TRUE)
971971
})
972972

973973
#' Return a new RDD that is reduced into numPartitions partitions.
@@ -989,8 +989,8 @@ setMethod("coalesce",
989989
function(x, numPartitions, shuffle = FALSE) {
990990
numPartitions <- numToInt(numPartitions)
991991
if (shuffle || numPartitions > SparkR::numPartitions(x)) {
992-
func <- function(s, part) {
993-
set.seed(s) # split as seed
992+
func <- function(partIndex, part) {
993+
set.seed(partIndex) # partIndex as seed
994994
start <- as.integer(sample(numPartitions, 1) - 1)
995995
lapply(seq_along(part),
996996
function(i) {
@@ -1035,7 +1035,7 @@ setMethod("saveAsObjectFile",
10351035
#' Save this RDD as a text file, using string representations of elements.
10361036
#'
10371037
#' @param x The RDD to save
1038-
#' @param path The directory where the splits of the text file are saved
1038+
#' @param path The directory where the partitions of the text file are saved
10391039
#' @examples
10401040
#'\dontrun{
10411041
#' sc <- sparkR.init()
@@ -1335,10 +1335,10 @@ setMethod("zipWithUniqueId",
13351335
function(x) {
13361336
n <- numPartitions(x)
13371337

1338-
partitionFunc <- function(split, part) {
1338+
partitionFunc <- function(partIndex, part) {
13391339
mapply(
13401340
function(item, index) {
1341-
list(item, (index - 1) * n + split)
1341+
list(item, (index - 1) * n + partIndex)
13421342
},
13431343
part,
13441344
seq_along(part),
@@ -1382,11 +1382,11 @@ setMethod("zipWithIndex",
13821382
startIndices <- Reduce("+", nums, accumulate = TRUE)
13831383
}
13841384

1385-
partitionFunc <- function(split, part) {
1386-
if (split == 0) {
1385+
partitionFunc <- function(partIndex, part) {
1386+
if (partIndex == 0) {
13871387
startIndex <- 0
13881388
} else {
1389-
startIndex <- startIndices[[split]]
1389+
startIndex <- startIndices[[partIndex]]
13901390
}
13911391

13921392
mapply(

R/pkg/R/context.R

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
# context.R: SparkContext driven functions
1919

20-
getMinSplits <- function(sc, minSplits) {
21-
if (is.null(minSplits)) {
20+
getMinPartitions <- function(sc, minPartitions) {
21+
if (is.null(minPartitions)) {
2222
defaultParallelism <- callJMethod(sc, "defaultParallelism")
23-
minSplits <- min(defaultParallelism, 2)
23+
minPartitions <- min(defaultParallelism, 2)
2424
}
25-
as.integer(minSplits)
25+
as.integer(minPartitions)
2626
}
2727

2828
#' Create an RDD from a text file.
@@ -33,7 +33,7 @@ getMinSplits <- function(sc, minSplits) {
3333
#'
3434
#' @param sc SparkContext to use
3535
#' @param path Path of file to read. A vector of multiple paths is allowed.
36-
#' @param minSplits Minimum number of splits to be created. If NULL, the default
36+
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
3737
#' value is chosen based on available parallelism.
3838
#' @return RDD where each item is of type \code{character}
3939
#' @export
@@ -42,13 +42,13 @@ getMinSplits <- function(sc, minSplits) {
4242
#' sc <- sparkR.init()
4343
#' lines <- textFile(sc, "myfile.txt")
4444
#'}
45-
textFile <- function(sc, path, minSplits = NULL) {
45+
textFile <- function(sc, path, minPartitions = NULL) {
4646
# Allow the user to have a more flexible definiton of the text file path
4747
path <- suppressWarnings(normalizePath(path))
4848
#' Convert a string vector of paths to a string containing comma separated paths
4949
path <- paste(path, collapse = ",")
5050

51-
jrdd <- callJMethod(sc, "textFile", path, getMinSplits(sc, minSplits))
51+
jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions))
5252
# jrdd is of type JavaRDD[String]
5353
RDD(jrdd, "string")
5454
}
@@ -60,7 +60,7 @@ textFile <- function(sc, path, minSplits = NULL) {
6060
#'
6161
#' @param sc SparkContext to use
6262
#' @param path Path of file to read. A vector of multiple paths is allowed.
63-
#' @param minSplits Minimum number of splits to be created. If NULL, the default
63+
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
6464
#' value is chosen based on available parallelism.
6565
#' @return RDD containing serialized R objects.
6666
#' @seealso saveAsObjectFile
@@ -70,13 +70,13 @@ textFile <- function(sc, path, minSplits = NULL) {
7070
#' sc <- sparkR.init()
7171
#' rdd <- objectFile(sc, "myfile")
7272
#'}
73-
objectFile <- function(sc, path, minSplits = NULL) {
73+
objectFile <- function(sc, path, minPartitions = NULL) {
7474
# Allow the user to have a more flexible definiton of the text file path
7575
path <- suppressWarnings(normalizePath(path))
7676
#' Convert a string vector of paths to a string containing comma separated paths
7777
path <- paste(path, collapse = ",")
7878

79-
jrdd <- callJMethod(sc, "objectFile", path, getMinSplits(sc, minSplits))
79+
jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions))
8080
# Assume the RDD contains serialized R objects.
8181
RDD(jrdd, "byte")
8282
}

R/pkg/R/generics.R

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ setGeneric("countByValue", function(x) { standardGeneric("countByValue") })
6060

6161
#' @rdname distinct
6262
#' @export
63-
setGeneric("distinct", function(x, numPartitions = 1L) { standardGeneric("distinct") })
63+
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })
6464

6565
#' @rdname filterRDD
6666
#' @export
@@ -182,7 +182,7 @@ setGeneric("setName", function(x, name) { standardGeneric("setName") })
182182
#' @rdname sortBy
183183
#' @export
184184
setGeneric("sortBy",
185-
function(x, func, ascending = TRUE, numPartitions = 1L) {
185+
function(x, func, ascending = TRUE, numPartitions = 1) {
186186
standardGeneric("sortBy")
187187
})
188188

@@ -244,7 +244,7 @@ setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues")
244244

245245
#' @rdname intersection
246246
#' @export
247-
setGeneric("intersection", function(x, other, numPartitions = 1L) {
247+
setGeneric("intersection", function(x, other, numPartitions = 1) {
248248
standardGeneric("intersection") })
249249

250250
#' @rdname keys
@@ -346,21 +346,21 @@ setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("ri
346346
#' @rdname sortByKey
347347
#' @export
348348
setGeneric("sortByKey",
349-
function(x, ascending = TRUE, numPartitions = 1L) {
349+
function(x, ascending = TRUE, numPartitions = 1) {
350350
standardGeneric("sortByKey")
351351
})
352352

353353
#' @rdname subtract
354354
#' @export
355355
setGeneric("subtract",
356-
function(x, other, numPartitions = 1L) {
356+
function(x, other, numPartitions = 1) {
357357
standardGeneric("subtract")
358358
})
359359

360360
#' @rdname subtractByKey
361361
#' @export
362362
setGeneric("subtractByKey",
363-
function(x, other, numPartitions = 1L) {
363+
function(x, other, numPartitions = 1) {
364364
standardGeneric("subtractByKey")
365365
})
366366

0 commit comments

Comments
 (0)