-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-36063][SQL] Optimize OneRowRelation subqueries #33284
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-36063][SQL] Optimize OneRowRelation subqueries #33284
Conversation
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #140864 has finished for PR 33284 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #140874 has finished for PR 33284 at commit
|
cc @cloud-fan |
* Rewrite a subquery expression into one or more expressions. The rewrite can only be done | ||
* if there is no nested subqueries in the subquery plan. | ||
*/ | ||
private def rewrite(plan: LogicalPlan): LogicalPlan = plan.transformUpWithSubqueries { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to handle nested subqueries here? I think the rule OptimizeSubqueries
will run this rule again to optimize nested subqueries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why we need to check subqueries is to deal with nested subqueries:
Project [scalar-subquery [a]]
: +- Project [scalar-subquery [b]] <-- collapsible if transform with nested subqueries first
: : +- Project [outer(b) + 1]
: : +- OneRowRelation
: +- Project [outer(a) as b]
: +- OneRowRelation
+- Relation [a]
A subquery's plan should only be rewritten if it doesn't contain another correlated subquery. If we do not transform the nested subqueries first, we will miss out cases like the one above.
*/ | ||
private def rewrite(plan: LogicalPlan): LogicalPlan = plan.transformUpWithSubqueries { | ||
case LateralJoin(left, right @ LateralSubquery(OneRowSubquery(projectList), _, _, _), _, None) | ||
if right.plan.subqueriesAll.isEmpty && right.joinCond.isEmpty => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think subqueries.isEmpty
is good enough?
case p: LogicalPlan => p.transformExpressionsUpWithPruning( | ||
_.containsPattern(SCALAR_SUBQUERY)) { | ||
case s @ ScalarSubquery(OneRowSubquery(projectList), _, _, _) | ||
if s.plan.subqueriesAll.isEmpty && s.joinCond.isEmpty => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
private def rewrite(plan: LogicalPlan): LogicalPlan = plan.transformUpWithSubqueries { | ||
case LateralJoin(left, right @ LateralSubquery(OneRowSubquery(projectList), _, _, _), _, None) | ||
if right.plan.subqueriesAll.isEmpty && right.joinCond.isEmpty => | ||
Project(left.output ++ projectList, left) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the lateral join has a condition, can we just add a filter above project?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be fine for inner join but for left outer join, it's trickier. This also applies to subqueries after pulling out correlated filters as join conditions. Maybe this can be a separate optimization before RewriteCorrelatedScalarSubqueries / RewriteLateralSubqueries.
@@ -4053,6 +4061,8 @@ class SQLConf extends Serializable with Logging { | |||
|
|||
def decorrelateInnerQueryEnabled: Boolean = getConf(SQLConf.DECORRELATE_INNER_QUERY_ENABLED) | |||
|
|||
def optimizeOneRowRelationSubquery: Boolean = getConf(SQLConf.OPTIMIZE_ONE_ROW_RELATION_SUBQUERY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's only called once, we can just call conf.getConf(SQLConf.OPTIMIZE_ONE_ROW_RELATION_SUBQUERY)
in the new rule
val correctAnswer = Project(Seq(x, y), DomainJoin(Seq(x, y), OneRowRelation())) | ||
val innerPlan = Project(Seq(OuterReference(x).as("x1"), OuterReference(y).as("y1")), t0) | ||
val correctAnswer = Project( | ||
Seq(x.as("x1"), y.as("y1"), x, y), DomainJoin(Seq(x, y), t0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will we optimize away the DomainJoin
at the end?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now once the domain join is added, it will always be rewritten as an inner join because the join condition in the subquery might not be null: select (select c1 where c1 = c2 + 1) from t
.
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #141041 has finished for PR 33284 at commit
|
8dd685a
to
73afab1
Compare
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #141044 has finished for PR 33284 at commit
|
retest this please |
Test build #141084 has started for PR 33284 at commit |
Kubernetes integration test unable to build dist. exiting with code: 141 |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #141108 has finished for PR 33284 at commit
|
} | ||
} | ||
|
||
transformUp(g) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
transformUp { case plan =>
val transformed = plan transformExpressionsUp {
case planExpression: PlanExpression[PlanType] =>
val newPlan = planExpression.plan.transformUpWithSubqueries(f)
planExpression.withNewPlan(newPlan)
}
f.applyOrElse[PlanType, PlanType](transformed, identity)
}
@@ -435,6 +435,28 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] | |||
subqueries ++ subqueries.flatMap(_.subqueriesAll) | |||
} | |||
|
|||
/** | |||
* Returns a copy of this node where the given partial function has been recursively applied | |||
* first to this node's children, then this node's subqueries, and finally this node itself |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this doc is wrong. We apply the func to subqueries first, then children, then the node itself.
val inner = t0.select('a.as("a1"), 'b.as("b1")).select(('a1 + 'b1).as("c")) | ||
val query = t1.select(ScalarSubquery(inner).as("sub")) | ||
val optimized = Optimize.execute(query.analyze) | ||
val correctAnswer = Project(Alias(Alias(a + b, "c")(), "sub")() :: Nil, t1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's a bit weird that sometimes we use DSL and sometimes we use LogicalPlan directly. Can we be consistent? I think here can be t1.select(('a + 'b).as("c").as("sub"))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The analyzer will remove the extra aliases which make the correct answer differs from the optimized. I will add a clean-up aliases rule in the test optimizer.
val inner = t0.select('a.as("b")).select(ScalarSubquery(t0.select('b)).as("s")) | ||
val query = t1.select(ScalarSubquery(inner).as("sub")) | ||
val optimized = Optimize.execute(query.analyze) | ||
val correctAnswer = Project(Alias(Alias(a, "s")(), "sub")() :: Nil, t1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
} | ||
} | ||
|
||
test("Should not optimize subquery with nested subqueries") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do support nested subqueries, the problem here is WHERE a = 1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The test title is a bit confusing. Will update.
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #141280 has finished for PR 33284 at commit
|
thanks, merging to master/3.2 (it's a very useful optimization for the 3.2 new feature LATERAL JOIN) |
### What changes were proposed in this pull request? This PR adds optimization for scalar and lateral subqueries with OneRowRelation as leaf nodes. It inlines such subqueries before decorrelation to avoid rewriting them as left outer joins. It also introduces a flag to turn on/off this optimization: `spark.sql.optimizer.optimizeOneRowRelationSubquery` (default: True). For example: ```sql select (select c1) from t ``` Analyzed plan: ``` Project [scalar-subquery#17 [c1#18] AS scalarsubquery(c1)#22] : +- Project [outer(c1#18)] : +- OneRowRelation +- LocalRelation [c1#18, c2#19] ``` Optimized plan before this PR: ``` Project [c1#18#25 AS scalarsubquery(c1)#22] +- Join LeftOuter, (c1#24 <=> c1#18) :- LocalRelation [c1#18] +- Aggregate [c1#18], [c1#18 AS c1#18#25, c1#18 AS c1#24] +- LocalRelation [c1#18] ``` Optimized plan after this PR: ``` LocalRelation [scalarsubquery(c1)#22] ``` ### Why are the changes needed? To optimize query plans. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new unit tests. Closes #33284 from allisonwang-db/spark-36063-optimize-subquery-one-row-relation. Authored-by: allisonwang-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit de8e4be) Signed-off-by: Wenchen Fan <[email protected]>
The tests seems flaky:
@allisonwang-db would you mind taking a look please? |
@HyukjinKwon Thanks for letting me know. The test failures are from branch-3.2 and I will fix them soon. |
…lar subqueries This PR cherry picks #33235 to branch-3.2 to fix test failures introduced by #33284. ### What changes were proposed in this pull request? This PR allows the `Project` node to host outer references in scalar subqueries when `decorrelateInnerQuery` is enabled. It is already supported by the new decorrelation framework and the `RewriteCorrelatedScalarSubquery` rule. Note currently by default all correlated subqueries will be decorrelated, which is not necessarily the most optimal approach. Consider `SELECT (SELECT c1) FROM t`. This should be optimized as `SELECT c1 FROM t` instead of rewriting it as a left outer join. This will be done in a separate PR to optimize correlated scalar/lateral subqueries with OneRowRelation. ### Why are the changes needed? To allow more types of correlated scalar subqueries. ### Does this PR introduce _any_ user-facing change? Yes. This PR allows outer query column references in the SELECT cluase of a correlated scalar subquery. For example: ```sql SELECT (SELECT c1) FROM t; ``` Before this change: ``` org.apache.spark.sql.AnalysisException: Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses ``` After this change: ``` +------------------+ |scalarsubquery(c1)| +------------------+ |0 | |1 | +------------------+ ``` ### How was this patch tested? Added unit tests and SQL tests. (cherry picked from commit ca348e5) Signed-off-by: allisonwang-db <allison.wangdatabricks.com> Closes #33527 from allisonwang-db/spark-36028-3.2. Authored-by: allisonwang-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This PR adds optimization for scalar and lateral subqueries with OneRowRelation as leaf nodes. It inlines such subqueries before decorrelation to avoid rewriting them as left outer joins. It also introduces a flag to turn on/off this optimization:
spark.sql.optimizer.optimizeOneRowRelationSubquery
(default: True).For example:
Analyzed plan:
Optimized plan before this PR:
Optimized plan after this PR:
Why are the changes needed?
To optimize query plans.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added new unit tests.