Skip to content

Commit 36b0956

Browse files
lianchengmarmbrus
authored andcommitted
[SPARK-4453][SPARK-4213][SQL] Simplifies Parquet filter generation code
While reviewing PR apache#3083 and apache#3161, I noticed that Parquet record filter generation code can be simplified significantly according to the clue stated in [SPARK-4453](https://issues.apache.org/jira/browse/SPARK-4213). This PR addresses both SPARK-4453 and SPARK-4213 with this simplification. While generating `ParquetTableScan` operator, we need to remove all Catalyst predicates that have already been pushed down to Parquet. Originally, we first generate the record filter, and then call `findExpression` to traverse the generated filter to find out all pushed down predicates [[1](https://github.com/apache/spark/blob/64c6b9bad559c21f25cd9fbe37c8813cdab939f2/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L213-L228)]. In this way, we have to introduce the `CatalystFilter` class hierarchy to bind the Catalyst predicates together with their generated Parquet filter, and complicate the code base a lot. The basic idea of this PR is that, we don't need `findExpression` after filter generation, because we already know a predicate can be pushed down if we can successfully generate its corresponding Parquet filter. SPARK-4213 is fixed by returning `None` for any unsupported predicate type. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3317) <!-- Reviewable:end --> Author: Cheng Lian <[email protected]> Closes apache#3317 from liancheng/simplify-parquet-filters and squashes the following commits: d6a9499 [Cheng Lian] Fixes import styling issue 43760e8 [Cheng Lian] Simplifies Parquet filter generation logic
1 parent ef7c464 commit 36b0956

File tree

5 files changed

+161
-693
lines changed

5 files changed

+161
-693
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.util.Metadata
2626
object NamedExpression {
2727
private val curId = new java.util.concurrent.atomic.AtomicLong()
2828
def newExprId = ExprId(curId.getAndIncrement())
29+
def unapply(expr: NamedExpression): Option[(String, DataType)] = Some(expr.name, expr.dataType)
2930
}
3031

3132
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -209,22 +209,15 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
209209
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
210210
val prunePushedDownFilters =
211211
if (sqlContext.parquetFilterPushDown) {
212-
(filters: Seq[Expression]) => {
213-
filters.filter { filter =>
214-
// Note: filters cannot be pushed down to Parquet if they contain more complex
215-
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove
216-
// all filters that have been pushed down. Note that a predicate such as
217-
// "(A AND B) OR C" can result in "A OR C" being pushed down.
218-
val recordFilter = ParquetFilters.createFilter(filter)
219-
if (!recordFilter.isDefined) {
220-
// First case: the pushdown did not result in any record filter.
221-
true
222-
} else {
223-
// Second case: a record filter was created; here we are conservative in
224-
// the sense that even if "A" was pushed and we check for "A AND B" we
225-
// still want to keep "A AND B" in the higher-level filter, not just "B".
226-
!ParquetFilters.findExpression(recordFilter.get, filter).isDefined
227-
}
212+
(predicates: Seq[Expression]) => {
213+
// Note: filters cannot be pushed down to Parquet if they contain more complex
214+
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove all
215+
// filters that have been pushed down. Note that a predicate such as "(A AND B) OR C"
216+
// can result in "A OR C" being pushed down. Here we are conservative in the sense
217+
// that even if "A" was pushed and we check for "A AND B" we still want to keep
218+
// "A AND B" in the higher-level filter, not just "B".
219+
predicates.map(p => p -> ParquetFilters.createFilter(p)).collect {
220+
case (predicate, None) => predicate
228221
}
229222
}
230223
} else {

0 commit comments

Comments
 (0)