Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 6e63201

Browse files
dongjoon-hyunmarmbrus
authored andcommitted
[SPARK-14830][SQL] Add RemoveRepetitionFromGroupExpressions optimizer.
## What changes were proposed in this pull request? This PR aims to optimize GroupExpressions by removing repeating expressions. `RemoveRepetitionFromGroupExpressions` is added. **Before** ```scala scala> sql("select a+1 from values 1,2 T(a) group by a+1, 1+a, A+1, 1+A").explain() == Physical Plan == WholeStageCodegen : +- TungstenAggregate(key=[(a#0 + 1)#6,(1 + a#0)#7,(A#0 + 1)#8,(1 + A#0)#9], functions=[], output=[(a + 1)#5]) : +- INPUT +- Exchange hashpartitioning((a#0 + 1)#6, (1 + a#0)#7, (A#0 + 1)#8, (1 + A#0)#9, 200), None +- WholeStageCodegen : +- TungstenAggregate(key=[(a#0 + 1) AS (a#0 + 1)#6,(1 + a#0) AS (1 + a#0)#7,(A#0 + 1) AS (A#0 + 1)#8,(1 + A#0) AS (1 + A#0)#9], functions=[], output=[(a#0 + 1)#6,(1 + a#0)#7,(A#0 + 1)#8,(1 + A#0)#9]) : +- INPUT +- LocalTableScan [a#0], [[1],[2]] ``` **After** ```scala scala> sql("select a+1 from values 1,2 T(a) group by a+1, 1+a, A+1, 1+A").explain() == Physical Plan == WholeStageCodegen : +- TungstenAggregate(key=[(a#0 + 1)#6], functions=[], output=[(a + 1)#5]) : +- INPUT +- Exchange hashpartitioning((a#0 + 1)#6, 200), None +- WholeStageCodegen : +- TungstenAggregate(key=[(a#0 + 1) AS (a#0 + 1)#6], functions=[], output=[(a#0 + 1)#6]) : +- INPUT +- LocalTableScan [a#0], [[1],[2]] ``` ## How was this patch tested? Pass the Jenkins tests (with a new testcase) Author: Dongjoon Hyun <[email protected]> Closes apache#12590 from dongjoon-hyun/SPARK-14830.
1 parent a35a67a commit 6e63201

File tree

2 files changed

+33
-2
lines changed

2 files changed

+33
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
6868
ReplaceExceptWithAntiJoin,
6969
ReplaceDistinctWithAggregate) ::
7070
Batch("Aggregate", fixedPoint,
71-
RemoveLiteralFromGroupExpressions) ::
71+
RemoveLiteralFromGroupExpressions,
72+
RemoveRepetitionFromGroupExpressions) ::
7273
Batch("Operator Optimizations", fixedPoint,
7374
// Operator push down
7475
SetOperationPushDown,
@@ -1439,6 +1440,18 @@ object RemoveLiteralFromGroupExpressions extends Rule[LogicalPlan] {
14391440
}
14401441
}
14411442

1443+
/**
1444+
* Removes repetition from group expressions in [[Aggregate]], as they have no effect to the result
1445+
* but only makes the grouping key bigger.
1446+
*/
1447+
object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] {
1448+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1449+
case a @ Aggregate(grouping, _, _) =>
1450+
val newGrouping = ExpressionSet(grouping).toSeq
1451+
a.copy(groupingExpressions = newGrouping)
1452+
}
1453+
}
1454+
14421455
/**
14431456
* Computes the current date and time to make sure we return the same result in a single query.
14441457
*/

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.sql.catalyst.optimizer
1919

20+
import org.apache.spark.sql.catalyst.SimpleCatalystConf
21+
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry}
22+
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
2023
import org.apache.spark.sql.catalyst.dsl.expressions._
2124
import org.apache.spark.sql.catalyst.dsl.plans._
2225
import org.apache.spark.sql.catalyst.expressions.Literal
@@ -25,10 +28,14 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
2528
import org.apache.spark.sql.catalyst.rules.RuleExecutor
2629

2730
class AggregateOptimizeSuite extends PlanTest {
31+
val conf = new SimpleCatalystConf(caseSensitiveAnalysis = false)
32+
val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
33+
val analyzer = new Analyzer(catalog, conf)
2834

2935
object Optimize extends RuleExecutor[LogicalPlan] {
3036
val batches = Batch("Aggregate", FixedPoint(100),
31-
RemoveLiteralFromGroupExpressions) :: Nil
37+
RemoveLiteralFromGroupExpressions,
38+
RemoveRepetitionFromGroupExpressions) :: Nil
3239
}
3340

3441
test("remove literals in grouping expression") {
@@ -42,4 +49,15 @@ class AggregateOptimizeSuite extends PlanTest {
4249

4350
comparePlans(optimized, correctAnswer)
4451
}
52+
53+
test("remove repetition in grouping expression") {
54+
val input = LocalRelation('a.int, 'b.int, 'c.int)
55+
56+
val query = input.groupBy('a + 1, 'b + 2, Literal(1) + 'A, Literal(2) + 'B)(sum('c))
57+
val optimized = Optimize.execute(analyzer.execute(query))
58+
59+
val correctAnswer = analyzer.execute(input.groupBy('a + 1, 'b + 2)(sum('c)))
60+
61+
comparePlans(optimized, correctAnswer)
62+
}
4563
}

0 commit comments

Comments
 (0)