Skip to content

Commit 2053d79

Browse files
committed
Improve MapOutputTracker error logging.
Author: Reynold Xin <[email protected]> Closes apache#1258 from rxin/mapOutputTracker and squashes the following commits: a7c95b6 [Reynold Xin] Improve MapOutputTracker error logging.
1 parent 3c104c7 commit 2053d79

File tree

1 file changed

+10
-7
lines changed

1 file changed

+10
-7
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ import scala.concurrent.Await
2626
import akka.actor._
2727
import akka.pattern.ask
2828

29-
import org.apache.spark.util._
3029
import org.apache.spark.scheduler.MapStatus
3130
import org.apache.spark.shuffle.MetadataFetchFailedException
3231
import org.apache.spark.storage.BlockManagerId
32+
import org.apache.spark.util._
3333

3434
private[spark] sealed trait MapOutputTrackerMessage
3535
private[spark] case class GetMapOutputStatuses(shuffleId: Int)
@@ -107,14 +107,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
107107
Await.result(future, timeout)
108108
} catch {
109109
case e: Exception =>
110+
logError("Error communicating with MapOutputTracker", e)
110111
throw new SparkException("Error communicating with MapOutputTracker", e)
111112
}
112113
}
113114

114115
/** Send a one-way message to the trackerActor, to which we expect it to reply with true. */
115116
protected def sendTracker(message: Any) {
116-
if (askTracker(message) != true) {
117-
throw new SparkException("Error reply received from MapOutputTracker")
117+
val response = askTracker(message)
118+
if (response != true) {
119+
throw new SparkException(
120+
"Error reply received from MapOutputTracker. Expecting true, got " + response.toString)
118121
}
119122
}
120123

@@ -366,9 +369,9 @@ private[spark] object MapOutputTracker {
366369
// any of the statuses is null (indicating a missing location due to a failed mapper),
367370
// throw a FetchFailedException.
368371
private def convertMapStatuses(
369-
shuffleId: Int,
370-
reduceId: Int,
371-
statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
372+
shuffleId: Int,
373+
reduceId: Int,
374+
statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
372375
assert (statuses != null)
373376
statuses.map {
374377
status =>
@@ -403,7 +406,7 @@ private[spark] object MapOutputTracker {
403406
if (compressedSize == 0) {
404407
0
405408
} else {
406-
math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong
409+
math.pow(LOG_BASE, compressedSize & 0xFF).toLong
407410
}
408411
}
409412
}

0 commit comments

Comments
 (0)