Skip to content

Commit 1bb78b1

Browse files
author
Kostas Sakellis
committed
[SPARK-4874] [CORE] Collect record count metrics
Collects record counts for both Input/Output and Shuffle Metrics. For the input/output metrics, it just appends the counter everytime the iterators get accessed. For shuffle on the write side, we count the metrics post aggregation (after a map side combine) and on the read side we count the metrics pre aggregation. This allows both the bytes read/written metrics and the records read/written to line up. For backwards compatibiliy, if we deserialize an older event that doesn't have record metrics, we set the metric to -1.
1 parent 6d3b7cb commit 1bb78b1

19 files changed

+435
-67
lines changed

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark
1919

20+
import org.apache.spark.util.AfterNextInterceptingIterator
21+
2022
import scala.collection.mutable
2123
import scala.collection.mutable.ArrayBuffer
2224

@@ -49,7 +51,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4951
.getInputMetricsForReadMethod(inputMetrics.readMethod)
5052
existingMetrics.addBytesRead(inputMetrics.bytesRead)
5153

52-
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
54+
val iter = blockResult.data.asInstanceOf[Iterator[T]]
55+
new InterruptibleIterator(context, AfterNextInterceptingIterator(iter, (next: T) => {
56+
existingMetrics.addRecordsRead(1)
57+
next
58+
}))
5359

5460
case None =>
5561
// Acquire a lock for loading this partition

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ class TaskMetrics extends Serializable {
201201
merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
202202
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
203203
merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
204+
merged.recordsRead += depMetrics.recordsRead
204205
}
205206
_shuffleReadMetrics = Some(merged)
206207
}
@@ -243,11 +244,17 @@ object DataWriteMethod extends Enumeration with Serializable {
243244
case class InputMetrics(readMethod: DataReadMethod.Value) {
244245

245246
private val _bytesRead: AtomicLong = new AtomicLong()
247+
private val _recordsRead: AtomicLong = new AtomicLong()
246248

247249
/**
248250
* Total bytes read.
249251
*/
250252
def bytesRead: Long = _bytesRead.get()
253+
254+
/**
255+
* Total records read.
256+
*/
257+
def recordsRead: Long = _recordsRead.get()
251258
@volatile @transient var bytesReadCallback: Option[() => Long] = None
252259

253260
/**
@@ -257,6 +264,10 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
257264
_bytesRead.addAndGet(bytes)
258265
}
259266

267+
def addRecordsRead(records: Long) = {
268+
_recordsRead.addAndGet(records)
269+
}
270+
260271
/**
261272
* Invoke the bytesReadCallback and mutate bytesRead.
262273
*/
@@ -287,6 +298,11 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
287298
private var _bytesWritten: Long = _
288299
def bytesWritten = _bytesWritten
289300
private[spark] def setBytesWritten(value : Long) = _bytesWritten = value
301+
302+
/**
303+
* Total records written
304+
*/
305+
var recordsWritten: Long = 0L
290306
}
291307

292308
/**
@@ -334,6 +350,11 @@ class ShuffleReadMetrics extends Serializable {
334350
* Number of blocks fetched in this shuffle by this task (remote or local)
335351
*/
336352
def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched
353+
354+
/**
355+
* Total number of records read from the shuffle by this task
356+
*/
357+
var recordsRead: Long = _
337358
}
338359

339360
/**
@@ -358,5 +379,8 @@ class ShuffleWriteMetrics extends Serializable {
358379
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
359380
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
360381

361-
382+
/**
383+
* Total number of records written from the shuffle by this task
384+
*/
385+
var recordsWritten: Long = _
362386
}

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.broadcast.Broadcast
4444
import org.apache.spark.deploy.SparkHadoopUtil
4545
import org.apache.spark.executor.DataReadMethod
4646
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
47-
import org.apache.spark.util.{NextIterator, Utils}
47+
import org.apache.spark.util.{AfterNextInterceptingIterator, NextIterator, Utils}
4848
import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
4949
import org.apache.spark.storage.StorageLevel
5050

