Skip to content

Commit 85fea2d

Browse files
Adding SparkConf setting to disable filter predicate pushdown
1 parent f0ad3cf commit 85fea2d

File tree

3 files changed

+12
-4
lines changed

3 files changed

+12
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ import org.apache.spark.sql.execution.SparkSqlSerializer
3131

3232
object ParquetFilters {
3333
val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
34+
// set this to false if pushdown should be disabled
35+
// Note: prefix is "spark.hadoop." so that it will be copied from SparkConf
36+
// to Hadoop configuration
37+
val PARQUET_FILTER_PUSHDOWN_ENABLED = "org.apache.spark.sql.parquet.filter.pushdown"
3438

3539
def createFilter(filterExpressions: Seq[Expression]): UnboundRecordFilter = {
3640
def createEqualityFilter(name: String, literal: Literal) = literal.dataType match {

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,13 @@ case class ParquetTableScan(
7171
ParquetTypesConverter.convertFromAttributes(output).toString)
7272

7373
// Store record filtering predicate in `Configuration`
74-
// Note: the input format ignores all predicates that cannot be expressed
74+
// Note 1: the input format ignores all predicates that cannot be expressed
7575
// as simple column predicate filters in Parquet. Here we just record
7676
// the whole pruning predicate.
77-
if (columnPruningPred.isDefined) {
77+
// Note 2: you can disable filter predicate pushdown by setting
78+
// "org.apache.spark.sql.parquet.filter.pushdown" to false inside SparkConf.
79+
if (columnPruningPred.isDefined &&
80+
sc.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
7881
ParquetFilters.serializeFilterExpressions(columnPruningPred.get, conf)
7982
}
8083

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,15 @@ private[sql] object ParquetTestData {
130130
writer.close()
131131
}
132132

133-
def writeFilterFile() = {
133+
def writeFilterFile(records: Int = 200) = {
134+
// for microbenchmark use: records = 300000000
134135
testFilterDir.delete
135136
val path: Path = new Path(new Path(testFilterDir.toURI), new Path("part-r-0.parquet"))
136137
val schema: MessageType = MessageTypeParser.parseMessageType(testFilterSchema)
137138
val writeSupport = new TestGroupWriteSupport(schema)
138139
val writer = new ParquetWriter[Group](path, writeSupport)
139140

140-
for(i <- 0 to 200) {
141+
for(i <- 0 to records) {
141142
val record = new SimpleGroup(schema)
142143
if (i % 4 == 0) {
143144
record.add(0, true)

0 commit comments

Comments
 (0)