Skip to content

Commit 9cb9567

Browse files
committed
Merge pull request apache#131 from shivaram/rJavaExpt
[SPARKR-177] New R to Java bridge
2 parents bcd4258 + 1fa722e commit 9cb9567

23 files changed

+1243
-333
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,5 @@ work/
1818
*~
1919
.Rproj.user
2020
SparkR-pkg.Rproj
21+
*.o
22+
*.so

pkg/DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ Version: 0.1
55
Date: 2013-09-09
66
Author: Shivaram Venkataraman
77
Maintainer: Shivaram Venkataraman <[email protected]>
8+
Imports: methods
89
Depends:
910
R (>= 3.0),
1011
methods,
11-
rJava
1212
Suggests:
1313
testthat
1414
Description: R frontend for Spark

pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,5 @@ export(
6565
"setCheckpointDir"
6666
)
6767
export("sparkR.init")
68+
export("print.jobj")
6869
useDynLib(SparkR, stringHashCode)

pkg/R/RDD.R

Lines changed: 88 additions & 110 deletions
Large diffs are not rendered by default.

pkg/R/broadcast.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ setGeneric("value", function(bcast) { standardGeneric("value") })
4242
setMethod("value",
4343
signature(bcast = "Broadcast"),
4444
function(bcast) {
45-
if (exists(bcast@id, envir=.broadcastValues)) {
46-
get(bcast@id, envir=.broadcastValues)
45+
if (exists(bcast@id, envir = .broadcastValues)) {
46+
get(bcast@id, envir = .broadcastValues)
4747
} else {
4848
NULL
4949
}

pkg/R/context.R

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22

33
getMinSplits <- function(sc, minSplits) {
44
if (is.null(minSplits)) {
5-
ssc <- .jcall(sc, "Lorg/apache/spark/SparkContext;", "sc")
6-
defaultParallelism <- .jcall(ssc, "I", "defaultParallelism")
5+
defaultParallelism <- callJMethod(sc, "defaultParallelism")
76
minSplits <- min(defaultParallelism, 2)
87
}
98
as.integer(minSplits)
@@ -30,10 +29,9 @@ textFile <- function(sc, path, minSplits = NULL) {
3029
# Allow the user to have a more flexible definiton of the text file path
3130
path <- normalizePath(path)
3231
#' Convert a string vector of paths to a string containing comma separated paths
33-
path <- paste(path, collapse=",")
32+
path <- paste(path, collapse = ",")
3433

35-
jrdd <- .jcall(sc, "Lorg/apache/spark/api/java/JavaRDD;", "textFile", path,
36-
getMinSplits(sc, minSplits))
34+
jrdd <- callJMethod(sc, "textFile", path, getMinSplits(sc, minSplits))
3735
RDD(jrdd, FALSE)
3836
}
3937

@@ -58,10 +56,9 @@ objectFile <- function(sc, path, minSplits = NULL) {
5856
# Allow the user to have a more flexible definiton of the text file path
5957
path <- normalizePath(path)
6058
#' Convert a string vector of paths to a string containing comma separated paths
61-
path <- paste(path, collapse=",")
59+
path <- paste(path, collapse = ",")
6260

63-
jrdd <- .jcall(sc, "Lorg/apache/spark/api/java/JavaRDD;", "objectFile", path,
64-
getMinSplits(sc, minSplits))
61+
jrdd <- callJMethod(sc, "objectFile", path, getMinSplits(sc, minSplits))
6562
# Assume the RDD contains serialized R objects.
6663
RDD(jrdd, TRUE)
6764
}
@@ -106,16 +103,8 @@ parallelize <- function(sc, coll, numSlices = 1) {
106103
# 2-tuples of raws
107104
serializedSlices <- lapply(slices, serialize, connection = NULL)
108105

109-
javaSerializedSlices <- .jarray(lapply(serializedSlices, .jarray),
110-
contents.class = "[B")
111-
112-
jrddType = "Lorg/apache/spark/api/java/JavaRDD;"
113-
114-
jrdd <- .jcall("edu/berkeley/cs/amplab/sparkr/RRDD",
115-
jrddType,
116-
"createRDDFromArray",
117-
sc,
118-
javaSerializedSlices)
106+
jrdd <- callJStatic("edu.berkeley.cs.amplab.sparkr.RRDD",
107+
"createRDDFromArray", sc, serializedSlices)
119108

120109
RDD(jrdd, TRUE)
121110
}
@@ -186,12 +175,10 @@ includePackage <- function(sc, pkg) {
186175
broadcast <- function(sc, object) {
187176
objName <- as.character(substitute(object))
188177
serializedObj <- serialize(object, connection = NULL, ascii = TRUE)
189-
serializedObjArr <- .jcast(.jarray(serializedObj),
190-
new.class="java/lang/Object")
191-
jBroadcast <- .jcall(sc, "Lorg/apache/spark/broadcast/Broadcast;",
192-
"broadcast", serializedObjArr)
193178

194-
id <- as.character(.jsimplify(.jcall(jBroadcast, "J", "id")))
179+
jBroadcast <- callJMethod(sc, "broadcast", serializedObj)
180+
id <- as.character(callJMethod(jBroadcast, "id"))
181+
195182
Broadcast(id, object, jBroadcast, objName)
196183
}
197184

@@ -211,8 +198,5 @@ broadcast <- function(sc, object) {
211198
#' checkpoint(rdd)
212199
#'}
213200
setCheckpointDir <- function(sc, dirName) {
214-
ssc <- .jcall(sc, "Lorg/apache/spark/SparkContext;", "sc")
215-
.jcall(ssc, "V", "setCheckpointDir", suppressWarnings(normalizePath(dirName)))
216-
# NOTE: rJava doesn't check for exceptions if the return type is void
217-
.jcheck()
201+
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
218202
}

pkg/R/deserialize.R

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Utility functions to deserialize objects from Java.
2+
3+
# Type mapping from Java to R
4+
#
5+
# void -> NULL
6+
# Int -> integer
7+
# String -> character
8+
# Boolean -> logical
9+
# Double -> double
10+
# Long -> double
11+
# Array[Byte] -> raw
12+
#
13+
# Array[T] -> list()
14+
# Object -> jobj
15+
16+
readObject <- function(con) {
17+
# Read type first
18+
type <- readType(con)
19+
readTypedObject(con, type)
20+
}
21+
22+
readTypedObject <- function(con, type) {
23+
switch (type,
24+
"i" = readInt(con),
25+
"c" = readString(con),
26+
"b" = readBoolean(con),
27+
"d" = readDouble(con),
28+
"r" = readRaw(con),
29+
"l" = readList(con),
30+
"n" = NULL,
31+
"j" = getJobj(readString(con)),
32+
stop("Unsupported type for deserialization"))
33+
}
34+
35+
readString <- function(con) {
36+
stringLen <- readInt(con)
37+
string <- readBin(con, raw(), stringLen, endian = "big")
38+
rawToChar(string)
39+
}
40+
41+
readInt <- function(con) {
42+
readBin(con, integer(), n = 1, endian = "big")
43+
}
44+
45+
readDouble <- function(con) {
46+
readBin(con, double(), n = 1, endian = "big")
47+
}
48+
49+
readBoolean <- function(con) {
50+
as.logical(readInt(con))
51+
}
52+
53+
readType <- function(con) {
54+
rawToChar(readBin(con, "raw", n = 1L))
55+
}
56+
57+
# We only support lists where all elements are of same type
58+
readList <- function(con) {
59+
type <- readType(con)
60+
len <- readInt(con)
61+
if (len > 0) {
62+
l <- vector("list", len)
63+
for (i in 1:len) {
64+
l[[i]] <- readTypedObject(con, type)
65+
}
66+
l
67+
} else {
68+
list()
69+
}
70+
}
71+
72+
readRaw <- function(con) {
73+
dataLen <- readInt(con)
74+
data <- readBin(con, raw(), as.integer(dataLen), endian = "big")
75+
}
76+
77+
readRawLen <- function(con, dataLen) {
78+
data <- readBin(con, raw(), as.integer(dataLen), endian = "big")
79+
}
80+
81+
readDeserialize <- function(con) {
82+
# We have two cases that are possible - In one, the entire partition is
83+
# encoded as a byte array, so we have only one value to read. If so just
84+
# return firstData
85+
dataLen <- readInt(con)
86+
firstData <- unserialize(
87+
readBin(con, raw(), as.integer(dataLen), endian = "big"))
88+
89+
# Else, read things into a list
90+
dataLen <- readInt(con)
91+
if (length(dataLen) > 0 && dataLen > 0) {
92+
data <- list(firstData)
93+
while (length(dataLen) > 0 && dataLen > 0) {
94+
data[[length(data) + 1L]] <- unserialize(
95+
readBin(con, raw(), as.integer(dataLen), endian = "big"))
96+
dataLen <- readInt(con)
97+
}
98+
unlist(data, recursive = FALSE)
99+
} else {
100+
firstData
101+
}
102+
}
103+

pkg/R/jobj.R

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# References to objects that exist on the JVM backend
2+
# are maintained using the jobj.
3+
4+
# Maintain a reference count of Java object references
5+
# This allows us to GC the java object when it is safe
6+
.validJobjs <- new.env(parent = emptyenv())
7+
8+
# List of object ids to be removed
9+
.toRemoveJobjs <- new.env(parent = emptyenv())
10+
11+
getJobj <- function(objId) {
12+
newObj <- jobj(objId)
13+
if (exists(objId, .validJobjs)) {
14+
.validJobjs[[objId]] <- .validJobjs[[objId]] + 1
15+
} else {
16+
.validJobjs[[objId]] <- 1
17+
}
18+
newObj
19+
}
20+
21+
# Handler for a java object that exists on the backend.
22+
jobj <- function(objId) {
23+
if (!is.character(objId)) {
24+
stop("object id must be a character")
25+
}
26+
# NOTE: We need a new env for a jobj as we can only register
27+
# finalizers for environments or external references pointers.
28+
obj <- structure(new.env(parent = emptyenv()), class = "jobj")
29+
obj$id <- objId
30+
# Register a finalizer to remove the Java object when this reference
31+
# is garbage collected in R
32+
reg.finalizer(obj, cleanup.jobj)
33+
obj
34+
}
35+
36+
print.jobj <- function(jobj) {
37+
cls <- callJMethod(jobj, "getClass")
38+
name <- callJMethod(cls, "getName")
39+
cat("Java ref type", name, "id", jobj$id, "\n", sep = " ")
40+
}
41+
42+
cleanup.jobj <- function(jobj) {
43+
objId <- jobj$id
44+
.validJobjs[[objId]] <- .validJobjs[[objId]] - 1
45+
46+
if (.validJobjs[[objId]] == 0) {
47+
rm(list = objId, envir = .validJobjs)
48+
# NOTE: We cannot call removeJObject here as the finalizer may be run
49+
# in the middle of another RPC. Thus we queue up this object Id to be removed
50+
# and then run all the removeJObject when the next RPC is called.
51+
.toRemoveJobjs[[objId]] <- 1
52+
}
53+
}

pkg/R/serialize.R

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# Utility functions to serialize R objects so they can be read in Java.
2+
3+
# Type mapping from R to Java
4+
#
5+
# integer -> Int
6+
# character -> String
7+
# logical -> Boolean
8+
# double, numeric -> Double
9+
# raw -> Array[Byte]
10+
#
11+
# list[T] -> Array[T], where T is one of above mentioned types
12+
# environment -> Map[String, T], where T is a native type
13+
# jobj -> Object, where jobj is an object created in the backend
14+
15+
writeObject <- function(con, object, writeType = TRUE) {
16+
# NOTE: In R vectors have same type as objects. So we don't support
17+
# passing in vectors as arrays and instead require arrays to be passed
18+
# as lists.
19+
if (writeType) {
20+
writeType(con, class(object))
21+
}
22+
switch(class(object),
23+
integer = writeInt(con, object),
24+
character = writeString(con, object),
25+
logical = writeBoolean(con, object),
26+
double = writeDouble(con, object),
27+
numeric = writeDouble(con, object),
28+
raw = writeRaw(con, object),
29+
list = writeList(con, object),
30+
jobj = writeString(con, object$id),
31+
environment = writeEnv(con, object),
32+
stop("Unsupported type for serialization"))
33+
}
34+
35+
writeString <- function(con, value) {
36+
writeInt(con, as.integer(nchar(value) + 1))
37+
writeBin(value, con, endian = "big")
38+
}
39+
40+
writeInt <- function(con, value) {
41+
writeBin(as.integer(value), con, endian = "big")
42+
}
43+
44+
writeDouble <- function(con, value) {
45+
writeBin(value, con, endian = "big")
46+
}
47+
48+
writeBoolean <- function(con, value) {
49+
# TRUE becomes 1, FALSE becomes 0
50+
writeInt(con, as.integer(value))
51+
}
52+
53+
writeRawSerialize <- function(outputCon, batch) {
54+
outputSer <- serialize(batch, ascii = FALSE, conn = NULL)
55+
writeRaw(outputCon, outputSer)
56+
}
57+
58+
writeRaw <- function(con, batch) {
59+
writeInt(con, length(batch))
60+
writeBin(batch, con, endian = "big")
61+
}
62+
63+
writeType <- function(con, class) {
64+
type <- switch(class,
65+
integer = "i",
66+
character = "c",
67+
logical = "b",
68+
double = "d",
69+
numeric = "d",
70+
raw = "r",
71+
list = "l",
72+
jobj = "j",
73+
environment = "e",
74+
stop("Unsupported type for serialization"))
75+
writeBin(charToRaw(type), con)
76+
}
77+
78+
# Used to pass arrays where all the elements are of the same type
79+
writeList <- function(con, arr) {
80+
# All elements should be of same type
81+
elemType <- unique(sapply(arr, function(elem) { class(elem) }))
82+
stopifnot(length(elemType) <= 1)
83+
84+
# TODO: Empty lists are given type "character" right now.
85+
# This may not work if the Java side expects array of any other type.
86+
if (length(elemType) == 0) {
87+
elemType <- class("somestring")
88+
}
89+
90+
writeType(con, elemType)
91+
writeInt(con, length(arr))
92+
93+
if (length(arr) > 0) {
94+
for (a in arr) {
95+
writeObject(con, a, FALSE)
96+
}
97+
}
98+
}
99+
100+
# Used to pass in hash maps required on Java side.
101+
writeEnv <- function(con, env) {
102+
len <- length(env)
103+
104+
writeInt(con, len)
105+
if (len > 0) {
106+
writeList(con, as.list(ls(env)))
107+
vals <- lapply(ls(env), function(x) { env[[x]] })
108+
writeList(con, as.list(vals))
109+
}
110+
}
111+
112+
# Used to serialize in a list of objects where each
113+
# object can be of a different type. Serialization format is
114+
# <object type> <object> for each object
115+
writeArgs <- function(con, args) {
116+
if (length(args) > 0) {
117+
for (a in args) {
118+
writeObject(con, a)
119+
}
120+
}
121+
}
122+
123+
writeStrings <- function(con, stringList) {
124+
writeLines(unlist(stringList), con)
125+
}

0 commit comments

Comments
 (0)