Skip to content

Commit c64da4f

Browse files
author
Ilya Ganelin
committed
Partially updated task metrics to make some vars private
1 parent b004150 commit c64da4f

File tree

8 files changed

+76
-48
lines changed

8 files changed

+76
-48
lines changed

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,10 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
182182
/**
183183
* Total bytes read.
184184
*/
185-
var bytesRead: Long = 0L
185+
private var _bytesRead: Long = _
186+
def bytesRead = _bytesRead
187+
def incBytesRead(value: Long) = _bytesRead += value
188+
def decBytesRead(value: Long) = _bytesRead -= value
186189
}
187190

188191
/**
@@ -194,7 +197,10 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
194197
/**
195198
* Total bytes written
196199
*/
197-
var bytesWritten: Long = 0L
200+
private var _bytesWritten: Long = _
201+
def bytesWritten = _bytesWritten
202+
def incBytesWritten(value : Long) = _bytesWritten += value
203+
def decBytesWritten(value : Long) = _bytesWritten -= value
198204
}
199205

200206
/**
@@ -203,32 +209,48 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
203209
*/
204210
@DeveloperApi
205211
class ShuffleReadMetrics extends Serializable {
206-
/**
207-
* Number of blocks fetched in this shuffle by this task (remote or local)
208-
*/
209-
def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
210-
211212
/**
212213
* Number of remote blocks fetched in this shuffle by this task
213214
*/
214-
var remoteBlocksFetched: Int = _
215-
215+
private var _remoteBlocksFetched: Int = _
216+
def remoteBlocksFetched = _remoteBlocksFetched
217+
def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
218+
def defRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
219+
216220
/**
217221
* Number of local blocks fetched in this shuffle by this task
218222
*/
219-
var localBlocksFetched: Int = _
223+
private var _localBlocksFetched: Int = _
224+
def localBlocksFetched = _localBlocksFetched
225+
def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
226+
def defLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
227+
220228

221229
/**
222230
* Time the task spent waiting for remote shuffle blocks. This only includes the time
223231
* blocking on shuffle input data. For instance if block B is being fetched while the task is
224232
* still not finished processing block A, it is not considered to be blocking on block B.
225233
*/
226-
var fetchWaitTime: Long = _
227-
234+
private var _fetchWaitTime: Long = _
235+
def fetchWaitTime = _fetchWaitTime
236+
def incFetchWaitTime(value: Long) = _fetchWaitTime += value
237+
def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
238+
228239
/**
229240
* Total number of remote bytes read from the shuffle by this task
230241
*/
231-
var remoteBytesRead: Long = _
242+
private var _remoteBytesRead: Long = _
243+
def remoteBytesRead = _remoteBytesRead
244+
def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
245+
def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
246+
247+
/**
248+
* Number of blocks fetched in this shuffle by this task (remote or local)
249+
*/
250+
private var _totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
251+
def totalBlocksFetched = _totalBlocksFetched
252+
def incTotalBlocksFetched(value: Int) = _totalBlocksFetched += value
253+
def decTotalBlocksFetched(value: Int) = _totalBlocksFetched -= value
232254
}
233255

