Skip to content

[SPARK-3288] All fields in TaskMetrics should be private and use getters/setters #4020

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ case class Aggregator[K, V, C] (
// Update task metrics if context is not null
// TODO: Make context non optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
}
combiners.iterator
}
Expand Down Expand Up @@ -95,8 +95,8 @@ case class Aggregator[K, V, C] (
// Update task metrics if context is not null
// TODO: Make context non-optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
}
combiners.iterator
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ private[spark] class PythonRDD(
init, finish))
val memoryBytesSpilled = stream.readLong()
val diskBytesSpilled = stream.readLong()
context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += diskBytesSpilled
context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)
read()
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
// Signals that an exception has been thrown in python
Expand Down
19 changes: 10 additions & 9 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ private[spark] class Executor(
val afterSerialization = System.currentTimeMillis()

for (m <- task.metrics) {
m.executorDeserializeTime = taskStart - deserializeStartTime
m.executorRunTime = taskFinish - taskStart
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = afterSerialization - beforeSerialization
m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
m.setExecutorRunTime(taskFinish - taskStart)
m.setJvmGCTime(gcTime - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
}

val accumUpdates = Accumulators.values
Expand Down Expand Up @@ -257,8 +257,8 @@ private[spark] class Executor(
val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
for (m <- metrics) {
m.executorRunTime = serviceTime
m.jvmGCTime = gcTime - startGCTime
m.setExecutorRunTime(serviceTime)
m.setJvmGCTime(gcTime - startGCTime)
}
val reason = new ExceptionFailure(t, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
Expand Down Expand Up @@ -376,11 +376,12 @@ private[spark] class Executor(
val curGCTime = gcTime

for (taskRunner <- runningTasks.values()) {
if (!taskRunner.attemptedTask.isEmpty) {
if (taskRunner.attemptedTask.nonEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
metrics.updateShuffleReadMetrics()
metrics.updateInputMetrics()
metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)

if (isLocal) {
// JobProgressListener will hold an reference of it during
// onExecutorMetricsUpdate(), then JobProgressListener can not see
Expand Down
103 changes: 73 additions & 30 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,42 +44,62 @@ class TaskMetrics extends Serializable {
/**
* Host's name the task runs on
*/
var hostname: String = _

private var _hostname: String = _
def hostname = _hostname
private[spark] def setHostname(value : String) = _hostname = value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be "value: String" with no space (not sure why our style checks didn't catch this).


/**
* Time taken on the executor to deserialize this task
*/
var executorDeserializeTime: Long = _

private var _executorDeserializeTime: Long = _
def executorDeserializeTime = _executorDeserializeTime
private[spark] def setExecutorDeserializeTime(value: Long) = _executorDeserializeTime = value


/**
* Time the executor spends actually running the task (including fetching shuffle data)
*/
var executorRunTime: Long = _

private var _executorRunTime: Long = _
def executorRunTime = _executorRunTime
private[spark] def setExecutorRunTime(value: Long) = _executorRunTime = value

/**
* The number of bytes this task transmitted back to the driver as the TaskResult
*/
var resultSize: Long = _
private var _resultSize: Long = _
def resultSize = _resultSize
private[spark] def setResultSize(value: Long) = _resultSize = value


/**
* Amount of time the JVM spent in garbage collection while executing this task
*/
var jvmGCTime: Long = _
private var _jvmGCTime: Long = _
def jvmGCTime = _jvmGCTime
private[spark] def setJvmGCTime(value: Long) = _jvmGCTime = value

/**
* Amount of time spent serializing the task result
*/
var resultSerializationTime: Long = _
private var _resultSerializationTime: Long = _
def resultSerializationTime = _resultSerializationTime
private[spark] def setResultSerializationTime(value: Long) = _resultSerializationTime = value

/**
* The number of in-memory bytes spilled by this task
*/
var memoryBytesSpilled: Long = _
private var _memoryBytesSpilled: Long = _
def memoryBytesSpilled = _memoryBytesSpilled
private[spark] def incMemoryBytesSpilled(value: Long) = _memoryBytesSpilled += value
private[spark] def decMemoryBytesSpilled(value: Long) = _memoryBytesSpilled -= value

/**
* The number of on-disk bytes spilled by this task
*/
var diskBytesSpilled: Long = _
private var _diskBytesSpilled: Long = _
def diskBytesSpilled = _diskBytesSpilled
def incDiskBytesSpilled(value: Long) = _diskBytesSpilled += value
def decDiskBytesSpilled(value: Long) = _diskBytesSpilled -= value

/**
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
Expand Down Expand Up @@ -178,10 +198,10 @@ class TaskMetrics extends Serializable {
private[spark] def updateShuffleReadMetrics() = synchronized {
val merged = new ShuffleReadMetrics()
for (depMetrics <- depsShuffleReadMetrics) {
merged.fetchWaitTime += depMetrics.fetchWaitTime
merged.localBlocksFetched += depMetrics.localBlocksFetched
merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched
merged.remoteBytesRead += depMetrics.remoteBytesRead
merged.incFetchWaitTime(depMetrics.fetchWaitTime)
merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
}
_shuffleReadMetrics = Some(merged)
}
Expand Down Expand Up @@ -265,7 +285,9 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
/**
* Total bytes written
*/
var bytesWritten: Long = 0L
private var _bytesWritten: Long = _
def bytesWritten = _bytesWritten
private[spark] def setBytesWritten(value : Long) = _bytesWritten = value
}

/**
Expand All @@ -274,32 +296,45 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
*/
@DeveloperApi
class ShuffleReadMetrics extends Serializable {
/**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched

/**
* Number of remote blocks fetched in this shuffle by this task
*/
var remoteBlocksFetched: Int = _

private var _remoteBlocksFetched: Int = _
def remoteBlocksFetched = _remoteBlocksFetched
private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
private[spark] def defRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value

/**
* Number of local blocks fetched in this shuffle by this task
*/
var localBlocksFetched: Int = _
private var _localBlocksFetched: Int = _
def localBlocksFetched = _localBlocksFetched
private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
private[spark] def defLocalBlocksFetched(value: Int) = _localBlocksFetched -= value


/**
* Time the task spent waiting for remote shuffle blocks. This only includes the time
* blocking on shuffle input data. For instance if block B is being fetched while the task is
* still not finished processing block A, it is not considered to be blocking on block B.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make these AtomicLong's so that the incrementing can be threadsafe. I have a pr out that does this:
https://github.com/apache/spark/pull/3120/files#diff-1bd3dc38f6306e0a822f93d62c32b1d0R226 for input metrics. I'd be good to do this throughout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_shuffleWriteTime can only get incremented from a single thread. It's marked volatile so that other threads can read from it. Using an AtomicLong would unnecessarily bloat the size of the task results. I probably should have documented this access pattern better.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, but it is hard to guarantee that only one thread will increment the value. We could mark the class as not thread safe by docs but it might be a ticking time bomb. Is the overhead of AtomicLong that concerning to risk concurrency issues down the line?

Speaking of shuffleMetrics, can we get rid of the array of shuffleReadMetrics (depsShuffleReadMetrics) and the merge step in favor of using AtomicLongs in ShuffleReadMetrics? That way the TaskMetrics can just return the same threadsafe ShuffleReadMetrics to the tasks and there wouldn't need to be a need to call updateShuffleReadMetrics periodically in the Executor heartbeat code.

*/
var fetchWaitTime: Long = _

private var _fetchWaitTime: Long = _
def fetchWaitTime = _fetchWaitTime
private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value

/**
* Total number of remote bytes read from the shuffle by this task
*/
var remoteBytesRead: Long = _
private var _remoteBytesRead: Long = _
def remoteBytesRead = _remoteBytesRead
private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value

/**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched
}

/**
Expand All @@ -311,10 +346,18 @@ class ShuffleWriteMetrics extends Serializable {
/**
* Number of bytes written for the shuffle by this task
*/
@volatile var shuffleBytesWritten: Long = _

@volatile private var _shuffleBytesWritten: Long = _
def shuffleBytesWritten = _shuffleBytesWritten
private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value

/**
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
*/
@volatile var shuffleWriteTime: Long = _
@volatile private var _shuffleWriteTime: Long = _
def shuffleWriteTime= _shuffleWriteTime
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value


}
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
for ((it, depNum) <- rddIterators) {
map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
context.taskMetrics.memoryBytesSpilled += map.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += map.diskBytesSpilled
context.taskMetrics.incMemoryBytesSpilled(map.memoryBytesSpilled)
context.taskMetrics.incDiskBytesSpilled(map.diskBytesSpilled)
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ class HadoopRDD[K, V](
case eof: EOFException =>
finished = true
}

(key, value)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false

(reader.getCurrentKey, reader.getCurrentValue)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
1
} : Int

Expand Down Expand Up @@ -1079,7 +1079,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.close()
}
writer.commit()
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
}

self.context.runJob(self, writeToFile)
Expand All @@ -1102,7 +1102,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
&& bytesWrittenCallback.isDefined) {
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
TaskContextHelper.setTaskContext(context)
context.taskMetrics.hostname = Utils.localHostName()
context.taskMetrics.setHostname(Utils.localHostName())
taskThread = Thread.currentThread()
if (_killed) {
kill(interruptThread = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
(deserializedResult, size)
}

result.metrics.resultSize = size
result.metrics.setResultSize(size)
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
} catch {
case cnf: ClassNotFoundException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ private[spark] class HashShuffleReader[K, C](
// the ExternalSorter won't spill to disk.
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
sorter.insertAll(aggregatedIter)
context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
sorter.iterator
case None =>
aggregatedIter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,14 @@ private[spark] class DiskBlockObjectWriter(
}
finalPosition = file.length()
// In certain compression codecs, more bytes are written after close() is called
writeMetrics.shuffleBytesWritten += (finalPosition - reportedPosition)
writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
}

// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
override def revertPartialWritesAndClose() {
try {
writeMetrics.shuffleBytesWritten -= (reportedPosition - initialPosition)
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)

if (initialized) {
objOut.flush()
Expand Down Expand Up @@ -212,14 +212,14 @@ private[spark] class DiskBlockObjectWriter(
*/
private def updateBytesWritten() {
val pos = channel.position()
writeMetrics.shuffleBytesWritten += (pos - reportedPosition)
writeMetrics.incShuffleBytesWritten(pos - reportedPosition)
reportedPosition = pos
}

private def callWithTiming(f: => Unit) = {
val start = System.nanoTime()
f
writeMetrics.shuffleWriteTime += (System.nanoTime() - start)
writeMetrics.incShuffleWriteTime(System.nanoTime() - start)
}

// For testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ final class ShuffleBlockFetcherIterator(
// This needs to be released after use.
buf.retain()
results.put(new SuccessFetchResult(BlockId(blockId), sizeMap(blockId), buf))
shuffleMetrics.remoteBytesRead += buf.size
shuffleMetrics.remoteBlocksFetched += 1
shuffleMetrics.incRemoteBytesRead(buf.size)
shuffleMetrics.incRemoteBlocksFetched(1)
}
logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
}
Expand Down Expand Up @@ -233,7 +233,7 @@ final class ShuffleBlockFetcherIterator(
val blockId = iter.next()
try {
val buf = blockManager.getBlockData(blockId)
shuffleMetrics.localBlocksFetched += 1
shuffleMetrics.incLocalBlocksFetched(1)
buf.retain()
results.put(new SuccessFetchResult(blockId, 0, buf))
} catch {
Expand Down Expand Up @@ -277,7 +277,7 @@ final class ShuffleBlockFetcherIterator(
currentResult = results.take()
val result = currentResult
val stopFetchWait = System.currentTimeMillis()
shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait)
shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)

result match {
case SuccessFetchResult(_, size, _) => bytesInFlight -= size
Expand Down
Loading