@@ -25,80 +25,80 @@ getMinPartitions <- function(sc, minPartitions) {
25
25
as.integer(minPartitions )
26
26
}
27
27
28
- # ' Create an RDD from a text file.
29
- # '
30
- # ' This function reads a text file from HDFS, a local file system (available on all
31
- # ' nodes), or any Hadoop-supported file system URI, and creates an
32
- # ' RDD of strings from it.
33
- # '
34
- # ' @param sc SparkContext to use
35
- # ' @param path Path of file to read. A vector of multiple paths is allowed.
36
- # ' @param minPartitions Minimum number of partitions to be created. If NULL, the default
37
- # ' value is chosen based on available parallelism.
38
- # ' @return RDD where each item is of type \code{character}
39
- # ' @export
40
- # ' @examples
41
- # ' \dontrun{
42
- # ' sc <- sparkR.init()
43
- # ' lines <- textFile(sc, "myfile.txt")
44
- # ' }
28
+ # Create an RDD from a text file.
29
+ #
30
+ # This function reads a text file from HDFS, a local file system (available on all
31
+ # nodes), or any Hadoop-supported file system URI, and creates an
32
+ # RDD of strings from it.
33
+ #
34
+ # @param sc SparkContext to use
35
+ # @param path Path of file to read. A vector of multiple paths is allowed.
36
+ # @param minPartitions Minimum number of partitions to be created. If NULL, the default
37
+ # value is chosen based on available parallelism.
38
+ # @return RDD where each item is of type \code{character}
39
+ # @export
40
+ # @examples
41
+ # \dontrun{
42
+ # sc <- sparkR.init()
43
+ # lines <- textFile(sc, "myfile.txt")
44
+ # }
45
45
textFile <- function (sc , path , minPartitions = NULL ) {
46
46
# Allow the user to have a more flexible definiton of the text file path
47
47
path <- suppressWarnings(normalizePath(path ))
48
- # ' Convert a string vector of paths to a string containing comma separated paths
48
+ # Convert a string vector of paths to a string containing comma separated paths
49
49
path <- paste(path , collapse = " ," )
50
50
51
51
jrdd <- callJMethod(sc , " textFile" , path , getMinPartitions(sc , minPartitions ))
52
52
# jrdd is of type JavaRDD[String]
53
53
RDD(jrdd , " string" )
54
54
}
55
55
56
- # ' Load an RDD saved as a SequenceFile containing serialized objects.
57
- # '
58
- # ' The file to be loaded should be one that was previously generated by calling
59
- # ' saveAsObjectFile() of the RDD class.
60
- # '
61
- # ' @param sc SparkContext to use
62
- # ' @param path Path of file to read. A vector of multiple paths is allowed.
63
- # ' @param minPartitions Minimum number of partitions to be created. If NULL, the default
64
- # ' value is chosen based on available parallelism.
65
- # ' @return RDD containing serialized R objects.
66
- # ' @seealso saveAsObjectFile
67
- # ' @export
68
- # ' @examples
69
- # ' \dontrun{
70
- # ' sc <- sparkR.init()
71
- # ' rdd <- objectFile(sc, "myfile")
72
- # ' }
56
+ # Load an RDD saved as a SequenceFile containing serialized objects.
57
+ #
58
+ # The file to be loaded should be one that was previously generated by calling
59
+ # saveAsObjectFile() of the RDD class.
60
+ #
61
+ # @param sc SparkContext to use
62
+ # @param path Path of file to read. A vector of multiple paths is allowed.
63
+ # @param minPartitions Minimum number of partitions to be created. If NULL, the default
64
+ # value is chosen based on available parallelism.
65
+ # @return RDD containing serialized R objects.
66
+ # @seealso saveAsObjectFile
67
+ # @export
68
+ # @examples
69
+ # \dontrun{
70
+ # sc <- sparkR.init()
71
+ # rdd <- objectFile(sc, "myfile")
72
+ # }
73
73
objectFile <- function (sc , path , minPartitions = NULL ) {
74
74
# Allow the user to have a more flexible definiton of the text file path
75
75
path <- suppressWarnings(normalizePath(path ))
76
- # ' Convert a string vector of paths to a string containing comma separated paths
76
+ # Convert a string vector of paths to a string containing comma separated paths
77
77
path <- paste(path , collapse = " ," )
78
78
79
79
jrdd <- callJMethod(sc , " objectFile" , path , getMinPartitions(sc , minPartitions ))
80
80
# Assume the RDD contains serialized R objects.
81
81
RDD(jrdd , " byte" )
82
82
}
83
83
84
- # ' Create an RDD from a homogeneous list or vector.
85
- # '
86
- # ' This function creates an RDD from a local homogeneous list in R. The elements
87
- # ' in the list are split into \code{numSlices} slices and distributed to nodes
88
- # ' in the cluster.
89
- # '
90
- # ' @param sc SparkContext to use
91
- # ' @param coll collection to parallelize
92
- # ' @param numSlices number of partitions to create in the RDD
93
- # ' @return an RDD created from this collection
94
- # ' @export
95
- # ' @examples
96
- # ' \dontrun{
97
- # ' sc <- sparkR.init()
98
- # ' rdd <- parallelize(sc, 1:10, 2)
99
- # ' # The RDD should contain 10 elements
100
- # ' length(rdd)
101
- # ' }
84
+ # Create an RDD from a homogeneous list or vector.
85
+ #
86
+ # This function creates an RDD from a local homogeneous list in R. The elements
87
+ # in the list are split into \code{numSlices} slices and distributed to nodes
88
+ # in the cluster.
89
+ #
90
+ # @param sc SparkContext to use
91
+ # @param coll collection to parallelize
92
+ # @param numSlices number of partitions to create in the RDD
93
+ # @return an RDD created from this collection
94
+ # @export
95
+ # @examples
96
+ # \dontrun{
97
+ # sc <- sparkR.init()
98
+ # rdd <- parallelize(sc, 1:10, 2)
99
+ # # The RDD should contain 10 elements
100
+ # length(rdd)
101
+ # }
102
102
parallelize <- function (sc , coll , numSlices = 1 ) {
103
103
# TODO: bound/safeguard numSlices
104
104
# TODO: unit tests for if the split works for all primitives
@@ -133,33 +133,33 @@ parallelize <- function(sc, coll, numSlices = 1) {
133
133
RDD(jrdd , " byte" )
134
134
}
135
135
136
- # ' Include this specified package on all workers
137
- # '
138
- # ' This function can be used to include a package on all workers before the
139
- # ' user's code is executed. This is useful in scenarios where other R package
140
- # ' functions are used in a function passed to functions like \code{lapply}.
141
- # ' NOTE: The package is assumed to be installed on every node in the Spark
142
- # ' cluster.
143
- # '
144
- # ' @param sc SparkContext to use
145
- # ' @param pkg Package name
146
- # '
147
- # ' @export
148
- # ' @examples
149
- # ' \dontrun{
150
- # ' library(Matrix)
151
- # '
152
- # ' sc <- sparkR.init()
153
- # ' # Include the matrix library we will be using
154
- # ' includePackage(sc, Matrix)
155
- # '
156
- # ' generateSparse <- function(x) {
157
- # ' sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
158
- # ' }
159
- # '
160
- # ' rdd <- lapplyPartition(parallelize(sc, 1:2, 2L), generateSparse)
161
- # ' collect(rdd)
162
- # ' }
136
+ # Include this specified package on all workers
137
+ #
138
+ # This function can be used to include a package on all workers before the
139
+ # user's code is executed. This is useful in scenarios where other R package
140
+ # functions are used in a function passed to functions like \code{lapply}.
141
+ # NOTE: The package is assumed to be installed on every node in the Spark
142
+ # cluster.
143
+ #
144
+ # @param sc SparkContext to use
145
+ # @param pkg Package name
146
+ #
147
+ # @export
148
+ # @examples
149
+ # \dontrun{
150
+ # library(Matrix)
151
+ #
152
+ # sc <- sparkR.init()
153
+ # # Include the matrix library we will be using
154
+ # includePackage(sc, Matrix)
155
+ #
156
+ # generateSparse <- function(x) {
157
+ # sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
158
+ # }
159
+ #
160
+ # rdd <- lapplyPartition(parallelize(sc, 1:2, 2L), generateSparse)
161
+ # collect(rdd)
162
+ # }
163
163
includePackage <- function (sc , pkg ) {
164
164
pkg <- as.character(substitute(pkg ))
165
165
if (exists(" .packages" , .sparkREnv )) {
@@ -171,30 +171,30 @@ includePackage <- function(sc, pkg) {
171
171
.sparkREnv $ .packages <- packages
172
172
}
173
173
174
- # ' @title Broadcast a variable to all workers
175
- # '
176
- # ' @description
177
- # ' Broadcast a read-only variable to the cluster, returning a \code{Broadcast}
178
- # ' object for reading it in distributed functions.
179
- # '
180
- # ' @param sc Spark Context to use
181
- # ' @param object Object to be broadcast
182
- # ' @export
183
- # ' @examples
184
- # ' \dontrun{
185
- # ' sc <- sparkR.init()
186
- # ' rdd <- parallelize(sc, 1:2, 2L)
187
- # '
188
- # ' # Large Matrix object that we want to broadcast
189
- # ' randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000))
190
- # ' randomMatBr <- broadcast(sc, randomMat)
191
- # '
192
- # ' # Use the broadcast variable inside the function
193
- # ' useBroadcast <- function(x) {
194
- # ' sum(value(randomMatBr) * x)
195
- # ' }
196
- # ' sumRDD <- lapply(rdd, useBroadcast)
197
- # ' }
174
+ # @title Broadcast a variable to all workers
175
+ #
176
+ # @description
177
+ # Broadcast a read-only variable to the cluster, returning a \code{Broadcast}
178
+ # object for reading it in distributed functions.
179
+ #
180
+ # @param sc Spark Context to use
181
+ # @param object Object to be broadcast
182
+ # @export
183
+ # @examples
184
+ # \dontrun{
185
+ # sc <- sparkR.init()
186
+ # rdd <- parallelize(sc, 1:2, 2L)
187
+ #
188
+ # # Large Matrix object that we want to broadcast
189
+ # randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000))
190
+ # randomMatBr <- broadcast(sc, randomMat)
191
+ #
192
+ # # Use the broadcast variable inside the function
193
+ # useBroadcast <- function(x) {
194
+ # sum(value(randomMatBr) * x)
195
+ # }
196
+ # sumRDD <- lapply(rdd, useBroadcast)
197
+ # }
198
198
broadcast <- function (sc , object ) {
199
199
objName <- as.character(substitute(object ))
200
200
serializedObj <- serialize(object , connection = NULL )
@@ -205,21 +205,21 @@ broadcast <- function(sc, object) {
205
205
Broadcast(id , object , jBroadcast , objName )
206
206
}
207
207
208
- # ' @title Set the checkpoint directory
209
- # '
210
- # ' Set the directory under which RDDs are going to be checkpointed. The
211
- # ' directory must be a HDFS path if running on a cluster.
212
- # '
213
- # ' @param sc Spark Context to use
214
- # ' @param dirName Directory path
215
- # ' @export
216
- # ' @examples
217
- # ' \dontrun{
218
- # ' sc <- sparkR.init()
219
- # ' setCheckpointDir(sc, "~/checkpoint")
220
- # ' rdd <- parallelize(sc, 1:2, 2L)
221
- # ' checkpoint(rdd)
222
- # ' }
208
+ # @title Set the checkpoint directory
209
+ #
210
+ # Set the directory under which RDDs are going to be checkpointed. The
211
+ # directory must be a HDFS path if running on a cluster.
212
+ #
213
+ # @param sc Spark Context to use
214
+ # @param dirName Directory path
215
+ # @export
216
+ # @examples
217
+ # \dontrun{
218
+ # sc <- sparkR.init()
219
+ # setCheckpointDir(sc, "~/checkpoint")
220
+ # rdd <- parallelize(sc, 1:2, 2L)
221
+ # checkpoint(rdd)
222
+ # }
223
223
setCheckpointDir <- function (sc , dirName ) {
224
224
invisible (callJMethod(sc , " setCheckpointDir" , suppressWarnings(normalizePath(dirName ))))
225
225
}
0 commit comments