Skip to content

SPARK-3179. Add task OutputMetrics. #2968

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy

import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -133,14 +134,9 @@ class SparkHadoopUtil extends Logging {
*/
private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
: Option[() => Long] = {
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
val scheme = qualifiedPath.toUri().getScheme()
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
try {
val threadStats = stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
val statisticsDataClass =
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
val getBytesReadMethod = statisticsDataClass.getDeclaredMethod("getBytesRead")
val threadStats = getFileSystemThreadStatistics(path, conf)
val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesRead = f()
Some(() => f() - baselineBytesRead)
Expand All @@ -151,6 +147,42 @@ class SparkHadoopUtil extends Logging {
}
}
}

/**
* Returns a function that can be called to find Hadoop FileSystem bytes written. If
* getFSBytesWrittenOnThreadCallback is called from thread r at time t, the returned callback will
* return the bytes written on r since t. Reflection is required because thread-level FileSystem
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
* Returns None if the required method can't be found.
*/
private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration)
: Option[() => Long] = {
try {
val threadStats = getFileSystemThreadStatistics(path, conf)
val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesWritten = f()
Some(() => f() - baselineBytesWritten)
} catch {
case e: NoSuchMethodException => {
logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
None
}
}
}

private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
val scheme = qualifiedPath.toUri().getScheme()
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
}

private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
val statisticsDataClass =
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
statisticsDataClass.getDeclaredMethod(methodName)
}
}

object SparkHadoopUtil {
Expand Down
28 changes: 28 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ class TaskMetrics extends Serializable {
*/
var inputMetrics: Option[InputMetrics] = None

/**
* If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
* data was written are stored here.
*/
var outputMetrics: Option[OutputMetrics] = None

/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
* This includes read metrics aggregated over all the task's shuffle dependencies.
Expand Down Expand Up @@ -157,6 +163,16 @@ object DataReadMethod extends Enumeration with Serializable {
val Memory, Disk, Hadoop, Network = Value
}

/**
* :: DeveloperApi ::
* Method by which output data was written.
*/
@DeveloperApi
object DataWriteMethod extends Enumeration with Serializable {
type DataWriteMethod = Value
val Hadoop = Value
}

/**
* :: DeveloperApi ::
* Metrics about reading input data.
Expand All @@ -169,6 +185,18 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
var bytesRead: Long = 0L
}

/**
* :: DeveloperApi ::
* Metrics about writing output data.
*/
@DeveloperApi
case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
/**
* Total bytes written
*/
var bytesWritten: Long = 0L
}

/**
* :: DeveloperApi ::
* Metrics pertaining to shuffle data read in a given task.
Expand Down
51 changes: 48 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.reflect.ClassTag

import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
Expand All @@ -40,6 +40,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.{DataWriteMethod, OutputMetrics}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -961,30 +962,40 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}

val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
val config = wrappedConf.value
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val hadoopContext = newTaskAttemptContext(config, attemptId)
val format = outfmt.newInstance
format match {
case c: Configurable => c.setConf(wrappedConf.value)
case c: Configurable => c.setConf(config)
case _ => ()
}
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)

val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)

val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
try {
var recordsWritten = 0L
while (iter.hasNext) {
val pair = iter.next()
writer.write(pair._1, pair._2)

// Update bytes written metric every few records
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
recordsWritten += 1
}
} finally {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
1
} : Int

Expand All @@ -1005,6 +1016,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def saveAsHadoopDataset(conf: JobConf) {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val wrappedConf = new SerializableWritable(hadoopConf)
val outputFormatInstance = hadoopConf.getOutputFormat
val keyClass = hadoopConf.getOutputKeyClass
val valueClass = hadoopConf.getOutputValueClass
Expand Down Expand Up @@ -1032,27 +1044,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.preSetup()

val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
val config = wrappedConf.value
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt

val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)

writer.setup(context.stageId, context.partitionId, attemptNumber)
writer.open()
try {
var recordsWritten = 0L
while (iter.hasNext) {
val record = iter.next()
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])

// Update bytes written metric every few records
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
recordsWritten += 1
}
} finally {
writer.close()
}
writer.commit()
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
}

self.context.runJob(self, writeToFile)
writer.commitJob()
}

private def initHadoopOutputMetrics(context: TaskContext, config: Configuration)
: (OutputMetrics, Option[() => Long]) = {
val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir"))
.map(new Path(_))
.flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config))
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
if (bytesWrittenCallback.isDefined) {
context.taskMetrics.outputMetrics = Some(outputMetrics)
}
(outputMetrics, bytesWrittenCallback)
}

private def maybeUpdateOutputMetrics(bytesWrittenCallback: Option[() => Long],
outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
&& bytesWrittenCallback.isDefined) {
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
}
}

/**
* Return an RDD with the keys of each tuple.
*/
Expand All @@ -1069,3 +1110,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord)
}

