Skip to content

Commit 9fa29a6

Browse files
Kostas Sakellispwendell
authored andcommitted
[SPARK-4874] [CORE] Collect record count metrics
Collects record counts for both Input/Output and Shuffle Metrics. For the input/output metrics, it just appends the counter every time the iterators get accessed. For shuffle on the write side, we count the metrics post aggregation (after a map side combine) and on the read side we count the metrics pre aggregation. This allows both the bytes read/written metrics and the records read/written to line up. For backwards compatibility, if we deserialize an older event that doesn't have record metrics, we set the metric to -1. Author: Kostas Sakellis <[email protected]> Closes #4067 from ksakellis/kostas-spark-4874 and squashes the following commits: bd919be [Kostas Sakellis] Changed 'Records Read' in shuffleReadMetrics json output to 'Total Records Read' dad4d57 [Kostas Sakellis] Add a comment and check to BlockObjectWriter so that it cannot be reopend. 6f236a1 [Kostas Sakellis] Renamed _recordsWritten in ShuffleWriteMetrics to be more consistent 70620a0 [Kostas Sakellis] CR Feedback 17faa3a [Kostas Sakellis] Removed AtomicLong in favour of using Long b6f9923 [Kostas Sakellis] Merge AfterNextInterceptingIterator with InterruptableIterator to save a function call 46c8186 [Kostas Sakellis] Combined Bytes and # records into one column 57551c1 [Kostas Sakellis] Conforms to SPARK-3288 6cdb44e [Kostas Sakellis] Removed the generic InterceptingIterator and repalced it with specific implementation 1aa273c [Kostas Sakellis] CR Feedback 1bb78b1 [Kostas Sakellis] [SPARK-4874] [CORE] Collect record count metrics (cherry picked from commit dcd1e42) Signed-off-by: Patrick Wendell <[email protected]>
1 parent 11dbf71 commit 9fa29a6

20 files changed

+548
-146
lines changed

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,15 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4747
val inputMetrics = blockResult.inputMetrics
4848
val existingMetrics = context.taskMetrics
4949
.getInputMetricsForReadMethod(inputMetrics.readMethod)
50-
existingMetrics.addBytesRead(inputMetrics.bytesRead)
51-
52-
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
50+
existingMetrics.incBytesRead(inputMetrics.bytesRead)
5351

