Skip to content

Commit eb123a1

Browse files
zml1206cloud-fan
authored andcommitted
[SPARK-46941][SQL][3.5] Can't insert window group limit node for top-k computation if contains SizeBasedWindowFunction
### What changes were proposed in this pull request? This PR backports #44980 to branch-3.5. Don't insert window group limit node for top-k computation if contains `SizeBasedWindowFunction`. ### Why are the changes needed? Bug fix, Insert window group limit node for top-k computation contains `SizeBasedWindowFunction` will cause wrong result of the SizeBasedWindowFunction`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT. Before this pr UT will not pass. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51422 from zml1206/SPARK-46941-3.5. Authored-by: zml1206 <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 218d292 commit eb123a1

File tree

3 files changed

+50
-6
lines changed

3 files changed

+50
-6
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.optimizer
1919

20-
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
20+
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentRow, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, IntegerLiteral, LessThan, LessThanOrEqual, Literal, NamedExpression, PredicateHelper, Rank, RowFrame, RowNumber, SizeBasedWindowFunction, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
2121
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LocalRelation, LogicalPlan, Window, WindowGroupLimit}
2222
import org.apache.spark.sql.catalyst.rules.Rule
2323
import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, WINDOW}
@@ -53,13 +53,14 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper {
5353
}
5454

5555
/**
56-
* All window expressions should use the same expanding window, so that
57-
* we can safely do the early stop.
56+
* All window expressions should use the same expanding window and do not contains
57+
* `SizeBasedWindowFunction`, so that we can safely do the early stop.
5858
*/
5959
private def isExpandingWindow(
6060
windowExpression: NamedExpression): Boolean = windowExpression match {
61-
case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
62-
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
61+
case Alias(WindowExpression(windowFunction, WindowSpecDefinition(_, _,
62+
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _)
63+
if !windowFunction.isInstanceOf[SizeBasedWindowFunction] => true
6364
case _ => false
6465
}
6566

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimitSuite.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
2020
import org.apache.spark.sql.Row
2121
import org.apache.spark.sql.catalyst.dsl.expressions._
2222
import org.apache.spark.sql.catalyst.dsl.plans._
23-
import org.apache.spark.sql.catalyst.expressions.{CurrentRow, DenseRank, Literal, NthValue, NTile, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding}
23+
import org.apache.spark.sql.catalyst.expressions.{CurrentRow, DenseRank, Literal, NthValue, NTile, PercentRank, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding}
2424
import org.apache.spark.sql.catalyst.plans.PlanTest
2525
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
2626
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -338,4 +338,20 @@ class InferWindowGroupLimitSuite extends PlanTest {
338338
WithoutOptimize.execute(correctAnswer1.analyze))
339339
}
340340
}
341+
342+
test("SPARK-46941: Can't Insert window group limit node for top-k computation if contains " +
343+
"SizeBasedWindowFunction") {
344+
val originalQuery =
345+
testRelation
346+
.select(a, b, c,
347+
windowExpr(Rank(c :: Nil),
348+
windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rank"),
349+
windowExpr(PercentRank(c :: Nil),
350+
windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("percent_rank"))
351+
.where(Symbol("rank") < 2)
352+
353+
comparePlans(
354+
Optimize.execute(originalQuery.analyze),
355+
WithoutOptimize.execute(originalQuery.analyze))
356+
}
341357
}

sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1637,4 +1637,31 @@ class DataFrameWindowFunctionsSuite extends QueryTest
16371637
}
16381638
}
16391639
}
1640+
1641+
test("SPARK-46941: Can't insert window group limit node for top-k computation if contains " +
1642+
"SizeBasedWindowFunction") {
1643+
val df = Seq(
1644+
(1, "Dave", 1, 2020),
1645+
(2, "Mark", 2, 2020),
1646+
(3, "Amy", 3, 2020),
1647+
(4, "Dave", 1, 2021),
1648+
(5, "Mark", 2, 2021),
1649+
(6, "Amy", 3, 2021),
1650+
(7, "John", 4, 2021)).toDF("id", "name", "score", "year")
1651+
1652+
val window = Window.partitionBy($"year").orderBy($"score".desc)
1653+
1654+
Seq(-1, 100).foreach { threshold =>
1655+
withSQLConf(SQLConf.WINDOW_GROUP_LIMIT_THRESHOLD.key -> threshold.toString) {
1656+
val df2 = df
1657+
.withColumn("rank", rank().over(window))
1658+
.withColumn("percent_rank", percent_rank().over(window))
1659+
.sort($"year")
1660+
checkAnswer(df2.filter("rank=2"), Seq(
1661+
Row(2, "Mark", 2, 2020, 2, 0.5),
1662+
Row(6, "Amy", 3, 2021, 2, 0.3333333333333333)
1663+
))
1664+
}
1665+
}
1666+
}
16401667
}

0 commit comments

Comments
 (0)