Skip to content

Commit 7dfc376

Browse files
authored
Merge pull request #244 from markhamstra/branch-2.3-merge
[SPARK-24802][SQL] Add a new config for Optimization Rule Exclusion
2 parents 63b644b + 19041a4 commit 7dfc376

File tree

3 files changed

+163
-1
lines changed

3 files changed

+163
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
4646

4747
protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations)
4848

49-
def batches: Seq[Batch] = {
49+
def defaultBatches: Seq[Batch] = {
5050
val operatorOptimizationRuleSet =
5151
Seq(
5252
// Operator push down
@@ -158,6 +158,22 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
158158
RemoveRedundantProject)
159159
}
160160

161+
def nonExcludableRules: Seq[String] =
162+
EliminateDistinct.ruleName ::
163+
EliminateSubqueryAliases.ruleName ::
164+
EliminateView.ruleName ::
165+
ReplaceExpressions.ruleName ::
166+
ComputeCurrentTime.ruleName ::
167+
GetCurrentDatabase(sessionCatalog).ruleName ::
168+
RewriteDistinctAggregates.ruleName ::
169+
ReplaceDeduplicateWithAggregate.ruleName ::
170+
ReplaceIntersectWithSemiJoin.ruleName ::
171+
ReplaceExceptWithFilter.ruleName ::
172+
ReplaceExceptWithAntiJoin.ruleName ::
173+
ReplaceDistinctWithAggregate.ruleName ::
174+
PullupCorrelatedPredicates.ruleName ::
175+
RewritePredicateSubquery.ruleName :: Nil
176+
161177
/**
162178
* Optimize all the subqueries inside expression.
163179
*/
@@ -173,6 +189,41 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
173189
* Override to provide additional rules for the operator optimization batch.
174190
*/
175191
def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil
192+
193+
override def batches: Seq[Batch] = {
194+
val excludedRulesConf =
195+
SQLConf.get.optimizerExcludedRules.toSeq.flatMap(Utils.stringToSeq)
196+
val excludedRules = excludedRulesConf.filter { ruleName =>
197+
val nonExcludable = nonExcludableRules.contains(ruleName)
198+
if (nonExcludable) {
199+
logWarning(s"Optimization rule '${ruleName}' was not excluded from the optimizer " +
200+
s"because this rule is a non-excludable rule.")
201+
}
202+
!nonExcludable
203+
}
204+
if (excludedRules.isEmpty) {
205+
defaultBatches
206+
} else {
207+
defaultBatches.flatMap { batch =>
208+
val filteredRules = batch.rules.filter { rule =>
209+
val exclude = excludedRules.contains(rule.ruleName)
210+
if (exclude) {
211+
logInfo(s"Optimization rule '${rule.ruleName}' is excluded from the optimizer.")
212+
}
213+
!exclude
214+
}
215+
if (batch.rules == filteredRules) {
216+
Some(batch)
217+
} else if (filteredRules.nonEmpty) {
218+
Some(Batch(batch.name, batch.strategy, filteredRules: _*))
219+
} else {
220+
logInfo(s"Optimization batch '${batch.name}' is excluded from the optimizer " +
221+
s"as all enclosed rules have been excluded.")
222+
None
223+
}
224+
}
225+
}
226+
}
176227
}
177228

