Skip to content

Commit 57600e2

Browse files
committed
Merge remote-tracking branch 'origin/master' into json-time-parser
2 parents 55f2eac + 35f9163 commit 57600e2

File tree

73 files changed

+1240
-485
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+1240
-485
lines changed

bin/docker-image-tool.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ function create_dev_build_context {(
107107
"$PYSPARK_CTX/kubernetes/dockerfiles"
108108
mkdir "$PYSPARK_CTX/python"
109109
cp -r "python/lib" "$PYSPARK_CTX/python/lib"
110+
cp -r "python/pyspark" "$PYSPARK_CTX/python/pyspark"
110111

111112
local R_CTX="$CTX_DIR/sparkr"
112113
mkdir -p "$R_CTX/kubernetes"

core/src/main/resources/org/apache/spark/ui/static/stagepage.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,10 @@ function createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTable) {
221221
"searching": false,
222222
"order": [[0, "asc"]],
223223
"bSort": false,
224-
"bAutoWidth": false
224+
"bAutoWidth": false,
225+
"oLanguage": {
226+
"sEmptyTable": "No tasks have reported metrics yet"
227+
}
225228
};
226229
taskSummaryMetricsDataTable = $(taskMetricsTable).DataTable(taskConf);
227230
}
@@ -426,7 +429,10 @@ $(document).ready(function () {
426429
}
427430
],
428431
"order": [[0, "asc"]],
429-
"bAutoWidth": false
432+
"bAutoWidth": false,
433+
"oLanguage": {
434+
"sEmptyTable": "No data to show yet"
435+
}
430436
}
431437
var executorSummaryTableSelector =
432438
$("#summary-executor-table").DataTable(executorSummaryConf);

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -813,14 +813,14 @@ private[spark] class SparkSubmit extends Logging {
813813
mainClass = Utils.classForName(childMainClass)
814814
} catch {
815815
case e: ClassNotFoundException =>
816-
logWarning(s"Failed to load $childMainClass.", e)
816+
logError(s"Failed to load class $childMainClass.")
817817
if (childMainClass.contains("thriftserver")) {
818818
logInfo(s"Failed to load main class $childMainClass.")
819819
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
820820
}
821821
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
822822
case e: NoClassDefFoundError =>
823-
logWarning(s"Failed to load $childMainClass: ${e.getMessage()}")
823+
logError(s"Failed to load $childMainClass: ${e.getMessage()}")
824824
if (e.getMessage.contains("org/apache/hadoop/hive")) {
825825
logInfo(s"Failed to load hive class.")
826826
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
@@ -915,13 +915,17 @@ object SparkSubmit extends CommandLineUtils with Logging {
915915
override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
916916

917917
override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
918+
919+
override protected def logError(msg: => String): Unit = self.logError(msg)
918920
}
919921
}
920922

921923
override protected def logInfo(msg: => String): Unit = printMessage(msg)
922924

923925
override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
924926

927+
override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")
928+
925929
override def doSubmit(args: Array[String]): Unit = {
926930
try {
927931
super.doSubmit(args)

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -148,11 +148,20 @@ private[spark] class AppStatusStore(
148148
// cheaper for disk stores (avoids deserialization).
149149
val count = {
150150
Utils.tryWithResource(
151-
store.view(classOf[TaskDataWrapper])
152-
.parent(stageKey)
153-
.index(TaskIndexNames.EXEC_RUN_TIME)
154-
.first(0L)
155-
.closeableIterator()
151+
if (store.isInstanceOf[InMemoryStore]) {
152+
store.view(classOf[TaskDataWrapper])
153+
.parent(stageKey)
154+
.index(TaskIndexNames.STATUS)
155+
.first("SUCCESS")
156+
.last("SUCCESS")
157+
.closeableIterator()
158+
} else {
159+
store.view(classOf[TaskDataWrapper])
160+
.parent(stageKey)
161+
.index(TaskIndexNames.EXEC_RUN_TIME)
162+
.first(0L)
163+
.closeableIterator()
164+
}
156165
) { it =>
157166
var _count = 0L
158167
while (it.hasNext()) {
@@ -221,30 +230,50 @@ private[spark] class AppStatusStore(
221230
// stabilize once the stage finishes. It's also slow, especially with disk stores.
222231
val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }
223232

233+
// TODO: Summary metrics needs to display all the successful tasks' metrics (SPARK-26119).
234+
// For InMemory case, it is efficient to find using the following code. But for diskStore case
235+
// we need an efficient solution to avoid deserialization time overhead. For that, we need to
236+
// rework on the way indexing works, so that we can index by specific metrics for successful
237+
// and failed tasks differently (would be tricky). Also would require changing the disk store
238+
// version (to invalidate old stores).
224239
def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = {
225-
Utils.tryWithResource(
226-
store.view(classOf[TaskDataWrapper])
240+
if (store.isInstanceOf[InMemoryStore]) {
241+
val quantileTasks = store.view(classOf[TaskDataWrapper])
227242
.parent(stageKey)
228243
.index(index)
229244
.first(0L)
230-
.closeableIterator()
231-
) { it =>
232-
var last = Double.NaN
233-
var currentIdx = -1L
234-
indices.map { idx =>
235-
if (idx == currentIdx) {
236-
last
237-
} else {
238-
val diff = idx - currentIdx
239-
currentIdx = idx
240-
if (it.skip(diff - 1)) {
241-
last = fn(it.next()).toDouble
245+
.asScala
246+
.filter { _.status == "SUCCESS"} // Filter "SUCCESS" tasks
247+
.toIndexedSeq
248+
249+
indices.map { index =>
250+
fn(quantileTasks(index.toInt)).toDouble
251+
}.toIndexedSeq
252+
} else {
253+
Utils.tryWithResource(
254+
store.view(classOf[TaskDataWrapper])
255+
.parent(stageKey)
256+
.index(index)
257+
.first(0L)
258+
.closeableIterator()
259+
) { it =>
260+
var last = Double.NaN
261+
var currentIdx = -1L
262+
indices.map { idx =>
263+
if (idx == currentIdx) {
242264
last
243265
} else {
244-
Double.NaN
266+
val diff = idx - currentIdx
267+
currentIdx = idx
268+
if (it.skip(diff - 1)) {
269+
last = fn(it.next()).toDouble
270+
last
271+
} else {
272+
Double.NaN
273+
}
245274
}
246-
}
247-
}.toIndexedSeq
275+
}.toIndexedSeq
276+
}
248277
}
249278
}
250279

core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,34 @@ class AppStatusStoreSuite extends SparkFunSuite {
7777
assert(store.count(classOf[CachedQuantile]) === 2)
7878
}
7979

80+
test("only successfull task have taskSummary") {
81+
val store = new InMemoryStore()
82+
(0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) }
83+
val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles)
84+
assert(appStore.size === 0)
85+
}
86+
87+
test("summary should contain task metrics of only successfull tasks") {
88+
val store = new InMemoryStore()
89+
90+
for (i <- 0 to 5) {
91+
if (i % 2 == 1) {
92+
store.write(newTaskData(i, status = "FAILED"))
93+
} else {
94+
store.write(newTaskData(i))
95+
}
96+
}
97+
98+
val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles).get
99+
100+
val values = Array(0.0, 2.0, 4.0)
101+
102+
val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted)
103+
dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
104+
assert(expected === actual)
105+
}
106+
}
107+
80108
private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = {
81109
val store = new InMemoryStore()
82110
val values = (0 until count).map { i =>
@@ -93,12 +121,11 @@ class AppStatusStoreSuite extends SparkFunSuite {
93121
}
94122
}
95123

96-
private def newTaskData(i: Int): TaskDataWrapper = {
124+
private def newTaskData(i: Int, status: String = "SUCCESS"): TaskDataWrapper = {
97125
new TaskDataWrapper(
98-
i, i, i, i, i, i, i.toString, i.toString, i.toString, i.toString, false, Nil, None,
126+
i, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None,
99127
i, i, i, i, i, i, i, i, i, i,
100128
i, i, i, i, i, i, i, i, i, i,
101129
i, i, i, i, stageId, attemptId)
102130
}
103-
104131
}

dev/.scalafmt.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ align = none
1919
align.openParenDefnSite = false
2020
align.openParenCallSite = false
2121
align.tokens = []
22+
optIn = {
23+
configStyleArguments = false
24+
}
25+
danglingParentheses = false
2226
docstrings = JavaDoc
2327
maxColumn = 98
2428

docs/running-on-kubernetes.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,13 @@ specific to Spark on Kubernetes.
944944
<code>spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml`</code>
945945
</td>
946946
</tr>
947+
<tr>
948+
<td><code>spark.kubernetes.executor.deleteOnTermination</code></td>
949+
<td>true</td>
950+
<td>
951+
Specify whether executor pods should be deleted in case of failure or normal termination.
952+
</td>
953+
</tr>
947954
</table>
948955

949956
#### Pod template properties

docs/sql-data-sources-jdbc.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ the following case-insensitive options:
6464
Example:<br>
6565
<code>
6666
spark.read.format("jdbc")<br>
67-
&nbsp&nbsp .option("dbtable", "(select c1, c2 from t1) as subq")<br>
68-
&nbsp&nbsp .option("partitionColumn", "subq.c1"<br>
69-
&nbsp&nbsp .load()
67+
.option("url", jdbcUrl)<br>
68+
.option("query", "select c1, c2 from t1")<br>
69+
.load()
7070
</code></li>
7171
</ol>
7272
</td>

docs/sql-migration-guide-upgrade.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ displayTitle: Spark SQL Upgrading Guide
3333

3434
- Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0.
3535

36-
- Since Spark 3.0, JSON datasource uses java.time API for parsing and generating JSON content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
36+
- Since Spark 3.0, CSV/JSON datasources use java.time API for parsing and generating CSV/JSON content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
3737

3838
## Upgrading From Spark SQL 2.3 to 2.4
3939

project/SparkBuild.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,13 @@ object KubernetesIntegrationTests {
494494
dockerBuild := {
495495
if (shouldBuildImage) {
496496
val dockerTool = s"$sparkHome/bin/docker-image-tool.sh"
497-
val cmd = Seq(dockerTool, "-m", "-t", imageTag.value, "build")
497+
val bindingsDir = s"$sparkHome/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings"
498+
val cmd = Seq(dockerTool, "-m",
499+
"-t", imageTag.value,
500+
"-p", s"$bindingsDir/python/Dockerfile",
501+
"-R", s"$bindingsDir/R/Dockerfile",
502+
"build"
503+
)
498504
val ec = Process(cmd).!
499505
if (ec != 0) {
500506
throw new IllegalStateException(s"Process '${cmd.mkString(" ")}' exited with $ec.")

0 commit comments

Comments
 (0)