Skip to content

Commit df3afd7

Browse files
author
Ilya Ganelin
committed
Revert "Partially updated task metrics to make some vars private"
This reverts commit c64da4f.
1 parent 3f6c512 commit df3afd7

File tree

8 files changed

+48
-76
lines changed

8 files changed

+48
-76
lines changed

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 16 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,7 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
182182
/**
183183
* Total bytes read.
184184
*/
185-
private var _bytesRead: Long = _
186-
def bytesRead = _bytesRead
187-
def incBytesRead(value: Long) = _bytesRead += value
188-
def decBytesRead(value: Long) = _bytesRead -= value
185+
var bytesRead: Long = 0L
189186
}
190187

191188
/**
@@ -197,10 +194,7 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
197194
/**
198195
* Total bytes written
199196
*/
200-
private var _bytesWritten: Long = _
201-
def bytesWritten = _bytesWritten
202-
def incBytesWritten(value : Long) = _bytesWritten += value
203-
def decBytesWritten(value : Long) = _bytesWritten -= value
197+
var bytesWritten: Long = 0L
204198
}
205199

206200
/**
@@ -209,48 +203,32 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
209203
*/
210204
@DeveloperApi
211205
class ShuffleReadMetrics extends Serializable {
206+
/**
207+
* Number of blocks fetched in this shuffle by this task (remote or local)
208+
*/
209+
def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
210+
212211
/**
213212
* Number of remote blocks fetched in this shuffle by this task
214213
*/
215-
private var _remoteBlocksFetched: Int = _
216-
def remoteBlocksFetched = _remoteBlocksFetched
217-
def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
218-
def defRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
219-
214+
var remoteBlocksFetched: Int = _
215+
220216
/**
221217
* Number of local blocks fetched in this shuffle by this task
222218
*/
223-
private var _localBlocksFetched: Int = _
224-
def localBlocksFetched = _localBlocksFetched
225-
def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
226-
def defLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
227-
219+
var localBlocksFetched: Int = _
228220

229221
/**
230222
* Time the task spent waiting for remote shuffle blocks. This only includes the time
231223
* blocking on shuffle input data. For instance if block B is being fetched while the task is
232224
* still not finished processing block A, it is not considered to be blocking on block B.
233225
*/
234-
private var _fetchWaitTime: Long = _
235-
def fetchWaitTime = _fetchWaitTime
236-
def incFetchWaitTime(value: Long) = _fetchWaitTime += value
237-
def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
238-
239-
/**
240-
* Total number of remote bytes read from the shuffle by this task
241-
*/
242-
private var _remoteBytesRead: Long = _
243-
def remoteBytesRead = _remoteBytesRead
244-
def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
245-
def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
226+
var fetchWaitTime: Long = _
246227

247228
/**
248-
* Number of blocks fetched in this shuffle by this task (remote or local)
229+
* Total number of remote bytes read from the shuffle by this task
249230
*/
250-
private var _totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
251-
def totalBlocksFetched = _totalBlocksFetched
252-
def incTotalBlocksFetched(value: Int) = _totalBlocksFetched += value
253-
def decTotalBlocksFetched(value: Int) = _totalBlocksFetched -= value
231+
var remoteBytesRead: Long = _
254232
}
255233

