Skip to content

Commit e133823

Browse files
committed
[SPARK-40040][SQL] Push local limit to both sides if join condition is empty
### What changes were proposed in this pull request? Similar to #31567. This PR enhances `LimitPushDown` to support push local limit to both sides if it is outer join and join condition is empty. It is safe to push down because without join condition is actually a cross join. ### Why are the changes needed? Improve query performance. For example: <img width="400" alt="image" src="https://user-images.githubusercontent.com/5399861/184052707-ebf50748-6870-4650-84c3-65d79b18ba9d.png"> ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #37475 from wangyum/SPARK-40040. Lead-authored-by: Yuming Wang <[email protected]> Co-authored-by: Yuming Wang <[email protected]> Signed-off-by: Yuming Wang <[email protected]>
1 parent 34754f0 commit e133823

File tree

3 files changed

+107
-51
lines changed

3 files changed

+107
-51
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -719,9 +719,11 @@ object LimitPushDown extends Rule[LogicalPlan] {
719719

720720
private def pushLocalLimitThroughJoin(limitExpr: Expression, join: Join): Join = {
721721
join.joinType match {
722-
case RightOuter => join.copy(right = maybePushLocalLimit(limitExpr, join.right))
723-
case LeftOuter => join.copy(left = maybePushLocalLimit(limitExpr, join.left))
724-
case _: InnerLike if join.condition.isEmpty =>
722+
case RightOuter if join.condition.nonEmpty =>
723+
join.copy(right = maybePushLocalLimit(limitExpr, join.right))
724+
case LeftOuter if join.condition.nonEmpty =>
725+
join.copy(left = maybePushLocalLimit(limitExpr, join.left))
726+
case _: InnerLike | RightOuter | LeftOuter | FullOuter if join.condition.isEmpty =>
725727
join.copy(
726728
left = maybePushLocalLimit(limitExpr, join.left),
727729
right = maybePushLocalLimit(limitExpr, join.right))
@@ -743,15 +745,15 @@ object LimitPushDown extends Rule[LogicalPlan] {
743745
LocalLimit(exp, u.copy(children = u.children.map(maybePushLocalLimit(exp, _))))
744746

745747
// Add extra limits below JOIN:
746-
// 1. For LEFT OUTER and RIGHT OUTER JOIN, we push limits to the left and right sides,
747-
// respectively.
748-
// 2. For INNER and CROSS JOIN, we push limits to both the left and right sides if join
749-
// condition is empty.
748+
// 1. For LEFT OUTER and RIGHT OUTER JOIN, we push limits to the left and right sides
749+
// respectively if join condition is not empty.
750+
// 2. For INNER, CROSS JOIN and OUTER JOIN, we push limits to both the left and right sides if
751+
// join condition is empty.
750752
// 3. For LEFT SEMI and LEFT ANTI JOIN, we push limits to the left side if join condition
751753
// is empty.
752-
// It's not safe to push limits below FULL OUTER JOIN in the general case without a more
753-
// invasive rewrite. We also need to ensure that this limit pushdown rule will not eventually
754-
// introduce limits on both sides if it is applied multiple times. Therefore:
754+
// It's not safe to push limits below FULL OUTER JOIN with join condition in the general case
755+
// without a more invasive rewrite. We also need to ensure that this limit pushdown rule will
756+
// not eventually introduce limits on both sides if it is applied multiple times. Therefore:
755757
// - If one side is already limited, stack another limit on top if the new limit is smaller.
756758
// The redundant limit will be collapsed by the CombineLimits rule.
757759
case LocalLimit(exp, join: Join) =>

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ class EliminateSortsSuite extends AnalysisTest {
372372
.limit(10)
373373
val optimized = Optimize.execute(joinPlan.analyze)
374374
val correctAnswer = LocalLimit(10, projectPlan)
375-
.join(projectPlanB, LeftOuter)
375+
.join(LocalLimit(10, projectPlanB), LeftOuter)
376376
.limit(10).analyze
377377
comparePlans(optimized, correctAnswer)
378378
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala

Lines changed: 94 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -96,45 +96,75 @@ class LimitPushdownSuite extends PlanTest {
9696
// Outer join ----------------------------------------------------------------------------------
9797

9898
test("left outer join") {
99-
val originalQuery = x.join(y, LeftOuter).limit(1)
100-
val optimized = Optimize.execute(originalQuery.analyze)
101-
val correctAnswer = Limit(1, LocalLimit(1, x).join(y, LeftOuter)).analyze
102-
comparePlans(optimized, correctAnswer)
99+
Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
100+
val originalQuery = x.join(y, LeftOuter, condition).limit(1).analyze
101+
val optimized = if (condition.isEmpty) {
102+
LocalLimit(1, x).join(LocalLimit(1, y), LeftOuter, condition).limit(1).analyze
103+
} else {
104+
LocalLimit(1, x).join(y, LeftOuter, condition).limit(1).analyze
105+
}
106+
comparePlans(Optimize.execute(originalQuery), optimized)
107+
}
103108
}
104109

105110
test("left outer join and left sides are limited") {
106-
val originalQuery = x.limit(2).join(y, LeftOuter).limit(1)
107-
val optimized = Optimize.execute(originalQuery.analyze)
108-
val correctAnswer = Limit(1, LocalLimit(1, x).join(y, LeftOuter)).analyze
109-
comparePlans(optimized, correctAnswer)
111+
Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
112+
val originalQuery = x.limit(2).join(y, LeftOuter, condition).limit(1).analyze
113+
val optimized = if (condition.isEmpty) {
114+
LocalLimit(1, x).join(LocalLimit(1, y), LeftOuter, condition).limit(1).analyze
115+
} else {
116+
LocalLimit(1, x).join(y, LeftOuter, condition).limit(1).analyze
117+
}
118+
comparePlans(Optimize.execute(originalQuery), optimized)
119+
}
110120
}
111121

112122
test("left outer join and right sides are limited") {
113-
val originalQuery = x.join(y.limit(2), LeftOuter).limit(1)
114-
val optimized = Optimize.execute(originalQuery.analyze)
115-
val correctAnswer = Limit(1, LocalLimit(1, x).join(Limit(2, y), LeftOuter)).analyze
116-
comparePlans(optimized, correctAnswer)
123+
Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
124+
val originalQuery = x.join(y.limit(2), LeftOuter, condition).limit(1).analyze
125+
val optimized = if (condition.isEmpty) {
126+
LocalLimit(1, x).join(LocalLimit(1, y), LeftOuter, condition).limit(1).analyze
127+
} else {
128+
LocalLimit(1, x).join(Limit(2, y), LeftOuter, condition).limit(1).analyze
129+
}
130+
comparePlans( Optimize.execute(originalQuery), optimized)
131+
}
117132
}
118133

119134
test("right outer join") {
120-
val originalQuery = x.join(y, RightOuter).limit(1)
121-
val optimized = Optimize.execute(originalQuery.analyze)
122-
val correctAnswer = Limit(1, x.join(LocalLimit(1, y), RightOuter)).analyze
123-
comparePlans(optimized, correctAnswer)
135+
Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
136+
val originalQuery = x.join(y, RightOuter, condition).limit(1).analyze
137+
val optimized = if (condition.isEmpty) {
138+
LocalLimit(1, x).join(LocalLimit(1, y), RightOuter, condition).limit(1).analyze
139+
} else {
140+
x.join(LocalLimit(1, y), RightOuter, condition).limit(1).analyze
141+
}
142+
comparePlans(Optimize.execute(originalQuery), optimized)
143+
}
124144
}
125145

126146
test("right outer join and right sides are limited") {
127-
val originalQuery = x.join(y.limit(2), RightOuter).limit(1)
128-
val optimized = Optimize.execute(originalQuery.analyze)
129-
val correctAnswer = Limit(1, x.join(LocalLimit(1, y), RightOuter)).analyze
130-
comparePlans(optimized, correctAnswer)
147+
Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
148+
val originalQuery = x.join(y.limit(2), RightOuter, condition).limit(1).analyze
149+
val optimized = if (condition.isEmpty) {
150+
LocalLimit(1, x).join(LocalLimit(1, y), RightOuter, condition).limit(1).analyze
151+
} else {
152+
x.join(LocalLimit(1, y), RightOuter, condition).limit(1).analyze
153+
}
154+
comparePlans(Optimize.execute(originalQuery), optimized)
155+
}
131156
}
132157

133158
test("right outer join and left sides are limited") {
134-
val originalQuery = x.limit(2).join(y, RightOuter).limit(1)
135-
val optimized = Optimize.execute(originalQuery.analyze)
136-
val correctAnswer = Limit(1, Limit(2, x).join(LocalLimit(1, y), RightOuter)).analyze
137-
comparePlans(optimized, correctAnswer)
159+
Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
160+
val originalQuery = x.limit(2).join(y, RightOuter, condition).limit(1).analyze
161+
val optimized = if (condition.isEmpty) {
162+
LocalLimit(1, x).join(LocalLimit(1, y), RightOuter, condition).limit(1).analyze
163+
} else {
164+
Limit(2, x).join(LocalLimit(1, y), RightOuter, condition).limit(1).analyze
165+
}
166+
comparePlans(Optimize.execute(originalQuery), optimized)
167+
}
138168
}
139169

