Skip to content

Commit 2d19f0a

Browse files
committed
Merge remote-tracking branch 'upstream/master' into pyspark-update-cloudpickle-42-SPARK-23159
2 parents 5458702 + 4e0fb01 commit 2d19f0a

File tree

426 files changed

+10973
-3521
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

426 files changed

+10973
-3521
lines changed

R/pkg/R/DataFrame.R

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2090,7 +2090,8 @@ setMethod("selectExpr",
20902090
#'
20912091
#' @param x a SparkDataFrame.
20922092
#' @param colName a column name.
2093-
#' @param col a Column expression, or an atomic vector in the length of 1 as literal value.
2093+
#' @param col a Column expression (which must refer only to this SparkDataFrame), or an atomic
2094+
#' vector in the length of 1 as literal value.
20942095
#' @return A SparkDataFrame with the new column added or the existing column replaced.
20952096
#' @family SparkDataFrame functions
20962097
#' @aliases withColumn,SparkDataFrame,character-method

R/pkg/R/functions.R

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1026,7 +1026,9 @@ setMethod("last_day",
10261026
})
10271027

10281028
#' @details
1029-
#' \code{length}: Computes the length of a given string or binary column.
1029+
#' \code{length}: Computes the character length of a string data or number of bytes
1030+
#' of a binary data. The length of string data includes the trailing spaces.
1031+
#' The length of binary data includes binary zeros.
10301032
#'
10311033
#' @rdname column_string_functions
10321034
#' @aliases length length,Column-method

