Skip to content

Commit b55cade

Browse files
committed
Remove the remoteFetchTime metric.
This metric is confusing: it adds up all of the time to fetch shuffle inputs, but fetches often happen in parallel, so remoteFetchTime can be much longer than the task execution time. @squito it looks like you added this metric -- do you have a use case for it? cc @shivaram -- I know you've looked at the shuffle performance a lot so chime in here if this metric has turned out to be useful for you! Author: Kay Ousterhout <[email protected]> Closes #62 from kayousterhout/remove_fetch_variable and squashes the following commits: 43341eb [Kay Ousterhout] Remote the remoteFetchTime metric.
1 parent 9d225a9 commit b55cade

File tree

5 files changed

+0
-14
lines changed

5 files changed

+0
-14
lines changed

core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
7979
val completionIter = CompletionIterator[T, Iterator[T]](itr, {
8080
val shuffleMetrics = new ShuffleReadMetrics
8181
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
82-
shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
8382
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
8483
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
8584
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,6 @@ class ShuffleReadMetrics extends Serializable {
103103
*/
104104
var fetchWaitTime: Long = _
105105

106-
/**
107-
* Total time spent fetching remote shuffle blocks. This aggregates the time spent fetching all
108-
* input blocks. Since block fetches are both pipelined and parallelized, this can
109-
* exceed fetchWaitTime and executorRunTime.
110-
*/
111-
var remoteFetchTime: Long = _
112-
113106
/**
114107
* Total number of remote bytes read from the shuffle by this task
115108
*/

core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,6 @@ class JobLogger(val user: String, val logDirName: String)
275275
" BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
276276
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
277277
" REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
278-
" REMOTE_FETCH_TIME=" + metrics.remoteFetchTime +
279278
" REMOTE_BYTES_READ=" + metrics.remoteBytesRead
280279
case None => ""
281280
}

core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] wi
4949
def totalBlocks: Int
5050
def numLocalBlocks: Int
5151
def numRemoteBlocks: Int
52-
def remoteFetchTime: Long
5352
def fetchWaitTime: Long
5453
def remoteBytesRead: Long
5554
}
@@ -79,7 +78,6 @@ object BlockFetcherIterator {
7978
import blockManager._
8079

8180
private var _remoteBytesRead = 0L
82-
private var _remoteFetchTime = 0L
8381
private var _fetchWaitTime = 0L
8482

8583
if (blocksByAddress == null) {
@@ -125,7 +123,6 @@ object BlockFetcherIterator {
125123
future.onSuccess {
126124
case Some(message) => {
127125
val fetchDone = System.currentTimeMillis()
128-
_remoteFetchTime += fetchDone - fetchStart
129126
val bufferMessage = message.asInstanceOf[BufferMessage]
130127
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
131128
for (blockMessage <- blockMessageArray) {
@@ -241,7 +238,6 @@ object BlockFetcherIterator {
241238
override def totalBlocks: Int = numLocal + numRemote
242239
override def numLocalBlocks: Int = numLocal
243240
override def numRemoteBlocks: Int = numRemote
244-
override def remoteFetchTime: Long = _remoteFetchTime
245241
override def fetchWaitTime: Long = _fetchWaitTime
246242
override def remoteBytesRead: Long = _remoteBytesRead
247243

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
129129
sm.localBlocksFetched should be > (0)
130130
sm.remoteBlocksFetched should be (0)
131131
sm.remoteBytesRead should be (0l)
132-
sm.remoteFetchTime should be (0l)
133132
}
134133
}
135134
}

0 commit comments

Comments
 (0)