Skip to content

Commit 9400d9a

Browse files
committed
Merge branch 'master' into SPARK-28704
2 parents b9dd1e4 + cf74901 commit 9400d9a

File tree

84 files changed

+2324
-1414
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+2324
-1414
lines changed

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.serializer.Serializer
2525
import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteProcessor}
26+
import org.apache.spark.storage.BlockManagerId
2627

2728
/**
2829
* :: DeveloperApi ::
@@ -95,6 +96,20 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
9596
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
9697
shuffleId, this)
9798

99+
/**
100+
* Stores the location of the list of chosen external shuffle services for handling the
101+
* shuffle merge requests from mappers in this shuffle map stage.
102+
*/
103+
private[spark] var mergerLocs: Seq[BlockManagerId] = Nil
104+
105+
def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
106+
if (mergerLocs != null) {
107+
this.mergerLocs = mergerLocs
108+
}
109+
}
110+
111+
def getMergerLocs: Seq[BlockManagerId] = mergerLocs
112+
98113
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
99114
_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
100115
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1945,4 +1945,51 @@ package object config {
19451945
.version("3.0.1")
19461946
.booleanConf
19471947
.createWithDefault(false)
1948+
1949+
private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
1950+
ConfigBuilder("spark.shuffle.push.enabled")
1951+
.doc("Set to 'true' to enable push-based shuffle on the client side and this works in " +
1952+
"conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl " +
1953+
"which needs to be set with the appropriate " +
1954+
"org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based " +
1955+
"shuffle to be enabled")
1956+
.version("3.1.0")
1957+
.booleanConf
1958+
.createWithDefault(false)
1959+
1960+
private[spark] val SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS =
1961+
ConfigBuilder("spark.shuffle.push.maxRetainedMergerLocations")
1962+
.doc("Maximum number of shuffle push merger locations cached for push based shuffle. " +
1963+
"Currently, shuffle push merger locations are nothing but external shuffle services " +
1964+
"which are responsible for handling pushed blocks and merging them and serving " +
1965+
"merged blocks for later shuffle fetch.")
1966+
.version("3.1.0")
1967+
.intConf
1968+
.createWithDefault(500)
1969+
1970+
private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
1971+
ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
1972+
.doc("The minimum number of shuffle merger locations required to enable push based " +
1973+
"shuffle for a stage. This is specified as a ratio of the number of partitions in " +
1974+
"the child stage. For example, a reduce stage which has 100 partitions and uses the " +
1975+
"default value 0.05 requires at least 5 unique merger locations to enable push based " +
1976+
"shuffle. Merger locations are currently defined as external shuffle services.")
1977+
.version("3.1.0")
1978+
.doubleConf
1979+
.createWithDefault(0.05)
1980+
1981+
private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD =
1982+
ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold")
1983+
.doc(s"The static threshold for number of shuffle push merger locations should be " +
1984+
"available in order to enable push based shuffle for a stage. Note this config " +
1985+
s"works in conjunction with ${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key}. " +
1986+
"Maximum of spark.shuffle.push.mergersMinStaticThreshold and " +
1987+
s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} ratio number of mergers needed to " +
1988+
"enable push based shuffle for a stage. For eg: with 1000 partitions for the child " +
1989+
"stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " +
1990+
s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} set to 0.05, we would need " +
1991+
"at least 50 mergers to enable push based shuffle for that stage.")
1992+
.version("3.1.0")
1993+
.doubleConf
1994+
.createWithDefault(5)
19481995
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,8 @@ private[spark] class DAGScheduler(
249249
private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
250250
taskScheduler.setDAGScheduler(this)
251251

252+
private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf)
253+
252254
/**
253255
* Called by the TaskSetManager to report task's starting.
254256
*/
@@ -1252,6 +1254,33 @@ private[spark] class DAGScheduler(
12521254
execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
12531255
}
12541256

