Skip to content

Commit d8a5d36

Browse files
committed
[SPARK-6627] Finished rename to ShuffleBlockResolver
The previous cleanup-commit for SPARK-6627 renamed ShuffleBlockManager to ShuffleBlockResolver, but didn't rename the associated subclasses and variables; this commit does that.
1 parent 555213e commit d8a5d36

File tree

16 files changed

+72
-73
lines changed

16 files changed

+72
-73
lines changed

core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala renamed to core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.shuffle
1919

2020
import java.io.File
21-
import java.nio.ByteBuffer
2221
import java.util.concurrent.ConcurrentLinkedQueue
2322
import java.util.concurrent.atomic.AtomicInteger
2423

@@ -29,7 +28,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics
2928
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
3029
import org.apache.spark.network.netty.SparkTransportConf
3130
import org.apache.spark.serializer.Serializer
32-
import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
31+
import org.apache.spark.shuffle.FileShuffleBlockResolver.ShuffleFileGroup
3332
import org.apache.spark.storage._
3433
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
3534
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
@@ -64,9 +63,9 @@ private[spark] trait ShuffleWriterGroup {
6463
* files within a ShuffleFileGroups associated with the block's reducer.
6564
*/
6665
// Note: Changes to the format in this file should be kept in sync with
67-
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData().
66+
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData().
6867
private[spark]
69-
class FileShuffleBlockManager(conf: SparkConf)
68+
class FileShuffleBlockResolver(conf: SparkConf)
7069
extends ShuffleBlockResolver with Logging {
7170

7271
private val transportConf = SparkTransportConf.fromSparkConf(conf)
@@ -243,7 +242,7 @@ class FileShuffleBlockManager(conf: SparkConf)
243242
}
244243

245244
private[spark]
246-
object FileShuffleBlockManager {
245+
object FileShuffleBlockResolver {
247246
/**
248247
* A group of shuffle files, one per reducer.
249248
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.

core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala renamed to core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.shuffle
1919

2020
import java.io._
21-
import java.nio.ByteBuffer
2221

2322
import com.google.common.io.ByteStreams
2423

@@ -28,7 +27,7 @@ import org.apache.spark.network.netty.SparkTransportConf
2827
import org.apache.spark.storage._
2928
import org.apache.spark.util.Utils
3029

31-
import IndexShuffleBlockManager.NOOP_REDUCE_ID
30+
import IndexShuffleBlockResolver.NOOP_REDUCE_ID
3231

3332
/**
3433
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
@@ -40,9 +39,9 @@ import IndexShuffleBlockManager.NOOP_REDUCE_ID
4039
*
4140
*/
4241
// Note: Changes to the format in this file should be kept in sync with
43-
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
42+
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData().
4443
private[spark]
45-
class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
44+
class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver {
4645

4746
private lazy val blockManager = SparkEnv.get.blockManager
4847

@@ -115,7 +114,7 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
115114
override def stop(): Unit = {}
116115
}
117116

118-
private[spark] object IndexShuffleBlockManager {
117+
private[spark] object IndexShuffleBlockResolver {
119118
// No-op reduce ID used in interactions with disk store and BlockObjectWriter.
120119
// The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
121120
// shuffle outputs for several reduces are glommed into a single file.

core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.shuffle._
2626
*/
2727
private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager {
2828

29-
private val fileShuffleBlockManager = new FileShuffleBlockManager(conf)
29+
private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)
3030

3131
/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
3232
override def registerShuffle[K, V, C](
@@ -61,8 +61,8 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager
6161
shuffleBlockResolver.removeShuffle(shuffleId)
6262
}
6363

64-
override def shuffleBlockResolver: FileShuffleBlockManager = {
65-
fileShuffleBlockManager
64+
override def shuffleBlockResolver: FileShuffleBlockResolver = {
65+
fileShuffleBlockResolver
6666
}
6767

6868
/** Shut down this ShuffleManager. */

core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.shuffle._
2525
import org.apache.spark.storage.BlockObjectWriter
2626

2727
private[spark] class HashShuffleWriter[K, V](
28-
shuffleBlockManager: FileShuffleBlockManager,
28+
shuffleBlockResolver: FileShuffleBlockResolver,
2929
handle: BaseShuffleHandle[K, V, _],
3030
mapId: Int,
3131
context: TaskContext)
@@ -45,7 +45,7 @@ private[spark] class HashShuffleWriter[K, V](
4545

4646
private val blockManager = SparkEnv.get.blockManager
4747
private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
48-
private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
48+
private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
4949
writeMetrics)
5050

5151
/** Write a bunch of records to this task's output */

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader
2525

2626
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {
2727

28-
private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf)
28+
private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf)
2929
private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()
3030

3131
/**
@@ -72,8 +72,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
7272
true
7373
}
7474

75-
override def shuffleBlockResolver: IndexShuffleBlockManager = {
76-
indexShuffleBlockManager
75+
override def shuffleBlockResolver: IndexShuffleBlockResolver = {
76+
indexShuffleBlockResolver
7777
}
7878

7979
/** Shut down this ShuffleManager. */

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ package org.apache.spark.shuffle.sort
2020
import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext}
2121
import org.apache.spark.executor.ShuffleWriteMetrics
2222
import org.apache.spark.scheduler.MapStatus
23-
import org.apache.spark.shuffle.{IndexShuffleBlockManager, ShuffleWriter, BaseShuffleHandle}
23+
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle}
2424
import org.apache.spark.storage.ShuffleBlockId
2525
import org.apache.spark.util.collection.ExternalSorter
2626

2727
private[spark] class SortShuffleWriter[K, V, C](
28-
shuffleBlockManager: IndexShuffleBlockManager,
28+
shuffleBlockResolver: IndexShuffleBlockResolver,
2929
handle: BaseShuffleHandle[K, V, C],
3030
mapId: Int,
3131
context: TaskContext)
@@ -65,10 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C](
6565
// Don't bother including the time to open the merged output file in the shuffle write time,
6666
// because it just opens a single file, so is typically too fast to measure accurately
6767
// (see SPARK-3570).
68-
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
69-
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID)
68+
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
69+
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
7070
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
71-
shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
71+
shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
7272

7373
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
7474
}
@@ -84,7 +84,7 @@ private[spark] class SortShuffleWriter[K, V, C](
8484
return Option(mapStatus)
8585
} else {
8686
// The map task failed, so delete our output data.
87-
shuffleBlockManager.removeDataByMap(dep.shuffleId, mapId)
87+
shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId)
8888
return None
8989
}
9090
} finally {

core/src/main/scala/org/apache/spark/storage/BlockId.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
5454
}
5555