234256
/**
@@ -240,10 +262,16 @@ class ShuffleWriteMetrics extends Serializable {
240262
/**
241263
* Number of bytes written for the shuffle by this task
242264
*/
243-
@volatile var shuffleBytesWritten: Long = _
244-
265+
@volatile private var _shuffleBytesWritten: Long = _
266+
def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
267+
def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
268+
def shuffleBytesWritten = _shuffleBytesWritten
245269
/**
246270
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
247271
*/
248-
@volatile var shuffleWriteTime: Long = _
272+
@volatile private var _shuffleWriteTime: Long = _
273+
def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
274+
def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
275+
def shuffleWriteTime= _shuffleWriteTime
276+
249277
}

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ class HadoopRDD[K, V](
252252
&& bytesReadCallback.isDefined) {
253253
recordsSinceMetricsUpdate = 0
254254
val bytesReadFn = bytesReadCallback.get
255-
inputMetrics.bytesRead = bytesReadFn()
255+
inputMetrics.incBytesRead(bytesReadFn())
256256
} else {
257257
recordsSinceMetricsUpdate += 1
258258
}
@@ -264,12 +264,12 @@ class HadoopRDD[K, V](
264264
reader.close()
265265
if (bytesReadCallback.isDefined) {
266266
val bytesReadFn = bytesReadCallback.get
267-
inputMetrics.bytesRead = bytesReadFn()
267+
inputMetrics.incBytesRead(bytesReadFn())
268268
} else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
269269
// If we can't get the bytes read from the FS stats, fall back to the split size,
270270
// which may be inaccurate.
271271
try {
272-
inputMetrics.bytesRead = split.inputSplit.value.getLength
272+
inputMetrics.incBytesRead(split.inputSplit.value.getLength)
273273
context.taskMetrics.inputMetrics = Some(inputMetrics)
274274
} catch {
275275
case e: java.io.IOException =>

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ class NewHadoopRDD[K, V](
159159
&& bytesReadCallback.isDefined) {
160160
recordsSinceMetricsUpdate = 0
161161
val bytesReadFn = bytesReadCallback.get
162-
inputMetrics.bytesRead = bytesReadFn()
162+
inputMetrics.incBytesRead(bytesReadFn())
163163
} else {
164164
recordsSinceMetricsUpdate += 1
165165
}
@@ -174,12 +174,12 @@ class NewHadoopRDD[K, V](
174174
// Update metrics with final amount
175175
if (bytesReadCallback.isDefined) {
176176
val bytesReadFn = bytesReadCallback.get
177-
inputMetrics.bytesRead = bytesReadFn()
177+
inputMetrics.incBytesRead(bytesReadFn())
178178
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
179179
// If we can't get the bytes read from the FS stats, fall back to the split size,
180180
// which may be inaccurate.
181181
try {
182-
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength
182+
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
183183
context.taskMetrics.inputMetrics = Some(inputMetrics)
184184
} catch {
185185
case e: java.io.IOException =>

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,7 +1000,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10001000
writer.close(hadoopContext)
10011001
}
10021002
committer.commitTask(hadoopContext)
1003-
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
1003+
bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) }
10041004
1
10051005
} : Int
10061006

@@ -1072,7 +1072,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10721072
writer.close()
10731073
}
10741074
writer.commit()
1075-
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
1075+
bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) }
10761076
}
10771077

10781078
self.context.runJob(self, writeToFile)
@@ -1095,7 +1095,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10951095
outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
10961096
if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
10971097
&& bytesWrittenCallback.isDefined) {
1098-
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
1098+
bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) }
10991099
}
11001100
}
11011101

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private[spark] class BlockResult(
5454
readMethod: DataReadMethod.Value,
5555
bytes: Long) {
5656
val inputMetrics = new InputMetrics(readMethod)
57-
inputMetrics.bytesRead = bytes
57+
inputMetrics.incBytesRead(bytes)
5858
}
5959

