Skip to content

[SPARK-4163][Core][WebUI] Send the fetch failure message back to Web UI #3032

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ case class FetchFailed(
bmAddress: BlockManagerId, // Note that bmAddress can be null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one day we'll use proper subclassing to get rid of this silly case too

shuffleId: Int,
mapId: Int,
reduceId: Int)
reduceId: Int,
message: String)
extends TaskFailedReason {
override def toErrorString: String = {
val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId)"
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, " +
s"message=\n$message\n)"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ class DAGScheduler(
logInfo("Resubmitted " + task + ", so marking it as still running")
stage.pendingTasks += task

case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleToMapStage(shuffleId)

Expand All @@ -1060,7 +1060,7 @@ class DAGScheduler(
if (runningStages.contains(failedStage)) {
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
s"due to a fetch failure from $mapStage (${mapStage.name})")
markStageAsFinished(failedStage, Some("Fetch failure"))
markStageAsFinished(failedStage, Some("Fetch failure: " + failureMessage))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong here, I didn't realize that the failureMessage includes the "FetchFailedException: ", which makes the prepending of "Fetch failure:" redundant. Please go ahead and remove it again, sorry for making you do extra work!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I I revert to the Utils.exceptionString() way, FetchFailedException won't appear here because failureMessage will only contain Utils.exceptionString(e). So I will still keep Fetch failure here.

runningStages -= failedStage
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
" STAGE_ID=" + taskEnd.stageId
stageLogInfo(taskEnd.stageId, taskStatus)
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
case FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) =>
taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" +
taskEnd.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
mapId + " REDUCE_ID=" + reduceId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.shuffle

import org.apache.spark.storage.BlockManagerId
import org.apache.spark.{FetchFailed, TaskEndReason}
import org.apache.spark.util.Utils

/**
* Failed to fetch a shuffle block. The executor catches this exception and propagates it
Expand All @@ -30,13 +31,11 @@ private[spark] class FetchFailedException(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int)
extends Exception {

override def getMessage: String =
"Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId)
reduceId: Int,
message: String)
extends Exception(message) {

def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId)
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message)
}

/**
Expand All @@ -46,7 +45,4 @@ private[spark] class MetadataFetchFailedException(
shuffleId: Int,
reduceId: Int,
message: String)
extends FetchFailedException(null, shuffleId, -1, reduceId) {

override def getMessage: String = message
}
extends FetchFailedException(null, shuffleId, -1, reduceId, message)
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package org.apache.spark.shuffle.hash

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.util.{Failure, Success, Try}

import org.apache.spark._
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.{CompletionIterator, Utils}

private[hash] object BlockStoreShuffleFetcher extends Logging {
def fetch[T](
Expand Down Expand Up @@ -52,21 +53,22 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
(address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
}

def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {
def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = {
val blockId = blockPair._1
val blockOption = blockPair._2
blockOption match {
case Some(block) => {
case Success(block) => {
block.asInstanceOf[Iterator[T]]
}
case None => {
case Failure(e) => {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId)
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId,
Utils.exceptionString(e))
case _ =>
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block")
"Failed to get block " + blockId + ", which is not a shuffle block", e)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.util.concurrent.LinkedBlockingQueue

import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
import scala.util.{Failure, Success, Try}

import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.network.{BlockFetchingListener, BlockTransferService}
Expand Down Expand Up @@ -54,7 +55,7 @@ final class ShuffleBlockFetcherIterator(
blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
serializer: Serializer,
maxBytesInFlight: Long)
extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
extends Iterator[(BlockId, Try[Iterator[Any]])] with Logging {

import ShuffleBlockFetcherIterator._

Expand Down Expand Up @@ -117,16 +118,18 @@ final class ShuffleBlockFetcherIterator(
private[this] def cleanup() {
isZombie = true
// Release the current buffer if necessary
if (currentResult != null && !currentResult.failed) {
currentResult.buf.release()
currentResult match {
case SuccessFetchResult(_, _, buf) => buf.release()
case _ =>
}

// Release buffers in the results queue
val iter = results.iterator()
while (iter.hasNext) {
val result = iter.next()
if (!result.failed) {
result.buf.release()
result match {
case SuccessFetchResult(_, _, buf) => buf.release()
case _ =>
}
}
}
Expand All @@ -149,7 +152,7 @@ final class ShuffleBlockFetcherIterator(
// Increment the ref count because we need to pass this to a different thread.
// This needs to be released after use.
buf.retain()
results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), buf))
results.put(new SuccessFetchResult(BlockId(blockId), sizeMap(blockId), buf))
shuffleMetrics.remoteBytesRead += buf.size
shuffleMetrics.remoteBlocksFetched += 1
}
Expand All @@ -158,7 +161,7 @@ final class ShuffleBlockFetcherIterator(

override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
results.put(new FetchResult(BlockId(blockId), -1, null))
results.put(new FailureFetchResult(BlockId(blockId), e))
}
}
)
Expand Down Expand Up @@ -229,12 +232,12 @@ final class ShuffleBlockFetcherIterator(
val buf = blockManager.getBlockData(blockId)
shuffleMetrics.localBlocksFetched += 1
buf.retain()
results.put(new FetchResult(blockId, 0, buf))
results.put(new SuccessFetchResult(blockId, 0, buf))
} catch {
case e: Exception =>
// If we see an exception, stop immediately.
logError(s"Error occurred while fetching local blocks", e)
results.put(new FetchResult(blockId, -1, null))
results.put(new FailureFetchResult(blockId, e))
return
}
}
Expand Down Expand Up @@ -265,36 +268,39 @@ final class ShuffleBlockFetcherIterator(

override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch

override def next(): (BlockId, Option[Iterator[Any]]) = {
override def next(): (BlockId, Try[Iterator[Any]]) = {
numBlocksProcessed += 1
val startFetchWait = System.currentTimeMillis()
currentResult = results.take()
val result = currentResult
val stopFetchWait = System.currentTimeMillis()
shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait)
if (!result.failed) {
bytesInFlight -= result.size

result match {
case SuccessFetchResult(_, size, _) => bytesInFlight -= size
case _ =>
}
// Send fetch requests up to maxBytesInFlight
while (fetchRequests.nonEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}

val iteratorOpt: Option[Iterator[Any]] = if (result.failed) {
None
} else {
val is = blockManager.wrapForCompression(result.blockId, result.buf.createInputStream())
val iter = serializer.newInstance().deserializeStream(is).asIterator
Some(CompletionIterator[Any, Iterator[Any]](iter, {
// Once the iterator is exhausted, release the buffer and set currentResult to null
// so we don't release it again in cleanup.
currentResult = null
result.buf.release()
}))
val iteratorTry: Try[Iterator[Any]] = result match {
case FailureFetchResult(_, e) => Failure(e)
case SuccessFetchResult(blockId, _, buf) => {
val is = blockManager.wrapForCompression(blockId, buf.createInputStream())
val iter = serializer.newInstance().deserializeStream(is).asIterator
Success(CompletionIterator[Any, Iterator[Any]](iter, {
// Once the iterator is exhausted, release the buffer and set currentResult to null
// so we don't release it again in cleanup.
currentResult = null
buf.release()
}))
}
}

(result.blockId, iteratorOpt)
(result.blockId, iteratorTry)
}
}

Expand All @@ -313,14 +319,30 @@ object ShuffleBlockFetcherIterator {
}

/**
* Result of a fetch from a remote block. A failure is represented as size == -1.
* Result of a fetch from a remote block.
*/
private[storage] sealed trait FetchResult {
val blockId: BlockId
}

/**
* Result of a fetch from a remote block successfully.
* @param blockId block id
* @param size estimated size of the block, used to calculate bytesInFlight.
* Note that this is NOT the exact bytes. -1 if failure is present.
* @param buf [[ManagedBuffer]] for the content. null is error.
* Note that this is NOT the exact bytes.
* @param buf [[ManagedBuffer]] for the content.
*/
case class FetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer) {
def failed: Boolean = size == -1
if (failed) assert(buf == null) else assert(buf != null)
private[storage] case class SuccessFetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure we're on absolutely the same page, private[storage] is used to restrict visibility, to ensure that we never accidentally expose this API to a wider audience than necessary. sealed is used to restrict inheritance, so that the compiler (and programmer) can check that all cases have been matched on. The attributes are thus orthogonal in nature.

Adding private[storage] is good here, and we should also add it to the trait FetchResult as well, to make sure it's restricted in both visibility and inheritance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you.

extends FetchResult {
require(buf != null)
require(size >= 0)
}

/**
* Result of a fetch from a remote block unsuccessfully.
* @param blockId block id
* @param e the failure exception
*/
private[storage] case class FailureFetchResult(blockId: BlockId, e: Throwable)
extends FetchResult
}
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ private[spark] object JsonProtocol {
("Block Manager Address" -> blockManagerAddress) ~
("Shuffle ID" -> fetchFailed.shuffleId) ~
("Map ID" -> fetchFailed.mapId) ~
("Reduce ID" -> fetchFailed.reduceId)
("Reduce ID" -> fetchFailed.reduceId) ~
("Message" -> fetchFailed.message)
case exceptionFailure: ExceptionFailure =>
val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
val metrics = exceptionFailure.metrics.map(taskMetricsToJson).getOrElse(JNothing)
Expand Down Expand Up @@ -627,7 +628,9 @@ private[spark] object JsonProtocol {
val shuffleId = (json \ "Shuffle ID").extract[Int]
val mapId = (json \ "Map ID").extract[Int]
val reduceId = (json \ "Reduce ID").extract[Int]
new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId)
val message = Utils.jsonOption(json \ "Message").map(_.extract[String])
new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId,
message.getOrElse("Unknown reason"))
case `exceptionFailure` =>
val className = (json \ "Class Name").extract[String]
val description = (json \ "Description").extract[String]
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1593,7 +1593,7 @@ private[spark] object Utils extends Logging {
}

/** Return a nice string representation of the exception, including the stack trace. */
def exceptionString(e: Exception): String = {
def exceptionString(e: Throwable): String = {
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
// the 2nd ResultTask failed
complete(taskSets(1), Seq(
(Success, 42),
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null)))
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)))
// this will get called
// blockManagerMaster.removeExecutor("exec-hostA")
// ask the scheduler to try it again
Expand Down Expand Up @@ -461,7 +461,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
// The first result task fails, with a fetch failure for the output from the first mapper.
runEvent(CompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
null,
Map[Long, Any](),
null,
Expand All @@ -472,7 +472,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
// The second ResultTask fails, with a fetch failure for the output from the second mapper.
runEvent(CompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"),
null,
Map[Long, Any](),
null,
Expand Down Expand Up @@ -624,7 +624,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
(Success, makeMapStatus("hostC", 1))))
// fail the third stage because hostA went down
complete(taskSets(2), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
// TODO assert this:
// blockManagerMaster.removeExecutor("exec-hostA")
// have DAGScheduler try again
Expand Down Expand Up @@ -655,7 +655,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
(Success, makeMapStatus("hostB", 1))))
// pretend stage 0 failed because hostA went down
complete(taskSets(2), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
// TODO assert this:
// blockManagerMaster.removeExecutor("exec-hostA")
// DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
for (i <- 0 until 5) {
assert(iterator.hasNext, s"iterator should have 5 elements but actually has $i elements")
val (blockId, subIterator) = iterator.next()
assert(subIterator.isDefined,
assert(subIterator.isSuccess,
s"iterator should have 5 elements defined but actually has $i elements")

// Make sure we release the buffer once the iterator is exhausted.
Expand Down Expand Up @@ -233,8 +233,8 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
sem.acquire()

// The first block should be defined, and the last two are not defined (due to failure)
assert(iterator.next()._2.isDefined === true)
assert(iterator.next()._2.isDefined === false)
assert(iterator.next()._2.isDefined === false)
assert(iterator.next()._2.isSuccess)
assert(iterator.next()._2.isFailure)
assert(iterator.next()._2.isFailure)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// Go through all the failure cases to make sure we are counting them as failures.
val taskFailedReasons = Seq(
Resubmitted,
new FetchFailed(null, 0, 0, 0),
new FetchFailed(null, 0, 0, 0, "ignored"),
new ExceptionFailure("Exception", "description", null, None),
TaskResultLost,
TaskKilled,
Expand Down
Loading