Skip to content

Commit bd47570

Browse files
committed
normalize sameOrderExpressions in SortOrder node
1 parent e2c7bfc commit bd47570

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ trait AliasAwareOutputOrdering extends AliasAwareOutputExpression {
6565

6666
final override def outputOrdering: Seq[SortOrder] = {
6767
if (hasAlias) {
68-
orderingExpressions.map(normalizeExpression(_).asInstanceOf[SortOrder])
68+
orderingExpressions.map { sortOrder =>
69+
val newSortOrder = normalizeExpression(sortOrder).asInstanceOf[SortOrder]
70+
val newSameOrderExpressions = newSortOrder.sameOrderExpressions.map(normalizeExpression)
71+
newSortOrder.copy(sameOrderExpressions = newSameOrderExpressions)
72+
}
6973
} else {
7074
orderingExpressions
7175
}

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,6 +1059,37 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
10591059
}
10601060
}
10611061

1062+
test("SPARK-33400: Normalization of sortOrder should take care of sameOrderExprs") {
1063+
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
1064+
withTempView("t1", "t2", "t3") {
1065+
spark.range(10).repartition($"id").createTempView("t1")
1066+
spark.range(20).repartition($"id").createTempView("t2")
1067+
spark.range(30).repartition($"id").createTempView("t3")
1068+
val planned = sql(
1069+
"""
1070+
|SELECT t2id, t3.id as t3id
1071+
|FROM (
1072+
| SELECT t1.id as t1id, t2.id as t2id
1073+
| FROM t1, t2
1074+
| WHERE t1.id = t2.id
1075+
|) t12, t3
1076+
|WHERE t2id = t3.id
1077+
""".stripMargin).queryExecution.executedPlan
1078+
1079+
val sortNodes = planned.collect { case s: SortExec => s }
1080+
assert(sortNodes.size == 3)
1081+
1082+
val projects = planned.collect { case p: ProjectExec => p }
1083+
assert(projects.exists(_.outputOrdering match {
1084+
case Seq(SortOrder(_, Ascending, NullsFirst, sameOrderExprs)) =>
1085+
sameOrderExprs.size == 1 && sameOrderExprs.head.isInstanceOf[AttributeReference] &&
1086+
sameOrderExprs.head.asInstanceOf[AttributeReference].name == "t2id"
1087+
case _ => false
1088+
}))
1089+
}
1090+
}
1091+
}
1092+
10621093
test("aliases to expressions should not be replaced") {
10631094
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
10641095
withTempView("df1", "df2") {

0 commit comments

Comments
 (0)