Skip to content

Commit 03e1901

Browse files
author
Robert Kruszewski
committed
Merge branch 'master' into rk/merge-upstream
2 parents 99e6ab2 + 7bd14cf commit 03e1901

File tree

944 files changed

+66356
-11807
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

944 files changed

+66356
-11807
lines changed

.travis.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,14 @@ notifications:
5555
# 5. Run maven build before running lints.
5656
install:
5757
- export MAVEN_SKIP_RC=1
58+
<<<<<<< HEAD
5859
- build/mvn ${PHASE} ${PROFILES} ${MODULES} ${ARGS}
5960
# 6. Run lints.
61+
=======
62+
- build/mvn -T 4 -q -DskipTests -Pkubernetes -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
63+
64+
# 6. Run lint-java.
65+
>>>>>>> master
6066
script:
6167
- dev/lint-java
6268
- dev/lint-scala

NOTICE

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,12 @@ Copyright (C) 2011 Google Inc.
448448
Apache Commons Pool
449449
Copyright 1999-2009 The Apache Software Foundation
450450

451+
This product includes/uses Kubernetes & OpenShift 3 Java Client (https://github.com/fabric8io/kubernetes-client)
452+
Copyright (C) 2015 Red Hat, Inc.
453+
454+
This product includes/uses OkHttp (https://github.com/square/okhttp)
455+
Copyright (C) 2012 The Android Open Source Project
456+
451457
=========================================================================
452458
== NOTICE file corresponding to section 4(d) of the Apache License, ==
453459
== Version 2.0, in this case for the DataNucleus distribution. ==

R/pkg/DESCRIPTION

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Package: SparkR
22
Type: Package
3-
Version: 2.3.0
3+
Version: 2.4.0
44
Title: R Frontend for Apache Spark
55
Description: Provides an R Frontend for Apache Spark.
66
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
@@ -59,3 +59,4 @@ Collate:
5959
'window.R'
6060
RoxygenNote: 6.0.1
6161
VignetteBuilder: knitr
62+
NeedsCompilation: no

R/pkg/NAMESPACE

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ exportMethods("glm",
7676
export("setJobGroup",
7777
"clearJobGroup",
7878
"cancelJobGroup",
79-
"setJobDescription")
79+
"setJobDescription",
80+
"setLocalProperty",
81+
"getLocalProperty")
8082

8183
# Export Utility methods
8284
export("setLogLevel")
@@ -133,6 +135,7 @@ exportMethods("arrange",
133135
"isStreaming",
134136
"join",
135137
"limit",
138+
"localCheckpoint",
136139
"merge",
137140
"mutate",
138141
"na.omit",
@@ -176,6 +179,7 @@ exportMethods("arrange",
176179
"with",
177180
"withColumn",
178181
"withColumnRenamed",
182+
"withWatermark",
179183
"write.df",
180184
"write.jdbc",
181185
"write.json",
@@ -225,11 +229,14 @@ exportMethods("%<=>%",
225229
"crc32",
226230
"create_array",
227231
"create_map",
232+
"current_date",
233+
"current_timestamp",
228234
"hash",
229235
"cume_dist",
230236
"date_add",
231237
"date_format",
232238
"date_sub",
239+
"date_trunc",
233240
"datediff",
234241
"dayofmonth",
235242
"dayofweek",

R/pkg/R/DataFrame.R

Lines changed: 129 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2297,6 +2297,7 @@ setClassUnion("characterOrColumn", c("character", "Column"))
22972297
#' @param ... additional sorting fields
22982298
#' @param decreasing a logical argument indicating sorting order for columns when
22992299
#' a character vector is specified for col
2300+
#' @param withinPartitions a logical argument indicating whether to sort only within each partition
23002301
#' @return A SparkDataFrame where all elements are sorted.
23012302
#' @family SparkDataFrame functions
23022303
#' @aliases arrange,SparkDataFrame,Column-method
@@ -2312,16 +2313,21 @@ setClassUnion("characterOrColumn", c("character", "Column"))
23122313
#' arrange(df, asc(df$col1), desc(abs(df$col2)))
23132314
#' arrange(df, "col1", decreasing = TRUE)
23142315
#' arrange(df, "col1", "col2", decreasing = c(TRUE, FALSE))
2316+
#' arrange(df, "col1", "col2", withinPartitions = TRUE)
23152317
#' }
23162318
#' @note arrange(SparkDataFrame, Column) since 1.4.0
23172319
setMethod("arrange",
23182320
signature(x = "SparkDataFrame", col = "Column"),
2319-
function(x, col, ...) {
2321+
function(x, col, ..., withinPartitions = FALSE) {
23202322
jcols <- lapply(list(col, ...), function(c) {
23212323
c@jc
23222324
})
23232325

2324-
sdf <- callJMethod(x@sdf, "sort", jcols)
2326+
if (withinPartitions) {
2327+
sdf <- callJMethod(x@sdf, "sortWithinPartitions", jcols)
2328+
} else {
2329+
sdf <- callJMethod(x@sdf, "sort", jcols)
2330+
}
23252331
dataFrame(sdf)
23262332
})
23272333

@@ -2332,7 +2338,7 @@ setMethod("arrange",
23322338
#' @note arrange(SparkDataFrame, character) since 1.4.0
23332339
setMethod("arrange",
23342340
signature(x = "SparkDataFrame", col = "character"),
2335-
function(x, col, ..., decreasing = FALSE) {
2341+
function(x, col, ..., decreasing = FALSE, withinPartitions = FALSE) {
23362342

23372343
# all sorting columns
23382344
by <- list(col, ...)
@@ -2356,7 +2362,7 @@ setMethod("arrange",
23562362
}
23572363
})
23582364

2359-
do.call("arrange", c(x, jcols))
2365+
do.call("arrange", c(x, jcols, withinPartitions = withinPartitions))
23602366
})
23612367

23622368
#' @rdname arrange
@@ -3655,7 +3661,8 @@ setMethod("getNumPartitions",
36553661
#' isStreaming
36563662
#'
36573663
#' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
3658-
#' as it arrives.
3664+
#' as it arrives. A dataset that reads data from a streaming source must be executed as a
3665+
#' \code{StreamingQuery} using \code{write.stream}.
36593666
#'
36603667
#' @param x A SparkDataFrame
36613668
#' @return TRUE if this SparkDataFrame is from a streaming source
@@ -3701,7 +3708,17 @@ setMethod("isStreaming",
37013708
#' @param df a streaming SparkDataFrame.
37023709
#' @param source a name for external data source.
37033710
#' @param outputMode one of 'append', 'complete', 'update'.
3704-
#' @param ... additional argument(s) passed to the method.
3711+
#' @param partitionBy a name or a list of names of columns to partition the output by on the file
3712+
#' system. If specified, the output is laid out on the file system similar to Hive's
3713+
#' partitioning scheme.
3714+
#' @param trigger.processingTime a processing time interval as a string, e.g. '5 seconds',
3715+
#' '1 minute'. This is a trigger that runs a query periodically based on the processing
3716+
#' time. If value is '0 seconds', the query will run as fast as possible, this is the
3717+
#' default. Only one trigger can be set.
3718+
#' @param trigger.once a logical, must be set to \code{TRUE}. This is a trigger that processes only
3719+
#' one batch of data in a streaming query then terminates the query. Only one trigger can be
3720+
#' set.
3721+
#' @param ... additional external data source specific named options.
37053722
#'
37063723
#' @family SparkDataFrame functions
37073724
#' @seealso \link{read.stream}
@@ -3719,7 +3736,8 @@ setMethod("isStreaming",
37193736
#' # console
37203737
#' q <- write.stream(wordCounts, "console", outputMode = "complete")
37213738
#' # text stream
3722-
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
3739+
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp"
3740+
#' partitionBy = c("year", "month"), trigger.processingTime = "30 seconds")
37233741
#' # memory stream
37243742
#' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
37253743
#' head(sql("SELECT * from outs"))
@@ -3731,7 +3749,8 @@ setMethod("isStreaming",
37313749
#' @note experimental
37323750
setMethod("write.stream",
37333751
signature(df = "SparkDataFrame"),
3734-
function(df, source = NULL, outputMode = NULL, ...) {
3752+
function(df, source = NULL, outputMode = NULL, partitionBy = NULL,
3753+
trigger.processingTime = NULL, trigger.once = NULL, ...) {
37353754
if (!is.null(source) && !is.character(source)) {
37363755
stop("source should be character, NULL or omitted. It is the data source specified ",
37373756
"in 'spark.sql.sources.default' configuration by default.")
@@ -3742,12 +3761,43 @@ setMethod("write.stream",
37423761
if (is.null(source)) {
37433762
source <- getDefaultSqlSource()
37443763
}
3764+
cols <- NULL
3765+
if (!is.null(partitionBy)) {
3766+
if (!all(sapply(partitionBy, function(c) { is.character(c) }))) {
3767+
stop("All partitionBy column names should be characters.")
3768+
}
3769+
cols <- as.list(partitionBy)
3770+
}
3771+
jtrigger <- NULL
3772+
if (!is.null(trigger.processingTime) && !is.na(trigger.processingTime)) {
3773+
if (!is.null(trigger.once)) {
3774+
stop("Multiple triggers not allowed.")
3775+
}
3776+
interval <- as.character(trigger.processingTime)
3777+
if (nchar(interval) == 0) {
3778+
stop("Value for trigger.processingTime must be a non-empty string.")
3779+
}
3780+
jtrigger <- handledCallJStatic("org.apache.spark.sql.streaming.Trigger",
3781+
"ProcessingTime",
3782+
interval)
3783+
} else if (!is.null(trigger.once) && !is.na(trigger.once)) {
3784+
if (!is.logical(trigger.once) || !trigger.once) {
3785+
stop("Value for trigger.once must be TRUE.")
3786+
}
3787+
jtrigger <- callJStatic("org.apache.spark.sql.streaming.Trigger", "Once")
3788+
}
37453789
options <- varargsToStrEnv(...)
37463790
write <- handledCallJMethod(df@sdf, "writeStream")
37473791
write <- callJMethod(write, "format", source)
37483792
if (!is.null(outputMode)) {
37493793
write <- callJMethod(write, "outputMode", outputMode)
37503794
}
3795+
if (!is.null(cols)) {
3796+
write <- callJMethod(write, "partitionBy", cols)
3797+
}
3798+
if (!is.null(jtrigger)) {
3799+
write <- callJMethod(write, "trigger", jtrigger)
3800+
}
37513801
write <- callJMethod(write, "options", options)
37523802
ssq <- handledCallJMethod(write, "start")
37533803
streamingQuery(ssq)
@@ -3782,6 +3832,33 @@ setMethod("checkpoint",
37823832
dataFrame(df)
37833833
})
37843834

3835+
#' localCheckpoint
3836+
#'
3837+
#' Returns a locally checkpointed version of this SparkDataFrame. Checkpointing can be used to
3838+
#' truncate the logical plan, which is especially useful in iterative algorithms where the plan
3839+
#' may grow exponentially. Local checkpoints are stored in the executors using the caching
3840+
#' subsystem and therefore they are not reliable.
3841+
#'
3842+
#' @param x A SparkDataFrame
3843+
#' @param eager whether to locally checkpoint this SparkDataFrame immediately
3844+
#' @return a new locally checkpointed SparkDataFrame
3845+
#' @family SparkDataFrame functions
3846+
#' @aliases localCheckpoint,SparkDataFrame-method
3847+
#' @rdname localCheckpoint
3848+
#' @name localCheckpoint
3849+
#' @export
3850+
#' @examples
3851+
#'\dontrun{
3852+
#' df <- localCheckpoint(df)
3853+
#' }
3854+
#' @note localCheckpoint since 2.3.0
3855+
setMethod("localCheckpoint",
3856+
signature(x = "SparkDataFrame"),
3857+
function(x, eager = TRUE) {
3858+
df <- callJMethod(x@sdf, "localCheckpoint", as.logical(eager))
3859+
dataFrame(df)
3860+
})
3861+
37853862
#' cube
37863863
#'
37873864
#' Create a multi-dimensional cube for the SparkDataFrame using the specified columns.
@@ -3934,3 +4011,47 @@ setMethod("broadcast",
39344011
sdf <- callJStatic("org.apache.spark.sql.functions", "broadcast", x@sdf)
39354012
dataFrame(sdf)
39364013
})
4014+
4015+
#' withWatermark
4016+
#'
4017+
#' Defines an event time watermark for this streaming SparkDataFrame. A watermark tracks a point in
4018+
#' time before which we assume no more late data is going to arrive.
4019+
#'
4020+
#' Spark will use this watermark for several purposes:
4021+
#' \itemize{
4022+
#' \item{-} To know when a given time window aggregation can be finalized and thus can be emitted
4023+
#' when using output modes that do not allow updates.
4024+
#' \item{-} To minimize the amount of state that we need to keep for on-going aggregations.
4025+
#' }
4026+
#' The current watermark is computed by looking at the \code{MAX(eventTime)} seen across
4027+
#' all of the partitions in the query minus a user specified \code{delayThreshold}. Due to the cost
4028+
#' of coordinating this value across partitions, the actual watermark used is only guaranteed
4029+
#' to be at least \code{delayThreshold} behind the actual event time. In some cases we may still
4030+
#' process records that arrive more than \code{delayThreshold} late.
4031+
#'
4032+
#' @param x a streaming SparkDataFrame
4033+
#' @param eventTime a string specifying the name of the Column that contains the event time of the
4034+
#' row.
4035+
#' @param delayThreshold a string specifying the minimum delay to wait to data to arrive late,
4036+
#' relative to the latest record that has been processed in the form of an
4037+
#' interval (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
4038+
#' @return a SparkDataFrame.
4039+
#' @aliases withWatermark,SparkDataFrame,character,character-method
4040+
#' @family SparkDataFrame functions
4041+
#' @rdname withWatermark
4042+
#' @name withWatermark
4043+
#' @export
4044+
#' @examples
4045+
#' \dontrun{
4046+
#' sparkR.session()
4047+
#' schema <- structType(structField("time", "timestamp"), structField("value", "double"))
4048+
#' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
4049+
#' df <- withWatermark(df, "time", "10 minutes")
4050+
#' }
4051+
#' @note withWatermark since 2.3.0
4052+
setMethod("withWatermark",
4053+
signature(x = "SparkDataFrame", eventTime = "character", delayThreshold = "character"),
4054+
function(x, eventTime, delayThreshold) {
4055+
sdf <- callJMethod(x@sdf, "withWatermark", eventTime, delayThreshold)
4056+
dataFrame(sdf)
4057+
})

R/pkg/R/SQLContext.R

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -727,7 +727,9 @@ read.jdbc <- function(url, tableName,
727727
#' @param schema The data schema defined in structType or a DDL-formatted string, this is
728728
#' required for file-based streaming data source
729729
#' @param ... additional external data source specific named options, for instance \code{path} for
730-
#' file-based streaming data source
730+
#' file-based streaming data source. \code{timeZone} to indicate a timezone to be used to
731+
#' parse timestamps in the JSON/CSV data sources or partition values; If it isn't set, it
732+
#' uses the default value, session local timezone.
731733
#' @return SparkDataFrame
732734
#' @rdname read.stream
733735
#' @name read.stream

0 commit comments

Comments
 (0)