private[spark] object PairRDDFunctions {
val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
" INPUT_BYTES=" + metrics.bytesRead
case None => ""
}
val outputMetrics = taskMetrics.outputMetrics match {
case Some(metrics) =>
" OUTPUT_BYTES=" + metrics.bytesWritten
case None => ""
}
val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
case Some(metrics) =>
" BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
Expand All @@ -173,7 +178,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
" SHUFFLE_WRITE_TIME=" + metrics.shuffleWriteTime
case None => ""
}
stageLogInfo(stageId, status + info + executorRunTime + gcTime + inputMetrics +
stageLogInfo(stageId, status + info + executorRunTime + gcTime + inputMetrics + outputMetrics +
shuffleReadMetrics + writeMetrics)
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/ToolTips.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ private[spark] object ToolTips {

val INPUT = "Bytes read from Hadoop or from Spark storage."

val OUTPUT = "Bytes written to Hadoop."

val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage."

val SHUFFLE_READ =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
val executorToTasksFailed = HashMap[String, Int]()
val executorToDuration = HashMap[String, Long]()
val executorToInputBytes = HashMap[String, Long]()
val executorToOutputBytes = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()

Expand Down Expand Up @@ -78,6 +79,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
executorToInputBytes(eid) =
executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
}
metrics.outputMetrics.foreach { outputMetrics =>
executorToOutputBytes(eid) =
executorToOutputBytes.getOrElse(eid, 0L) + outputMetrics.bytesWritten
}
metrics.shuffleReadMetrics.foreach { shuffleRead =>
executorToShuffleRead(eid) =
executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobPr
<th>Failed Tasks</th>
<th>Succeeded Tasks</th>
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
<th><span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output</span></th>
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}>Shuffle Write</span></th>
<th>Shuffle Spill (Memory)</th>
Expand Down Expand Up @@ -77,6 +78,8 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobPr
<td>{v.succeededTasks}</td>
<td sorttable_customkey={v.inputBytes.toString}>
{Utils.bytesToString(v.inputBytes)}</td>
<td sorttable_customkey={v.outputBytes.toString}>
{Utils.bytesToString(v.outputBytes)}</td>
<td sorttable_customkey={v.shuffleRead.toString}>
{Utils.bytesToString(v.shuffleRead)}</td>
<td sorttable_customkey={v.shuffleWrite.toString}>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageData.inputBytes += inputBytesDelta
execSummary.inputBytes += inputBytesDelta

val outputBytesDelta =
(taskMetrics.outputMetrics.map(_.bytesWritten).getOrElse(0L)
- oldMetrics.flatMap(_.outputMetrics).map(_.bytesWritten).getOrElse(0L))
stageData.outputBytes += outputBytesDelta
execSummary.outputBytes += outputBytesDelta

val diskSpillDelta =
taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
stageData.diskBytesSpilled += diskSpillDelta
Expand Down
Loading