Skip to content

Commit 4ea1712

Browse files
committed
Small code cleanup for readability
1 parent 7429a98 commit 4ea1712

File tree

1 file changed

+10
-11
lines changed

1 file changed

+10
-11
lines changed

core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,16 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
5151
(address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
5252
}
5353

54-
def unpackBlock(blockPair: (BlockId, Try[InputStream])) : (BlockId, InputStream) = {
54+
val blockFetcherItr = new ShuffleBlockFetcherIterator(
55+
context,
56+
SparkEnv.get.blockManager.shuffleClient,
57+
blockManager,
58+
blocksByAddress,
59+
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
60+
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)
61+
62+
// Make sure that fetch failures are wrapped inside a FetchFailedException for the scheduler
63+
blockFetcherItr.map { blockPair =>
5564
val blockId = blockPair._1
5665
val blockOption = blockPair._2
5766
blockOption match {
@@ -70,15 +79,5 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
7079
}
7180
}
7281
}
73-
74-
val blockFetcherItr = new ShuffleBlockFetcherIterator(
75-
context,
76-
SparkEnv.get.blockManager.shuffleClient,
77-
blockManager,
78-
blocksByAddress,
79-
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
80-
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)
81-
82-
blockFetcherItr.map(unpackBlock)
8382
}
8483
}

0 commit comments

Comments
 (0)