Skip to content

Commit 227ee42

Browse files
Merge pull request apache#141 from shivaram/SPARKR-140
[SPARKR-140] Fix serialization tracking in pipelined RDDs
2 parents 7428a7e + ac5ceb1 commit 227ee42

File tree

2 files changed

+32
-2
lines changed

2 files changed

+32
-2
lines changed

pkg/R/RDD.R

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,11 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
4646
.Object@env$isCached <- FALSE
4747
.Object@env$isCheckpointed <- FALSE
4848
.Object@env$jrdd_val <- jrdd_val
49+
# This tracks if jrdd_val is serialized
4950
.Object@env$serialized <- prev@env$serialized
51+
52+
# NOTE: We use prev_serialized to track if prev_jrdd is serialized
53+
# prev_serialized is used during the delayed computation of JRDD in getJRDD
5054
.Object@prev <- prev
5155

5256
isPipelinable <- function(rdd) {
@@ -58,12 +62,17 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
5862
# This transformation is the first in its stage:
5963
.Object@func <- func
6064
.Object@prev_jrdd <- getJRDD(prev)
65+
# Since this is the first step in the pipeline, the prev_serialized
66+
# is same as serialized here.
67+
.Object@env$prev_serialized <- .Object@env$serialized
6168
} else {
6269
pipelinedFunc <- function(split, iterator) {
6370
func(split, prev@func(split, iterator))
6471
}
6572
.Object@func <- pipelinedFunc
6673
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
74+
# Get if the prev_jrdd was serialized from the parent RDD
75+
.Object@env$prev_serialized <- prev@env$prev_serialized
6776
}
6877

6978
.Object
@@ -116,7 +125,7 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
116125
rddRef <- newJObject("edu.berkeley.cs.amplab.sparkr.RRDD",
117126
callJMethod(prev_jrdd, "rdd"),
118127
serializedFuncArr,
119-
rdd@env$serialized,
128+
rdd@env$prev_serialized,
120129
depsBin,
121130
packageNamesArr,
122131
as.character(.sparkREnv[["libname"]]),
@@ -126,7 +135,7 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
126135
rddRef <- newJObject("edu.berkeley.cs.amplab.sparkr.StringRRDD",
127136
callJMethod(prev_jrdd, "rdd"),
128137
serializedFuncArr,
129-
rdd@env$serialized,
138+
rdd@env$prev_serialized,
130139
depsBin,
131140
packageNamesArr,
132141
as.character(.sparkREnv[["libname"]]),

pkg/inst/tests/test_textFile.R

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,24 @@ test_that("textFile() on multiple paths", {
122122
unlink(fileName2)
123123
})
124124

125+
test_that("Pipelined operations on RDDs created using textFile", {
126+
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
127+
writeLines(mockFile, fileName)
128+
129+
rdd <- textFile(sc, fileName)
130+
131+
lengths <- lapply(rdd, function(x) { length(x) })
132+
expect_equal(collect(lengths), list(1, 1))
133+
134+
lengthsPipelined <- lapply(lengths, function(x) { x + 10 })
135+
expect_equal(collect(lengthsPipelined), list(11, 11))
136+
137+
lengths30 <- lapply(lengthsPipelined, function(x) { x + 20 })
138+
expect_equal(collect(lengths30), list(31, 31))
139+
140+
lengths20 <- lapply(lengths, function(x) { x + 20 })
141+
expect_equal(collect(lengths20), list(21, 21))
142+
143+
unlink(fileName)
144+
})
145+

0 commit comments

Comments
 (0)