Skip to content

Commit 39181ff

Browse files
c21maropu
authored andcommitted
[SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable
### What changes were proposed in this pull request? Based on a follow up comment in #28123, where we can coalesce buckets for shuffled hash join as well. The note here is we only coalesce the buckets from shuffled hash join stream side (i.e. the side not building hash map), so we don't need to worry about OOM when coalescing multiple buckets in one task for building hash map. > If you refactor some codes with changing classes, showing the class hierarchy will help reviewers. Refactor existing physical plan rule `CoalesceBucketsInSortMergeJoin` to `CoalesceBucketsInJoin`, for covering shuffled hash join as well. Refactor existing unit test `CoalesceBucketsInSortMergeJoinSuite` to `CoalesceBucketsInJoinSuite`, for covering shuffled hash join as well. ### Why are the changes needed? Avoid shuffle for joining different bucketed tables, is also useful for shuffled hash join. In production, we are seeing users to use shuffled hash join to join bucketed tables (set `spark.sql.join.preferSortMergeJoin`=false, to avoid sort), and this can help avoid shuffle if number of buckets are not same. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit tests in `CoalesceBucketsInJoinSuite` for verifying shuffled hash join physical plan. ### Performance number per request from maropu I was looking at TPCDS per suggestion from maropu. But I found most of queries from TPCDS are doing aggregate, and only several ones are doing join. None of input tables are bucketed. So I took the approach to test a modified version of `TPCDS q93` as ``` SELECT ss_ticket_number, sr_ticket_number FROM store_sales JOIN store_returns ON ss_ticket_number = sr_ticket_number ``` And make `store_sales` and `store_returns` to be bucketed tables. Physical query plan without coalesce: ``` ShuffledHashJoin [ss_ticket_number#109L], [sr_ticket_number#120L], Inner, BuildLeft :- Exchange hashpartitioning(ss_ticket_number#109L, 4), true, [id=#67] : +- *(1) Project [ss_ticket_number#109L] : +- *(1) Filter isnotnull(ss_ticket_number#109L) : +- *(1) ColumnarToRow : +- FileScan parquet default.store_sales[ss_ticket_number#109L] Batched: true, DataFilters: [isnotnull(ss_ticket_number#109L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_ticket_number)], ReadSchema: struct<ss_ticket_number:bigint>, SelectedBucketsCount: 2 out of 2 +- *(2) Project [sr_returned_date_sk#111L, sr_return_time_sk#112L, sr_item_sk#113L, sr_customer_sk#114L, sr_cdemo_sk#115L, sr_hdemo_sk#116L, sr_addr_sk#117L, sr_store_sk#118L, sr_reason_sk#119L, sr_ticket_number#120L, sr_return_quantity#121L, sr_return_amt#122, sr_return_tax#123, sr_return_amt_inc_tax#124, sr_fee#125, sr_return_ship_cost#126, sr_refunded_cash#127, sr_reversed_charge#128, sr_store_credit#129, sr_net_loss#130] +- *(2) Filter isnotnull(sr_ticket_number#120L) +- *(2) ColumnarToRow +- FileScan parquet default.store_returns[sr_returned_date_sk#111L,sr_return_time_sk#112L,sr_item_sk#113L,sr_customer_sk#114L,sr_cdemo_sk#115L,sr_hdemo_sk#116L,sr_addr_sk#117L,sr_store_sk#118L,sr_reason_sk#119L,sr_ticket_number#120L,sr_return_quantity#121L,sr_return_amt#122,sr_return_tax#123,sr_return_amt_inc_tax#124,sr_fee#125,sr_return_ship_cost#126,sr_refunded_cash#127,sr_reversed_charge#128,sr_store_credit#129,sr_net_loss#130] Batched: true, DataFilters: [isnotnull(sr_ticket_number#120L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_returns], PartitionFilters: [], PushedFilters: [IsNotNull(sr_ticket_number)], ReadSchema: struct<sr_returned_date_sk:bigint,sr_return_time_sk:bigint,sr_item_sk:bigint,sr_customer_sk:bigin..., SelectedBucketsCount: 4 out of 4 ``` Physical query plan with coalesce: ``` ShuffledHashJoin [ss_ticket_number#109L], [sr_ticket_number#120L], Inner, BuildLeft :- *(1) Project [ss_ticket_number#109L] : +- *(1) Filter isnotnull(ss_ticket_number#109L) : +- *(1) ColumnarToRow : +- FileScan parquet default.store_sales[ss_ticket_number#109L] Batched: true, DataFilters: [isnotnull(ss_ticket_number#109L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_ticket_number)], ReadSchema: struct<ss_ticket_number:bigint>, SelectedBucketsCount: 2 out of 2 +- *(2) Project [sr_returned_date_sk#111L, sr_return_time_sk#112L, sr_item_sk#113L, sr_customer_sk#114L, sr_cdemo_sk#115L, sr_hdemo_sk#116L, sr_addr_sk#117L, sr_store_sk#118L, sr_reason_sk#119L, sr_ticket_number#120L, sr_return_quantity#121L, sr_return_amt#122, sr_return_tax#123, sr_return_amt_inc_tax#124, sr_fee#125, sr_return_ship_cost#126, sr_refunded_cash#127, sr_reversed_charge#128, sr_store_credit#129, sr_net_loss#130] +- *(2) Filter isnotnull(sr_ticket_number#120L) +- *(2) ColumnarToRow +- FileScan parquet default.store_returns[sr_returned_date_sk#111L,sr_return_time_sk#112L,sr_item_sk#113L,sr_customer_sk#114L,sr_cdemo_sk#115L,sr_hdemo_sk#116L,sr_addr_sk#117L,sr_store_sk#118L,sr_reason_sk#119L,sr_ticket_number#120L,sr_return_quantity#121L,sr_return_amt#122,sr_return_tax#123,sr_return_amt_inc_tax#124,sr_fee#125,sr_return_ship_cost#126,sr_refunded_cash#127,sr_reversed_charge#128,sr_store_credit#129,sr_net_loss#130] Batched: true, DataFilters: [isnotnull(sr_ticket_number#120L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_returns], PartitionFilters: [], PushedFilters: [IsNotNull(sr_ticket_number)], ReadSchema: struct<sr_returned_date_sk:bigint,sr_return_time_sk:bigint,sr_item_sk:bigint,sr_customer_sk:bigin..., SelectedBucketsCount: 4 out of 4 (Coalesced to 2) ``` Run time improvement as 50% of wall clock time: ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4 Intel(R) Core(TM) i9-9980HK CPU 2.40GHz shuffle hash join: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ shuffle hash join coalesce bucket off 1541 1664 106 1.9 535.1 1.0X shuffle hash join coalesce bucket on 1060 1169 81 2.7 368.1 1.5X ``` Closes #29079 from c21/split-bucket. Authored-by: Cheng Su <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]>
1 parent 0432379 commit 39181ff

File tree

7 files changed

+309
-178
lines changed

7 files changed

+309
-178
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2638,21 +2638,24 @@ object SQLConf {
26382638
.booleanConf
26392639
.createWithDefault(true)
26402640

2641-
val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED =
2642-
buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled")
2641+
val COALESCE_BUCKETS_IN_JOIN_ENABLED =
2642+
buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
26432643
.doc("When true, if two bucketed tables with the different number of buckets are joined, " +
26442644
"the side with a bigger number of buckets will be coalesced to have the same number " +
2645-
"of buckets as the other side. Bucket coalescing is applied only to sort-merge joins " +
2646-
"and only when the bigger number of buckets is divisible by the smaller number of buckets.")
2645+
"of buckets as the other side. Bigger number of buckets is divisible by the smaller " +
2646+
"number of buckets. Bucket coalescing is applied to sort-merge joins and " +
2647+
"shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling " +
2648+
"in join, but it also reduces parallelism and could possibly cause OOM for " +
2649+
"shuffled hash join.")
26472650
.version("3.1.0")
26482651
.booleanConf
26492652
.createWithDefault(false)
26502653

2651-
val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO =
2652-
buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
2654+
val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
2655+
buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")
26532656
.doc("The ratio of the number of two buckets being coalesced should be less than or " +
26542657
"equal to this value for bucket coalescing to be applied. This configuration only " +
2655-
s"has an effect when '${COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key}' is set to true.")
2658+
s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
26562659
.version("3.1.0")
26572660
.intConf
26582661
.checkValue(_ > 0, "The difference must be positive.")
@@ -3269,6 +3272,11 @@ class SQLConf extends Serializable with Logging {
32693272

32703273
def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)
32713274

3275+
def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED)
3276+
3277+
def coalesceBucketsInJoinMaxBucketRatio: Int =
3278+
getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO)
3279+
32723280
/** ********************** SQLConf functionality methods ************ */
32733281

