Skip to content

Commit ded7fee

Browse files
authored
Fix for ineligible filters, use compressed block size (apache#70)
1 parent 9c256a4 commit ded7fee

File tree

2 files changed

+35
-8
lines changed

2 files changed

+35
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileSplitter.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ class ParquetMetadataFileSplitter(
7878
(applied, unapplied, filteredBlocks)
7979
}
8080

81-
val eligible = parquetFilter(unapplied, filteredBlocks).map { bmd =>
81+
val eligible = applyParquetFilter(unapplied, filteredBlocks).map { bmd =>
8282
val blockPath = new Path(root, bmd.getPath)
83-
new FileSplit(blockPath, bmd.getStartingPos, bmd.getTotalByteSize, Array.empty)
83+
new FileSplit(blockPath, bmd.getStartingPos, bmd.getCompressedSize, Array.empty)
8484
}
8585

8686
val statFilter: (FileStatus => Seq[FileSplit]) = { stat =>
@@ -95,19 +95,20 @@ class ParquetMetadataFileSplitter(
9595
statFilter
9696
}
9797

98-
private def parquetFilter(
98+
private def applyParquetFilter(
9999
filters: Seq[Filter],
100100
blocks: Seq[BlockMetaData]): Seq[BlockMetaData] = {
101-
if (filters.nonEmpty) {
101+
val predicates = filters.flatMap {
102+
ParquetFilters.createFilter(schema, _)
103+
}
104+
if (predicates.nonEmpty) {
102105
// Asynchronously build bitmaps
103106
Future {
104107
buildFilterBitMaps(filters)
105108
}(ParquetMetadataFileSplitter.executionContext)
106109

107-
val predicate = filters.flatMap {
108-
ParquetFilters.createFilter(schema, _)
109-
}.reduce(FilterApi.and)
110-
blocks.filter(bmd => !StatisticsFilter.canDrop(predicate, bmd.getColumns))
110+
val predicate = predicates.reduce(FilterApi.and)
111+
blocks.filterNot(bmd => StatisticsFilter.canDrop(predicate, bmd.getColumns))
111112
} else {
112113
blocks
113114
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,32 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
769769
}
770770
}
771771
}
772+
773+
test("Ensure file with multiple blocks splits properly with filters") {
774+
withSQLConf(ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL",
775+
SQLConf.FILES_MAX_PARTITION_BYTES.key -> "1024",
776+
ParquetOutputFormat.BLOCK_SIZE -> "1") {
777+
withTempPath { path =>
778+
spark.sparkContext.parallelize((1 to 1000).map(x => x.toString), 1)
779+
.toDF("x").write.parquet(path.getCanonicalPath)
780+
val df = spark.read.parquet(path.getCanonicalPath)
781+
val column: Column = df.col("x").isNotNull
782+
assert(df.filter(column).count == df.count)
783+
}
784+
}
785+
}
786+
787+
test("Ensure unconvertable filters don't break splitting") {
788+
withSQLConf(ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL") {
789+
withTempPath { path =>
790+
spark.sparkContext.parallelize((1 to 1000).map(x => x.toString), 1)
791+
.toDF("x").write.parquet(path.getCanonicalPath)
792+
val df = spark.read.parquet(path.getCanonicalPath)
793+
val column: Column = df.col("x").startsWith("1000")
794+
assert(df.filter(column).count == 1)
795+
}
796+
}
797+
}
772798
}
773799

774800
class CountingFileSystem extends RawLocalFileSystem {

0 commit comments

Comments
 (0)