Skip to content

Commit 1ada641

Browse files
committed
containsBlock/removeDataByMap
1 parent fa9c72f commit 1ada641

File tree

4 files changed

+32
-17
lines changed

4 files changed

+32
-17
lines changed

core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,11 @@ private[spark] class IndexShuffleBlockResolver(
6262
private val remoteShuffleMaxDisk: Option[Long] =
6363
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE)
6464

65-
def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, mapId, None)
65+
def getDataFile(shuffleId: Int, mapId: Long): File =
66+
getDataFile(shuffleId, mapId, None, true)
67+
68+
def getDataFile(shuffleId: Int, mapId: Long, needCreate: Boolean): File =
69+
getDataFile(shuffleId, mapId, None, needCreate)
6670

6771
/**
6872
* Get the shuffle files that are stored locally. Used for block migrations.
@@ -95,12 +99,16 @@ private[spark] class IndexShuffleBlockResolver(
9599
* When the dirs parameter is None then use the disk manager's local directories. Otherwise,
96100
* read from the specified directories.
97101
*/
98-
def getDataFile(shuffleId: Int, mapId: Long, dirs: Option[Array[String]]): File = {
102+
def getDataFile(
103+
shuffleId: Int,
104+
mapId: Long,
105+
dirs: Option[Array[String]],
106+
needCreate: Boolean): File = {
99107
val blockId = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
100108
dirs
101109
.map(d =>
102110
new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, blockId.name)))
103-
.getOrElse(blockManager.diskBlockManager.getFile(blockId))
111+
.getOrElse(blockManager.diskBlockManager.getFile(blockId, needCreate))
104112
}
105113

106114
/**
@@ -112,12 +120,13 @@ private[spark] class IndexShuffleBlockResolver(
112120
def getIndexFile(
113121
shuffleId: Int,
114122
mapId: Long,
115-
dirs: Option[Array[String]] = None): File = {
123+
dirs: Option[Array[String]] = None,
124+
needCreate: Boolean = true): File = {
116125
val blockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
117126
dirs
118127
.map(d =>
119128
new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, blockId.name)))
120-
.getOrElse(blockManager.diskBlockManager.getFile(blockId))
129+
.getOrElse(blockManager.diskBlockManager.getFile(blockId, needCreate))
121130
}
122131

123132
private def getMergedBlockDataFile(
@@ -154,17 +163,18 @@ private[spark] class IndexShuffleBlockResolver(
154163
* Remove data file and index file that contain the output data from one map.
155164
*/
156165
def removeDataByMap(shuffleId: Int, mapId: Long): Unit = {
157-
var file = getDataFile(shuffleId, mapId)
166+
var file = getDataFile(shuffleId, mapId, needCreate = false)
158167
if (file.exists() && !file.delete()) {
159168
logWarning(s"Error deleting data ${file.getPath()}")
160169
}
161170

162-
file = getIndexFile(shuffleId, mapId)
171+
file = getIndexFile(shuffleId, mapId, needCreate = false)
163172
if (file.exists() && !file.delete()) {
164173
logWarning(s"Error deleting index ${file.getPath()}")
165174
}
166175

167-
file = getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM))
176+
file = getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM),
177+
needCreate = false)
168178
if (file.exists() && !file.delete()) {
169179
logWarning(s"Error deleting checksum ${file.getPath()}")
170180
}
@@ -549,13 +559,14 @@ private[spark] class IndexShuffleBlockResolver(
549559
shuffleId: Int,
550560
mapId: Long,
551561
algorithm: String,
552-
dirs: Option[Array[String]] = None): File = {
562+
dirs: Option[Array[String]] = None,
563+
needCreate: Boolean = true): File = {
553564
val blockId = ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
554565
val fileName = ShuffleChecksumHelper.getChecksumFileName(blockId.name, algorithm)
555566
dirs
556567
.map(d =>
557568
new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, fileName)))
558-
.getOrElse(blockManager.diskBlockManager.getFile(fileName))
569+
.getOrElse(blockManager.diskBlockManager.getFile(fileName, needCreate))
559570
}
560571

561572
override def getBlockData(
@@ -594,7 +605,7 @@ private[spark] class IndexShuffleBlockResolver(
594605
}
595606
new FileSegmentManagedBuffer(
596607
transportConf,
597-
getDataFile(shuffleId, mapId, dirs),
608+
getDataFile(shuffleId, mapId, dirs, true),
598609
startOffset,
599610
endOffset - startOffset)
600611
} finally {

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,8 @@ private[spark] class BlockManager(
302302
val shuffleBlock = blockId.asInstanceOf[ShuffleBlockId]
303303
val resolver = shuffleManager.shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver]
304304
val checksumFile =
305-
resolver.getChecksumFile(shuffleBlock.shuffleId, shuffleBlock.mapId, algorithm)
305+
resolver.getChecksumFile(shuffleBlock.shuffleId, shuffleBlock.mapId, algorithm,
306+
needCreate = false)
306307
val reduceId = shuffleBlock.reduceId
307308
ShuffleChecksumHelper.diagnoseCorruption(
308309
algorithm, checksumFile, reduceId, resolver.getBlockData(shuffleBlock), checksumByReader)

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,10 @@ private[spark] class DiskBlockManager(
9191
// This method should be kept in sync with
9292
// org.apache.spark.network.shuffle.ExecutorDiskUtils#getFilePath().
9393
def getFile(filename: String): File = {
94-
getFile(filename, true)
94+
getFile(filename, needCreate = true)
9595
}
9696

97-
def getFile(filename: String, needCreate: Boolean = true): File = {
97+
def getFile(filename: String, needCreate: Boolean): File = {
9898
// Figure out which local directory it hashes to, and which subdirectory in that
9999
val hash = Utils.nonNegativeHash(filename)
100100
val dirId = hash % localDirs.length
@@ -131,7 +131,10 @@ private[spark] class DiskBlockManager(
131131
new File(subDir, filename)
132132
}
133133

134-
def getFile(blockId: BlockId): File = getFile(blockId.name)
134+
def getFile(blockId: BlockId): File = getFile(blockId.name, needCreate = true)
135+
136+
def getFile(blockId: BlockId, needCreate: Boolean): File =
137+
getFile(blockId.name, needCreate)
135138

136139
/**
137140
* This should be in sync with
@@ -162,7 +165,7 @@ private[spark] class DiskBlockManager(
162165

163166
/** Check if disk block manager has a block. */
164167
def containsBlock(blockId: BlockId): Boolean = {
165-
getFile(blockId.name).exists()
168+
getFile(blockId.name, needCreate = false).exists()
166169
}
167170

168171
/** List all the files currently stored on disk by the disk manager. */

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ private[spark] class DiskStore(
128128

129129
def remove(blockId: BlockId): Boolean = {
130130
blockSizes.remove(blockId)
131-
val file = diskManager.getFile(blockId.name, false)
131+
val file = diskManager.getFile(blockId.name, needCreate = false)
132132
if (file.exists()) {
133133
val ret = file.delete()
134134
if (!ret) {

0 commit comments

Comments
 (0)