Skip to content

Commit 7c72128

Browse files
authored
Merge pull request apache#74 from ashangit/criteo-2.2
Bump spark criteo-2.2 to last branch-2.2 commits
2 parents 76d4968 + ee91762 commit 7c72128

File tree

178 files changed

+2242
-571
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

178 files changed

+2242
-571
lines changed

LICENSE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
263263
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
264264
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
265265
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
266-
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.4 - http://py4j.sourceforge.net/)
266+
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.7 - http://py4j.sourceforge.net/)
267267
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
268268
(BSD licence) sbt and sbt-launch-lib.bash
269269
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)

R/pkg/DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Package: SparkR
22
Type: Package
3-
Version: 2.2.2
3+
Version: 2.2.3
44
Title: R Frontend for Apache Spark
55
Description: Provides an R Frontend for Apache Spark.
66
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),

R/pkg/R/client.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
# Creates a SparkR client connection object
2121
# if one doesn't already exist
22-
connectBackend <- function(hostname, port, timeout) {
22+
connectBackend <- function(hostname, port, timeout, authSecret) {
2323
if (exists(".sparkRcon", envir = .sparkREnv)) {
2424
if (isOpen(.sparkREnv[[".sparkRCon"]])) {
2525
cat("SparkRBackend client connection already exists\n")
@@ -29,7 +29,7 @@ connectBackend <- function(hostname, port, timeout) {
2929

3030
con <- socketConnection(host = hostname, port = port, server = FALSE,
3131
blocking = TRUE, open = "wb", timeout = timeout)
32-
32+
doServerAuth(con, authSecret)
3333
assign(".sparkRCon", con, envir = .sparkREnv)
3434
con
3535
}

R/pkg/R/deserialize.R

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,18 @@ readTypedObject <- function(con, type) {
6060
stop(paste("Unsupported type for deserialization", type)))
6161
}
6262

63-
readString <- function(con) {
64-
stringLen <- readInt(con)
65-
raw <- readBin(con, raw(), stringLen, endian = "big")
63+
readStringData <- function(con, len) {
64+
raw <- readBin(con, raw(), len, endian = "big")
6665
string <- rawToChar(raw)
6766
Encoding(string) <- "UTF-8"
6867
string
6968
}
7069

70+
readString <- function(con) {
71+
stringLen <- readInt(con)
72+
readStringData(con, stringLen)
73+
}
74+
7175
readInt <- function(con) {
7276
readBin(con, integer(), n = 1, endian = "big")
7377
}

R/pkg/R/sparkR.R

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ sparkR.sparkContext <- function(
161161
" please use the --packages commandline instead", sep = ","))
162162
}
163163
backendPort <- existingPort
164+
authSecret <- Sys.getenv("SPARKR_BACKEND_AUTH_SECRET")
165+
if (nchar(authSecret) == 0) {
166+
stop("Auth secret not provided in environment.")
167+
}
164168
} else {
165169
path <- tempfile(pattern = "backend_port")
166170
submitOps <- getClientModeSparkSubmitOpts(
@@ -189,16 +193,27 @@ sparkR.sparkContext <- function(
189193
monitorPort <- readInt(f)
190194
rLibPath <- readString(f)
191195
connectionTimeout <- readInt(f)
196+
197+
# Don't use readString() so that we can provide a useful
198+
# error message if the R and Java versions are mismatched.
199+
authSecretLen <- readInt(f)
200+
if (length(authSecretLen) == 0 || authSecretLen == 0) {
201+
stop("Unexpected EOF in JVM connection data. Mismatched versions?")
202+
}
203+
authSecret <- readStringData(f, authSecretLen)
192204
close(f)
193205
file.remove(path)
194206
if (length(backendPort) == 0 || backendPort == 0 ||
195207
length(monitorPort) == 0 || monitorPort == 0 ||
196-
length(rLibPath) != 1) {
208+
length(rLibPath) != 1 || length(authSecret) == 0) {
197209
stop("JVM failed to launch")
198210
}
199-
assign(".monitorConn",
200-
socketConnection(port = monitorPort, timeout = connectionTimeout),
201-
envir = .sparkREnv)
211+
212+
monitorConn <- socketConnection(port = monitorPort, blocking = TRUE,
213+
timeout = connectionTimeout, open = "wb")
214+
doServerAuth(monitorConn, authSecret)
215+
216+
assign(".monitorConn", monitorConn, envir = .sparkREnv)
202217
assign(".backendLaunched", 1, envir = .sparkREnv)
203218
if (rLibPath != "") {
204219
assign(".libPath", rLibPath, envir = .sparkREnv)
@@ -208,7 +223,7 @@ sparkR.sparkContext <- function(
208223

209224
.sparkREnv$backendPort <- backendPort
210225
tryCatch({
211-
connectBackend("localhost", backendPort, timeout = connectionTimeout)
226+
connectBackend("localhost", backendPort, timeout = connectionTimeout, authSecret = authSecret)
212227
},
213228
error = function(err) {
214229
stop("Failed to connect JVM\n")
@@ -632,3 +647,17 @@ sparkCheckInstall <- function(sparkHome, master, deployMode) {
632647
NULL
633648
}
634649
}
650+
651+
# Utility function for sending auth data over a socket and checking the server's reply.
652+
doServerAuth <- function(con, authSecret) {
653+
if (nchar(authSecret) == 0) {
654+
stop("Auth secret not provided.")
655+
}
656+
writeString(con, authSecret)
657+
flush(con)
658+
reply <- readString(con)
659+
if (reply != "ok") {
660+
close(con)
661+
stop("Unexpected reply from server.")
662+
}
663+
}

R/pkg/inst/worker/daemon.R

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ suppressPackageStartupMessages(library(SparkR))
2828

2929
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
3030
inputCon <- socketConnection(
31-
port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
31+
port = port, open = "wb", blocking = TRUE, timeout = connectionTimeout)
32+
33+
SparkR:::doServerAuth(inputCon, Sys.getenv("SPARKR_WORKER_SECRET"))
3234

3335
while (TRUE) {
3436
ready <- socketSelect(list(inputCon))

R/pkg/inst/worker/worker.R

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,12 @@ suppressPackageStartupMessages(library(SparkR))
100100

101101
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
102102
inputCon <- socketConnection(
103-
port = port, blocking = TRUE, open = "rb", timeout = connectionTimeout)
103+
port = port, blocking = TRUE, open = "wb", timeout = connectionTimeout)
104+
SparkR:::doServerAuth(inputCon, Sys.getenv("SPARKR_WORKER_SECRET"))
105+
104106
outputCon <- socketConnection(
105107
port = port, blocking = TRUE, open = "wb", timeout = connectionTimeout)
108+
SparkR:::doServerAuth(outputCon, Sys.getenv("SPARKR_WORKER_SECRET"))
106109

107110
# read the index of the current partition inside the RDD
108111
partition <- SparkR:::readInt(inputCon)

R/pkg/tests/run-all.R

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ if (.Platform$OS.type == "windows") {
2727

2828
# Setup global test environment
2929
# Install Spark first to set SPARK_HOME
30-
install.spark()
30+
31+
# NOTE(shivaram): We set overwrite to handle any old tar.gz files or directories left behind on
32+
# CRAN machines. For Jenkins we should already have SPARK_HOME set.
33+
install.spark(overwrite = TRUE)
3134

3235
sparkRDir <- file.path(Sys.getenv("SPARK_HOME"), "R")
3336
sparkRWhitelistSQLDirs <- c("spark-warehouse", "metastore_db")

appveyor.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ install:
4242
# Install maven and dependencies
4343
- ps: .\dev\appveyor-install-dependencies.ps1
4444
# Required package for R unit tests
45-
- cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'testthat', 'e1071', 'survival'), repos='http://cran.us.r-project.org')"
45+
- cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'devtools', 'e1071', 'survival'), repos='http://cran.us.r-project.org')"
46+
# Here, we use the fixed version of testthat. For more details, please see SPARK-22817.
47+
- cmd: R -e "devtools::install_version('testthat', version = '1.0.2', repos='http://cran.us.r-project.org')"
4648
- cmd: R -e "packageVersion('knitr'); packageVersion('rmarkdown'); packageVersion('testthat'); packageVersion('e1071'); packageVersion('survival')"
4749

4850
build_script:

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent_2.11</artifactId>
24-
<version>2.2.2-criteo-SNAPSHOT</version>
24+
<version>2.2.3-criteo-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

0 commit comments

Comments
 (0)