1257+
/**
1258+
* If push based shuffle is enabled, set the shuffle services to be used for the given
1259+
* shuffle map stage for block push/merge.
1260+
*
1261+
* Even with dynamic resource allocation kicking in and significantly reducing the number
1262+
* of available active executors, we would still be able to get sufficient shuffle service
1263+
* locations for block push/merge by getting the historical locations of past executors.
1264+
*/
1265+
private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = {
1266+
// TODO(SPARK-32920) Handle stage reuse/retry cases separately as without finalize
1267+
// TODO changes we cannot disable shuffle merge for the retry/reuse cases
1268+
val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
1269+
stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
1270+
1271+
if (mergerLocs.nonEmpty) {
1272+
stage.shuffleDep.setMergerLocs(mergerLocs)
1273+
logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" +
1274+
s" ${stage.shuffleDep.getMergerLocs.size} merger locations")
1275+
1276+
logDebug("List of shuffle push merger locations " +
1277+
s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
1278+
} else {
1279+
logInfo("No available merger locations." +
1280+
s" Push-based shuffle disabled for $stage (${stage.name})")
1281+
}
1282+
}
1283+
12551284
/** Called when stage's parents are available and we can now do its task. */
12561285
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
12571286
logDebug("submitMissingTasks(" + stage + ")")
@@ -1281,6 +1310,12 @@ private[spark] class DAGScheduler(
12811310
stage match {
12821311
case s: ShuffleMapStage =>
12831312
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
1313+
// Only generate merger location for a given shuffle dependency once. This way, even if
1314+
// this stage gets retried, it would still be merging blocks using the same set of
1315+
// shuffle services.
1316+
if (pushBasedShuffleEnabled) {
1317+
prepareShuffleServicesForShuffleMapStage(s)
1318+
}
12841319
case s: ResultStage =>
12851320
outputCommitCoordinator.stageStart(
12861321
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
@@ -2027,6 +2062,11 @@ private[spark] class DAGScheduler(
20272062
if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch) {
20282063
executorFailureEpoch(execId) = currentEpoch
20292064
logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
2065+
if (pushBasedShuffleEnabled) {
2066+
// Remove fetchFailed host in the shuffle push merger list for push based shuffle
2067+
hostToUnregisterOutputs.foreach(
2068+
host => blockManagerMaster.removeShufflePushMergerLocation(host))
2069+
}
20302070
blockManagerMaster.removeExecutor(execId)
20312071
clearCacheLocs()
20322072
}

core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.scheduler
1919

2020
import org.apache.spark.resource.ResourceProfile
21+
import org.apache.spark.storage.BlockManagerId
2122

2223
/**
2324
* A backend interface for scheduling systems that allows plugging in different ones under
@@ -92,4 +93,16 @@ private[spark] trait SchedulerBackend {
9293
*/
9394
def maxNumConcurrentTasks(rp: ResourceProfile): Int
9495

96+
/**
97+
* Get the list of host locations for push based shuffle
98+
*
99+
* Currently push based shuffle is disabled for both stage retry and stage reuse cases
100+
* (for eg: in the case where few partitions are lost due to failure). Hence this method
101+
* should be invoked only once for a ShuffleDependency.
102+
* @return List of external shuffle services locations
103+
*/
104+
def getShufflePushMergerLocations(
105+
numPartitions: Int,
106+
resourceProfileId: Int): Seq[BlockManagerId] = Nil
107+
95108
}

core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,6 @@ private[spark] object BlockManagerId {
145145
def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
146146
blockManagerIdCache.get(id)
147147
}
148+
149+
private[spark] val SHUFFLE_MERGER_IDENTIFIER = "shuffle-push-merger"
148150
}

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,26 @@ class BlockManagerMaster(
125125
driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))
126126
}
127127

128+
/**
129+
* Get a list of unique shuffle service locations where an executor is successfully
130+
* registered in the past for block push/merge with push based shuffle.
131+
*/
132+
def getShufflePushMergerLocations(
133+
numMergersNeeded: Int,
134+
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
135+
driverEndpoint.askSync[Seq[BlockManagerId]](
136+
GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter))
137+
}
138+
139+
/**
140+
* Remove the host from the candidate list of shuffle push mergers. This can be
141+
* triggered if there is a FetchFailedException on the host
142+
* @param host
143+
*/
144+
def removeShufflePushMergerLocation(host: String): Unit = {
145+
driverEndpoint.askSync[Seq[BlockManagerId]](RemoveShufflePushMergerLocation(host))
146+
}
147+
128148
def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
129149
driverEndpoint.askSync[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
130150
}

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,14 @@ class BlockManagerMasterEndpoint(
7474
// Mapping from block id to the set of block managers that have the block.
7575
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
7676

77+
// Mapping from host name to shuffle (mergers) services where the current app
78+
// registered an executor in the past. Older hosts are removed when the
79+
// maxRetainedMergerLocations size is reached in favor of newer locations.
80+
private val shuffleMergerLocations = new mutable.LinkedHashMap[String, BlockManagerId]()
81+
82+
// Maximum number of merger locations to cache
83+
private val maxRetainedMergerLocations = conf.get(config.SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS)
84+
7785
private val askThreadPool =
7886
ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100)
7987
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
@@ -92,6 +100,8 @@ class BlockManagerMasterEndpoint(
92100

93101
val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf)
94102

