Skip to content

Commit ce756ec

Browse files
cloud-fanbeliefer
authored andcommitted
[SPARK-40834][SQL][FOLLOWUP] Take care of legacy query end events
### What changes were proposed in this pull request? This is a followup of apache#38302 . For events generated by old versions of Spark, which do not have the new `errorMessage` field, we should use the old way to detect query execution status (failed or not). This PR also adds a UI test for the expected behavior. ### Why are the changes needed? backward compatibility ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes apache#38747 from cloud-fan/ui. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 01b4f38 commit ce756ec

File tree

6 files changed

+72
-10
lines changed

6 files changed

+72
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,11 @@ object SQLExecution {
121121
SparkThrowableHelper.getMessage(e)
122122
}
123123
val event = SparkListenerSQLExecutionEnd(
124-
executionId, System.currentTimeMillis(), errorMessage)
124+
executionId,
125+
System.currentTimeMillis(),
126+
// Use empty string to indicate no error, as None may mean events generated by old
127+
// versions of Spark.
128+
errorMessage.orElse(Some("")))
125129
// Currently only `Dataset.withAction` and `DataFrameWriter.runCommand` specify the `name`
126130
// parameter. The `ExecutionListenerManager` only watches SQL executions with name. We
127131
// can specify the execution name in more places in the future, so that

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,24 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
4242

4343
sqlStore.executionsList().foreach { e =>
4444
if (e.errorMessage.isDefined) {
45-
failed += e
46-
} else if (e.completionTime.nonEmpty) {
47-
completed += e
48-
} else {
45+
if (e.errorMessage.get.isEmpty) {
46+
completed += e
47+
} else {
48+
failed += e
49+
}
50+
} else if (e.completionTime.isEmpty) {
4951
running += e
52+
} else {
53+
// When `completionTime` is present, it means the query execution is completed and
54+
// `errorMessage` should be present as well. However, events generated by old versions of
55+
// Spark do not have the `errorMessage` field. We have to check the status of this query
56+
// execution's jobs.
57+
val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED }
58+
if (isFailed) {
59+
failed += e
60+
} else {
61+
completed += e
62+
}
5063
}
5164
}
5265

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -489,13 +489,13 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
489489
var metrics = Seq[SQLPlanMetric]()
490490
var submissionTime = -1L
491491
var completionTime: Option[Date] = None
492+
var errorMessage: Option[String] = None
492493

493494
var jobs = Map[Int, JobExecutionStatus]()
494495
var stages = Set[Int]()
495496
var driverAccumUpdates = Seq[(Long, Long)]()
496497

497498
@volatile var metricsValues: Map[Long, String] = null
498-
var errorMessage: Option[String] = None
499499

500500
// Just in case job end and execution end arrive out of order, keep track of how many
501501
// end events arrived so that the listener can stop tracking the execution.
@@ -511,10 +511,10 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
511511
metrics,
512512
submissionTime,
513513
completionTime,
514+
errorMessage,
514515
jobs,
515516
stages,
516-
metricsValues,
517-
errorMessage)
517+
metricsValues)
518518
}
519519

520520
}

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ class SQLExecutionUIData(
9090
val metrics: Seq[SQLPlanMetric],
9191
val submissionTime: Long,
9292
val completionTime: Option[Date],
93+
val errorMessage: Option[String],
9394
@JsonDeserialize(keyAs = classOf[Integer])
9495
val jobs: Map[Int, JobExecutionStatus],
9596
@JsonDeserialize(contentAs = classOf[Integer])
@@ -100,8 +101,7 @@ class SQLExecutionUIData(
100101
* from the SQL listener instance.
101102
*/
102103
@JsonDeserialize(keyAs = classOf[JLong])
103-
val metricValues: Map[Long, String],
104-
val errorMessage: Option[String]) {
104+
val metricValues: Map[Long, String]) {
105105

106106
@JsonIgnore @KVIndex("completionTime")
107107
private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L)

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ case class SparkListenerSQLExecutionStart(
5555
case class SparkListenerSQLExecutionEnd(
5656
executionId: Long,
5757
time: Long,
58+
// For backward compatibility, the `errorMessage` will be None when we parse event logs
59+
// generated by old versions of Spark. It should always be Some in Spark 3.4+ and empty string
60+
// means there is no error during execution.
5861
errorMessage: Option[String] = None)
5962
extends SparkListenerEvent {
6063

sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import scala.xml.Node
2626
import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
2727
import org.scalatest.BeforeAndAfter
2828

29+
import org.apache.spark.SparkConf
2930
import org.apache.spark.scheduler.{JobFailed, SparkListenerJobEnd, SparkListenerJobStart}
3031
import org.apache.spark.sql.DataFrame
3132
import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
@@ -35,6 +36,11 @@ import org.apache.spark.util.kvstore.InMemoryStore
3536

3637
class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter {
3738

39+
override def sparkConf: SparkConf = {
40+
// Disable async kv store write in the UI, to make tests more stable here.
41+
super.sparkConf.set(org.apache.spark.internal.config.Status.ASYNC_TRACKING_ENABLED, false)
42+
}
43+
3844
import testImplicits._
3945

4046
var kvstore: ElementTrackingStore = _
@@ -60,6 +66,42 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter {
6066
assert(!html.contains("1970/01/01"))
6167
}
6268

69+
test("SPARK-40834: prioritize `errorMessage` over job failures") {
70+
val statusStore = createStatusStore
71+
val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS)
72+
when(tab.sqlStore).thenReturn(statusStore)
73+
74+
val request = mock(classOf[HttpServletRequest])
75+
when(tab.appName).thenReturn("testing")
76+
when(tab.headerTabs).thenReturn(Seq.empty)
77+
78+
Seq(Some(""), Some("testErrorMsg"), None).foreach { msg =>
79+
val listener = statusStore.listener.get
80+
val page = new AllExecutionsPage(tab)
81+
val df = createTestDataFrame
82+
listener.onOtherEvent(SparkListenerSQLExecutionStart(
83+
0,
84+
"test",
85+
"test",
86+
df.queryExecution.toString,
87+
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
88+
System.currentTimeMillis()))
89+
listener.onJobStart(SparkListenerJobStart(
90+
jobId = 0,
91+
time = System.currentTimeMillis(),
92+
stageInfos = Nil,
93+
createProperties(0)))
94+
listener.onJobEnd(SparkListenerJobEnd(
95+
jobId = 0,
96+
time = System.currentTimeMillis(),
97+
JobFailed(new RuntimeException("Oops"))))
98+
listener.onOtherEvent(SparkListenerSQLExecutionEnd(0, System.currentTimeMillis(), msg))
99+
val html = page.render(request).toString().toLowerCase(Locale.ROOT)
100+
101+
assert(html.contains("failed queries") == !msg.contains(""))
102+
}
103+
}
104+
63105
test("sorting should be successful") {
64106
val statusStore = createStatusStore
65107
val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS)

0 commit comments

Comments
 (0)