Skip to content

Commit 1c3d98c

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-30724
# Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
2 parents 88ca4c2 + c198620 commit 1c3d98c

File tree

222 files changed

+4275
-2970
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

222 files changed

+4275
-2970
lines changed

R/pkg/DESCRIPTION

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,4 @@ Collate:
6262
RoxygenNote: 5.0.1
6363
VignetteBuilder: knitr
6464
NeedsCompilation: no
65+
Encoding: UTF-8

R/pkg/tests/fulltests/test_context.R

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ test_that("Check masked functions", {
2525
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
2626
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
2727
"summary", "transform", "drop", "window", "as.data.frame", "union", "not")
28-
if (as.numeric(R.version$major) >= 3 && as.numeric(R.version$minor) >= 3) {
28+
version <- packageVersion("base")
29+
if (as.numeric(version$major) >= 3 && as.numeric(version$minor) >= 3) {
2930
namesOfMasked <- c("endsWith", "startsWith", namesOfMasked)
3031
}
3132
masked <- conflicts(detail = TRUE)$`package:SparkR`

R/pkg/tests/fulltests/test_includePackage.R

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ rdd <- parallelize(sc, nums, 2L)
2727

2828
test_that("include inside function", {
2929
# Only run the test if plyr is installed.
30-
if ("plyr" %in% rownames(installed.packages())) {
31-
suppressPackageStartupMessages(library(plyr))
30+
if ("plyr" %in% rownames(installed.packages()) &&
31+
suppressPackageStartupMessages(suppressWarnings(library(plyr, logical.return = TRUE)))) {
3232
generateData <- function(x) {
3333
suppressPackageStartupMessages(library(plyr))
3434
attach(airquality)
@@ -44,8 +44,8 @@ test_that("include inside function", {
4444

4545
test_that("use include package", {
4646
# Only run the test if plyr is installed.
47-
if ("plyr" %in% rownames(installed.packages())) {
48-
suppressPackageStartupMessages(library(plyr))
47+
if ("plyr" %in% rownames(installed.packages()) &&
48+
suppressPackageStartupMessages(suppressWarnings(library(plyr, logical.return = TRUE)))) {
4949
generateData <- function(x) {
5050
attach(airquality)
5151
result <- transform(Ozone, logOzone = log(Ozone))

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1810,7 +1810,8 @@ test_that("string operators", {
18101810
expect_true(first(select(df, endsWith(df$name, "el")))[[1]])
18111811
expect_equal(first(select(df, substr(df$name, 1, 2)))[[1]], "Mi")
18121812
expect_equal(first(select(df, substr(df$name, 4, 6)))[[1]], "hae")
1813-
if (as.numeric(R.version$major) >= 3 && as.numeric(R.version$minor) >= 3) {
1813+
version <- packageVersion("base")
1814+
if (as.numeric(version$major) >= 3 && as.numeric(version$minor) >= 3) {
18141815
expect_true(startsWith("Hello World", "Hello"))
18151816
expect_false(endsWith("Hello World", "a"))
18161817
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,8 @@ private[spark] object SparkConf extends Logging {
684684
"spark.yarn.jars" -> Seq(
685685
AlternateConfig("spark.yarn.jar", "2.0")),
686686
MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
687-
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
687+
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3"),
688+
AlternateConfig("spark.maxRemoteBlockSizeFetchToMem", "3.0")),
688689
LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
689690
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3")),
690691
DRIVER_MEMORY_OVERHEAD.key -> Seq(

core/src/main/scala/org/apache/spark/internal/Logging.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ trait Logging {
117117
}
118118

119119
// For testing
120-
def initializeForcefully(isInterpreter: Boolean, silent: Boolean): Unit = {
120+
private[spark] def initializeForcefully(isInterpreter: Boolean, silent: Boolean): Unit = {
121121
initializeLogging(isInterpreter, silent)
122122
}
123123

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,7 +895,7 @@ package object config {
895895
.createWithDefault(Int.MaxValue)
896896

897897
private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM =
898-
ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem")
898+
ConfigBuilder("spark.network.maxRemoteBlockSizeFetchToMem")
899899
.doc("Remote block will be fetched to disk when size of the block is above this threshold " +
900900
"in bytes. This is to avoid a giant request takes too much memory. Note this " +
901901
"configuration will affect both shuffle fetch and block manager remote block fetch. " +

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ private[spark] class TaskSchedulerImpl(
430430
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
431431
val availableResources = shuffledOffers.map(_.resources).toArray
432432
val availableCpus = shuffledOffers.map(o => o.cores).toArray
433-
val sortedTaskSets = rootPool.getSortedTaskSetQueue
433+
val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
434434
for (taskSet <- sortedTaskSets) {
435435
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
436436
taskSet.parent.name, taskSet.name, taskSet.runningTasks))

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ private[spark] class TaskSetManager(
229229
index: Int,
230230
resolveRacks: Boolean = true,
231231
speculatable: Boolean = false): Unit = {
232+
// A zombie TaskSetManager may reach here while handling failed task.
233+
if (isZombie) return
232234
val pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasks
233235
for (loc <- tasks(index).preferredLocations) {
234236
loc match {
@@ -1082,6 +1084,8 @@ private[spark] class TaskSetManager(
10821084
}
10831085

10841086
def recomputeLocality(): Unit = {
1087+
// A zombie TaskSetManager may reach here while executorLost happens
1088+
if (isZombie) return
10851089
val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
10861090
myLocalityLevels = computeValidLocalityLevels()
10871091
localityWaits = myLocalityLevels.map(getLocalityWait)

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,6 @@ private[serializer] object KryoSerializer {
502502
"org.apache.spark.ml.attribute.NumericAttribute",
503503

504504
"org.apache.spark.ml.feature.Instance",
505-
"org.apache.spark.ml.feature.InstanceBlock",
506505
"org.apache.spark.ml.feature.LabeledPoint",
507506
"org.apache.spark.ml.feature.OffsetInstance",
508507
"org.apache.spark.ml.linalg.DenseMatrix",

0 commit comments

Comments
 (0)