6060
/**

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -621,31 +621,31 @@ private[spark] object JsonProtocol {
621621

622622
def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
623623
val metrics = new ShuffleReadMetrics
624-
metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
625-
metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
626-
metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]
627-
metrics.remoteBytesRead = (json \ "Remote Bytes Read").extract[Long]
624+
metrics.incRemoteBlocksFetched((json \ "Remote Blocks Fetched").extract[Int])
625+
metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
626+
metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
627+
metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
628628
metrics
629629
}
630630

631631
def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = {
632632
val metrics = new ShuffleWriteMetrics
633-
metrics.shuffleBytesWritten = (json \ "Shuffle Bytes Written").extract[Long]
634-
metrics.shuffleWriteTime = (json \ "Shuffle Write Time").extract[Long]
633+
metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long])
634+
metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long])
635635
metrics
636636
}
637637

638638
def inputMetricsFromJson(json: JValue): InputMetrics = {
639639
val metrics = new InputMetrics(
640640
DataReadMethod.withName((json \ "Data Read Method").extract[String]))
641-
metrics.bytesRead = (json \ "Bytes Read").extract[Long]
641+
metrics.incBytesRead((json \ "Bytes Read").extract[Long])
642642
metrics
643643
}
644644

645645
def outputMetricsFromJson(json: JValue): OutputMetrics = {
646646
val metrics = new OutputMetrics(
647647
DataWriteMethod.withName((json \ "Data Write Method").extract[String]))
648-
metrics.bytesWritten = (json \ "Bytes Written").extract[Long]
648+
metrics.incBytesWritten((json \ "Bytes Written").extract[Long])
649649
metrics
650650
}
651651

core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
138138
assert(listener.stageIdToData.size === 0)
139139

140140
// finish this task, should get updated shuffleRead
141-
shuffleReadMetrics.remoteBytesRead = 1000
141+
shuffleReadMetrics.incRemoteBytesRead(1000)
142142
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
143143
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
144144
taskInfo.finishTime = 1
@@ -224,18 +224,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
224224
val shuffleWriteMetrics = new ShuffleWriteMetrics()
225225
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
226226
taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics)
227-
shuffleReadMetrics.remoteBytesRead = base + 1
228-
shuffleReadMetrics.remoteBlocksFetched = base + 2
229-
shuffleWriteMetrics.shuffleBytesWritten = base + 3
227+
shuffleReadMetrics.incRemoteBytesRead(base + 1)
228+
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
229+
shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
230230
taskMetrics.executorRunTime = base + 4
231231
taskMetrics.diskBytesSpilled = base + 5
232232
taskMetrics.memoryBytesSpilled = base + 6
233233
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
234234
taskMetrics.inputMetrics = Some(inputMetrics)
235-
inputMetrics.bytesRead = base + 7
235+
inputMetrics.incBytesRead(base + 7)
236236
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
237237
taskMetrics.outputMetrics = Some(outputMetrics)
238-
outputMetrics.bytesWritten = base + 8
238+
outputMetrics.incBytesWritten(base + 8)
239239
taskMetrics
240240
}
241241

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -633,24 +633,24 @@ class JsonProtocolSuite extends FunSuite {
633633

634634
if (hasHadoopInput) {
635635
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
636-
inputMetrics.bytesRead = d + e + f
636+
inputMetrics.incBytesRead(d + e + f)
637637
t.inputMetrics = Some(inputMetrics)
638638
} else {
639639
val sr = new ShuffleReadMetrics
640-
sr.remoteBytesRead = b + d
641-
sr.localBlocksFetched = e
642-
sr.fetchWaitTime = a + d
643-
sr.remoteBlocksFetched = f
640+
sr.incRemoteBytesRead(b + d)
641+
sr.incLocalBlocksFetched(e)
642+
sr.incFetchWaitTime(a + d)
643+
sr.incRemoteBlocksFetched(f)
644644
t.setShuffleReadMetrics(Some(sr))
645645
}
646646
if (hasOutput) {
647647
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
648-
outputMetrics.bytesWritten = a + b + c
648+
outputMetrics.incBytesWritten(a + b + c)
649649
t.outputMetrics = Some(outputMetrics)
650650
} else {
651651
val sw = new ShuffleWriteMetrics
652-
sw.shuffleBytesWritten = a + b + c
653-
sw.shuffleWriteTime = b + c + d
652+
sw.incShuffleBytesWritten(a + b + c)
653+
sw.incShuffleWriteTime(b + c + d)
654654
t.shuffleWriteMetrics = Some(sw)
655655
}
656656
// Make at most 6 blocks

0 commit comments

Comments
 (0)