Skip to content

Commit 248067a

Browse files
lianchengmarmbrus
authored andcommitted
[SPARK-2961][SQL] Use statistics to prune batches within cached partitions
This PR is based on #1883 authored by marmbrus. Key differences: 1. Batch pruning instead of partition pruning When #1883 was authored, batched column buffer building (#1880) hadn't been introduced. This PR combines these two and provide partition batch level pruning, which leads to smaller memory footprints and can generally skip more elements. The cost is that the pruning predicates are evaluated more frequently (partition number multiplies batch number per partition). 1. More filters are supported Filter predicates consist of `=`, `<`, `<=`, `>`, `>=` and their conjunctions and disjunctions are supported. Author: Cheng Lian <[email protected]> Closes #2188 from liancheng/in-mem-batch-pruning and squashes the following commits: 68cf019 [Cheng Lian] Marked sqlContext as @transient 4254f6c [Cheng Lian] Enables in-memory partition pruning in PartitionBatchPruningSuite 3784105 [Cheng Lian] Overrides InMemoryColumnarTableScan.sqlContext d2a1d66 [Cheng Lian] Disables in-memory partition pruning by default 062c315 [Cheng Lian] HiveCompatibilitySuite code cleanup 16b77bf [Cheng Lian] Fixed pruning predication conjunctions and disjunctions 16195c5 [Cheng Lian] Enabled both disjunction and conjunction 89950d0 [Cheng Lian] Worked around Scala style check 9c167f6 [Cheng Lian] Minor code cleanup 3c4d5c7 [Cheng Lian] Minor code cleanup ea59ee5 [Cheng Lian] Renamed PartitionSkippingSuite to PartitionBatchPruningSuite fc517d0 [Cheng Lian] More test cases 1868c18 [Cheng Lian] Code cleanup, bugfix, and adding tests cb76da4 [Cheng Lian] Added more predicate filters, fixed table scan stats for testing purposes 385474a [Cheng Lian] Merge branch 'inMemStats' into in-mem-batch-pruning
1 parent f48420f commit 248067a

File tree

17 files changed

+446
-359
lines changed

17 files changed

+446
-359
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions
19+
20+
/**
21+
* Builds a map that is keyed by an Attribute's expression id. Using the expression id allows values
22+
* to be looked up even when the attributes used differ cosmetically (i.e., the capitalization
23+
* of the name, or the expected nullability).
24+
*/
25+
object AttributeMap {
26+
def apply[A](kvs: Seq[(Attribute, A)]) =
27+
new AttributeMap(kvs.map(kv => (kv._1.exprId, (kv._1, kv._2))).toMap)
28+
}
29+
30+
class AttributeMap[A](baseMap: Map[ExprId, (Attribute, A)])
31+
extends Map[Attribute, A] with Serializable {
32+
33+
override def get(k: Attribute): Option[A] = baseMap.get(k.exprId).map(_._2)
34+
35+
override def + [B1 >: A](kv: (Attribute, B1)): Map[Attribute, B1] =
36+
(baseMap.map(_._2) + kv).toMap
37+
38+
override def iterator: Iterator[(Attribute, A)] = baseMap.map(_._2).iterator
39+
40+
override def -(key: Attribute): Map[Attribute, A] = (baseMap.map(_._2) - key).toMap
41+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,20 @@ case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean)
3838
}
3939

4040
object BindReferences extends Logging {
41-
def bindReference[A <: Expression](expression: A, input: Seq[Attribute]): A = {
41+
42+
def bindReference[A <: Expression](
43+
expression: A,
44+
input: Seq[Attribute],
45+
allowFailures: Boolean = false): A = {
4246
expression.transform { case a: AttributeReference =>
4347
attachTree(a, "Binding attribute") {
4448
val ordinal = input.indexWhere(_.exprId == a.exprId)
4549
if (ordinal == -1) {
46-
sys.error(s"Couldn't find $a in ${input.mkString("[", ",", "]")}")
50+
if (allowFailures) {
51+
a
52+
} else {
53+
sys.error(s"Couldn't find $a in ${input.mkString("[", ",", "]")}")
54+
}
4755
} else {
4856
BoundReference(ordinal, a.dataType, a.nullable)
4957
}

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import java.util.Properties
2626
private[spark] object SQLConf {
2727
val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
2828
val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
29+
val IN_MEMORY_PARTITION_PRUNING = "spark.sql.inMemoryColumnarStorage.partitionPruning"
2930
val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
3031
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
3132
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
@@ -124,6 +125,12 @@ trait SQLConf {
124125
private[spark] def isParquetBinaryAsString: Boolean =
125126
getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean
126127

128+
/**
129+
* When set to true, partition pruning for in-memory columnar tables is enabled.
130+
*/
131+
private[spark] def inMemoryPartitionPruning: Boolean =
132+
getConf(IN_MEMORY_PARTITION_PRUNING, "false").toBoolean
133+
127134
/** ********************** SQLConf functionality methods ************ */
128135

129136
/** Set Spark SQL configuration properties. */

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ private[sql] trait ColumnBuilder {
3838
/**
3939
* Column statistics information
4040
*/
41-
def columnStats: ColumnStats[_, _]
41+
def columnStats: ColumnStats
4242

4343
/**
4444
* Returns the final columnar byte buffer.
@@ -47,7 +47,7 @@ private[sql] trait ColumnBuilder {
4747
}
4848

4949
private[sql] class BasicColumnBuilder[T <: DataType, JvmType](
50-
val columnStats: ColumnStats[T, JvmType],
50+
val columnStats: ColumnStats,
5151
val columnType: ColumnType[T, JvmType])
5252
extends ColumnBuilder {
5353

@@ -81,18 +81,18 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType](
8181

8282
private[sql] abstract class ComplexColumnBuilder[T <: DataType, JvmType](
8383
columnType: ColumnType[T, JvmType])
84-
extends BasicColumnBuilder[T, JvmType](new NoopColumnStats[T, JvmType], columnType)
84+
extends BasicColumnBuilder[T, JvmType](new NoopColumnStats, columnType)
8585
with NullableColumnBuilder
8686

8787
private[sql] abstract class NativeColumnBuilder[T <: NativeType](
88-
override val columnStats: NativeColumnStats[T],
88+
override val columnStats: ColumnStats,
8989
override val columnType: NativeColumnType[T])
9090
extends BasicColumnBuilder[T, T#JvmType](columnStats, columnType)
9191
with NullableColumnBuilder
9292
with AllCompressionSchemes
9393
with CompressibleColumnBuilder[T]
9494

95-
private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)
95+
private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new NoopColumnStats, BOOLEAN)
9696

9797
private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)
9898

0 commit comments

Comments
 (0)