Skip to content

Commit 3abdb1b

Browse files
zsxwingAndrew Or
authored andcommitted
[SPARK-4204][Core][WebUI] Change Utils.exceptionString to contain the inner exceptions and make the error information in Web UI more friendly
This PR fixed `Utils.exceptionString` to output the full exception information. However, the stack trace may become very huge, so I also updated the Web UI to collapse the error information by default (display the first line and clicking `+detail` will display the full info). Here are the screenshots: Stages: ![stages](https://cloud.githubusercontent.com/assets/1000778/4882441/66d8cc68-6356-11e4-8346-6318677d9470.png) Details for one stage: ![stage](https://cloud.githubusercontent.com/assets/1000778/4882513/1311043c-6357-11e4-8804-ca14240a9145.png) The full information in the gray text field is: ```Java org.apache.spark.shuffle.FetchFailedException: Connection reset by peer at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:189) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Caused by: java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) at sun.nio.ch.IOUtil.read(IOUtil.java:166) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) ... 1 more ``` /cc aarondav Author: zsxwing <[email protected]> Closes #3073 from zsxwing/SPARK-4204 and squashes the following commits: 176d1e3 [zsxwing] Add comments to explain the stack trace difference ca509d3 [zsxwing] Add fullStackTrace to the constructor of ExceptionFailure a07057b [zsxwing] Core style fix dfb0032 [zsxwing] Backward compatibility for old history server 1e50f71 [zsxwing] Update as per review and increase the max height of the stack trace details 94f2566 [zsxwing] Change Utils.exceptionString to contain the inner exceptions and make the error information in Web UI more friendly
1 parent 48a19a6 commit 3abdb1b

File tree

12 files changed

+148
-30
lines changed

12 files changed

+148
-30
lines changed

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,20 @@ pre {
120120
border: none;
121121
}
122122

123+
.stacktrace-details {
124+
max-height: 300px;
125+
overflow-y: auto;
126+
margin: 0;
127+
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
128+
}
129+
130+
.stacktrace-details.collapsed {
131+
max-height: 0;
132+
padding-top: 0;
133+
padding-bottom: 0;
134+
border: none;
135+
}
136+
123137
span.expand-additional-metrics {
124138
cursor: pointer;
125139
}

core/src/main/scala/org/apache/spark/TaskEndReason.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,48 @@ case class FetchFailed(
8383
* :: DeveloperApi ::
8484
* Task failed due to a runtime exception. This is the most common failure case and also captures
8585
* user program exceptions.
86+
*
87+
* `stackTrace` contains the stack trace of the exception itself. It still exists for backward
88+
* compatibility. It's better to use `this(e: Throwable, metrics: Option[TaskMetrics])` to
89+
* create `ExceptionFailure` as it will handle the backward compatibility properly.
90+
*
91+
* `fullStackTrace` is a better representation of the stack trace because it contains the whole
92+
* stack trace including the exception and its causes
8693
*/
8794
@DeveloperApi
8895
case class ExceptionFailure(
8996
className: String,
9097
description: String,
9198
stackTrace: Array[StackTraceElement],
99+
fullStackTrace: String,
92100
metrics: Option[TaskMetrics])
93101
extends TaskFailedReason {
94-
override def toErrorString: String = Utils.exceptionString(className, description, stackTrace)
102+
103+
private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) {
104+
this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e), metrics)
105+
}
106+
107+
override def toErrorString: String =
108+
if (fullStackTrace == null) {
109+
// fullStackTrace is added in 1.2.0
110+
// If fullStackTrace is null, use the old error string for backward compatibility
111+
exceptionString(className, description, stackTrace)
112+
} else {
113+
fullStackTrace
114+
}
115+
116+
/**
117+
* Return a nice string representation of the exception, including the stack trace.
118+
* Note: It does not include the exception's causes, and is only used for backward compatibility.
119+
*/
120+
private def exceptionString(
121+
className: String,
122+
description: String,
123+
stackTrace: Array[StackTraceElement]): String = {
124+
val desc = if (description == null) "" else description
125+
val st = if (stackTrace == null) "" else stackTrace.map(" " + _).mkString("\n")
126+
s"$className: $desc\n$st"
127+
}
95128
}
96129

