@@ -140,12 +140,35 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
140
140
InsertIntoParquetTable (relation, planLater(child), overwrite= true )(sparkContext) :: Nil
141
141
case logical.InsertIntoTable (table : ParquetRelation , partition, child, overwrite) =>
142
142
InsertIntoParquetTable (table, planLater(child), overwrite)(sparkContext) :: Nil
143
- case PhysicalOperation (projectList, filters, relation : ParquetRelation ) =>
144
- // TODO: Should be pushing down filters as well.
143
+ case PhysicalOperation (projectList, filters : Seq [Expression ], relation : ParquetRelation ) => {
144
+ val remainingFilters =
145
+ if (sparkContext.conf.getBoolean(ParquetFilters .PARQUET_FILTER_PUSHDOWN_ENABLED , true )) {
146
+ filters.filter {
147
+ // Note: filters cannot be pushed down to Parquet if they contain more complex
148
+ // expressions than simple "Attribute cmp Literal" comparisons. Here we remove
149
+ // all filters that have been pushed down. Note that a predicate such as
150
+ // "(A AND B) OR C" can result in "A OR C" being pushed down.
151
+ filter =>
152
+ val recordFilter = ParquetFilters .createFilter(filter)
153
+ if (! recordFilter.isDefined) {
154
+ // First case: the pushdown did not result in any record filter.
155
+ true
156
+ } else {
157
+ // Second case: a record filter was created; here we are conservative in
158
+ // the sense that even if "A" was pushed and we check for "A AND B" we
159
+ // still want to keep "A AND B" in the higher-level filter, not just "B".
160
+ ! ParquetFilters .findExpression(recordFilter.get, filter).isDefined
161
+ }
162
+ }
163
+ } else {
164
+ filters
165
+ }
145
166
pruneFilterProject(
146
167
projectList,
147
- filters,
148
- ParquetTableScan (_, relation, None )(sparkContext)) :: Nil
168
+ remainingFilters,
169
+ ParquetTableScan (_, relation, filters)(sparkContext)) :: Nil
170
+ }
171
+
149
172
case _ => Nil
150
173
}
151
174
}
0 commit comments