178229
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,14 @@ object SQLConf {
109109
*/
110110
def get: SQLConf = confGetter.get()()
111111

112+
val OPTIMIZER_EXCLUDED_RULES = buildConf("spark.sql.optimizer.excludedRules")
113+
.doc("Configures a list of rules to be disabled in the optimizer, in which the rules are " +
114+
"specified by their rule names and separated by comma. It is not guaranteed that all the " +
115+
"rules in this configuration will eventually be excluded, as some rules are necessary " +
116+
"for correctness. The optimizer will log the rules that have indeed been excluded.")
117+
.stringConf
118+
.createOptional
119+
112120
val OPTIMIZER_MAX_ITERATIONS = buildConf("spark.sql.optimizer.maxIterations")
113121
.internal()
114122
.doc("The max number of iterations the optimizer and analyzer runs.")
@@ -1226,6 +1234,8 @@ class SQLConf extends Serializable with Logging {
12261234

12271235
/** ************************ Spark SQL Params/Hints ******************* */
12281236

1237+
def optimizerExcludedRules: Option[String] = getConf(OPTIMIZER_EXCLUDED_RULES)
1238+
12291239
def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)
12301240

12311241
def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.dsl.expressions._
21+
import org.apache.spark.sql.catalyst.dsl.plans._
22+
import org.apache.spark.sql.catalyst.plans.PlanTest
23+
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
24+
import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_EXCLUDED_RULES
25+
26+
27+
class OptimizerRuleExclusionSuite extends PlanTest {
28+
29+
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
30+
31+
private def verifyExcludedRules(excludedRuleNames: Seq[String]) {
32+
val optimizer = new SimpleTestOptimizer()
33+
// Batches whose rules are all to be excluded should be removed as a whole.
34+
val excludedBatchNames = optimizer.batches
35+
.filter(batch => batch.rules.forall(rule => excludedRuleNames.contains(rule.ruleName)))
36+
.map(_.name)
37+
38+
withSQLConf(
39+
OPTIMIZER_EXCLUDED_RULES.key -> excludedRuleNames.foldLeft("")((l, r) => l + "," + r)) {
40+
val batches = optimizer.batches
41+
assert(batches.forall(batch => !excludedBatchNames.contains(batch.name)))
42+
assert(
43+
batches
44+
.forall(batch => batch.rules.forall(rule => !excludedRuleNames.contains(rule.ruleName))))
45+
}
46+
}
47+
48+
test("Exclude a single rule from multiple batches") {
49+
verifyExcludedRules(
50+
Seq(
51+
PushPredicateThroughJoin.ruleName))
52+
}
53+
54+
test("Exclude multiple rules from single or multiple batches") {
55+
verifyExcludedRules(
56+
Seq(
57+
CombineUnions.ruleName,
58+
RemoveLiteralFromGroupExpressions.ruleName,
59+
RemoveRepetitionFromGroupExpressions.ruleName))
60+
}
61+
62+
test("Exclude non-existent rule with other valid rules") {
63+
verifyExcludedRules(
64+
Seq(
65+
LimitPushDown.ruleName,
66+
InferFiltersFromConstraints.ruleName,
67+
"DummyRuleName"))
68+
}
69+
70+
test("Try to exclude a non-excludable rule") {
71+
val excludedRules = Seq(
72+
ReplaceIntersectWithSemiJoin.ruleName,
73+
PullupCorrelatedPredicates.ruleName)
74+
75+
val optimizer = new SimpleTestOptimizer()
76+
77+
withSQLConf(
78+
OPTIMIZER_EXCLUDED_RULES.key -> excludedRules.foldLeft("")((l, r) => l + "," + r)) {
79+
excludedRules.foreach { excludedRule =>
80+
assert(
81+
optimizer.batches
82+
.exists(batch => batch.rules.exists(rule => rule.ruleName == excludedRule)))
83+
}
84+
}
85+
}
86+
87+
test("Verify optimized plan after excluding CombineUnions rule") {
88+
val excludedRules = Seq(
89+
ConvertToLocalRelation.ruleName,
90+
PropagateEmptyRelation.ruleName,
91+
CombineUnions.ruleName)
92+
93+
withSQLConf(
94+
OPTIMIZER_EXCLUDED_RULES.key -> excludedRules.foldLeft("")((l, r) => l + "," + r)) {
95+
val optimizer = new SimpleTestOptimizer()
96+
val originalQuery = testRelation.union(testRelation.union(testRelation)).analyze
97+
val optimized = optimizer.execute(originalQuery)
98+
comparePlans(originalQuery, optimized)
99+
}
100+
}
101+
}

0 commit comments

Comments
 (0)