Skip to content

Commit d12ea49

Browse files
marmbruspwendell
authored andcommitted
[SPARK-4753][SQL] Use catalyst for partition pruning in newParquet.
Author: Michael Armbrust <[email protected]> Closes #3613 from marmbrus/parquetPartitionPruning and squashes the following commits: 4f138f8 [Michael Armbrust] Use catalyst for partition pruning in newParquet. (cherry picked from commit f5801e8) Signed-off-by: Patrick Wendell <[email protected]>
1 parent a8d8077 commit d12ea49

File tree

1 file changed

+28
-30
lines changed

1 file changed

+28
-30
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2222
import org.apache.hadoop.conf.{Configurable, Configuration}
2323
import org.apache.hadoop.io.Writable
2424
import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job}
25+
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
2526

2627
import parquet.hadoop.ParquetInputFormat
2728
import parquet.hadoop.util.ContextUtil
@@ -31,8 +32,8 @@ import org.apache.spark.{Partition => SparkPartition, Logging}
3132
import org.apache.spark.rdd.{NewHadoopPartition, RDD}
3233

3334
import org.apache.spark.sql.{SQLConf, Row, SQLContext}
34-
import org.apache.spark.sql.catalyst.expressions.{SpecificMutableRow, And, Expression, Attribute}
35-
import org.apache.spark.sql.catalyst.types.{IntegerType, StructField, StructType}
35+
import org.apache.spark.sql.catalyst.expressions._
36+
import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, StructField, StructType}
3637
import org.apache.spark.sql.sources._
3738

3839
import scala.collection.JavaConversions._
@@ -151,44 +152,41 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
151152
override def buildScan(output: Seq[Attribute], predicates: Seq[Expression]): RDD[Row] = {
152153
// This is mostly a hack so that we can use the existing parquet filter code.
153154
val requiredColumns = output.map(_.name)
154-
// TODO: Parquet filters should be based on data sources API, not catalyst expressions.
155-
val filters = DataSourceStrategy.selectFilters(predicates)
156155

157156
val job = new Job(sparkContext.hadoopConfiguration)
158157
ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
159158
val jobConf: Configuration = ContextUtil.getConfiguration(job)
160159

161160
val requestedSchema = StructType(requiredColumns.map(schema(_)))
162161

163-
// TODO: Make folder based partitioning a first class citizen of the Data Sources API.
164-
val partitionFilters = filters.collect {
165-
case e @ EqualTo(attr, value) if partitionKeys.contains(attr) =>
166-
logInfo(s"Parquet scan partition filter: $attr=$value")
167-
(p: Partition) => p.partitionValues(attr) == value
168-
169-
case e @ In(attr, values) if partitionKeys.contains(attr) =>
170-
logInfo(s"Parquet scan partition filter: $attr IN ${values.mkString("{", ",", "}")}")
171-
val set = values.toSet
172-
(p: Partition) => set.contains(p.partitionValues(attr))
173-
174-
case e @ GreaterThan(attr, value) if partitionKeys.contains(attr) =>
175-
logInfo(s"Parquet scan partition filter: $attr > $value")
176-
(p: Partition) => p.partitionValues(attr).asInstanceOf[Int] > value.asInstanceOf[Int]
177-
178-
case e @ GreaterThanOrEqual(attr, value) if partitionKeys.contains(attr) =>
179-
logInfo(s"Parquet scan partition filter: $attr >= $value")
180-
(p: Partition) => p.partitionValues(attr).asInstanceOf[Int] >= value.asInstanceOf[Int]
162+
val partitionKeySet = partitionKeys.toSet
163+
val rawPredicate =
164+
predicates
165+
.filter(_.references.map(_.name).toSet.subsetOf(partitionKeySet))
166+
.reduceOption(And)
167+
.getOrElse(Literal(true))
168+
169+
// Translate the predicate so that it reads from the information derived from the
170+
// folder structure
171+
val castedPredicate = rawPredicate transform {
172+
case a: AttributeReference =>
173+
val idx = partitionKeys.indexWhere(a.name == _)
174+
BoundReference(idx, IntegerType, nullable = true)
175+
}
181176

182-
case e @ LessThan(attr, value) if partitionKeys.contains(attr) =>
183-
logInfo(s"Parquet scan partition filter: $attr < $value")
184-
(p: Partition) => p.partitionValues(attr).asInstanceOf[Int] < value.asInstanceOf[Int]
177+
val inputData = new GenericMutableRow(partitionKeys.size)
178+
val pruningCondition = InterpretedPredicate(castedPredicate)
185179

186-
case e @ LessThanOrEqual(attr, value) if partitionKeys.contains(attr) =>
187-
logInfo(s"Parquet scan partition filter: $attr <= $value")
188-
(p: Partition) => p.partitionValues(attr).asInstanceOf[Int] <= value.asInstanceOf[Int]
189-
}
180+
val selectedPartitions =
181+
if (partitionKeys.nonEmpty && predicates.nonEmpty) {
182+
partitions.filter { part =>
183+
inputData(0) = part.partitionValues.values.head
184+
pruningCondition(inputData)
185+
}
186+
} else {
187+
partitions
188+
}
190189

191-
val selectedPartitions = partitions.filter(p => partitionFilters.forall(_(p)))
192190
val fs = FileSystem.get(new java.net.URI(path), sparkContext.hadoopConfiguration)
193191
val selectedFiles = selectedPartitions.flatMap(_.files).map(f => fs.makeQualified(f.getPath))
194192
// FileInputFormat cannot handle empty lists.

0 commit comments

Comments
 (0)