Skip to content

Commit 5bc3cba

Browse files
committed
[SPARK-3984] [SPARK-3983] Improve UI task metrics.
This commit fixes the scheduler delay in the UI (which previously included things that are not scheduler delay, like time to deserialize the task and serialize the result), and also adds finer-grained information to the summary table for each stage about task launch overhead (which is useful for debugging performance of short jobs, where the overhead is not-insignificant).
1 parent 55ab777 commit 5bc3cba

File tree

7 files changed

+56
-3
lines changed

7 files changed

+56
-3
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,15 @@ private[spark] class Executor(
110110
// Maintains the list of running tasks.
111111
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
112112

113+
// Time when the task arrived on the executor. Used to track the overhead of getting a thread for
114+
// the task to run in.
115+
private val taskStartTimes = new ConcurrentHashMap[Long, Long]
116+
113117
startDriverHeartbeater()
114118

115119
def launchTask(
116120
context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
121+
taskStartTimes.put(taskId, System.currentTimeMillis)
117122
val tr = new TaskRunner(context, taskId, taskName, serializedTask)
118123
runningTasks.put(taskId, tr)
119124
threadPool.execute(tr)
@@ -152,7 +157,7 @@ private[spark] class Executor(
152157
}
153158

154159
override def run() {
155-
val startTime = System.currentTimeMillis()
160+
val deserializeStartTime = System.currentTimeMillis()
156161
Thread.currentThread.setContextClassLoader(replClassLoader)
157162
val ser = SparkEnv.get.closureSerializer.newInstance()
158163
logInfo(s"Running $taskName (TID $taskId)")
@@ -197,7 +202,8 @@ private[spark] class Executor(
197202
val afterSerialization = System.currentTimeMillis()
198203

199204
for (m <- task.metrics) {
200-
m.executorDeserializeTime = taskStart - startTime
205+
m.executorLaunchTime = deserializeStartTime - taskStartTimes.get(taskId)
206+
m.executorDeserializeTime = taskStart - deserializeStartTime
201207
m.executorRunTime = taskFinish - taskStart
202208
m.jvmGCTime = gcTime - startGCTime
203209
m.resultSerializationTime = afterSerialization - beforeSerialization
@@ -267,6 +273,7 @@ private[spark] class Executor(
267273
// Release memory used by this thread for unrolling blocks
268274
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
269275
runningTasks.remove(taskId)
276+
taskStartTimes.remove(taskId)
270277
}
271278
}
272279
}

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ class TaskMetrics extends Serializable {
4141
*/
4242
var hostname: String = _
4343

44+
/**
45+
* Time taken on the executor to launch the task in its own thread.
46+
*/
47+
var executorLaunchTime: Long = _
48+
4449
/**
4550
* Time taken on the executor to deserialize this task
4651
*/

core/src/main/scala/org/apache/spark/ui/ToolTips.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
package org.apache.spark.ui
1919

2020
private[spark] object ToolTips {
21+
val EXECUTOR_LAUNCH_TIME =
22+
"""Overhead associated with launching the task in its own thread on the executor."""
23+
2124
val SCHEDULER_DELAY =
2225
"""Scheduler delay includes time to ship the task from the scheduler to
2326
the executor, and time to send the task result from the executor to the scheduler. If

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,20 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
179179
}
180180
}
181181

182+
val executorLaunchTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
183+
metrics.get.executorLaunchTime.toDouble
184+
}
185+
val executorLaunchTitle = <td><span data-toggle="tooltip"
186+
title={ToolTips.EXECUTOR_LAUNCH_TIME} data-placement="right">Launch time</span></td>
187+
val executorLaunchQuantiles =
188+
executorLaunchTitle +: getFormattedTimeQuantiles(executorLaunchTimes)
189+
190+
val deserializationTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
191+
metrics.get.executorDeserializeTime.toDouble
192+
}
193+
val deserializationQuantiles =
194+
<td>Task deserialization time</td> +: getFormattedTimeQuantiles(deserializationTimes)
195+
182196
val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
183197
metrics.get.executorRunTime.toDouble
184198
}
@@ -248,6 +262,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
248262
val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
249263
metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
250264
}
265+
251266
val shuffleWriteQuantiles = <td>Shuffle Write</td> +:
252267
getFormattedSizeQuantiles(shuffleWriteSizes)
253268

@@ -266,6 +281,10 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
266281
val listings: Seq[Seq[Node]] = Seq(
267282
<tr>{serviceQuantiles}</tr>,
268283
<tr class={TaskDetailsClassNames.SCHEDULER_DELAY}>{schedulerDelayQuantiles}</tr>,
284+
<tr class={TaskDetailsClassNames.EXECUTOR_LAUNCH_TIME}>{executorLaunchQuantiles}</tr>
285+
<tr class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
286+
{deserializationQuantiles}
287+
</tr>
269288
<tr class={TaskDetailsClassNames.GC_TIME}>{gcQuantiles}</tr>,
270289
<tr class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
271290
{serializationQuantiles}
@@ -424,6 +443,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
424443
(info.finishTime - info.launchTime)
425444
}
426445
}
427-
totalExecutionTime - metrics.executorRunTime
446+
val executorOverhead = (metrics.executorLaunchTime +
447+
metrics.executorDeserializeTime +
448+
metrics.resultSerializationTime)
449+
totalExecutionTime - metrics.executorRunTime - executorOverhead
428450
}
429451
}

core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ package org.apache.spark.ui.jobs
2424
private object TaskDetailsClassNames {
2525
val SCHEDULER_DELAY = "scheduler_delay"
2626
val GC_TIME = "gc_time"
27+
val EXECUTOR_LAUNCH_TIME = "launch_time"
28+
val TASK_DESERIALIZATION_TIME = "deserialization_time"
2729
val RESULT_SERIALIZATION_TIME = "serialization_time"
2830
val GETTING_RESULT_TIME = "getting_result_time"
2931
}

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ private[spark] object JsonProtocol {
240240
})
241241
}.getOrElse(JNothing)
242242
("Host Name" -> taskMetrics.hostname) ~
243+
("Executor Launch Time" -> taskMetrics.executorLaunchTime) ~
243244
("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
244245
("Executor Run Time" -> taskMetrics.executorRunTime) ~
245246
("Result Size" -> taskMetrics.resultSize) ~
@@ -562,6 +563,7 @@ private[spark] object JsonProtocol {
562563
}
563564
val metrics = new TaskMetrics
564565
metrics.hostname = (json \ "Host Name").extract[String]
566+
metrics.executorLaunchTime = (json \ "Executor Launch Time").extractOpt[Long].getOrElse(0)
565567
metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long]
566568
metrics.executorRunTime = (json \ "Executor Run Time").extract[Long]
567569
metrics.resultSize = (json \ "Result Size").extract[Long]

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,15 @@ class JsonProtocolSuite extends FunSuite {
154154
assert(newMetrics.inputMetrics.isEmpty)
155155
}
156156

157+
test("TaskMetrics.executorLaunchTime backward compatibility") {
158+
// executorLaunchTime was added after 1.1.
159+
val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true)
160+
val newJson = JsonProtocol.taskMetricsToJson(metrics)
161+
val oldJson = newJson.removeField { case (field, _) => field == "Executor Launch Time" }
162+
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
163+
assert(newMetrics.executorLaunchTime === 0L)
164+
}
165+
157166
test("BlockManager events backward compatibility") {
158167
// SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property.
159168
val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
@@ -554,6 +563,7 @@ class JsonProtocolSuite extends FunSuite {
554563
val t = new TaskMetrics
555564
val sw = new ShuffleWriteMetrics
556565
t.hostname = "localhost"
566+
t.executorLaunchTime = c + d
557567
t.executorDeserializeTime = a
558568
t.executorRunTime = b
559569
t.resultSize = c
@@ -796,6 +806,7 @@ class JsonProtocolSuite extends FunSuite {
796806
| },
797807
| "Task Metrics": {
798808
| "Host Name": "localhost",
809+
| "Executor Launch Time": 1100,
799810
| "Executor Deserialize Time": 300,
800811
| "Executor Run Time": 400,
801812
| "Result Size": 500,
@@ -879,6 +890,7 @@ class JsonProtocolSuite extends FunSuite {
879890
| },
880891
| "Task Metrics": {
881892
| "Host Name": "localhost",
893+
| "Executor Launch Time": 1100,
882894
| "Executor Deserialize Time": 300,
883895
| "Executor Run Time": 400,
884896
| "Result Size": 500,

0 commit comments

Comments
 (0)