256234
/**
@@ -262,16 +240,10 @@ class ShuffleWriteMetrics extends Serializable {
262240
/**
263241
* Number of bytes written for the shuffle by this task
264242
*/
265-
@volatile private var _shuffleBytesWritten: Long = _
266-
def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
267-
def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
268-
def shuffleBytesWritten = _shuffleBytesWritten
243+
@volatile var shuffleBytesWritten: Long = _
244+
269245
/**
270246
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
271247
*/
272-
@volatile private var _shuffleWriteTime: Long = _
273-
def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
274-
def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
275-
def shuffleWriteTime= _shuffleWriteTime
276-
248+
@volatile var shuffleWriteTime: Long = _
277249
}

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ class HadoopRDD[K, V](
252252
&& bytesReadCallback.isDefined) {
253253
recordsSinceMetricsUpdate = 0
254254
val bytesReadFn = bytesReadCallback.get
255-
inputMetrics.incBytesRead(bytesReadFn())
255+
inputMetrics.bytesRead = bytesReadFn()
256256
} else {
257257
recordsSinceMetricsUpdate += 1
258258
}
@@ -264,12 +264,12 @@ class HadoopRDD[K, V](
264264
reader.close()
265265
if (bytesReadCallback.isDefined) {
266266
val bytesReadFn = bytesReadCallback.get
267-
inputMetrics.incBytesRead(bytesReadFn())
267+
inputMetrics.bytesRead = bytesReadFn()
268268
} else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
269269
// If we can't get the bytes read from the FS stats, fall back to the split size,
270270
// which may be inaccurate.
271271
try {
272-
inputMetrics.incBytesRead(split.inputSplit.value.getLength)
272+
inputMetrics.bytesRead = split.inputSplit.value.getLength
273273
context.taskMetrics.inputMetrics = Some(inputMetrics)
274274
} catch {
275275
case e: java.io.IOException =>

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ class NewHadoopRDD[K, V](
159159
&& bytesReadCallback.isDefined) {
160160
recordsSinceMetricsUpdate = 0
161161
val bytesReadFn = bytesReadCallback.get
162-
inputMetrics.incBytesRead(bytesReadFn())
162+
inputMetrics.bytesRead = bytesReadFn()
163163
} else {
164164
recordsSinceMetricsUpdate += 1
165165
}
@@ -174,12 +174,12 @@ class NewHadoopRDD[K, V](
174174
// Update metrics with final amount
175175
if (bytesReadCallback.isDefined) {
176176
val bytesReadFn = bytesReadCallback.get
177-
inputMetrics.incBytesRead(bytesReadFn())
177+
inputMetrics.bytesRead = bytesReadFn()
178178
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
179179
// If we can't get the bytes read from the FS stats, fall back to the split size,
180180
// which may be inaccurate.
181181
try {
182-
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
182+
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength
183183
context.taskMetrics.inputMetrics = Some(inputMetrics)
184184
} catch {
185185
case e: java.io.IOException =>

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,7 +1010,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10101010
writer.close(hadoopContext)
10111011
}
10121012
committer.commitTask(hadoopContext)
1013-
bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) }
1013+
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
10141014
1
10151015
} : Int
10161016

@@ -1082,7 +1082,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10821082
writer.close()
10831083
}
10841084
writer.commit()
1085-
bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) }
1085+
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
10861086
}
10871087

10881088
self.context.runJob(self, writeToFile)
@@ -1105,7 +1105,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
11051105
outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
11061106
if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
11071107
&& bytesWrittenCallback.isDefined) {
1108-
bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) }
1108+
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
11091109
}
11101110
}
11111111

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private[spark] class BlockResult(
5454
readMethod: DataReadMethod.Value,
5555
bytes: Long) {
5656
val inputMetrics = new InputMetrics(readMethod)
57-
inputMetrics.incBytesRead(bytes)
57+
inputMetrics.bytesRead = bytes
5858
}
5959

