Skip to content

Commit f5e3302

Browse files
prakharjain09maropu
authored andcommitted
[SPARK-33399][SQL] Normalize output partitioning and sortorder with respect to aliases to avoid unneeded exchange/sort nodes
### What changes were proposed in this pull request? This pull request tries to remove unneeded exchanges/sorts by normalizing the output partitioning and sortorder information correctly with respect to aliases. Example: consider this join of three tables: |SELECT t2id, t3.id as t3id |FROM ( | SELECT t1.id as t1id, t2.id as t2id | FROM t1, t2 | WHERE t1.id = t2.id |) t12, t3 |WHERE t1id = t3.id The plan for this looks like: *(9) Project [t2id#1034L, id#1004L AS t3id#1035L] +- *(9) SortMergeJoin [t1id#1033L], [id#1004L], Inner :- *(6) Sort [t1id#1033L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(t1id#1033L, 5), true, [id=#1343] <------------------------------ : +- *(5) Project [id#996L AS t1id#1033L, id#1000L AS t2id#1034L] : +- *(5) SortMergeJoin [id#996L], [id#1000L], Inner : :- *(2) Sort [id#996L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(id#996L, 5), true, [id=#1329] : : +- *(1) Range (0, 10, step=1, splits=2) : +- *(4) Sort [id#1000L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#1000L, 5), true, [id=#1335] : +- *(3) Range (0, 20, step=1, splits=2) +- *(8) Sort [id#1004L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#1004L, 5), true, [id=#1349] +- *(7) Range (0, 30, step=1, splits=2) In this plan, the marked exchange could have been avoided as the data is already partitioned on "t1.id". This happens because AliasAwareOutputPartitioning class handles aliases only related to HashPartitioning. This change normalizes all output partitioning based on aliasing happening in Project. ### Why are the changes needed? To remove unneeded exchanges. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New UT added. On TPCDS 1000 scale, this change improves the performance of query 95 from 330 seconds to 170 seconds by removing the extra Exchange. Closes #30300 from prakharjain09/SPARK-33399-outputpartitioning. Authored-by: Prakhar Jain <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]>
1 parent 9ab0f82 commit f5e3302

File tree

10 files changed

+1718
-1486
lines changed

10 files changed

+1718
-1486
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,24 @@
1616
*/
1717
package org.apache.spark.sql.execution
1818

19-
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, NamedExpression, SortOrder}
20-
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
19+
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap, AttributeReference, Expression, NamedExpression, SortOrder}
20+
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
2121

