Skip to content

Commit 665e71d

Browse files
committed
[SPARK-1912] Lazily initialize buffers for local shuffle blocks.
This is a simplified fix for SPARK-1912. Author: Reynold Xin <[email protected]> Closes #2179 from rxin/SPARK-1912 and squashes the following commits: b2f0e9e [Reynold Xin] Fix unit tests. a8eddfe [Reynold Xin] [SPARK-1912] Lazily initialize buffers for local shuffle blocks.
1 parent 3c517a8 commit 665e71d

File tree

3 files changed

+20
-35
lines changed

3 files changed

+20
-35
lines changed

core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,8 @@ object BlockFetcherIterator {
196196
// any memory that might exceed our maxBytesInFlight
197197
for (id <- localBlocksToFetch) {
198198
try {
199-
// getLocalFromDisk never return None but throws BlockException
200-
val iter = getLocalFromDisk(id, serializer).get
201-
// Pass 0 as size since it's not in flight
202199
readMetrics.localBlocksFetched += 1
203-
results.put(new FetchResult(id, 0, () => iter))
200+
results.put(new FetchResult(id, 0, () => getLocalFromDisk(id, serializer).get))
204201
logDebug("Got local block " + id)
205202
} catch {
206203
case e: Exception => {

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,26 +1039,8 @@ private[spark] class BlockManager(
10391039
bytes: ByteBuffer,
10401040
serializer: Serializer = defaultSerializer): Iterator[Any] = {
10411041
bytes.rewind()
1042-
1043-
def getIterator: Iterator[Any] = {
1044-
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
1045-
serializer.newInstance().deserializeStream(stream).asIterator
1046-
}
1047-
1048-
if (blockId.isShuffle) {
1049-
/* Reducer may need to read many local shuffle blocks and will wrap them into Iterators
1050-
* at the beginning. The wrapping will cost some memory (compression instance
1051-
* initialization, etc.). Reducer reads shuffle blocks one by one so we could do the
1052-
* wrapping lazily to save memory. */
1053-
class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
1054-
lazy val proxy = f
1055-
override def hasNext: Boolean = proxy.hasNext
1056-
override def next(): Any = proxy.next()
1057-
}
1058-
new LazyProxyIterator(getIterator)
1059-
} else {
1060-
getIterator
1061-
}
1042+
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
1043+
serializer.newInstance().deserializeStream(stream).asIterator
10621044
}
10631045

10641046
def stop(): Unit = {

core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,20 +76,24 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
7676

7777
iterator.initialize()
7878

79-
// 3rd getLocalFromDisk invocation should be failed
80-
verify(blockManager, times(3)).getLocalFromDisk(any(), any())
79+
// Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk.
80+
verify(blockManager, times(0)).getLocalFromDisk(any(), any())
8181

8282
assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
8383
// the 2nd element of the tuple returned by iterator.next should be defined when fetching successfully
84-
assert(iterator.next._2.isDefined, "1st element should be defined but is not actually defined")
84+
assert(iterator.next()._2.isDefined, "1st element should be defined but is not actually defined")
85+
verify(blockManager, times(1)).getLocalFromDisk(any(), any())
86+
8587
assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element")
86-
assert(iterator.next._2.isDefined, "2nd element should be defined but is not actually defined")
88+
assert(iterator.next()._2.isDefined, "2nd element should be defined but is not actually defined")
89+
verify(blockManager, times(2)).getLocalFromDisk(any(), any())
90+
8791
assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements")
8892
// 3rd fetch should be failed
89-
assert(!iterator.next._2.isDefined, "3rd element should not be defined but is actually defined")
90-
assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements")
91-
// Don't call next() after fetching non-defined element even if thare are rest of elements in the iterator.
92-
// Otherwise, BasicBlockFetcherIterator hangs up.
93+
intercept[Exception] {
94+
iterator.next()
95+
}
96+
verify(blockManager, times(3)).getLocalFromDisk(any(), any())
9397
}
9498

9599

@@ -127,8 +131,8 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
127131

128132
iterator.initialize()
129133

130-
// getLocalFromDis should be invoked for all of 5 blocks
131-
verify(blockManager, times(5)).getLocalFromDisk(any(), any())
134+
// Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk.
135+
verify(blockManager, times(0)).getLocalFromDisk(any(), any())
132136

133137
assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
134138
assert(iterator.next._2.isDefined, "All elements should be defined but 1st element is not actually defined")
@@ -139,7 +143,9 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
139143
assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements")
140144
assert(iterator.next._2.isDefined, "All elements should be defined but 4th element is not actually defined")
141145
assert(iterator.hasNext, "iterator should have 5 elements but actually has 4 elements")
142-
assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined")
146+
assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined")
147+
148+
verify(blockManager, times(5)).getLocalFromDisk(any(), any())
143149
}
144150

145151
test("block fetch from remote fails using BasicBlockFetcherIterator") {

0 commit comments

Comments
 (0)