6060
/**

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -621,31 +621,31 @@ private[spark] object JsonProtocol {
621621

622622
def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
623623
val metrics = new ShuffleReadMetrics
624-
metrics.incRemoteBlocksFetched((json \ "Remote Blocks Fetched").extract[Int])
625-
metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
626-
metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
627-
metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
624+
metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
625+
metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
626+
metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]
627+
metrics.remoteBytesRead = (json \ "Remote Bytes Read").extract[Long]
628628
metrics
629629
}
630630

631631
def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = {
632632
val metrics = new ShuffleWriteMetrics
633-
metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long])
634-
metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long])
633+
metrics.shuffleBytesWritten = (json \ "Shuffle Bytes Written").extract[Long]
634+
metrics.shuffleWriteTime = (json \ "Shuffle Write Time").extract[Long]
635635
metrics
636636
}
637637

638638
def inputMetricsFromJson(json: JValue): InputMetrics = {
639639
val metrics = new InputMetrics(
640640
DataReadMethod.withName((json \ "Data Read Method").extract[String]))
641-
metrics.incBytesRead((json \ "Bytes Read").extract[Long])
641+
metrics.bytesRead = (json \ "Bytes Read").extract[Long]
642642
metrics
643643
}
644644

645645
def outputMetricsFromJson(json: JValue): OutputMetrics = {
646646
val metrics = new OutputMetrics(
647647
DataWriteMethod.withName((json \ "Data Write Method").extract[String]))
648-
metrics.incBytesWritten((json \ "Bytes Written").extract[Long])
648+
metrics.bytesWritten = (json \ "Bytes Written").extract[Long]
649649
metrics
650650
}
651651

core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
138138
assert(listener.stageIdToData.size === 0)
139139

140140
// finish this task, should get updated shuffleRead
141-
shuffleReadMetrics.incRemoteBytesRead(1000)
141+
shuffleReadMetrics.remoteBytesRead = 1000
142142
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
143143
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
144144
taskInfo.finishTime = 1
@@ -224,18 +224,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
224224
val shuffleWriteMetrics = new ShuffleWriteMetrics()
225225
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
226226
taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics)
227-
shuffleReadMetrics.incRemoteBytesRead(base + 1)
228-
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
229-
shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
227+
shuffleReadMetrics.remoteBytesRead = base + 1
228+
shuffleReadMetrics.remoteBlocksFetched = base + 2
229+
shuffleWriteMetrics.shuffleBytesWritten = base + 3
230230
taskMetrics.executorRunTime = base + 4
231231
taskMetrics.diskBytesSpilled = base + 5
232232
taskMetrics.memoryBytesSpilled = base + 6
233233
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
234234
taskMetrics.inputMetrics = Some(inputMetrics)
235-
inputMetrics.incBytesRead(base + 7)
235+
inputMetrics.bytesRead = base + 7
236236
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
237237
taskMetrics.outputMetrics = Some(outputMetrics)
238-
outputMetrics.incBytesWritten(base + 8)
238+
outputMetrics.bytesWritten = base + 8
239239
taskMetrics
240240
}
241241

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -609,24 +609,24 @@ class JsonProtocolSuite extends FunSuite {
609609

610610
if (hasHadoopInput) {
611611
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
612-
inputMetrics.incBytesRead(d + e + f)
612+
inputMetrics.bytesRead = d + e + f
613613
t.inputMetrics = Some(inputMetrics)
614614
} else {
615615
val sr = new ShuffleReadMetrics
616-
sr.incRemoteBytesRead(b + d)
617-
sr.incLocalBlocksFetched(e)
618-
sr.incFetchWaitTime(a + d)
619-
sr.incRemoteBlocksFetched(f)
616+
sr.remoteBytesRead = b + d
617+
sr.localBlocksFetched = e
618+
sr.fetchWaitTime = a + d
619+
sr.remoteBlocksFetched = f
620620
t.setShuffleReadMetrics(Some(sr))
621621
}
622622
if (hasOutput) {
623623
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
624-
outputMetrics.incBytesWritten(a + b + c)
624+
outputMetrics.bytesWritten = a + b + c
625625
t.outputMetrics = Some(outputMetrics)
626626
} else {
627627
val sw = new ShuffleWriteMetrics
628-
sw.incShuffleBytesWritten(a + b + c)
629-
sw.incShuffleWriteTime(b + c + d)
628+
sw.shuffleBytesWritten = a + b + c
629+
sw.shuffleWriteTime = b + c + d
630630
t.shuffleWriteMetrics = Some(sw)
631631
}
632632
// Make at most 6 blocks

0 commit comments

Comments
 (0)