103+
private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf)
104+
95105
logInfo("BlockManagerMasterEndpoint up")
96106
// same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)
97107
// && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)`
@@ -139,6 +149,12 @@ class BlockManagerMasterEndpoint(
139149
case GetBlockStatus(blockId, askStorageEndpoints) =>
140150
context.reply(blockStatus(blockId, askStorageEndpoints))
141151

152+
case GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter) =>
153+
context.reply(getShufflePushMergerLocations(numMergersNeeded, hostsToFilter))
154+
155+
case RemoveShufflePushMergerLocation(host) =>
156+
context.reply(removeShufflePushMergerLocation(host))
157+
142158
case IsExecutorAlive(executorId) =>
143159
context.reply(blockManagerIdByExecutor.contains(executorId))
144160

@@ -360,6 +376,17 @@ class BlockManagerMasterEndpoint(
360376

361377
}
362378

379+
private def addMergerLocation(blockManagerId: BlockManagerId): Unit = {
380+
if (!blockManagerId.isDriver && !shuffleMergerLocations.contains(blockManagerId.host)) {
381+
val shuffleServerId = BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER,
382+
blockManagerId.host, externalShuffleServicePort)
383+
if (shuffleMergerLocations.size >= maxRetainedMergerLocations) {
384+
shuffleMergerLocations -= shuffleMergerLocations.head._1
385+
}
386+
shuffleMergerLocations(shuffleServerId.host) = shuffleServerId
387+
}
388+
}
389+
363390
private def removeExecutor(execId: String): Unit = {
364391
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
365392
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
@@ -526,6 +553,10 @@ class BlockManagerMasterEndpoint(
526553

527554
blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(),
528555
maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint, externalShuffleServiceBlockStatus)
556+
557+
if (pushBasedShuffleEnabled) {
558+
addMergerLocation(id)
559+
}
529560
}
530561
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
531562
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
@@ -657,6 +688,40 @@ class BlockManagerMasterEndpoint(
657688
}
658689
}
659690

691+
private def getShufflePushMergerLocations(
692+
numMergersNeeded: Int,
693+
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
694+
val blockManagerHosts = blockManagerIdByExecutor.values.map(_.host).toSet
695+
val filteredBlockManagerHosts = blockManagerHosts.filterNot(hostsToFilter.contains(_))
696+
val filteredMergersWithExecutors = filteredBlockManagerHosts.map(
697+
BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, _, externalShuffleServicePort))
698+
// Enough mergers are available as part of active executors list
699+
if (filteredMergersWithExecutors.size >= numMergersNeeded) {
700+
filteredMergersWithExecutors.toSeq
701+
} else {
702+
// Delta mergers added from inactive mergers list to the active mergers list
703+
val filteredMergersWithExecutorsHosts = filteredMergersWithExecutors.map(_.host)
704+
val filteredMergersWithoutExecutors = shuffleMergerLocations.values
705+
.filterNot(x => hostsToFilter.contains(x.host))
706+
.filterNot(x => filteredMergersWithExecutorsHosts.contains(x.host))
707+
val randomFilteredMergersLocations =
708+
if (filteredMergersWithoutExecutors.size >
709+
numMergersNeeded - filteredMergersWithExecutors.size) {
710+
Utils.randomize(filteredMergersWithoutExecutors)
711+
.take(numMergersNeeded - filteredMergersWithExecutors.size)
712+
} else {
713+
filteredMergersWithoutExecutors
714+
}
715+
filteredMergersWithExecutors.toSeq ++ randomFilteredMergersLocations
716+
}
717+
}
718+
719+
private def removeShufflePushMergerLocation(host: String): Unit = {
720+
if (shuffleMergerLocations.contains(host)) {
721+
shuffleMergerLocations.remove(host)
722+
}
723+
}
724+
660725
/**
661726
* Returns an [[RpcEndpointRef]] of the [[BlockManagerReplicaEndpoint]] for sending RPC messages.
662727
*/

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,4 +141,10 @@ private[spark] object BlockManagerMessages {
141141
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
142142

143143
case class IsExecutorAlive(executorId: String) extends ToBlockManagerMaster
144+
145+
case class GetShufflePushMergerLocations(numMergersNeeded: Int, hostsToFilter: Set[String])
146+
extends ToBlockManagerMaster
147+
148+
case class RemoveShufflePushMergerLocation(host: String) extends ToBlockManagerMaster
149+
144150
}

0 commit comments

Comments
 (0)