140170
test("larger limits are not pushed on top of smaller ones in right outer join") {
@@ -146,35 +176,59 @@ class LimitPushdownSuite extends PlanTest {
146176

147177
test("full outer join where neither side is limited and both sides have same statistics") {
148178
assert(x.stats.sizeInBytes === y.stats.sizeInBytes)
149-
val originalQuery = x.join(y, FullOuter).limit(1).analyze
150-
val optimized = Optimize.execute(originalQuery)
151-
// No pushdown for FULL OUTER JOINS.
152-
comparePlans(optimized, originalQuery)
179+
Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
180+
val originalQuery = x.join(y, FullOuter, condition).limit(1).analyze
181+
val optimized = if (condition.isEmpty) {
182+
LocalLimit(1, x).join(LocalLimit(1, y), FullOuter, condition).limit(1).analyze
183+
} else {
184+
// No pushdown for FULL OUTER JOINS.
185+
originalQuery
186+
}
187+
comparePlans(Optimize.execute(originalQuery), optimized)
188+
}
153189
}
154190

155191
test("full outer join where neither side is limited and left side has larger statistics") {
156192
val xBig = testRelation.copy(data = Seq.fill(10)(null)).subquery("x")
157193
assert(xBig.stats.sizeInBytes > y.stats.sizeInBytes)
158-
val originalQuery = xBig.join(y, FullOuter).limit(1).analyze
159-
val optimized = Optimize.execute(originalQuery)
160-
// No pushdown for FULL OUTER JOINS.
161-
comparePlans(optimized, originalQuery)
194+
Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
195+
val originalQuery = xBig.join(y, FullOuter, condition).limit(1).analyze
196+
val optimized = if (condition.isEmpty) {
197+
LocalLimit(1, xBig).join(LocalLimit(1, y), FullOuter, condition).limit(1).analyze
198+
} else {
199+
// No pushdown for FULL OUTER JOINS.
200+
originalQuery
201+
}
202+
comparePlans(Optimize.execute(originalQuery), optimized)
203+
}
162204
}
163205