@@ -247,7 +247,9 @@ class HadoopRDD[K, V](
247247
case eof: EOFException =>
248248
finished = true
249249
}
250-
250+
if (!finished) {
251+
inputMetrics.addRecordsRead(1)
252+
}
251253
(key, value)
252254
}
253255

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,9 @@ class NewHadoopRDD[K, V](
151151
throw new java.util.NoSuchElementException("End of stream")
152152
}
153153
havePair = false
154-
154+
if (!finished) {
155+
inputMetrics.addRecordsRead(1)
156+
}
155157
(reader.getCurrentKey, reader.getCurrentValue)
156158
}
157159

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
3434
import org.apache.hadoop.io.compress.CompressionCodec
3535
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
3636
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat,
37-
RecordWriter => NewRecordWriter}
37+
RecordWriter => NewRecordWriter}
3838

3939
import org.apache.spark._
4040
import org.apache.spark.Partitioner.defaultPartitioner
@@ -993,8 +993,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
993993
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
994994

995995
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
996+
var recordsWritten = 0L
996997
try {
997-
var recordsWritten = 0L
998998
while (iter.hasNext) {
999999
val pair = iter.next()
10001000
writer.write(pair._1, pair._2)
@@ -1008,6 +1008,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10081008
}
10091009
committer.commitTask(hadoopContext)
10101010
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
1011+
outputMetrics.recordsWritten = recordsWritten
10111012
1
10121013
} : Int
10131014

@@ -1065,8 +1066,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10651066

10661067
writer.setup(context.stageId, context.partitionId, taskAttemptId)
10671068
writer.open()
1069+
var recordsWritten = 0L
10681070
try {
1069-
var recordsWritten = 0L
10701071
while (iter.hasNext) {
10711072
val record = iter.next()
10721073
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
@@ -1080,6 +1081,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10801081
}
10811082
writer.commit()
10821083
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
1084+
outputMetrics.recordsWritten = recordsWritten
10831085
}
10841086

10851087
self.context.runJob(self, writeToFile)
@@ -1089,17 +1091,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10891091
private def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, Option[() => Long]) = {
10901092
val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
10911093
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
1092-
if (bytesWrittenCallback.isDefined) {
1093-
context.taskMetrics.outputMetrics = Some(outputMetrics)
1094-
}
1094+
context.taskMetrics.outputMetrics = Some(outputMetrics)
10951095
(outputMetrics, bytesWrittenCallback)
10961096
}
10971097

10981098
private def maybeUpdateOutputMetrics(bytesWrittenCallback: Option[() => Long],
10991099
outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
1100-
if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
1101-
&& bytesWrittenCallback.isDefined) {
1100+
if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
11021101
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
1102+
outputMetrics.recordsWritten = recordsWritten
11031103
}
11041104
}
11051105

core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark._
2525
import org.apache.spark.serializer.Serializer
2626
import org.apache.spark.shuffle.FetchFailedException
2727
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
28-
import org.apache.spark.util.CompletionIterator
28+
import org.apache.spark.util.{InterceptingIterator, CompletionIterator}
2929