R/pkg/R/mllib_classification.R

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,11 +279,24 @@ function(object, path, overwrite = FALSE) {
279279
#' savedModel <- read.ml(path)
280280
#' summary(savedModel)
281281
#'
282-
#' # multinomial logistic regression
282+
#' # binary logistic regression against two classes with
283+
#' # upperBoundsOnCoefficients and upperBoundsOnIntercepts
284+
#' ubc <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4)
285+
#' model <- spark.logit(training, Species ~ .,
286+
#' upperBoundsOnCoefficients = ubc,
287+
#' upperBoundsOnIntercepts = 1.0)
283288
#'
289+
#' # multinomial logistic regression
284290
#' model <- spark.logit(training, Class ~ ., regParam = 0.5)
285291
#' summary <- summary(model)
286292
#'
293+
#' # multinomial logistic regression with
294+
#' # lowerBoundsOnCoefficients and lowerBoundsOnIntercepts
295+
#' lbc <- matrix(c(0.0, -1.0, 0.0, -1.0, 0.0, -1.0, 0.0, -1.0), nrow = 2, ncol = 4)
296+
#' lbi <- as.array(c(0.0, 0.0))
297+
#' model <- spark.logit(training, Species ~ ., family = "multinomial",
298+
#' lowerBoundsOnCoefficients = lbc,
299+
#' lowerBoundsOnIntercepts = lbi)
287300
#' }
288301
#' @note spark.logit since 2.1.0
289302
setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"),

R/pkg/R/serialize.R

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,17 @@
3030
# POSIXct,POSIXlt -> Time
3131
#
3232
# list[T] -> Array[T], where T is one of above mentioned types
33+
# Multi-element vector of any of the above (except raw) -> Array[T]
3334
# environment -> Map[String, T], where T is a native type
3435
# jobj -> Object, where jobj is an object created in the backend
3536
# nolint end
3637

3738
getSerdeType <- function(object) {
3839
type <- class(object)[[1]]
39-
if (type != "list") {
40-
type
40+
if (is.atomic(object) & !is.raw(object) & length(object) > 1) {
41+
"array"
42+
} else if (type != "list") {
43+
type
4144
} else {
4245
# Check if all elements are of same type
4346
elemType <- unique(sapply(object, function(elem) { getSerdeType(elem) }))
@@ -50,9 +53,7 @@ getSerdeType <- function(object) {
5053
}
5154

5255
writeObject <- function(con, object, writeType = TRUE) {
53-
# NOTE: In R vectors have same type as objects. So we don't support
54-
# passing in vectors as arrays and instead require arrays to be passed
55-
# as lists.
56+
# NOTE: In R vectors have same type as objects
5657
type <- class(object)[[1]] # class of POSIXlt is c("POSIXlt", "POSIXt")
5758
# Checking types is needed here, since 'is.na' only handles atomic vectors,
5859
# lists and pairlists

R/pkg/tests/fulltests/test_Serde.R

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,53 @@ test_that("SerDe of primitive types", {
3737
expect_equal(class(x), "character")
3838
})
3939

40+
test_that("SerDe of multi-element primitive vectors inside R data.frame", {
41+
# vector of integers embedded in R data.frame
42+
indices <- 1L:3L
43+
myDf <- data.frame(indices)
44+
myDf$data <- list(rep(0L, 3L))
45+
mySparkDf <- as.DataFrame(myDf)
46+
myResultingDf <- collect(mySparkDf)
47+
myDfListedData <- data.frame(indices)
48+
myDfListedData$data <- list(as.list(rep(0L, 3L)))
49+
expect_equal(myResultingDf, myDfListedData)
50+
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
51+
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "integer")
52+
53+
# vector of numeric embedded in R data.frame
54+
myDf <- data.frame(indices)
55+
myDf$data <- list(rep(0, 3L))
56+
mySparkDf <- as.DataFrame(myDf)
57+
myResultingDf <- collect(mySparkDf)
58+
myDfListedData <- data.frame(indices)
59+
myDfListedData$data <- list(as.list(rep(0, 3L)))
60+
expect_equal(myResultingDf, myDfListedData)
61+
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
62+
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "numeric")
63+
64+
# vector of logical embedded in R data.frame
65+
myDf <- data.frame(indices)
66+
myDf$data <- list(rep(TRUE, 3L))
67+
mySparkDf <- as.DataFrame(myDf)
68+
myResultingDf <- collect(mySparkDf)
69+
myDfListedData <- data.frame(indices)
70+
myDfListedData$data <- list(as.list(rep(TRUE, 3L)))
71+
expect_equal(myResultingDf, myDfListedData)
72+
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
73+
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "logical")
74+
75+
# vector of character embedded in R data.frame
76+
myDf <- data.frame(indices)
77+
myDf$data <- list(rep("abc", 3L))
78+
mySparkDf <- as.DataFrame(myDf)
79+
myResultingDf <- collect(mySparkDf)
80+
myDfListedData <- data.frame(indices)
81+
myDfListedData$data <- list(as.list(rep("abc", 3L)))
82+
expect_equal(myResultingDf, myDfListedData)
83+
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
84+
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "character")
85+
})
86+
4087
test_that("SerDe of list of primitive types", {
4188
x <- list(1L, 2L, 3L)
4289
y <- callJStatic("SparkRHandler", "echo", x)

R/pkg/tests/fulltests/test_mllib_classification.R

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ test_that("spark.logit", {
124124
# Petal.Width 0.42122607
125125
# nolint end
126126

127-
# Test multinomial logistic regression againt three classes
127+
# Test multinomial logistic regression against three classes
128128
df <- suppressWarnings(createDataFrame(iris))
129129
model <- spark.logit(df, Species ~ ., regParam = 0.5)
130130
summary <- summary(model)
@@ -196,7 +196,7 @@ test_that("spark.logit", {
196196
#
197197
# nolint end
198198

199-
# Test multinomial logistic regression againt two classes
199+
# Test multinomial logistic regression against two classes
200200
df <- suppressWarnings(createDataFrame(iris))
201201
training <- df[df$Species %in% c("versicolor", "virginica"), ]
202202
model <- spark.logit(training, Species ~ ., regParam = 0.5, family = "multinomial")
@@ -208,7 +208,7 @@ test_that("spark.logit", {
208208
expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1))
209209
expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1))
210210

211-
# Test binomial logistic regression againt two classes
211+
# Test binomial logistic regression against two classes
212212
model <- spark.logit(training, Species ~ ., regParam = 0.5)
213213
summary <- summary(model)
214214
coefsR <- c(-6.08, 0.25, 0.16, 0.48, 1.04)
@@ -239,7 +239,7 @@ test_that("spark.logit", {
239239
prediction2 <- collect(select(predict(model2, df2), "prediction"))
240240
expect_equal(sort(prediction2$prediction), c("0.0", "0.0", "0.0", "0.0", "0.0"))
241241

242-
# Test binomial logistic regression againt two classes with upperBoundsOnCoefficients
242+
# Test binomial logistic regression against two classes with upperBoundsOnCoefficients
243243
# and upperBoundsOnIntercepts
244244
u <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4)
245245
model <- spark.logit(training, Species ~ ., upperBoundsOnCoefficients = u,
@@ -252,7 +252,7 @@ test_that("spark.logit", {
252252
expect_error(spark.logit(training, Species ~ ., upperBoundsOnCoefficients = as.array(c(1, 2)),
253253
upperBoundsOnIntercepts = 1.0))
254254

255-
# Test binomial logistic regression againt two classes with lowerBoundsOnCoefficients
255+
# Test binomial logistic regression against two classes with lowerBoundsOnCoefficients
256256
# and lowerBoundsOnIntercepts
257257
l <- matrix(c(0.0, -1.0, 0.0, -1.0), nrow = 1, ncol = 4)
258258
model <- spark.logit(training, Species ~ ., lowerBoundsOnCoefficients = l,

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ private class DownloadCallback implements StreamCallback {
171171

172172
@Override
173173
public void onData(String streamId, ByteBuffer buf) throws IOException {
174-
channel.write(buf);
174+
while (buf.hasRemaining()) {
175+
channel.write(buf);
176+
}
175177
}
176178

177179
@Override

common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,12 @@ private boolean shouldPool(long size) {
4646

4747
@Override
4848
public MemoryBlock allocate(long size) throws OutOfMemoryError {
49-
if (shouldPool(size)) {
49+
int numWords = (int) ((size + 7) / 8);
50+
long alignedSize = numWords * 8L;
51+
assert (alignedSize >= size);
52+
if (shouldPool(alignedSize)) {
5053
synchronized (this) {
51-
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
54+
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
5255
if (pool != null) {
5356
while (!pool.isEmpty()) {
5457
final WeakReference<long[]> arrayReference = pool.pop();
@@ -62,11 +65,11 @@ public MemoryBlock allocate(long size) throws OutOfMemoryError {
6265
return memory;
6366
}
6467
}
65-
bufferPoolsBySize.remove(size);
68+
bufferPoolsBySize.remove(alignedSize);
6669
}
6770
}
6871
}
69-
long[] array = new long[(int) ((size + 7) / 8)];
72+
long[] array = new long[numWords];
7073
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
7174
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
7275
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
@@ -98,12 +101,13 @@ public void free(MemoryBlock memory) {
98101
long[] array = (long[]) memory.obj;
99102
memory.setObjAndOffset(null, 0);
100103

101-
if (shouldPool(size)) {
104+
long alignedSize = ((size + 7) / 8) * 8;
105+
if (shouldPool(alignedSize)) {
102106
synchronized (this) {
103-
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
107+
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
104108
if (pool == null) {
105109
pool = new LinkedList<>();
106-
bufferPoolsBySize.put(size, pool);
110+
bufferPoolsBySize.put(alignedSize, pool);
107111
}
108112
pool.add(new WeakReference<>(array));
109113
}

common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.unsafe;
1919

20+
import org.apache.spark.unsafe.memory.HeapMemoryAllocator;
2021
import org.apache.spark.unsafe.memory.MemoryAllocator;
2122
import org.apache.spark.unsafe.memory.MemoryBlock;
2223

@@ -134,4 +135,26 @@ public void memoryDebugFillEnabledInTest() {
134135
MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
135136
MemoryAllocator.UNSAFE.free(offheap);
136137
}
138+
139+
@Test
140+
public void heapMemoryReuse() {
141+
MemoryAllocator heapMem = new HeapMemoryAllocator();
142+
// The size is less than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,
143+
// allocate new memory every time.
144+
MemoryBlock onheap1 = heapMem.allocate(513);
145+
Object obj1 = onheap1.getBaseObject();
146+
heapMem.free(onheap1);
147+
MemoryBlock onheap2 = heapMem.allocate(514);
148+
Assert.assertNotEquals(obj1, onheap2.getBaseObject());
149+
150+
// The size is greater than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,
151+
// reuse the previous memory which has released.
152+
MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
153+
Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
154+
Object obj3 = onheap3.getBaseObject();
155+
heapMem.free(onheap3);
156+
MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
157+
Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
158+
Assert.assertEquals(obj3, onheap4.getBaseObject());
159+
}
137160
}

core/src/main/java/org/apache/spark/SparkExecutorInfo.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,8 @@ public interface SparkExecutorInfo extends Serializable {
3030
int port();
3131
long cacheSize();
3232
int numRunningTasks();
33+
long usedOnHeapStorageMemory();
34+
long usedOffHeapStorageMemory();
35+
long totalOnHeapStorageMemory();
36+
long totalOffHeapStorageMemory();
3337
}

0 commit comments

Comments
 (0)