Skip to content

Commit 09e83b7

Browse files
xingchaozhGitHub Enterprise
authored andcommitted
[CARMEL-6387] Backport [SPARK-35447][SQL] Optimize skew join before coalescing shuffle partitions (#1153)
1 parent 4d6e245 commit 09e83b7

File tree

10 files changed

+1307
-306
lines changed

10 files changed

+1307
-306
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,28 @@ object SQLConf {
613613
.booleanConf
614614
.createWithDefault(true)
615615

616+
val COALESCE_PARTITIONS_PARALLELISM_FIRST =
617+
buildConf("spark.sql.adaptive.coalescePartitions.parallelismFirst")
618+
.doc("When true, Spark does not respect the target size specified by " +
619+
s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' (default 64MB) when coalescing contiguous " +
620+
"shuffle partitions, but adaptively calculate the target size according to the default " +
621+
"parallelism of the Spark cluster. The calculated size is usually smaller than the " +
622+
"configured target size. This is to maximize the parallelism and avoid performance " +
623+
"regression when enabling adaptive query execution. It's recommended to set this config " +
624+
"to false and respect the configured target size.")
625+
.version("3.2.0")
626+
.booleanConf
627+
.createWithDefault(true)
628+
629+
val COALESCE_PARTITIONS_MIN_PARTITION_SIZE =
630+
buildConf("spark.sql.adaptive.coalescePartitions.minPartitionSize")
631+
.doc("The minimum size of shuffle partitions after coalescing. This is useful when the " +
632+
"adaptively calculated target size is too small during partition coalescing.")
633+
.version("3.2.0")
634+
.bytesConf(ByteUnit.BYTE)
635+
.checkValue(_ >= 0, "minPartitionSize must be positive")
636+
.createWithDefaultString("0")
637+
616638
val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
617639
buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
618640
.doc("The minimum number of shuffle partitions after coalescing. If not set, the default " +

sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,16 @@ sealed trait ShufflePartitionSpec
3232
// `endReducerIndex` (exclusive).
3333
case class CoalescedPartitionSpec(
3434
startReducerIndex: Int,
35-
endReducerIndex: Int) extends ShufflePartitionSpec
36-
35+
endReducerIndex: Int,
36+
@transient dataSize: Option[Long] = None) extends ShufflePartitionSpec
37+
38+
object CoalescedPartitionSpec {
39+
def apply(startReducerIndex: Int,
40+
endReducerIndex: Int,
41+
dataSize: Long): CoalescedPartitionSpec = {
42+
CoalescedPartitionSpec(startReducerIndex, endReducerIndex, Some(dataSize))
43+
}
44+
}
3745
// A partition that reads partial data of one reducer, from `startMapIndex` (inclusive) to
3846
// `endMapIndex` (exclusive).
3947
case class PartialReducerPartitionSpec(
@@ -162,7 +170,7 @@ class ShuffledRowRDD(
162170
override def getPreferredLocations(partition: Partition): Seq[String] = {
163171
val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
164172
partition.asInstanceOf[ShuffledRowRDDPartition].spec match {
165-
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
173+
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) =>
166174
// TODO order by partition size.
167175
startReducerIndex.until(endReducerIndex).flatMap { reducerIndex =>
168176
tracker.getPreferredLocationsForShuffle(dependency, reducerIndex)
@@ -182,7 +190,7 @@ class ShuffledRowRDD(
182190
// as well as the `tempMetrics` for basic shuffle metrics.
183191
val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics)
184192
val reader = split.asInstanceOf[ShuffledRowRDDPartition].spec match {
185-
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
193+
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) =>
186194
SparkEnv.get.shuffleManager.getReader(
187195
dependency.shuffleHandle,
188196
startReducerIndex,
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.adaptive
19+
20+
import org.apache.spark.sql.catalyst.rules.Rule
21+
import org.apache.spark.sql.execution.SparkPlan
22+
import org.apache.spark.sql.execution.exchange.{ShuffleExchangeLike, ShuffleOrigin}
23+
24+
/**
25+
* A rule that may create [[AQEShuffleReadExec]] on top of [[ShuffleQueryStageExec]] and change the
26+
* plan output partitioning. The AQE framework will skip the rule if it leads to extra shuffles.
27+
*/
28+
trait AQEShuffleReadRule extends Rule[SparkPlan] {
29+
/**
30+
* Returns the list of [[ShuffleOrigin]]s supported by this rule.
31+
*/
32+
protected def supportedShuffleOrigins: Seq[ShuffleOrigin]
33+
34+
protected def isSupported(shuffle: ShuffleExchangeLike): Boolean = {
35+
supportedShuffleOrigins.contains(shuffle.shuffleOrigin)
36+
}
37+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,11 @@ case class AdaptiveSparkPlanExec(
109109
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
110110
PlanAdaptiveDynamicPruningFilters(initialPlan),
111111
ReuseAdaptiveSubquery(context.subqueryCache),
112+
113+
OptimizeSkewedJoin,
112114
CoalesceShufflePartitions(context.session),
113115
// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
114116
// added by `CoalesceShufflePartitions`. So they must be executed after it.
115-
OptimizeSkewedJoin,
116117
OptimizeSkewedRangePartition,
117118
OptimizeSkewedInsert,
118119
OptimizeLocalShuffleReader,

0 commit comments

Comments
 (0)