Skip to content

Commit 43760e8

Browse files
committed
Simplifies Parquet filter generation logic
1 parent 64c6b9b commit 43760e8

File tree

5 files changed

+162
-693
lines changed

5 files changed

+162
-693
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.util.Metadata
2626
object NamedExpression {
2727
private val curId = new java.util.concurrent.atomic.AtomicLong()
2828
def newExprId = ExprId(curId.getAndIncrement())
29+
def unapply(expr: NamedExpression): Option[(String, DataType)] = Some(expr.name, expr.dataType)
2930
}
3031

3132
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -209,22 +209,15 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
209209
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
210210
val prunePushedDownFilters =
211211
if (sqlContext.parquetFilterPushDown) {
212-
(filters: Seq[Expression]) => {
213-
filters.filter { filter =>
214-
// Note: filters cannot be pushed down to Parquet if they contain more complex
215-
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove
216-
// all filters that have been pushed down. Note that a predicate such as
217-
// "(A AND B) OR C" can result in "A OR C" being pushed down.
218-
val recordFilter = ParquetFilters.createFilter(filter)
219-
if (!recordFilter.isDefined) {
220-
// First case: the pushdown did not result in any record filter.
221-
true
222-
} else {
223-
// Second case: a record filter was created; here we are conservative in
224-
// the sense that even if "A" was pushed and we check for "A AND B" we
225-
// still want to keep "A AND B" in the higher-level filter, not just "B".
226-
!ParquetFilters.findExpression(recordFilter.get, filter).isDefined
227-
}
212+
(predicates: Seq[Expression]) => {
213+
// Note: filters cannot be pushed down to Parquet if they contain more complex
214+
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove all
215+
// filters that have been pushed down. Note that a predicate such as "(A AND B) OR C"
216+
// can result in "A OR C" being pushed down. Here we are conservative in the sense
217+
// that even if "A" was pushed and we check for "A AND B" we still want to keep
218+
// "A AND B" in the higher-level filter, not just "B".
219+
predicates.map(p => p -> ParquetFilters.createFilter(p)).collect {
220+
case (predicate, None) => predicate
228221
}
229222
}
230223
} else {

0 commit comments

Comments
 (0)