52+
val iter = blockResult.data.asInstanceOf[Iterator[T]]
53+
new InterruptibleIterator[T](context, iter) {
54+
override def next(): T = {
55+
existingMetrics.incRecordsRead(1)
56+
delegate.next()
57+
}
58+
}
5459
case None =>
5560
// Acquire a lock for loading this partition
5661
// If another thread already holds the lock, wait for it to finish return its results

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -194,18 +194,19 @@ class TaskMetrics extends Serializable {
194194
/**
195195
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
196196
*/
197-
private[spark] def updateShuffleReadMetrics() = synchronized {
197+
private[spark] def updateShuffleReadMetrics(): Unit = synchronized {
198198
val merged = new ShuffleReadMetrics()
199199
for (depMetrics <- depsShuffleReadMetrics) {
200200
merged.incFetchWaitTime(depMetrics.fetchWaitTime)
201201
merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
202202
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
203203
merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
204+
merged.incRecordsRead(depMetrics.recordsRead)
204205
}
205206
_shuffleReadMetrics = Some(merged)
206207
}
207208

208-
private[spark] def updateInputMetrics() = synchronized {
209+
private[spark] def updateInputMetrics(): Unit = synchronized {
209210
inputMetrics.foreach(_.updateBytesRead())
210211
}
211212
}
@@ -242,27 +243,31 @@ object DataWriteMethod extends Enumeration with Serializable {
242243
@DeveloperApi
243244
case class InputMetrics(readMethod: DataReadMethod.Value) {
244245

245-
private val _bytesRead: AtomicLong = new AtomicLong()
246+
/**
247+
* This is volatile so that it is visible to the updater thread.
248+
*/
249+
@volatile @transient var bytesReadCallback: Option[() => Long] = None
246250

247251
/**
248252
* Total bytes read.
249253
*/
250-
def bytesRead: Long = _bytesRead.get()
251-
@volatile @transient var bytesReadCallback: Option[() => Long] = None
254+
private var _bytesRead: Long = _
255+
def bytesRead: Long = _bytesRead
256+
def incBytesRead(bytes: Long) = _bytesRead += bytes
252257

253258
/**
254-
* Adds additional bytes read for this read method.
259+
* Total records read.
255260
*/
256-
def addBytesRead(bytes: Long) = {
257-
_bytesRead.addAndGet(bytes)
258-
}
261+
private var _recordsRead: Long = _
262+
def recordsRead: Long = _recordsRead
263+
def incRecordsRead(records: Long) = _recordsRead += records
259264

260265
/**
261266
* Invoke the bytesReadCallback and mutate bytesRead.
262267
*/
263268
def updateBytesRead() {
264269
bytesReadCallback.foreach { c =>
265-
_bytesRead.set(c())
270+
_bytesRead = c()
266271
}
267272
}
268273

@@ -287,6 +292,13 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
287292
private var _bytesWritten: Long = _
288293
def bytesWritten = _bytesWritten
289294
private[spark] def setBytesWritten(value : Long) = _bytesWritten = value
295+
296+
/**
297+
* Total records written
298+
*/
299+
private var _recordsWritten: Long = 0L
300+
def recordsWritten = _recordsWritten
301+
private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value
290302
}
291303

292304
/**
@@ -301,16 +313,15 @@ class ShuffleReadMetrics extends Serializable {
301313
private var _remoteBlocksFetched: Int = _
302314
def remoteBlocksFetched = _remoteBlocksFetched
303315
private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
304-
private[spark] def defRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
316+
private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
305317

306318
/**
307319
* Number of local blocks fetched in this shuffle by this task
308320
*/
309321
private var _localBlocksFetched: Int = _
310322
def localBlocksFetched = _localBlocksFetched
311323
private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
312-
private[spark] def defLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
313-
324+
private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
314325

315326
/**
316327
* Time the task spent waiting for remote shuffle blocks. This only includes the time
@@ -334,6 +345,14 @@ class ShuffleReadMetrics extends Serializable {
334345
* Number of blocks fetched in this shuffle by this task (remote or local)
335346
*/
336347
def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched
348+
349+
/**
350+
* Total number of records read from the shuffle by this task
351+
*/
352+
private var _recordsRead: Long = _
353+
def recordsRead = _recordsRead
354+
private[spark] def incRecordsRead(value: Long) = _recordsRead += value
355+
private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
337356
}
338357

339358
/**
@@ -358,5 +377,12 @@ class ShuffleWriteMetrics extends Serializable {
358377
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
359378
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
360379

361-
380+
/**
381+
* Total number of records written to the shuffle by this task
382+
*/
383+
@volatile private var _shuffleRecordsWritten: Long = _
384+
def shuffleRecordsWritten = _shuffleRecordsWritten
385+
private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
386+
private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
387+
private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
362388
}

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,9 @@ class HadoopRDD[K, V](
247247
case eof: EOFException =>
248248
finished = true
249249
}
250-
250+
if (!finished) {
251+
inputMetrics.incRecordsRead(1)
252+
}
251253
(key, value)
252254
}
253255