164206
test("full outer join where neither side is limited and right side has larger statistics") {
165207
val yBig = testRelation.copy(data = Seq.fill(10)(null)).subquery("y")
166208
assert(x.stats.sizeInBytes < yBig.stats.sizeInBytes)
167-
val originalQuery = x.join(yBig, FullOuter).limit(1).analyze
168-
val optimized = Optimize.execute(originalQuery)
169-
// No pushdown for FULL OUTER JOINS.
170-
comparePlans(optimized, originalQuery)
209+
Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
210+
val originalQuery = x.join(yBig, FullOuter, condition).limit(1).analyze
211+
val optimized = if (condition.isEmpty) {
212+
LocalLimit(1, x).join(LocalLimit(1, yBig), FullOuter, condition).limit(1).analyze
213+
} else {
214+
// No pushdown for FULL OUTER JOINS.
215+
originalQuery
216+
}
217+
comparePlans(Optimize.execute(originalQuery), optimized)
218+
}
171219
}
172220

173221
test("full outer join where both sides are limited") {
174-
val originalQuery = x.limit(2).join(y.limit(2), FullOuter).limit(1).analyze
175-
val optimized = Optimize.execute(originalQuery)
176-
// No pushdown for FULL OUTER JOINS.
177-
comparePlans(optimized, originalQuery)
222+
Seq(Some("x.a".attr === "y.b".attr), None).foreach { condition =>
223+
val originalQuery = x.limit(2).join(y.limit(2), FullOuter, condition).limit(1).analyze
224+
val optimized = if (condition.isEmpty) {
225+
LocalLimit(1, x).join(LocalLimit(1, y), FullOuter, condition).limit(1).analyze
226+
} else {
227+
// No pushdown for FULL OUTER JOINS.
228+
originalQuery
229+
}
230+
comparePlans(Optimize.execute(originalQuery), optimized)
231+
}
178232
}
179233

180234
test("SPARK-33433: Change Aggregate max rows to 1 if grouping is empty") {

0 commit comments

Comments
 (0)