2222
/**
2323
* A trait that provides functionality to handle aliases in the `outputExpressions`.
2424
*/
2525
trait AliasAwareOutputExpression extends UnaryExecNode {
2626
protected def outputExpressions: Seq[NamedExpression]
2727

28-
protected def hasAlias: Boolean = outputExpressions.collectFirst { case _: Alias => }.isDefined
28+
private lazy val aliasMap = AttributeMap(outputExpressions.collect {
29+
case a @ Alias(child: AttributeReference, _) => (child, a.toAttribute)
30+
})
2931

30-
protected def replaceAliases(exprs: Seq[Expression]): Seq[Expression] = {
31-
exprs.map {
32-
case a: AttributeReference => replaceAlias(a).getOrElse(a)
33-
case other => other
34-
}
35-
}
32+
protected def hasAlias: Boolean = aliasMap.nonEmpty
3633

37-
protected def replaceAlias(attr: AttributeReference): Option[Attribute] = {
38-
outputExpressions.collectFirst {
39-
case a @ Alias(child: AttributeReference, _) if child.semanticEquals(attr) =>
40-
a.toAttribute
34+
protected def normalizeExpression(exp: Expression): Expression = {
35+
exp.transform {
36+
case attr: AttributeReference => aliasMap.getOrElse(attr, attr)
4137
}
4238
}
4339
}
@@ -50,7 +46,8 @@ trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
5046
final override def outputPartitioning: Partitioning = {
5147
if (hasAlias) {
5248
child.outputPartitioning match {
53-
case h: HashPartitioning => h.copy(expressions = replaceAliases(h.expressions))
49+
case e: Expression =>
50+
normalizeExpression(e).asInstanceOf[Partitioning]
5451
case other => other
5552
}
5653
} else {
@@ -68,12 +65,7 @@ trait AliasAwareOutputOrdering extends AliasAwareOutputExpression {
6865

6966
final override def outputOrdering: Seq[SortOrder] = {
7067
if (hasAlias) {
71-
orderingExpressions.map { s =>
72-
s.child match {
73-
case a: AttributeReference => s.copy(child = replaceAlias(a).getOrElse(a))
74-
case _ => s
75-
}
76-
}
68+
orderingExpressions.map(normalizeExpression(_).asInstanceOf[SortOrder])
7769
} else {
7870
orderingExpressions
7971
}

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q2.sf100/explain.txt

Lines changed: 82 additions & 87 deletions
Large diffs are not rendered by default.

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q2.sf100/simplified.txt

Lines changed: 47 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,65 @@
1-
WholeStageCodegen (16)
1+
WholeStageCodegen (15)
22
Sort [d_week_seq1]
33
InputAdapter
44
Exchange [d_week_seq1] #1
5-
WholeStageCodegen (15)
5+
WholeStageCodegen (14)
66
Project [d_week_seq1,sun_sales1,sun_sales2,mon_sales1,mon_sales2,tue_sales1,tue_sales2,wed_sales1,wed_sales2,thu_sales1,thu_sales2,fri_sales1,fri_sales2,sat_sales1,sat_sales2]
77
SortMergeJoin [d_week_seq1,d_week_seq2]
88
InputAdapter
9-
WholeStageCodegen (7)
9+
WholeStageCodegen (6)
1010
Sort [d_week_seq1]
11-
InputAdapter
12-
Exchange [d_week_seq1] #2
13-
WholeStageCodegen (6)
14-
Project [d_week_seq,sun_sales,mon_sales,tue_sales,wed_sales,thu_sales,fri_sales,sat_sales]
15-
BroadcastHashJoin [d_week_seq,d_week_seq]
16-
HashAggregate [d_week_seq,sum,sum,sum,sum,sum,sum,sum] [sum(UnscaledValue(CASE WHEN (d_day_name = Sunday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Monday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Tuesday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Wednesday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Thursday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Friday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Saturday) THEN sales_price ELSE null END)),sun_sales,mon_sales,tue_sales,wed_sales,thu_sales,fri_sales,sat_sales,sum,sum,sum,sum,sum,sum,sum]
17-
InputAdapter
18-
Exchange [d_week_seq] #3
19-
WholeStageCodegen (4)
20-
HashAggregate [d_week_seq,d_day_name,sales_price] [sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum]
21-
Project [sales_price,d_week_seq,d_day_name]
22-
BroadcastHashJoin [sold_date_sk,d_date_sk]
23-
InputAdapter
24-
Union
25-
WholeStageCodegen (1)
26-
Project [ws_sold_date_sk,ws_ext_sales_price]
27-
Filter [ws_sold_date_sk]
28-
ColumnarToRow
29-
InputAdapter
30-
Scan parquet default.web_sales [ws_sold_date_sk,ws_ext_sales_price]
31-
WholeStageCodegen (2)
32-
Project [cs_sold_date_sk,cs_ext_sales_price]
33-
Filter [cs_sold_date_sk]
34-
ColumnarToRow
35-
InputAdapter
36-
Scan parquet default.catalog_sales [cs_sold_date_sk,cs_ext_sales_price]
37-
InputAdapter
38-
BroadcastExchange #4
39-
WholeStageCodegen (3)
40-
Filter [d_date_sk,d_week_seq]
41-
ColumnarToRow
42-
InputAdapter
43-
Scan parquet default.date_dim [d_date_sk,d_week_seq,d_day_name]
44-
InputAdapter
45-
BroadcastExchange #5
46-
WholeStageCodegen (5)
47-
Project [d_week_seq]
48-
Filter [d_year,d_week_seq]
49-
ColumnarToRow
50-
InputAdapter
51-
Scan parquet default.date_dim [d_week_seq,d_year]
11+
Project [d_week_seq,sun_sales,mon_sales,tue_sales,wed_sales,thu_sales,fri_sales,sat_sales]
12+
BroadcastHashJoin [d_week_seq,d_week_seq]
13+
HashAggregate [d_week_seq,sum,sum,sum,sum,sum,sum,sum] [sum(UnscaledValue(CASE WHEN (d_day_name = Sunday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Monday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Tuesday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Wednesday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Thursday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Friday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Saturday) THEN sales_price ELSE null END)),sun_sales,mon_sales,tue_sales,wed_sales,thu_sales,fri_sales,sat_sales,sum,sum,sum,sum,sum,sum,sum]
14+
InputAdapter
15+
Exchange [d_week_seq] #2
16+
WholeStageCodegen (4)
17+
HashAggregate [d_week_seq,d_day_name,sales_price] [sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum]
18+
Project [sales_price,d_week_seq,d_day_name]
19+
BroadcastHashJoin [sold_date_sk,d_date_sk]
20+
InputAdapter
21+
Union
22+
WholeStageCodegen (1)
23+
Project [ws_sold_date_sk,ws_ext_sales_price]
24+
Filter [ws_sold_date_sk]
25+
ColumnarToRow
26+
InputAdapter
27+
Scan parquet default.web_sales [ws_sold_date_sk,ws_ext_sales_price]
28+
WholeStageCodegen (2)
29+
Project [cs_sold_date_sk,cs_ext_sales_price]
30+
Filter [cs_sold_date_sk]
31+
ColumnarToRow
32+
InputAdapter
33+
Scan parquet default.catalog_sales [cs_sold_date_sk,cs_ext_sales_price]
34+
InputAdapter
35+
BroadcastExchange #3
36+
WholeStageCodegen (3)
37+
Filter [d_date_sk,d_week_seq]
38+
ColumnarToRow
39+
InputAdapter
40+
Scan parquet default.date_dim [d_date_sk,d_week_seq,d_day_name]
41+
InputAdapter
42+
BroadcastExchange #4
43+
WholeStageCodegen (5)
44+
Project [d_week_seq]
45+
Filter [d_year,d_week_seq]
46+
ColumnarToRow
47+
InputAdapter
48+
Scan parquet default.date_dim [d_week_seq,d_year]
5249
InputAdapter
53-
WholeStageCodegen (14)
50+
WholeStageCodegen (13)
5451
Sort [d_week_seq2]
5552
InputAdapter
56-
Exchange [d_week_seq2] #6
57-
WholeStageCodegen (13)
53+
Exchange [d_week_seq2] #5
54+
WholeStageCodegen (12)
5855
Project [d_week_seq,sun_sales,mon_sales,tue_sales,wed_sales,thu_sales,fri_sales,sat_sales]
5956
BroadcastHashJoin [d_week_seq,d_week_seq]
6057
HashAggregate [d_week_seq,sum,sum,sum,sum,sum,sum,sum] [sum(UnscaledValue(CASE WHEN (d_day_name = Sunday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Monday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Tuesday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Wednesday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Thursday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Friday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Saturday) THEN sales_price ELSE null END)),sun_sales,mon_sales,tue_sales,wed_sales,thu_sales,fri_sales,sat_sales,sum,sum,sum,sum,sum,sum,sum]
6158
InputAdapter
62-
ReusedExchange [d_week_seq,sum,sum,sum,sum,sum,sum,sum] #3
59+
ReusedExchange [d_week_seq,sum,sum,sum,sum,sum,sum,sum] #2
6360
InputAdapter
64-
BroadcastExchange #7
65-
WholeStageCodegen (12)
61+
BroadcastExchange #6
62+
WholeStageCodegen (11)
6663
Project [d_week_seq]
6764
Filter [d_year,d_week_seq]
6865
ColumnarToRow

0 commit comments

Comments
 (0)