@@ -261,7 +263,7 @@ class HadoopRDD[K, V](
261263
// If we can't get the bytes read from the FS stats, fall back to the split size,
262264
// which may be inaccurate.
263265
try {
264-
inputMetrics.addBytesRead(split.inputSplit.value.getLength)
266+
inputMetrics.incBytesRead(split.inputSplit.value.getLength)
265267
} catch {
266268
case e: java.io.IOException =>
267269
logWarning("Unable to get input size to set InputMetrics for task", e)

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,9 @@ class NewHadoopRDD[K, V](
151151
throw new java.util.NoSuchElementException("End of stream")
152152
}
153153
havePair = false
154-
154+
if (!finished) {
155+
inputMetrics.incRecordsRead(1)
156+
}
155157
(reader.getCurrentKey, reader.getCurrentValue)
156158
}
157159

@@ -165,7 +167,7 @@ class NewHadoopRDD[K, V](
165167
// If we can't get the bytes read from the FS stats, fall back to the split size,
166168
// which may be inaccurate.
167169
try {
168-
inputMetrics.addBytesRead(split.serializableHadoopSplit.value.getLength)
170+
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
169171
} catch {
170172
case e: java.io.IOException =>
171173
logWarning("Unable to get input size to set InputMetrics for task", e)

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
3434
import org.apache.hadoop.io.compress.CompressionCodec
3535
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
3636
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat,
37-
RecordWriter => NewRecordWriter}
37+
RecordWriter => NewRecordWriter}
3838

3939
import org.apache.spark._
4040
import org.apache.spark.Partitioner.defaultPartitioner
@@ -993,8 +993,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
993993
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
994994

995995
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
996+
var recordsWritten = 0L
996997
try {
997-
var recordsWritten = 0L
998998
while (iter.hasNext) {
999999
val pair = iter.next()
10001000
writer.write(pair._1, pair._2)
@@ -1008,6 +1008,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10081008
}
10091009
committer.commitTask(hadoopContext)
10101010
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
1011+
outputMetrics.setRecordsWritten(recordsWritten)
10111012
1
10121013
} : Int
10131014

@@ -1065,8 +1066,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10651066

10661067
writer.setup(context.stageId, context.partitionId, taskAttemptId)
10671068
writer.open()
1069+
var recordsWritten = 0L
10681070
try {
1069-
var recordsWritten = 0L
10701071
while (iter.hasNext) {
10711072
val record = iter.next()
10721073
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
@@ -1080,6 +1081,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10801081
}
10811082
writer.commit()
10821083
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
1084+
outputMetrics.setRecordsWritten(recordsWritten)
10831085
}
10841086

10851087
self.context.runJob(self, writeToFile)
@@ -1097,9 +1099,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10971099

10981100
private def maybeUpdateOutputMetrics(bytesWrittenCallback: Option[() => Long],
10991101
outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
1100-
if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
1101-
&& bytesWrittenCallback.isDefined) {
1102+
if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
11021103
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
1104+
outputMetrics.setRecordsWritten(recordsWritten)
11031105
}
11041106
}
11051107

core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
8686
context.taskMetrics.updateShuffleReadMetrics()
8787
})
8888

89-
new InterruptibleIterator[T](context, completionIter)
89+
new InterruptibleIterator[T](context, completionIter) {
90+
val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
91+
override def next(): T = {
92+
readMetrics.incRecordsRead(1)
93+
delegate.next()
94+
}
95+
}
9096
}
9197
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ private[spark] class BlockResult(
5353
readMethod: DataReadMethod.Value,
5454
bytes: Long) {
5555
val inputMetrics = new InputMetrics(readMethod)
56-
inputMetrics.addBytesRead(bytes)
56+
inputMetrics.incBytesRead(bytes)
5757
}
5858

5959
/**

core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ import org.apache.spark.executor.ShuffleWriteMetrics
2929
* appending data to an existing block, and can guarantee atomicity in the case of faults
3030
* as it allows the caller to revert partial writes.
3131
*
32-
* This interface does not support concurrent writes.
32+
* This interface does not support concurrent writes. Also, once the writer has
33+
* been opened, it cannot be reopened again.
3334
*/
3435
private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
3536