5656
// Format of the shuffle block ids (including data and index) should be kept in sync with
57-
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getBlockData().
57+
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
5858
@DeveloperApi
5959
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
6060
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -436,10 +436,11 @@ private[spark] class BlockManager(
436436
// As an optimization for map output fetches, if the block is for a shuffle, return it
437437
// without acquiring a lock; the disk store never deletes (recent) items so this should work
438438
if (blockId.isShuffle) {
439-
val shuffleBlockManager = shuffleManager.shuffleBlockResolver
439+
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
440440
// TODO: This should gracefully handle case where local block is not available. Currently
441441
// downstream code will throw an exception.
442-
Option(shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
442+
Option(
443+
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
443444
} else {
444445
doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
445446
}

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
5555

5656
/** Looks up a file by hashing it into one of our local subdirectories. */
5757
// This method should be kept in sync with
58-
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile().
58+
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
5959
def getFile(filename: String): File = {
6060
// Figure out which local directory it hashes to, and which subdirectory in that
6161
val hash = Utils.nonNegativeHash(filename)

core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf}
2727
import org.apache.spark.executor.ShuffleWriteMetrics
2828
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
2929
import org.apache.spark.serializer.JavaSerializer
30-
import org.apache.spark.shuffle.FileShuffleBlockManager
30+
import org.apache.spark.shuffle.FileShuffleBlockResolver
3131
import org.apache.spark.storage.{ShuffleBlockId, FileSegment}
3232

3333
class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
@@ -53,10 +53,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
5353

5454
sc = new SparkContext("local", "test", conf)
5555

56-
val shuffleBlockManager =
57-
SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockManager]
56+
val shuffleBlockResolver =
57+
SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockResolver]
5858

59-
val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf),
59+
val shuffle1 = shuffleBlockResolver.forMapTask(1, 1, 1, new JavaSerializer(conf),
6060
new ShuffleWriteMetrics)
6161
for (writer <- shuffle1.writers) {
6262
writer.write("test1")
@@ -69,7 +69,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
6969
val shuffle1Segment = shuffle1.writers(0).fileSegment()
7070
shuffle1.releaseWriters(success = true)
7171

72-
val shuffle2 = shuffleBlockManager.forMapTask(1, 2, 1, new JavaSerializer(conf),
72+
val shuffle2 = shuffleBlockResolver.forMapTask(1, 2, 1, new JavaSerializer(conf),
7373
new ShuffleWriteMetrics)
7474

7575
for (writer <- shuffle2.writers) {
@@ -88,7 +88,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
8888
// of block based on remaining data in file : which could mess things up when there is
8989
// concurrent read and writes happening to the same shuffle group.
9090

91-
val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf),
91+
val shuffle3 = shuffleBlockResolver.forMapTask(1, 3, 1, new JavaSerializer(testConf),
9292
new ShuffleWriteMetrics)
9393
for (writer <- shuffle3.writers) {
9494
writer.write("test3")
@@ -98,10 +98,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
9898
writer.commitAndClose()
9999
}
100100
// check before we register.
101-
checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
101+
checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
102102
shuffle3.releaseWriters(success = true)
103-
checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
104-
shuffleBlockManager.removeShuffle(1)
103+
checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
104+
shuffleBlockResolver.removeShuffle(1)
105105
}
106106

107107
def writeToFile(file: File, numBytes: Int) {

0 commit comments

Comments
 (0)