Skip to content

Commit 98f7275

Browse files
committed
Remove any Filters with DynamicPruning that didn't filter on partition column
1 parent bcc81be commit 98f7275

File tree

4 files changed

+24
-3
lines changed

4 files changed

+24
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,12 @@ class SparkOptimizer(
4444
Batch("PartitionPruning", Once,
4545
PartitionPruning,
4646
OptimizeSubqueries) :+
47-
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
47+
Batch("Pushdown Filters from PartitionPruning before Inferring Filters", fixedPoint,
4848
PushDownPredicates) :+
4949
Batch("Infer Filters from PartitionPruning", Once,
5050
InferFiltersFromConstraints) :+
51+
Batch("Pushdown Filters from PartitionPruning after Inferring Filters", fixedPoint,
52+
PushDownPredicates) :+
5153
Batch("Cleanup filters that cannot be pushed down", Once,
5254
CleanupDynamicPruningFilters,
5355
PruneFilters)) ++

sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.dynamicpruning
1919

20-
import org.apache.spark.sql.catalyst.expressions.{DynamicPruning, PredicateHelper}
20+
import org.apache.spark.sql.catalyst.expressions.{DynamicPruning, DynamicPruningSubquery, Expression, PredicateHelper}
2121
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
2222
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2323
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
@@ -32,12 +32,29 @@ import org.apache.spark.sql.internal.SQLConf
3232
*/
3333
object CleanupDynamicPruningFilters extends Rule[LogicalPlan] with PredicateHelper {
3434

35+
private def isFilterOnNonPartition(condition: Expression, child: LogicalPlan): Boolean = {
36+
splitConjunctivePredicates(condition).exists {
37+
case DynamicPruningSubquery(pruningKey, _, _, _, _, _) =>
38+
PartitionPruning.getPartitionTableScan(pruningKey, child).isEmpty
39+
case _ => false
40+
}
41+
}
42+
3543
override def apply(plan: LogicalPlan): LogicalPlan = {
3644
if (!SQLConf.get.dynamicPartitionPruningEnabled) {
3745
return plan
3846
}
3947

4048
plan.transform {
49+
// Remove any Filters with DynamicPruning that didn't filter on partition column`.
50+
// This is inferred by Infer Filters from PartitionPruning.
51+
case f @ Filter(condition, child) if isFilterOnNonPartition(condition, child) =>
52+
val newCondition = condition.transform {
53+
case DynamicPruningSubquery(pruningKey, _, _, _, _, _)
54+
if PartitionPruning.getPartitionTableScan(pruningKey, child).isEmpty =>
55+
TrueLiteral
56+
}
57+
f.copy(condition = newCondition)
4158
// pass through anything that is pushed down into PhysicalOperation
4259
case p @ PhysicalOperation(_, _, LogicalRelation(_: HadoopFsRelation, _, _, _)) => p
4360
// remove any Filters with DynamicPruning that didn't get pushed down to PhysicalOperation.

sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
147147
}
148148

149149
// the pruning overhead is the total size in bytes of all scan relations
150-
val overhead = otherPlan.collectLeaves().map(_.stats.sizeInBytes).sum.toFloat
150+
val overhead = otherPlan.collectLeaves().map(_.stats.sizeInBytes).sum
151151

152152
filterRatio * partPlan.stats.sizeInBytes.toFloat > overhead.toFloat
153153
}

sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1299,6 +1299,7 @@ abstract class DynamicPartitionPruningSuiteBase
12991299
assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 2)
13001300
checkDistinctSubqueries(df, 1)
13011301
checkPartitionPruningPredicate(df, false, true)
1302+
assert(!checkUnpushedFilters(df), "Inferred dynamic pruning expression has been pushed down.")
13021303

13031304
checkAnswer(df,
13041305
Row(2, 20, 2) ::
@@ -1324,6 +1325,7 @@ abstract class DynamicPartitionPruningSuiteBase
13241325

13251326
assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 1)
13261327
checkPartitionPruningPredicate(df, false, true)
1328+
assert(!checkUnpushedFilters(df), "Inferred dynamic pruning expression should be removed.")
13271329

13281330
checkAnswer(df,
13291331
Row(2, "NL", 2) ::

0 commit comments

Comments
 (0)