Skip to content

[SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds #3120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.addBytesRead(inputMetrics.bytesRead)

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ private[spark] class Executor(
if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
metrics.updateInputMetrics()
metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
if (isLocal) {
// JobProgressListener will hold an reference of it during
Expand Down
75 changes: 73 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.spark.executor

import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.executor.DataReadMethod
import org.apache.spark.executor.DataReadMethod.DataReadMethod

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -80,7 +85,17 @@ class TaskMetrics extends Serializable {
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
* are stored here.
*/
var inputMetrics: Option[InputMetrics] = None
private var _inputMetrics: Option[InputMetrics] = None

def inputMetrics = _inputMetrics

/**
* This should only be used when recreating TaskMetrics, not when updating input metrics in
* executors
*/
private[spark] def setInputMetrics(inputMetrics: Option[InputMetrics]) {
_inputMetrics = inputMetrics
}

/**
* If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
Expand Down Expand Up @@ -133,6 +148,30 @@ class TaskMetrics extends Serializable {
readMetrics
}

/**
* Returns the input metrics object that the task should use. Currently, if
* there exists an input metric with the same readMethod, we return that one
* so the caller can accumulate bytes read. If the readMethod is different
* than previously seen by this task, we return a new InputMetric but don't
* record it.
*
* Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed,
* we can store all the different inputMetrics (one per readMethod).
*/
private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod):
InputMetrics =synchronized {
_inputMetrics match {
case None =>
val metrics = new InputMetrics(readMethod)
_inputMetrics = Some(metrics)
metrics
case Some(metrics @ InputMetrics(method)) if method == readMethod =>
metrics
case Some(InputMetrics(method)) =>
new InputMetrics(readMethod)
}
}

/**
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
*/
Expand All @@ -146,6 +185,10 @@ class TaskMetrics extends Serializable {
}
_shuffleReadMetrics = Some(merged)
}

private[spark] def updateInputMetrics() = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in your next pr, can u fix this by adding a return type explicitly?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this follows the method above: updateShuffleReadMetrics that doesn't have a return type. Should I change both then?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be great to do that!

inputMetrics.foreach(_.updateBytesRead())
}
}

private[spark] object TaskMetrics {
Expand Down Expand Up @@ -179,10 +222,38 @@ object DataWriteMethod extends Enumeration with Serializable {
*/
@DeveloperApi
case class InputMetrics(readMethod: DataReadMethod.Value) {

private val _bytesRead: AtomicLong = new AtomicLong()

/**
* Total bytes read.
*/
var bytesRead: Long = 0L
def bytesRead: Long = _bytesRead.get()
@volatile @transient var bytesReadCallback: Option[() => Long] = None

/**
* Adds additional bytes read for this read method.
*/
def addBytesRead(bytes: Long) = {
_bytesRead.addAndGet(bytes)
}

/**
* Invoke the bytesReadCallback and mutate bytesRead.
*/
def updateBytesRead() {
bytesReadCallback.foreach { c =>
_bytesRead.set(c())
}
}

/**
* Register a function that can be called to get up-to-date information on how many bytes the task
* has read from an input source.
*/
def setBytesReadCallback(f: Option[() => Long]) {
bytesReadCallback = f
}
}

/**
Expand Down
39 changes: 13 additions & 26 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,18 +213,19 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()

val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
val inputMetrics = context.taskMetrics
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
} else {
None
}
if (bytesReadCallback.isDefined) {
context.taskMetrics.inputMetrics = Some(inputMetrics)
}
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
split.inputSplit.value match {
case split: FileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf)
case _ => None
}
)
inputMetrics.setBytesReadCallback(bytesReadCallback)

var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
Expand All @@ -237,40 +238,26 @@ class HadoopRDD[K, V](
val key: K = reader.createKey()
val value: V = reader.createValue()

var recordsSinceMetricsUpdate = 0

override def getNext() = {
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
}

// Update bytes read metric every few records
if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
&& bytesReadCallback.isDefined) {
recordsSinceMetricsUpdate = 0
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
} else {
recordsSinceMetricsUpdate += 1
}
(key, value)
}

override def close() {
try {
reader.close()
if (bytesReadCallback.isDefined) {
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
inputMetrics.updateBytesRead()
} else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.bytesRead = split.inputSplit.value.getLength
context.taskMetrics.inputMetrics = Some(inputMetrics)
inputMetrics.addBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
Expand Down
40 changes: 13 additions & 27 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,19 @@ class NewHadoopRDD[K, V](
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value

val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
val inputMetrics = context.taskMetrics
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf)
} else {
None
}
if (bytesReadCallback.isDefined) {
context.taskMetrics.inputMetrics = Some(inputMetrics)
}
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
split.serializableHadoopSplit.value match {
case split: FileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, conf)
case _ => None
}
)
inputMetrics.setBytesReadCallback(bytesReadCallback)

val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
Expand Down Expand Up @@ -153,34 +154,19 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false

// Update bytes read metric every few records
if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
&& bytesReadCallback.isDefined) {
recordsSinceMetricsUpdate = 0
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
} else {
recordsSinceMetricsUpdate += 1
}

(reader.getCurrentKey, reader.getCurrentValue)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done intentionally to help keep the callback updates out of the InputMetrics class and isolate it to Hadoop RDD. This notion of callbacks makes the InputMetrics class more complicated and mutable. Since it's an exposed class we really wanted to keep the interface clean and simple, even if it meant some extra engineering in HadoopRDD. So could this part of the change be reverted back to how it was before (and you don't change the InputMetrics/TaskMetrics classes?).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pwendell There is a long thread in this pr between @sryza and @kayousterhout about why we need to add the call back to the input metrics. The reason is to prevent clobbering between different HadoopRdds. For example CartesianRdd - this is why there is a specific unit test for that case. I don't think we can do anything correctly if we don't have the callbacks in the inputMetrics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that's fine then. I looked and it's all private[spark] so actually there is no change to visibility.


private def close() {
try {
reader.close()

// Update metrics with final amount
if (bytesReadCallback.isDefined) {
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength
context.taskMetrics.inputMetrics = Some(inputMetrics)
inputMetrics.addBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[spark] class BlockResult(
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
inputMetrics.bytesRead = bytes
inputMetrics.addBytesRead(bytes)
}

/**
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,8 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson))
metrics.shuffleWriteMetrics =
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
metrics.inputMetrics =
Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)
metrics.setInputMetrics(
Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson))
metrics.outputMetrics =
Utils.jsonOption(json \ "Output Metrics").map(outputMetricsFromJson)
metrics.updatedBlocks =
Expand Down Expand Up @@ -638,7 +638,7 @@ private[spark] object JsonProtocol {
def inputMetricsFromJson(json: JValue): InputMetrics = {
val metrics = new InputMetrics(
DataReadMethod.withName((json \ "Data Read Method").extract[String]))
metrics.bytesRead = (json \ "Bytes Read").extract[Long]
metrics.addBytesRead((json \ "Bytes Read").extract[Long])
metrics
}

Expand Down
Loading