97130
/**

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ private[spark] class Executor(
263263
m.executorRunTime = serviceTime
264264
m.jvmGCTime = gcTime - startGCTime
265265
}
266-
val reason = ExceptionFailure(t.getClass.getName, t.getMessage, t.getStackTrace, metrics)
266+
val reason = new ExceptionFailure(t, metrics)
267267
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
268268

269269
// Don't forcibly exit unless the exception was inherently fatal, to avoid

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,7 +1063,7 @@ class DAGScheduler(
10631063
if (runningStages.contains(failedStage)) {
10641064
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
10651065
s"due to a fetch failure from $mapStage (${mapStage.name})")
1066-
markStageAsFinished(failedStage, Some("Fetch failure: " + failureMessage))
1066+
markStageAsFinished(failedStage, Some(failureMessage))
10671067
runningStages -= failedStage
10681068
}
10691069

@@ -1094,7 +1094,7 @@ class DAGScheduler(
10941094
handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
10951095
}
10961096

1097-
case ExceptionFailure(className, description, stackTrace, metrics) =>
1097+
case ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics) =>
10981098
// Do nothing here, left up to the TaskScheduler to decide how to handle user failures
10991099

11001100
case TaskResultLost =>

core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,21 @@ private[spark] class FetchFailedException(
3232
shuffleId: Int,
3333
mapId: Int,
3434
reduceId: Int,
35-
message: String)
36-
extends Exception(message) {
35+
message: String,
36+
cause: Throwable = null)
37+
extends Exception(message, cause) {
38+
39+
def this(
40+
bmAddress: BlockManagerId,
41+
shuffleId: Int,
42+
mapId: Int,
43+
reduceId: Int,
44+
cause: Throwable) {
45+
this(bmAddress, shuffleId, mapId, reduceId, cause.getMessage, cause)
46+
}
3747

38-
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message)
48+
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId,
49+
Utils.exceptionString(this))
3950
}
4051

4152
/**

core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark._
2525
import org.apache.spark.serializer.Serializer
2626
import org.apache.spark.shuffle.FetchFailedException
2727
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
28-
import org.apache.spark.util.{CompletionIterator, Utils}
28+
import org.apache.spark.util.CompletionIterator
2929

3030
private[hash] object BlockStoreShuffleFetcher extends Logging {
3131
def fetch[T](
@@ -64,8 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
6464
blockId match {
6565
case ShuffleBlockId(shufId, mapId, _) =>
6666
val address = statuses(mapId.toInt)._1
67-
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId,
68-
Utils.exceptionString(e))
67+
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
6968
case _ =>
7069
throw new SparkException(
7170
"Failed to get block " + blockId + ", which is not a shuffle block", e)

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import javax.servlet.http.HttpServletRequest
2222

2323
import scala.xml.{Node, Unparsed}
2424

25+
import org.apache.commons.lang3.StringEscapeUtils
26+
2527
import org.apache.spark.executor.TaskMetrics
2628
import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
2729
import org.apache.spark.ui.jobs.UIData._
@@ -436,13 +438,37 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
436438
{diskBytesSpilledReadable}
437439
</td>
438440
}}
439-
<td>
440-
{errorMessage.map { e => <pre>{e}</pre> }.getOrElse("")}
441-
</td>
441+
{errorMessageCell(errorMessage)}
442442
</tr>
443443
}
444444
}
445445

446+
private def errorMessageCell(errorMessage: Option[String]): Seq[Node] = {
447+
val error = errorMessage.getOrElse("")
448+
val isMultiline = error.indexOf('\n') >= 0
449+
// Display the first line by default
450+
val errorSummary = StringEscapeUtils.escapeHtml4(
451+
if (isMultiline) {
452+
error.substring(0, error.indexOf('\n'))
453+
} else {
454+
error
455+
})
456+
val details = if (isMultiline) {
457+
// scalastyle:off
458+
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
459+
class="expand-details">
460+
+details
461+
</span> ++
462+
<div class="stacktrace-details collapsed">
463+
<pre>{error}</pre>
464+
</div>
465+
// scalastyle:on
466+
} else {
467+
""
468+
}
469+
<td>{errorSummary}{details}</td>
470+
}
471+
446472
private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics): Long = {
447473
val totalExecutionTime = {
448474
if (info.gettingResultTime > 0) {

core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import scala.xml.Text
2222

2323
import java.util.Date
2424

25+
import org.apache.commons.lang3.StringEscapeUtils
26+
2527
import org.apache.spark.scheduler.StageInfo
2628
import org.apache.spark.ui.{ToolTips, UIUtils}
2729
import org.apache.spark.util.Utils
@@ -195,7 +197,29 @@ private[ui] class FailedStageTable(
195197

196198
override protected def stageRow(s: StageInfo): Seq[Node] = {
197199
val basicColumns = super.stageRow(s)
198-
val failureReason = <td valign="middle"><pre>{s.failureReason.getOrElse("")}</pre></td>
199-
basicColumns ++ failureReason
200+
val failureReason = s.failureReason.getOrElse("")
201+
val isMultiline = failureReason.indexOf('\n') >= 0
202+
// Display the first line by default
203+
val failureReasonSummary = StringEscapeUtils.escapeHtml4(
204+
if (isMultiline) {
205+
failureReason.substring(0, failureReason.indexOf('\n'))
206+
} else {
207+
failureReason
208+
})
209+
val details = if (isMultiline) {
210+
// scalastyle:off
211+
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
212+
class="expand-details">
213+
+details
214+
</span> ++
215+
<div class="stacktrace-details collapsed">
216+
<pre>{failureReason}</pre>
217+
</div>
218+
// scalastyle:on
219+
} else {
220+
""
221+
}
222+
val failureReasonHtml = <td valign="middle">{failureReasonSummary}{details}</td>
223+
basicColumns ++ failureReasonHtml
200224
}
201225
}

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ private[spark] object JsonProtocol {
287287
("Class Name" -> exceptionFailure.className) ~
288288
("Description" -> exceptionFailure.description) ~
289289
("Stack Trace" -> stackTrace) ~
290+
("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
290291
("Metrics" -> metrics)
291292
case ExecutorLostFailure(executorId) =>
292293
("Executor ID" -> executorId)
@@ -637,8 +638,10 @@ private[spark] object JsonProtocol {
637638
val className = (json \ "Class Name").extract[String]
638639
val description = (json \ "Description").extract[String]
639640
val stackTrace = stackTraceFromJson(json \ "Stack Trace")
641+
val fullStackTrace = Utils.jsonOption(json \ "Full Stack Trace").
642+
map(_.extract[String]).orNull
640643
val metrics = Utils.jsonOption(json \ "Metrics").map(taskMetricsFromJson)
641-
new ExceptionFailure(className, description, stackTrace, metrics)
644+
ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics)
642645
case `taskResultLost` => TaskResultLost
643646
case `taskKilled` => TaskKilled
644647
case `executorLostFailure` =>

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1599,19 +1599,19 @@ private[spark] object Utils extends Logging {
15991599
.orNull
16001600
}
16011601

1602-
/** Return a nice string representation of the exception, including the stack trace. */
1602+
/**
1603+
* Return a nice string representation of the exception. It will call "printStackTrace" to
1604+
* recursively generate the stack trace including the exception and its causes.
1605+
*/
16031606
def exceptionString(e: Throwable): String = {
1604-
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)
1605-
}
1606-
1607-
/** Return a nice string representation of the exception, including the stack trace. */
1608-
def exceptionString(
1609-
className: String,
1610-
description: String,
1611-
stackTrace: Array[StackTraceElement]): String = {
1612-
val desc = if (description == null) "" else description
1613-
val st = if (stackTrace == null) "" else stackTrace.map(" " + _).mkString("\n")
1614-
s"$className: $desc\n$st"
1607+
if (e == null) {
1608+
""
1609+
} else {
1610+
// Use e.printStackTrace here because e.getStackTrace doesn't include the cause
1611+
val stringWriter = new StringWriter()
1612+
e.printStackTrace(new PrintWriter(stringWriter))
1613+
stringWriter.toString
1614+
}
16151615
}
16161616

16171617
/** Return a thread dump of all threads' stacktraces. Used to capture dumps for the web UI */

0 commit comments

Comments
 (0)