3030
private[hash] object BlockStoreShuffleFetcher extends Logging {
3131
def fetch[T](
@@ -82,7 +82,15 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
8282
SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
8383
val itr = blockFetcherItr.flatMap(unpackBlock)
8484

85-
val completionIter = CompletionIterator[T, Iterator[T]](itr, {
85+
val itr2 = new InterceptingIterator[T](itr) {
86+
val readMetrics = context.taskMetrics().createShuffleReadMetricsForDependency()
87+
override def afterNext(next: T) : T = {
88+
readMetrics.recordsRead += 1
89+
next
90+
}
91+
}
92+
93+
val completionIter = CompletionIterator[T, Iterator[T]](itr2, {
8694
context.taskMetrics.updateShuffleReadMetrics()
8795
})
8896

core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ private[spark] class DiskBlockObjectWriter(
117117

118118
/** Calling channel.position() to update the write metrics can be a little bit expensive, so we
119119
* only call it every N writes */
120-
private var writesSinceMetricsUpdate = 0
120+
private var numRecordsWritten = 0
121121

122122
override def open(): BlockObjectWriter = {
123123
fos = new FileOutputStream(file, true)
@@ -168,6 +168,7 @@ private[spark] class DiskBlockObjectWriter(
168168
override def revertPartialWritesAndClose() {
169169
try {
170170
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
171+
writeMetrics.recordsWritten -= numRecordsWritten
171172

172173
if (initialized) {
173174
objOut.flush()
@@ -193,12 +194,11 @@ private[spark] class DiskBlockObjectWriter(
193194
}
194195

195196
objOut.writeObject(value)
197+
numRecordsWritten += 1
198+
writeMetrics.recordsWritten += 1
196199

197-
if (writesSinceMetricsUpdate == 32) {
198-
writesSinceMetricsUpdate = 0
200+
if (numRecordsWritten % 32 == 0) {
199201
updateBytesWritten()
200-
} else {
201-
writesSinceMetricsUpdate += 1
202202
}
203203
}
204204

core/src/main/scala/org/apache/spark/ui/ToolTips.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ private[spark] object ToolTips {
3030
"Time that the task spent blocked waiting for shuffle data to be read from remote machines."
3131

3232
val INPUT = "Bytes read from Hadoop or from Spark storage."
33+
val INPUT_RECORDS = "Number of records read from Hadoop or from Spark storage."
3334

3435
val OUTPUT = "Bytes written to Hadoop."
36+
val OUTPUT_RECORDS = "Number of records written to Hadoop."
3537

3638
val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage."
3739

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
4848
val executorToTasksFailed = HashMap[String, Int]()
4949
val executorToDuration = HashMap[String, Long]()
5050
val executorToInputBytes = HashMap[String, Long]()
51+
val executorToInputRecords = HashMap[String, Long]()
5152
val executorToOutputBytes = HashMap[String, Long]()
53+
val executorToOutputRecords = HashMap[String, Long]()
5254
val executorToShuffleRead = HashMap[String, Long]()
5355
val executorToShuffleWrite = HashMap[String, Long]()
5456

@@ -78,10 +80,14 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
7880
metrics.inputMetrics.foreach { inputMetrics =>
7981
executorToInputBytes(eid) =
8082
executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
83+
executorToInputRecords(eid) =
84+
executorToInputRecords.getOrElse(eid, 0L) + inputMetrics.recordsRead
8185
}
8286
metrics.outputMetrics.foreach { outputMetrics =>
8387
executorToOutputBytes(eid) =
8488
executorToOutputBytes.getOrElse(eid, 0L) + outputMetrics.bytesWritten
89+
executorToOutputRecords(eid) =
90+
executorToOutputRecords.getOrElse(eid, 0L) + outputMetrics.recordsWritten
8591
}
8692
metrics.shuffleReadMetrics.foreach { shuffleRead =>
8793
executorToShuffleRead(eid) =

core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
4545
<th>Failed Tasks</th>
4646
<th>Succeeded Tasks</th>
4747
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
48+
<th><span data-toggle="tooltip" title={ToolTips.INPUT_RECORDS}>Input Records</span></th>
4849
<th><span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output</span></th>
50+
<th><span data-toggle="tooltip" title={ToolTips.OUTPUT_RECORDS}>Output Records</span></th>
4951
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
5052
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}>Shuffle Write</span></th>
5153
<th>Shuffle Spill (Memory)</th>
@@ -78,8 +80,12 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
7880
<td>{v.succeededTasks}</td>
7981
<td sorttable_customkey={v.inputBytes.toString}>
8082
{Utils.bytesToString(v.inputBytes)}</td>
83+
<td sorttable_customkey={v.inputRecords.toString}>
84+
{v.inputRecords}</td>
8185
<td sorttable_customkey={v.outputBytes.toString}>
8286
{Utils.bytesToString(v.outputBytes)}</td>
87+
<td sorttable_customkey={v.outputRecords.toString}>
88+
{v.outputRecords}</td>
8389
<td sorttable_customkey={v.shuffleRead.toString}>
8490
{Utils.bytesToString(v.shuffleRead)}</td>
8591
<td sorttable_customkey={v.shuffleWrite.toString}>

0 commit comments

Comments
 (0)