Skip to content

Commit bfb3c05

Browse files
author
andrewor14
committed
Merge pull request #3 from JoshRosen/andrewor14-get-or-create-metrics
Fix merge conflicts in get-or-create-metrics PR
2 parents 12bd943 + 2f1f6db commit bfb3c05

File tree

43 files changed

+333
-246
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+333
-246
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
142142
// Creating the file to write to and creating a disk writer both involve interacting with
143143
// the disk, and can take a long time in aggregate when we open many files, so should be
144144
// included in the shuffle write time.
145-
writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);
145+
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
146146

147147
while (records.hasNext()) {
148148
final Product2<K, V> record = records.next();
@@ -202,7 +202,7 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
202202
threwException = false;
203203
} finally {
204204
Closeables.close(out, threwException);
205-
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
205+
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
206206
}
207207
partitionWriters = null;
208208
return lengths;

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
233233
// Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
234234
// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
235235
// This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
236-
writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
237-
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
236+
writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
237+
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
238238
}
239239
}
240240

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,8 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti
297297
// final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
298298
// to be counted as shuffle write, but this will lead to double-counting of the final
299299
// SpillInfo's bytes.
300-
writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
301-
writeMetrics.incShuffleBytesWritten(outputFile.length());
300+
writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
301+
writeMetrics.incBytesWritten(outputFile.length());
302302
return partitionLengths;
303303
}
304304
} catch (IOException e) {
@@ -410,7 +410,7 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
410410
spillInputChannelPositions[i] += actualBytesTransferred;
411411
bytesToTransfer -= actualBytesTransferred;
412412
}
413-
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
413+
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
414414
bytesWrittenToMergedFile += partitionLengthInSpill;
415415
partitionLengths[partition] += partitionLengthInSpill;
416416
}

core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,34 +42,34 @@ public TimeTrackingOutputStream(ShuffleWriteMetrics writeMetrics, OutputStream o
4242
public void write(int b) throws IOException {
4343
final long startTime = System.nanoTime();
4444
outputStream.write(b);
45-
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
45+
writeMetrics.incWriteTime(System.nanoTime() - startTime);
4646
}
4747

4848
@Override
4949
public void write(byte[] b) throws IOException {
5050
final long startTime = System.nanoTime();
5151
outputStream.write(b);
52-
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
52+
writeMetrics.incWriteTime(System.nanoTime() - startTime);
5353
}
5454

5555
@Override
5656
public void write(byte[] b, int off, int len) throws IOException {
5757
final long startTime = System.nanoTime();
5858
outputStream.write(b, off, len);
59-
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
59+
writeMetrics.incWriteTime(System.nanoTime() - startTime);
6060
}
6161

6262
@Override
6363
public void flush() throws IOException {
6464
final long startTime = System.nanoTime();
6565
outputStream.flush();
66-
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
66+
writeMetrics.incWriteTime(System.nanoTime() - startTime);
6767
}
6868

6969
@Override
7070
public void close() throws IOException {
7171
final long startTime = System.nanoTime();
7272
outputStream.close();
73-
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
73+
writeMetrics.incWriteTime(System.nanoTime() - startTime);
7474
}
7575
}

core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,28 +26,39 @@ import org.apache.spark.annotation.DeveloperApi
2626
*/
2727
@DeveloperApi
2828
class ShuffleWriteMetrics extends Serializable {
29+
2930
/**
3031
* Number of bytes written for the shuffle by this task
3132
*/
32-
@volatile private var _shuffleBytesWritten: Long = _
33-
def shuffleBytesWritten: Long = _shuffleBytesWritten
34-
private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
35-
private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
33+
@volatile private var _bytesWritten: Long = _
34+
def bytesWritten: Long = _bytesWritten
35+
private[spark] def incBytesWritten(value: Long) = _bytesWritten += value
36+
private[spark] def decBytesWritten(value: Long) = _bytesWritten -= value
3637

3738
/**
3839
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
3940
*/
40-
@volatile private var _shuffleWriteTime: Long = _
41-
def shuffleWriteTime: Long = _shuffleWriteTime
42-
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
43-
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
41+
@volatile private var _writeTime: Long = _
42+
def writeTime: Long = _writeTime
43+
private[spark] def incWriteTime(value: Long) = _writeTime += value
44+
private[spark] def decWriteTime(value: Long) = _writeTime -= value
4445

4546
/**
4647
* Total number of records written to the shuffle by this task
4748
*/
48-
@volatile private var _shuffleRecordsWritten: Long = _
49-
def shuffleRecordsWritten: Long = _shuffleRecordsWritten
50-
private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
51-
private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
52-
private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
49+
@volatile private var _recordsWritten: Long = _
50+
def recordsWritten: Long = _recordsWritten
51+
private[spark] def incRecordsWritten(value: Long) = _recordsWritten += value
52+
private[spark] def decRecordsWritten(value: Long) = _recordsWritten -= value
53+
private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value
54+
55+
// Legacy methods for backward compatibility.
56+
// TODO: remove these once we make this class private.
57+
@deprecated("use bytesWritten instead", "2.0.0")
58+
def shuffleBytesWritten: Long = bytesWritten
59+
@deprecated("use writeTime instead", "2.0.0")
60+
def shuffleWriteTime: Long = writeTime
61+
@deprecated("use recordsWritten instead", "2.0.0")
62+
def shuffleRecordsWritten: Long = recordsWritten
63+
5364
}

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ class StatsReportListener extends SparkListener with Logging {
271271

272272
// Shuffle write
273273
showBytesDistribution("shuffle bytes written:",
274-
(_, metric) => metric.shuffleWriteMetrics.map(_.shuffleBytesWritten), taskInfoMetrics)
274+
(_, metric) => metric.shuffleWriteMetrics.map(_.bytesWritten), taskInfoMetrics)
275275

