Skip to content

Commit 2106f12

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into closure-cleaner
2 parents 263593d + 438859e commit 2106f12

File tree

19 files changed

+589
-85
lines changed

19 files changed

+589
-85
lines changed

R/pkg/R/RDD.R

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
9191
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
9292
# prev_serializedMode is used during the delayed computation of JRDD in getJRDD
9393
} else {
94-
pipelinedFunc <- function(split, iterator) {
95-
func(split, prev@func(split, iterator))
94+
pipelinedFunc <- function(partIndex, part) {
95+
func(partIndex, prev@func(partIndex, part))
9696
}
9797
.Object@func <- cleanClosure(pipelinedFunc)
9898
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
@@ -306,7 +306,7 @@ setMethod("numPartitions",
306306
signature(x = "RDD"),
307307
function(x) {
308308
jrdd <- getJRDD(x)
309-
partitions <- callJMethod(jrdd, "splits")
309+
partitions <- callJMethod(jrdd, "partitions")
310310
callJMethod(partitions, "size")
311311
})
312312

@@ -452,8 +452,8 @@ setMethod("countByValue",
452452
setMethod("lapply",
453453
signature(X = "RDD", FUN = "function"),
454454
function(X, FUN) {
455-
func <- function(split, iterator) {
456-
lapply(iterator, FUN)
455+
func <- function(partIndex, part) {
456+
lapply(part, FUN)
457457
}
458458
lapplyPartitionsWithIndex(X, func)
459459
})
@@ -538,8 +538,8 @@ setMethod("mapPartitions",
538538
#'\dontrun{
539539
#' sc <- sparkR.init()
540540
#' rdd <- parallelize(sc, 1:10, 5L)
541-
#' prod <- lapplyPartitionsWithIndex(rdd, function(split, part) {
542-
#' split * Reduce("+", part) })
541+
#' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
542+
#' partIndex * Reduce("+", part) })
543543
#' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
544544
#'}
545545
#' @rdname lapplyPartitionsWithIndex
@@ -813,7 +813,7 @@ setMethod("distinct",
813813
#' @examples
814814
#'\dontrun{
815815
#' sc <- sparkR.init()
816-
#' rdd <- parallelize(sc, 1:10) # ensure each num is in its own split
816+
#' rdd <- parallelize(sc, 1:10)
817817
#' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
818818
#' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
819819
#'}
@@ -825,14 +825,14 @@ setMethod("sampleRDD",
825825
function(x, withReplacement, fraction, seed) {
826826

827827
# The sampler: takes a partition and returns its sampled version.
828-
samplingFunc <- function(split, part) {
828+
samplingFunc <- function(partIndex, part) {
829829
set.seed(seed)
830830
res <- vector("list", length(part))
831831
len <- 0
832832

833833
# Discards some random values to ensure each partition has a
834834
# different random seed.
835-
runif(split)
835+
runif(partIndex)
836836

837837
for (elem in part) {
838838
if (withReplacement) {
@@ -967,7 +967,7 @@ setMethod("keyBy",
967967
setMethod("repartition",
968968
signature(x = "RDD", numPartitions = "numeric"),
969969
function(x, numPartitions) {
970-
coalesce(x, numToInt(numPartitions), TRUE)
970+
coalesce(x, numPartitions, TRUE)
971971
})
972972

973973
#' Return a new RDD that is reduced into numPartitions partitions.
@@ -989,8 +989,8 @@ setMethod("coalesce",
989989
function(x, numPartitions, shuffle = FALSE) {
990990
numPartitions <- numToInt(numPartitions)
991991
if (shuffle || numPartitions > SparkR::numPartitions(x)) {
992-
func <- function(s, part) {
993-
set.seed(s) # split as seed
992+
func <- function(partIndex, part) {
993+
set.seed(partIndex) # partIndex as seed
994994
start <- as.integer(sample(numPartitions, 1) - 1)
995995
lapply(seq_along(part),
996996
function(i) {
@@ -1035,7 +1035,7 @@ setMethod("saveAsObjectFile",
10351035
#' Save this RDD as a text file, using string representations of elements.
10361036
#'
10371037
#' @param x The RDD to save
1038-
#' @param path The directory where the splits of the text file are saved
1038+
#' @param path The directory where the partitions of the text file are saved
10391039
#' @examples
10401040
#'\dontrun{
10411041
#' sc <- sparkR.init()
@@ -1335,10 +1335,10 @@ setMethod("zipWithUniqueId",
13351335
function(x) {
13361336
n <- numPartitions(x)
13371337

1338-
partitionFunc <- function(split, part) {
1338+
partitionFunc <- function(partIndex, part) {
13391339
mapply(
13401340
function(item, index) {
1341-
list(item, (index - 1) * n + split)
1341+
list(item, (index - 1) * n + partIndex)
13421342
},
13431343
part,
13441344
seq_along(part),
@@ -1382,11 +1382,11 @@ setMethod("zipWithIndex",
13821382
startIndices <- Reduce("+", nums, accumulate = TRUE)
13831383
}
13841384

1385-
partitionFunc <- function(split, part) {
1386-
if (split == 0) {
1385+
partitionFunc <- function(partIndex, part) {
1386+
if (partIndex == 0) {
13871387
startIndex <- 0
13881388
} else {
1389-
startIndex <- startIndices[[split]]
1389+
startIndex <- startIndices[[partIndex]]
13901390
}
13911391

13921392
mapply(

R/pkg/R/context.R

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
# context.R: SparkContext driven functions
1919

20-
getMinSplits <- function(sc, minSplits) {
21-
if (is.null(minSplits)) {
20+
getMinPartitions <- function(sc, minPartitions) {
21+
if (is.null(minPartitions)) {
2222
defaultParallelism <- callJMethod(sc, "defaultParallelism")
23-
minSplits <- min(defaultParallelism, 2)
23+
minPartitions <- min(defaultParallelism, 2)
2424
}
25-
as.integer(minSplits)
25+
as.integer(minPartitions)
2626
}
2727

2828
#' Create an RDD from a text file.
@@ -33,7 +33,7 @@ getMinSplits <- function(sc, minSplits) {
3333
#'
3434
#' @param sc SparkContext to use
3535
#' @param path Path of file to read. A vector of multiple paths is allowed.
36-
#' @param minSplits Minimum number of splits to be created. If NULL, the default
36+
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
3737
#' value is chosen based on available parallelism.
3838
#' @return RDD where each item is of type \code{character}
3939
#' @export
@@ -42,13 +42,13 @@ getMinSplits <- function(sc, minSplits) {
4242
#' sc <- sparkR.init()
4343
#' lines <- textFile(sc, "myfile.txt")
4444
#'}
45-
textFile <- function(sc, path, minSplits = NULL) {
45+
textFile <- function(sc, path, minPartitions = NULL) {
4646
# Allow the user to have a more flexible definiton of the text file path
4747
path <- suppressWarnings(normalizePath(path))
4848
#' Convert a string vector of paths to a string containing comma separated paths
4949
path <- paste(path, collapse = ",")
5050

51-
jrdd <- callJMethod(sc, "textFile", path, getMinSplits(sc, minSplits))
51+
jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions))
5252
# jrdd is of type JavaRDD[String]
5353
RDD(jrdd, "string")
5454
}
@@ -60,7 +60,7 @@ textFile <- function(sc, path, minSplits = NULL) {
6060
#'
6161
#' @param sc SparkContext to use
6262
#' @param path Path of file to read. A vector of multiple paths is allowed.
63-
#' @param minSplits Minimum number of splits to be created. If NULL, the default
63+
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
6464
#' value is chosen based on available parallelism.
6565
#' @return RDD containing serialized R objects.
6666
#' @seealso saveAsObjectFile
@@ -70,13 +70,13 @@ textFile <- function(sc, path, minSplits = NULL) {
7070
#' sc <- sparkR.init()
7171
#' rdd <- objectFile(sc, "myfile")
7272
#'}
73-
objectFile <- function(sc, path, minSplits = NULL) {
73+
objectFile <- function(sc, path, minPartitions = NULL) {
7474
# Allow the user to have a more flexible definiton of the text file path
7575
path <- suppressWarnings(normalizePath(path))
7676
#' Convert a string vector of paths to a string containing comma separated paths
7777
path <- paste(path, collapse = ",")
7878

79-
jrdd <- callJMethod(sc, "objectFile", path, getMinSplits(sc, minSplits))
79+
jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions))
8080
# Assume the RDD contains serialized R objects.
8181
RDD(jrdd, "byte")
8282
}

R/pkg/R/generics.R

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ setGeneric("countByValue", function(x) { standardGeneric("countByValue") })
6060

6161
#' @rdname distinct
6262
#' @export
63-
setGeneric("distinct", function(x, numPartitions = 1L) { standardGeneric("distinct") })
63+
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })
6464

6565
#' @rdname filterRDD
6666
#' @export
@@ -182,7 +182,7 @@ setGeneric("setName", function(x, name) { standardGeneric("setName") })
182182
#' @rdname sortBy
183183
#' @export
184184
setGeneric("sortBy",
185-
function(x, func, ascending = TRUE, numPartitions = 1L) {
185+
function(x, func, ascending = TRUE, numPartitions = 1) {
186186
standardGeneric("sortBy")
187187
})
188188

@@ -244,7 +244,7 @@ setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues")
244244

245245
#' @rdname intersection
246246
#' @export
247-
setGeneric("intersection", function(x, other, numPartitions = 1L) {
247+
setGeneric("intersection", function(x, other, numPartitions = 1) {
248248
standardGeneric("intersection") })
249249

250250
#' @rdname keys
@@ -346,21 +346,21 @@ setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("ri
346346
#' @rdname sortByKey
347347
#' @export
348348
setGeneric("sortByKey",
349-
function(x, ascending = TRUE, numPartitions = 1L) {
349+
function(x, ascending = TRUE, numPartitions = 1) {
350350
standardGeneric("sortByKey")
351351
})
352352

353353
#' @rdname subtract
354354
#' @export
355355
setGeneric("subtract",
356-
function(x, other, numPartitions = 1L) {
356+
function(x, other, numPartitions = 1) {
357357
standardGeneric("subtract")
358358
})
359359

360360
#' @rdname subtractByKey
361361
#' @export
362362
setGeneric("subtractByKey",
363-
function(x, other, numPartitions = 1L) {
363+
function(x, other, numPartitions = 1) {
364364
standardGeneric("subtractByKey")
365365
})
366366

R/pkg/R/pairRDD.R

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ setMethod("flatMapValues",
190190
#' @rdname partitionBy
191191
#' @aliases partitionBy,RDD,integer-method
192192
setMethod("partitionBy",
193-
signature(x = "RDD", numPartitions = "integer"),
193+
signature(x = "RDD", numPartitions = "numeric"),
194194
function(x, numPartitions, partitionFunc = hashCode) {
195195

196196
#if (missing(partitionFunc)) {
@@ -206,12 +206,12 @@ setMethod("partitionBy",
206206
get(name, .broadcastNames) })
207207
jrdd <- getJRDD(x)
208208

209-
# We create a PairwiseRRDD that extends RDD[(Array[Byte],
210-
# Array[Byte])], where the key is the hashed split, the value is
209+
# We create a PairwiseRRDD that extends RDD[(Int, Array[Byte])],
210+
# where the key is the target partition number, the value is
211211
# the content (key-val pairs).
212212
pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD",
213213
callJMethod(jrdd, "rdd"),
214-
as.integer(numPartitions),
214+
numToInt(numPartitions),
215215
serializedHashFuncBytes,
216216
getSerializedMode(x),
217217
packageNamesArr,
@@ -221,7 +221,7 @@ setMethod("partitionBy",
221221

222222
# Create a corresponding partitioner.
223223
rPartitioner <- newJObject("org.apache.spark.HashPartitioner",
224-
as.integer(numPartitions))
224+
numToInt(numPartitions))
225225

226226
# Call partitionBy on the obtained PairwiseRDD.
227227
javaPairRDD <- callJMethod(pairwiseRRDD, "asJavaPairRDD")
@@ -256,7 +256,7 @@ setMethod("partitionBy",
256256
#' @rdname groupByKey
257257
#' @aliases groupByKey,RDD,integer-method
258258
setMethod("groupByKey",
259-
signature(x = "RDD", numPartitions = "integer"),
259+
signature(x = "RDD", numPartitions = "numeric"),
260260
function(x, numPartitions) {
261261
shuffled <- partitionBy(x, numPartitions)
262262
groupVals <- function(part) {
@@ -315,7 +315,7 @@ setMethod("groupByKey",
315315
#' @rdname reduceByKey
316316
#' @aliases reduceByKey,RDD,integer-method
317317
setMethod("reduceByKey",
318-
signature(x = "RDD", combineFunc = "ANY", numPartitions = "integer"),
318+
signature(x = "RDD", combineFunc = "ANY", numPartitions = "numeric"),
319319
function(x, combineFunc, numPartitions) {
320320
reduceVals <- function(part) {
321321
vals <- new.env()
@@ -422,7 +422,7 @@ setMethod("reduceByKeyLocally",
422422
#' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method
423423
setMethod("combineByKey",
424424
signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY",
425-
mergeCombiners = "ANY", numPartitions = "integer"),
425+
mergeCombiners = "ANY", numPartitions = "numeric"),
426426
function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) {
427427
combineLocally <- function(part) {
428428
combiners <- new.env()
@@ -483,7 +483,7 @@ setMethod("combineByKey",
483483
#' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
484484
setMethod("aggregateByKey",
485485
signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY",
486-
combOp = "ANY", numPartitions = "integer"),
486+
combOp = "ANY", numPartitions = "numeric"),
487487
function(x, zeroValue, seqOp, combOp, numPartitions) {
488488
createCombiner <- function(v) {
489489
do.call(seqOp, list(zeroValue, v))
@@ -514,7 +514,7 @@ setMethod("aggregateByKey",
514514
#' @aliases foldByKey,RDD,ANY,ANY,integer-method
515515
setMethod("foldByKey",
516516
signature(x = "RDD", zeroValue = "ANY",
517-
func = "ANY", numPartitions = "integer"),
517+
func = "ANY", numPartitions = "numeric"),
518518
function(x, zeroValue, func, numPartitions) {
519519
aggregateByKey(x, zeroValue, func, func, numPartitions)
520520
})
@@ -553,7 +553,7 @@ setMethod("join",
553553
joinTaggedList(v, list(FALSE, FALSE))
554554
}
555555

556-
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numToInt(numPartitions)),
556+
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions),
557557
doJoin)
558558
})
559559

@@ -582,7 +582,7 @@ setMethod("join",
582582
#' @rdname join-methods
583583
#' @aliases leftOuterJoin,RDD,RDD-method
584584
setMethod("leftOuterJoin",
585-
signature(x = "RDD", y = "RDD", numPartitions = "integer"),
585+
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
586586
function(x, y, numPartitions) {
587587
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
588588
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
@@ -619,7 +619,7 @@ setMethod("leftOuterJoin",
619619
#' @rdname join-methods
620620
#' @aliases rightOuterJoin,RDD,RDD-method
621621
setMethod("rightOuterJoin",
622-
signature(x = "RDD", y = "RDD", numPartitions = "integer"),
622+
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
623623
function(x, y, numPartitions) {
624624
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
625625
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
@@ -659,7 +659,7 @@ setMethod("rightOuterJoin",
659659
#' @rdname join-methods
660660
#' @aliases fullOuterJoin,RDD,RDD-method
661661
setMethod("fullOuterJoin",
662-
signature(x = "RDD", y = "RDD", numPartitions = "integer"),
662+
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
663663
function(x, y, numPartitions) {
664664
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
665665
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
@@ -866,8 +866,8 @@ setMethod("sampleByKey",
866866
}
867867

868868
# The sampler: takes a partition and returns its sampled version.
869-
samplingFunc <- function(split, part) {
870-
set.seed(bitwXor(seed, split))
869+
samplingFunc <- function(partIndex, part) {
870+
set.seed(bitwXor(seed, partIndex))
871871
res <- vector("list", length(part))
872872
len <- 0
873873

R/pkg/R/utils.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ appendPartitionLengths <- function(x, other) {
501501
# A result RDD.
502502
mergePartitions <- function(rdd, zip) {
503503
serializerMode <- getSerializedMode(rdd)
504-
partitionFunc <- function(split, part) {
504+
partitionFunc <- function(partIndex, part) {
505505
len <- length(part)
506506
if (len > 0) {
507507
if (serializerMode == "byte") {

0 commit comments

Comments
 (0)