32743282
/** Set Spark SQL configuration properties. */

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
3434
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
3535
import org.apache.spark.sql.catalyst.util.truncatedString
3636
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan}
37-
import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInSortMergeJoin
37+
import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInJoin
3838
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
3939
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
4040
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
@@ -332,7 +332,7 @@ object QueryExecution {
332332
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
333333
adaptiveExecutionRule.toSeq ++
334334
Seq(
335-
CoalesceBucketsInSortMergeJoin(sparkSession.sessionState.conf),
335+
CoalesceBucketsInJoin(sparkSession.sessionState.conf),
336336
PlanDynamicPruningFilters(sparkSession),
337337
PlanSubqueries(sparkSession),
338338
EnsureRequirements(sparkSession.sessionState.conf),
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.bucketing
19+
20+
import scala.annotation.tailrec
21+
22+
import org.apache.spark.sql.catalyst.catalog.BucketSpec
23+
import org.apache.spark.sql.catalyst.expressions.Expression
24+
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
25+
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
26+
import org.apache.spark.sql.catalyst.rules.Rule
27+
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan}
28+
import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
29+
import org.apache.spark.sql.internal.SQLConf
30+
31+
/**
32+
* This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
33+
* if the following conditions are met:
34+
* - Two bucketed tables are joined.
35+
* - Join keys match with output partition expressions on their respective sides.
36+
* - The larger bucket number is divisible by the smaller bucket number.
37+
* - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
38+
* - The ratio of the number of buckets is less than the value set in
39+
* COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
40+
*/
41+
case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
42+
private def updateNumCoalescedBucketsInScan(
43+
plan: SparkPlan,
44+
numCoalescedBuckets: Int): SparkPlan = {
45+
plan transformUp {
46+
case f: FileSourceScanExec =>
47+
f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
48+
}
49+
}
50+
51+
private def updateNumCoalescedBuckets(
52+
join: BaseJoinExec,
53+
numLeftBuckets: Int,
54+
numRightBucket: Int,
55+
numCoalescedBuckets: Int): BaseJoinExec = {
56+
if (numCoalescedBuckets != numLeftBuckets) {
57+
val leftCoalescedChild =
58+
updateNumCoalescedBucketsInScan(join.left, numCoalescedBuckets)
59+
join match {
60+
case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild)
61+
case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild)
62+
}
63+
} else {
64+
val rightCoalescedChild =
65+
updateNumCoalescedBucketsInScan(join.right, numCoalescedBuckets)
66+
join match {
67+
case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild)
68+
case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild)
69+
}
70+
}
71+
}
72+
73+
private def isCoalesceSHJStreamSide(
74+
join: ShuffledHashJoinExec,
75+
numLeftBuckets: Int,
76+
numRightBucket: Int,
77+
numCoalescedBuckets: Int): Boolean = {
78+
if (numCoalescedBuckets == numLeftBuckets) {
79+
join.buildSide != BuildRight
80+
} else {
81+
join.buildSide != BuildLeft
82+
}
83+
}
84+
85+
def apply(plan: SparkPlan): SparkPlan = {
86+
if (!conf.coalesceBucketsInJoinEnabled) {
87+
return plan
88+
}
89+
90+
plan transform {
91+
case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets)
92+
if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <=
93+
conf.coalesceBucketsInJoinMaxBucketRatio =>
94+
val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
95+
join match {
96+
case j: SortMergeJoinExec =>
97+
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
98+
case j: ShuffledHashJoinExec
99+
// Only coalesce the buckets for shuffled hash join stream side,
100+
// to avoid OOM for build side.
101+
if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
102+
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
103+
case other => other
104+
}
105+
case other => other
106+
}
107+
}
108+
}
109+
110+
/**
111+
* An extractor that extracts `SortMergeJoinExec` and `ShuffledHashJoin`,
112+
* where both sides of the join have the bucketed tables,
113+
* are consisted of only the scan operation,
114+
* and numbers of buckets are not equal but divisible.
115+
*/
116+
object ExtractJoinWithBuckets {
117+
@tailrec
118+
private def hasScanOperation(plan: SparkPlan): Boolean = plan match {
119+
case f: FilterExec => hasScanOperation(f.child)
120+
case p: ProjectExec => hasScanOperation(p.child)
121+
case _: FileSourceScanExec => true
122+
case _ => false
123+
}
124+
125+
private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = {
126+
plan.collectFirst {
127+
case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty &&
128+
f.optionalNumCoalescedBuckets.isEmpty =>
129+
f.relation.bucketSpec.get
130+
}
131+
}
132+
133+
/**
134+
* The join keys should match with expressions for output partitioning. Note that
135+
* the ordering does not matter because it will be handled in `EnsureRequirements`.
136+
*/
137+
private def satisfiesOutputPartitioning(
138+
keys: Seq[Expression],
139+
partitioning: Partitioning): Boolean = {
140+
partitioning match {
141+
case HashPartitioning(exprs, _) if exprs.length == keys.length =>
142+
exprs.forall(e => keys.exists(_.semanticEquals(e)))
143+
case _ => false
144+
}
145+
}
146+
147+
private def isApplicable(j: BaseJoinExec): Boolean = {
148+
(j.isInstanceOf[SortMergeJoinExec] ||
149+
j.isInstanceOf[ShuffledHashJoinExec]) &&
150+
hasScanOperation(j.left) &&
151+
hasScanOperation(j.right) &&
152+
satisfiesOutputPartitioning(j.leftKeys, j.left.outputPartitioning) &&
153+
satisfiesOutputPartitioning(j.rightKeys, j.right.outputPartitioning)
154+
}
155+
156+
private def isDivisible(numBuckets1: Int, numBuckets2: Int): Boolean = {
157+
val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2))
158+
// A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller
159+
// number of buckets because bucket id is calculated by modding the total number of buckets.
160+
numBuckets1 != numBuckets2 && large % small == 0
161+
}
162+
163+
def unapply(plan: SparkPlan): Option[(BaseJoinExec, Int, Int)] = {
164+
plan match {
165+
case j: BaseJoinExec if isApplicable(j) =>
166+
val leftBucket = getBucketSpec(j.left)
167+
val rightBucket = getBucketSpec(j.right)
168+
if (leftBucket.isDefined && rightBucket.isDefined &&
169+
isDivisible(leftBucket.get.numBuckets, rightBucket.get.numBuckets)) {
170+
Some(j, leftBucket.get.numBuckets, rightBucket.get.numBuckets)
171+
} else {
172+
None
173+
}
174+
case _ => None
175+
}
176+
}
177+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala

Lines changed: 0 additions & 132 deletions
This file was deleted.

sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
347347
test("Coalesced bucket info should be a part of explain string") {
348348
withTable("t1", "t2") {
349349
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
350-
SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") {
350+
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
351351
Seq(1, 2).toDF("i").write.bucketBy(8, "i").saveAsTable("t1")
352352
Seq(2, 3).toDF("i").write.bucketBy(4, "i").saveAsTable("t2")
353353
val df1 = spark.table("t1")

0 commit comments

Comments
 (0)