Skip to content

Commit 7429a98

Browse files
committed
Update tests to check that BufferReleasingStream is closing delegate InputStream
1 parent f458489 commit 7429a98

File tree

3 files changed

+13
-3
lines changed

3 files changed

+13
-3
lines changed

core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ private[spark] class HashShuffleReader[K, C](
5151

5252
// Create a key/value iterator for each stream
5353
val recordIter = wrappedStreams.flatMap { wrappedStream =>
54+
// Note: the asKeyValueIterator below wraps a key/value iterator inside of a
55+
// NextIterator. The NextIterator makes sure that close() is called on the
56+
// underlying InputStream when all records have been read.
5457
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
5558
}
5659

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,9 +314,12 @@ final class ShuffleBlockFetcherIterator(
314314
}
315315
}
316316

317-
/** Helper class that ensures a ManagerBuffer is released upon InputStream.close() */
317+
/**
318+
* Helper class that ensures a ManagedBuffer is release upon InputStream.close()
319+
* Note: the delegate parameter is private[storage] to make it available to tests.
320+
*/
318321
private class BufferReleasingInputStream(
319-
delegate: InputStream,
322+
private[storage] val delegate: InputStream,
320323
iterator: ShuffleBlockFetcherIterator)
321324
extends InputStream {
322325
private var closed = false

core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,16 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite {
110110

111111
// Make sure we release buffers when a wrapped input stream is closed.
112112
val mockBuf = localBlocks.getOrElse(blockId, remoteBlocks(blockId))
113-
val wrappedInputStream = new BufferReleasingInputStream(inputStream.get, iterator)
113+
// Note: ShuffleBlockFetcherIterator wraps input streams in a BufferReleasingInputStream
114+
val wrappedInputStream = inputStream.get.asInstanceOf[BufferReleasingInputStream]
114115
verify(mockBuf, times(0)).release()
116+
verify(wrappedInputStream.delegate, times(0)).close()
115117
wrappedInputStream.close()
116118
verify(mockBuf, times(1)).release()
119+
verify(wrappedInputStream.delegate, times(1)).close()
117120
wrappedInputStream.close() // close should be idempotent
118121
verify(mockBuf, times(1)).release()
122+
verify(wrappedInputStream.delegate, times(1)).close()
119123
}
120124

121125
// 3 local blocks, and 2 remote blocks

0 commit comments

Comments
 (0)