Skip to content

Commit cb53a2c

Browse files
committed
2 parents f6a8a40 + d90434c commit cb53a2c

File tree

53 files changed

+729
-507
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+729
-507
lines changed

core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala renamed to core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala

Lines changed: 37 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,22 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.storage
18+
package org.apache.spark.shuffle
1919

2020
import java.io.File
21+
import java.nio.ByteBuffer
2122
import java.util.concurrent.ConcurrentLinkedQueue
2223
import java.util.concurrent.atomic.AtomicInteger
2324

2425
import scala.collection.JavaConversions._
2526

26-
import org.apache.spark.Logging
27+
import org.apache.spark.{SparkEnv, SparkConf, Logging}
28+
import org.apache.spark.executor.ShuffleWriteMetrics
2729
import org.apache.spark.serializer.Serializer
28-
import org.apache.spark.shuffle.ShuffleManager
29-
import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
30+
import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
31+
import org.apache.spark.storage._
3032
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
3133
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
32-
import org.apache.spark.shuffle.sort.SortShuffleManager
33-
import org.apache.spark.executor.ShuffleWriteMetrics
3434

3535
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
3636
private[spark] trait ShuffleWriterGroup {
@@ -61,20 +61,18 @@ private[spark] trait ShuffleWriterGroup {
6161
* each block stored in each file. In order to find the location of a shuffle block, we search the
6262
* files within a ShuffleFileGroups associated with the block's reducer.
6363
*/
64-
// TODO: Factor this into a separate class for each ShuffleManager implementation
64+
6565
private[spark]
66-
class ShuffleBlockManager(blockManager: BlockManager,
67-
shuffleManager: ShuffleManager) extends Logging {
68-
def conf = blockManager.conf
66+
class FileShuffleBlockManager(conf: SparkConf)
67+
extends ShuffleBlockManager with Logging {
68+
69+
private lazy val blockManager = SparkEnv.get.blockManager
6970

7071
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
7172
// TODO: Remove this once the shuffle file consolidation feature is stable.
72-
val consolidateShuffleFiles =
73+
private val consolidateShuffleFiles =
7374
conf.getBoolean("spark.shuffle.consolidateFiles", false)
7475

75-
// Are we using sort-based shuffle?
76-
val sortBasedShuffle = shuffleManager.isInstanceOf[SortShuffleManager]
77-
7876
private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
7977

8078
/**
@@ -93,22 +91,11 @@ class ShuffleBlockManager(blockManager: BlockManager,
9391
val completedMapTasks = new ConcurrentLinkedQueue[Int]()
9492
}
9593

96-
type ShuffleId = Int
9794
private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
9895

9996
private val metadataCleaner =
10097
new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
10198

102-
/**
103-
* Register a completed map without getting a ShuffleWriterGroup. Used by sort-based shuffle
104-
* because it just writes a single file by itself.
105-
*/
106-
def addCompletedMap(shuffleId: Int, mapId: Int, numBuckets: Int): Unit = {
107-
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
108-
val shuffleState = shuffleStates(shuffleId)
109-
shuffleState.completedMapTasks.add(mapId)
110-
}
111-
11299
/**
113100
* Get a ShuffleWriterGroup for the given map task, which will register it as complete
114101
* when the writers are closed successfully
@@ -181,17 +168,30 @@ class ShuffleBlockManager(blockManager: BlockManager,
181168

182169
/**
183170
* Returns the physical file segment in which the given BlockId is located.
184-
* This function should only be called if shuffle file consolidation is enabled, as it is
185-
* an error condition if we don't find the expected block.
186171
*/
187-
def getBlockLocation(id: ShuffleBlockId): FileSegment = {
188-
// Search all file groups associated with this shuffle.
189-
val shuffleState = shuffleStates(id.shuffleId)
190-
for (fileGroup <- shuffleState.allFileGroups) {
191-
val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId)
192-
if (segment.isDefined) { return segment.get }
172+
private def getBlockLocation(id: ShuffleBlockId): FileSegment = {
173+
if (consolidateShuffleFiles) {
174+
// Search all file groups associated with this shuffle.
175+
val shuffleState = shuffleStates(id.shuffleId)
176+
val iter = shuffleState.allFileGroups.iterator
177+
while (iter.hasNext) {
178+
val segment = iter.next.getFileSegmentFor(id.mapId, id.reduceId)
179+
if (segment.isDefined) { return segment.get }
180+
}
181+
throw new IllegalStateException("Failed to find shuffle block: " + id)
182+
} else {
183+
val file = blockManager.diskBlockManager.getFile(id)
184+
new FileSegment(file, 0, file.length())
193185
}
194-
throw new IllegalStateException("Failed to find shuffle block: " + id)
186+
}
187+
188+
override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = {
189+
val segment = getBlockLocation(blockId)
190+
blockManager.diskStore.getBytes(segment)
191+
}
192+
193+
override def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, ByteBuffer] = {
194+
Left(getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]))
195195
}
196196

197197
/** Remove all the blocks / files and metadata related to a particular shuffle. */
@@ -207,14 +207,7 @@ class ShuffleBlockManager(blockManager: BlockManager,
207207
private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = {
208208
shuffleStates.get(shuffleId) match {
209209
case Some(state) =>
210-
if (sortBasedShuffle) {
211-
// There's a single block ID for each map, plus an index file for it
212-
for (mapId <- state.completedMapTasks) {
213-
val blockId = new ShuffleBlockId(shuffleId, mapId, 0)
214-
blockManager.diskBlockManager.getFile(blockId).delete()
215-
blockManager.diskBlockManager.getFile(blockId.name + ".index").delete()
216-
}
217-
} else if (consolidateShuffleFiles) {
210+
if (consolidateShuffleFiles) {
218211
for (fileGroup <- state.allFileGroups; file <- fileGroup.files) {
219212
file.delete()
220213
}
@@ -240,13 +233,13 @@ class ShuffleBlockManager(blockManager: BlockManager,
240233
shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
241234
}
242235

243-
def stop() {
236+
override def stop() {
244237
metadataCleaner.cancel()
245238
}
246239
}
247240

248241
private[spark]
249-
object ShuffleBlockManager {
242+
object FileShuffleBlockManager {
250243
/**
251244
* A group of shuffle files, one per reducer.
252245
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle
19+
20+
import java.io._
21+
import java.nio.ByteBuffer
22+
23+
import org.apache.spark.SparkEnv
24+
import org.apache.spark.storage._
25+
26+
/**
27+
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
28+
* Data of shuffle blocks from the same map task are stored in a single consolidated data file.
29+
* The offsets of the data blocks in the data file are stored in a separate index file.
30+
*
31+
* We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data"
32+
* as the filename postfix for data file, and ".index" as the filename postfix for index file.
33+
*
34+
*/
35+
private[spark]
36+
class IndexShuffleBlockManager extends ShuffleBlockManager {
37+
38+
private lazy val blockManager = SparkEnv.get.blockManager
39+
40+
/**
41+
* Mapping to a single shuffleBlockId with reduce ID 0.
42+
* */
43+
def consolidateId(shuffleId: Int, mapId: Int): ShuffleBlockId = {
44+
ShuffleBlockId(shuffleId, mapId, 0)
45+
}
46+
47+
def getDataFile(shuffleId: Int, mapId: Int): File = {
48+
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0))
49+
}
50+
51+
private def getIndexFile(shuffleId: Int, mapId: Int): File = {
52+
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0))
53+
}
54+
55+
/**
56+
* Remove data file and index file that contain the output data from one map.
57+
* */
58+
def removeDataByMap(shuffleId: Int, mapId: Int): Unit = {
59+
var file = getDataFile(shuffleId, mapId)
60+
if (file.exists()) {
61+
file.delete()
62+
}
63+
64+
file = getIndexFile(shuffleId, mapId)
65+
if (file.exists()) {
66+
file.delete()
67+
}
68+
}
69+
70+
/**
71+
* Write an index file with the offsets of each block, plus a final offset at the end for the
72+
* end of the output file. This will be used by getBlockLocation to figure out where each block
73+
* begins and ends.
74+
* */
75+
def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]) = {
76+
val indexFile = getIndexFile(shuffleId, mapId)
77+
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
78+
try {
79+
// We take in lengths of each block, need to convert it to offsets.
80+
var offset = 0L
81+
out.writeLong(offset)
82+
83+
for (length <- lengths) {
84+
offset += length
85+
out.writeLong(offset)
86+
}
87+
} finally {
88+
out.close()
89+
}
90+
}
91+
92+
/**
93+
* Get the location of a block in a map output file. Uses the index file we create for it.
94+
* */
95+
private def getBlockLocation(blockId: ShuffleBlockId): FileSegment = {
96+
// The block is actually going to be a range of a single map output file for this map, so
97+
// find out the consolidated file, then the offset within that from our index
98+
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
99+
100+
val in = new DataInputStream(new FileInputStream(indexFile))
101+
try {
102+
in.skip(blockId.reduceId * 8)
103+
val offset = in.readLong()
104+
val nextOffset = in.readLong()
105+
new FileSegment(getDataFile(blockId.shuffleId, blockId.mapId), offset, nextOffset - offset)
106+
} finally {
107+
in.close()
108+
}
109+
}
110+
111+
override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = {
112+
val segment = getBlockLocation(blockId)
113+
blockManager.diskStore.getBytes(segment)
114+
}
115+
116+
override def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, ByteBuffer] = {
117+
Left(getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]))
118+
}
119+
120+
override def stop() = {}
121+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle
19+
20+
import java.nio.ByteBuffer
21+
22+
import org.apache.spark.storage.{FileSegment, ShuffleBlockId}
23+
24+
private[spark]
25+
trait ShuffleBlockManager {
26+
type ShuffleId = Int
27+
28+
/**
29+
* Get shuffle block data managed by the local ShuffleBlockManager.
30+
* @return Some(ByteBuffer) if block found, otherwise None.
31+
*/
32+
def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer]
33+
34+
def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, ByteBuffer]
35+
36+
def stop(): Unit
37+
}
38+