@@ -95,6 +96,7 @@ private[spark] class DiskBlockObjectWriter(
9596
private var ts: TimeTrackingOutputStream = null
9697
private var objOut: SerializationStream = null
9798
private var initialized = false
99+
private var hasBeenClosed = false
98100

99101
/**
100102
* Cursors used to represent positions in the file.
@@ -115,11 +117,16 @@ private[spark] class DiskBlockObjectWriter(
115117
private var finalPosition: Long = -1
116118
private var reportedPosition = initialPosition
117119

118-
/** Calling channel.position() to update the write metrics can be a little bit expensive, so we
119-
* only call it every N writes */
120-
private var writesSinceMetricsUpdate = 0
120+
/**
121+
* Keep track of number of records written and also use this to periodically
122+
* output bytes written since the latter is expensive to do for each record.
123+
*/
124+
private var numRecordsWritten = 0
121125

122126
override def open(): BlockObjectWriter = {
127+
if (hasBeenClosed) {
128+
throw new IllegalStateException("Writer already closed. Cannot be reopened.")
129+
}
123130
fos = new FileOutputStream(file, true)
124131
ts = new TimeTrackingOutputStream(fos)
125132
channel = fos.getChannel()
@@ -145,6 +152,7 @@ private[spark] class DiskBlockObjectWriter(
145152
ts = null
146153
objOut = null
147154
initialized = false
155+
hasBeenClosed = true
148156
}
149157
}
150158

@@ -168,6 +176,7 @@ private[spark] class DiskBlockObjectWriter(
168176
override def revertPartialWritesAndClose() {
169177
try {
170178
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
179+
writeMetrics.decShuffleRecordsWritten(numRecordsWritten)
171180

172181
if (initialized) {
173182
objOut.flush()
@@ -193,12 +202,11 @@ private[spark] class DiskBlockObjectWriter(
193202
}
194203

195204
objOut.writeObject(value)
205+
numRecordsWritten += 1
206+
writeMetrics.incShuffleRecordsWritten(1)
196207

197-
if (writesSinceMetricsUpdate == 32) {
198-
writesSinceMetricsUpdate = 0
208+
if (numRecordsWritten % 32 == 0) {
199209
updateBytesWritten()
200-
} else {
201-
writesSinceMetricsUpdate += 1
202210
}
203211
}
204212

core/src/main/scala/org/apache/spark/ui/ToolTips.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,15 @@ private[spark] object ToolTips {
2929
val SHUFFLE_READ_BLOCKED_TIME =
3030
"Time that the task spent blocked waiting for shuffle data to be read from remote machines."
3131

32-
val INPUT = "Bytes read from Hadoop or from Spark storage."
32+
val INPUT = "Bytes and records read from Hadoop or from Spark storage."
3333

34-
val OUTPUT = "Bytes written to Hadoop."
34+
val OUTPUT = "Bytes and records written to Hadoop."
3535

36-
val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage."
36+
val SHUFFLE_WRITE =
37+
"Bytes and records written to disk in order to be read by a shuffle in a future stage."
3738

3839
val SHUFFLE_READ =
39-
"""Bytes read from remote executors. Typically less than shuffle write bytes
40+
"""Bytes and records read from remote executors. Typically less than shuffle write bytes
4041
because this does not include shuffle data read locally."""
4142

4243
val GETTING_RESULT_TIME =

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
4848
val executorToTasksFailed = HashMap[String, Int]()
4949
val executorToDuration = HashMap[String, Long]()
5050
val executorToInputBytes = HashMap[String, Long]()
51+
val executorToInputRecords = HashMap[String, Long]()
5152
val executorToOutputBytes = HashMap[String, Long]()
53+
val executorToOutputRecords = HashMap[String, Long]()
5254
val executorToShuffleRead = HashMap[String, Long]()
5355
val executorToShuffleWrite = HashMap[String, Long]()
5456
val executorToLogUrls = HashMap[String, Map[String, String]]()
@@ -84,10 +86,14 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
8486
metrics.inputMetrics.foreach { inputMetrics =>
8587
executorToInputBytes(eid) =
8688
executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
89+
executorToInputRecords(eid) =
90+
executorToInputRecords.getOrElse(eid, 0L) + inputMetrics.recordsRead
8791
}
8892
metrics.outputMetrics.foreach { outputMetrics =>
8993
executorToOutputBytes(eid) =
9094
executorToOutputBytes.getOrElse(eid, 0L) + outputMetrics.bytesWritten
95+
executorToOutputRecords(eid) =
96+
executorToOutputRecords.getOrElse(eid, 0L) + outputMetrics.recordsWritten
9197
}
9298
metrics.shuffleReadMetrics.foreach { shuffleRead =>
9399
executorToShuffleRead(eid) =

0 commit comments

Comments
 (0)