276276
// Fetch & I/O
277277
showMillisDistribution("fetch wait time:",

core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
9090
}
9191
// Creating the file to write to and creating a disk writer both involve interacting with
9292
// the disk, so should be included in the shuffle write time.
93-
writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
93+
writeMetrics.incWriteTime(System.nanoTime - openStartTime)
9494

9595
override def releaseWriters(success: Boolean) {
9696
shuffleState.completedMapTasks.add(mapId)

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ private[spark] class SortShuffleWriter[K, V, C](
9292
if (sorter != null) {
9393
val startTime = System.nanoTime()
9494
sorter.stop()
95-
writeMetrics.incShuffleWriteTime(System.nanoTime - startTime)
95+
writeMetrics.incWriteTime(System.nanoTime - startTime)
9696
sorter = null
9797
}
9898
}

core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,9 @@ private[v1] object AllStagesResource {
214214
raw.shuffleWriteMetrics
215215
}
216216
def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
217-
writeBytes = submetricQuantiles(_.shuffleBytesWritten),
218-
writeRecords = submetricQuantiles(_.shuffleRecordsWritten),
219-
writeTime = submetricQuantiles(_.shuffleWriteTime)
217+
writeBytes = submetricQuantiles(_.bytesWritten),
218+
writeRecords = submetricQuantiles(_.recordsWritten),
219+
writeTime = submetricQuantiles(_.writeTime)
220220
)
221221
}.metricOption
222222

@@ -283,9 +283,9 @@ private[v1] object AllStagesResource {
283283

284284
def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = {
285285
new ShuffleWriteMetrics(
286-
bytesWritten = internal.shuffleBytesWritten,
287-
writeTime = internal.shuffleWriteTime,
288-
recordsWritten = internal.shuffleRecordsWritten
286+
bytesWritten = internal.bytesWritten,
287+
writeTime = internal.writeTime,
288+
recordsWritten = internal.recordsWritten
289289
)
290290
}
291291
}

core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ private[spark] class DiskBlockObjectWriter(
102102
objOut.flush()
103103
val start = System.nanoTime()
104104
fos.getFD.sync()
105-
writeMetrics.incShuffleWriteTime(System.nanoTime() - start)
105+
writeMetrics.incWriteTime(System.nanoTime() - start)
106106
}
107107
} {
108108
objOut.close()
@@ -132,7 +132,7 @@ private[spark] class DiskBlockObjectWriter(
132132
close()
133133
finalPosition = file.length()
134134
// In certain compression codecs, more bytes are written after close() is called
135-
writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
135+
writeMetrics.incBytesWritten(finalPosition - reportedPosition)
136136
} else {
137137
finalPosition = file.length()
138138
}
@@ -152,8 +152,8 @@ private[spark] class DiskBlockObjectWriter(
152152
// truncating the file to its initial position.
153153
try {
154154
if (initialized) {
155-
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
156-
writeMetrics.decShuffleRecordsWritten(numRecordsWritten)
155+
writeMetrics.decBytesWritten(reportedPosition - initialPosition)
156+
writeMetrics.decRecordsWritten(numRecordsWritten)
157157
objOut.flush()
158158
bs.flush()
159159
close()
@@ -201,7 +201,7 @@ private[spark] class DiskBlockObjectWriter(
201201
*/
202202
def recordWritten(): Unit = {
203203
numRecordsWritten += 1
204-
writeMetrics.incShuffleRecordsWritten(1)
204+
writeMetrics.incRecordsWritten(1)
205205

206206
if (numRecordsWritten % 32 == 0) {
207207
updateBytesWritten()
@@ -226,7 +226,7 @@ private[spark] class DiskBlockObjectWriter(
226226
*/
227227
private def updateBytesWritten() {
228228
val pos = channel.position()
229-
writeMetrics.incShuffleBytesWritten(pos - reportedPosition)
229+
writeMetrics.incBytesWritten(pos - reportedPosition)
230230
reportedPosition = pos
231231
}
232232

0 commit comments

Comments
 (0)