core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,13 @@ private[spark] trait ShuffleManager {
4949
endPartition: Int,
5050
context: TaskContext): ShuffleReader[K, C]
5151

52-
/** Remove a shuffle's metadata from the ShuffleManager. */
53-
def unregisterShuffle(shuffleId: Int)
52+
/**
53+
* Remove a shuffle's metadata from the ShuffleManager.
54+
* @return true if the metadata removed successfully, otherwise false.
55+
*/
56+
def unregisterShuffle(shuffleId: Int): Boolean
57+
58+
def shuffleBlockManager: ShuffleBlockManager
5459

5560
/** Shut down this ShuffleManager. */
5661
def stop(): Unit

core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ import org.apache.spark.shuffle._
2525
* mapper (possibly reusing these across waves of tasks).
2626
*/
2727
private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager {
28+
29+
private val fileShuffleBlockManager = new FileShuffleBlockManager(conf)
30+
2831
/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
2932
override def registerShuffle[K, V, C](
3033
shuffleId: Int,
@@ -49,12 +52,21 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager
4952
/** Get a writer for a given partition. Called on executors by map tasks. */
5053
override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
5154
: ShuffleWriter[K, V] = {
52-
new HashShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
55+
new HashShuffleWriter(
56+
shuffleBlockManager, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
5357
}
5458

5559
/** Remove a shuffle's metadata from the ShuffleManager. */
56-
override def unregisterShuffle(shuffleId: Int): Unit = {}
60+
override def unregisterShuffle(shuffleId: Int): Boolean = {
61+
shuffleBlockManager.removeShuffle(shuffleId)
62+
}
63+
64+
override def shuffleBlockManager: FileShuffleBlockManager = {
65+
fileShuffleBlockManager
66+
}
5767

5868
/** Shut down this ShuffleManager. */
59-
override def stop(): Unit = {}
69+
override def stop(): Unit = {
70+
shuffleBlockManager.stop()
71+
}
6072
}

core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@
1717

1818
package org.apache.spark.shuffle.hash
1919

20-
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleWriter}
21-
import org.apache.spark.{Logging, MapOutputTracker, SparkEnv, TaskContext}
22-
import org.apache.spark.storage.{BlockObjectWriter}
23-
import org.apache.spark.serializer.Serializer
20+
import org.apache.spark._
2421
import org.apache.spark.executor.ShuffleWriteMetrics
2522
import org.apache.spark.scheduler.MapStatus
23+
import org.apache.spark.serializer.Serializer
24+
import org.apache.spark.shuffle._
25+
import org.apache.spark.storage.BlockObjectWriter
2626

2727
private[spark] class HashShuffleWriter[K, V](
28+
shuffleBlockManager: FileShuffleBlockManager,
2829
handle: BaseShuffleHandle[K, V, _],
2930
mapId: Int,
3031
context: TaskContext)
@@ -43,7 +44,6 @@ private[spark] class HashShuffleWriter[K, V](
4344
metrics.shuffleWriteMetrics = Some(writeMetrics)
4445

4546
private val blockManager = SparkEnv.get.blockManager
46-
private val shuffleBlockManager = blockManager.shuffleBlockManager
4747
private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
4848
private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
4949
writeMetrics)

0 commit comments

Comments
 (0)