Skip to content

Commit 3b395e1

Browse files
lianchengmarmbrus
authored andcommitted
[SPARK-4798][SQL] A new set of Parquet testing API and test suites
This PR provides a set Parquet testing API (see trait `ParquetTest`) that enables developers to write more concise test cases. A new set of Parquet test suites built upon this API are added and aim to replace the old `ParquetQuerySuite`. To avoid potential merge conflicts, old testing code are not removed yet. The following classes can be safely removed after most Parquet related PRs are handled: - `ParquetQuerySuite` - `ParquetTestData` <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3644) <!-- Reviewable:end --> Author: Cheng Lian <[email protected]> Closes #3644 from liancheng/parquet-tests and squashes the following commits: 800e745 [Cheng Lian] Enforces ordering of test output 3bb8731 [Cheng Lian] Refactors HiveParquetSuite aa2cb2e [Cheng Lian] Decouples ParquetTest and TestSQLContext 7b43a68 [Cheng Lian] Updates ParquetTest Scaladoc 7f07af0 [Cheng Lian] Adds a new set of Parquet test suites
1 parent b85044e commit 3b395e1

File tree

8 files changed

+989
-81
lines changed

8 files changed

+989
-81
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,10 @@ private[sql] trait SQLConf {
188188
*/
189189
def getAllConfs: immutable.Map[String, String] = settings.synchronized { settings.toMap }
190190

191+
private[spark] def unsetConf(key: String) {
192+
settings -= key
193+
}
194+
191195
private[spark] def clear() {
192196
settings.clear()
193197
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.parquet
19+
20+
import java.io.File
21+
22+
import scala.reflect.ClassTag
23+
import scala.reflect.runtime.universe.TypeTag
24+
import scala.util.Try
25+
26+
import org.apache.spark.sql.{SQLContext, SchemaRDD}
27+
import org.apache.spark.sql.catalyst.util
28+
import org.apache.spark.util.Utils
29+
30+
/**
31+
* A helper trait that provides convenient facilities for Parquet testing.
32+
*
33+
* NOTE: Considering classes `Tuple1` ... `Tuple22` all extend `Product`, it would be more
34+
* convenient to use tuples rather than special case classes when writing test cases/suites.
35+
* Especially, `Tuple1.apply` can be used to easily wrap a single type/value.
36+
*/
37+
trait ParquetTest {
38+
val sqlContext: SQLContext
39+
40+
import sqlContext._
41+
42+
protected def configuration = sparkContext.hadoopConfiguration
43+
44+
/**
45+
* Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL
46+
* configurations.
47+
*
48+
* @todo Probably this method should be moved to a more general place
49+
*/
50+
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
51+
val (keys, values) = pairs.unzip
52+
val currentValues = keys.map(key => Try(getConf(key)).toOption)
53+
(keys, values).zipped.foreach(setConf)
54+
try f finally {
55+
keys.zip(currentValues).foreach {
56+
case (key, Some(value)) => setConf(key, value)
57+
case (key, None) => unsetConf(key)
58+
}
59+
}
60+
}
61+
62+
/**
63+
* Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
64+
* a file/directory is created there by `f`, it will be delete after `f` returns.
65+
*
66+
* @todo Probably this method should be moved to a more general place
67+
*/
68+
protected def withTempPath(f: File => Unit): Unit = {
69+
val file = util.getTempFilePath("parquetTest").getCanonicalFile
70+
try f(file) finally if (file.exists()) Utils.deleteRecursively(file)
71+
}
72+
73+
/**
74+
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
75+
* returns.
76+
*
77+
* @todo Probably this method should be moved to a more general place
78+
*/
79+
protected def withTempDir(f: File => Unit): Unit = {
80+
val dir = Utils.createTempDir().getCanonicalFile
81+
try f(dir) finally Utils.deleteRecursively(dir)
82+
}
83+
84+
/**
85+
* Writes `data` to a Parquet file, which is then passed to `f` and will be deleted after `f`
86+
* returns.
87+
*/
88+
protected def withParquetFile[T <: Product: ClassTag: TypeTag]
89+
(data: Seq[T])
90+
(f: String => Unit): Unit = {
91+
withTempPath { file =>
92+
sparkContext.parallelize(data).saveAsParquetFile(file.getCanonicalPath)
93+
f(file.getCanonicalPath)
94+
}
95+
}
96+
97+
/**
98+
* Writes `data` to a Parquet file and reads it back as a SchemaRDD, which is then passed to `f`.
99+
* The Parquet file will be deleted after `f` returns.
100+
*/
101+
protected def withParquetRDD[T <: Product: ClassTag: TypeTag]
102+
(data: Seq[T])
103+
(f: SchemaRDD => Unit): Unit = {
104+
withParquetFile(data)(path => f(parquetFile(path)))
105+
}
106+
107+
/**
108+
* Drops temporary table `tableName` after calling `f`.
109+
*/
110+
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
111+
try f finally dropTempTable(tableName)
112+
}
113+
114+
/**
115+
* Writes `data` to a Parquet file, reads it back as a SchemaRDD and registers it as a temporary
116+
* table named `tableName`, then call `f`. The temporary table together with the Parquet file will
117+
* be dropped/deleted after `f` returns.
118+
*/
119+
protected def withParquetTable[T <: Product: ClassTag: TypeTag]
120+
(data: Seq[T], tableName: String)
121+
(f: => Unit): Unit = {
122+
withParquetRDD(data) { rdd =>
123+
rdd.registerTempTable(tableName)
124+
withTempTable(tableName)(f)
125+
}
126+
}
127+
}
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.parquet
19+
20+
import parquet.filter2.predicate.Operators._
21+
import parquet.filter2.predicate.{FilterPredicate, Operators}
22+
23+
import org.apache.spark.sql.catalyst.dsl.expressions._
24+
import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate, Row}
25+
import org.apache.spark.sql.test.TestSQLContext
26+
import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD}
27+
28+
/**
29+
* A test suite that tests Parquet filter2 API based filter pushdown optimization.
30+
*
31+
* Notice that `!(a cmp b)` are always transformed to its negated form `a cmp' b` by the
32+
* `BooleanSimplification` optimization rule whenever possible. As a result, predicate `!(a < 1)`
33+
* results a `GtEq` filter predicate rather than a `Not`.
34+
*
35+
* @todo Add test cases for `IsNull` and `IsNotNull` after merging PR #3367
36+
*/
37+
class ParquetFilterSuite extends QueryTest with ParquetTest {
38+
val sqlContext = TestSQLContext
39+
40+
private def checkFilterPushdown(
41+
rdd: SchemaRDD,
42+
output: Seq[Symbol],
43+
predicate: Predicate,
44+
filterClass: Class[_ <: FilterPredicate],
45+
checker: (SchemaRDD, Any) => Unit,
46+
expectedResult: => Any): Unit = {
47+
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
48+
val query = rdd.select(output.map(_.attr): _*).where(predicate)
49+
50+
val maybeAnalyzedPredicate = query.queryExecution.executedPlan.collect {
51+
case plan: ParquetTableScan => plan.columnPruningPred
52+
}.flatten.reduceOption(_ && _)
53+
54+
assert(maybeAnalyzedPredicate.isDefined)
55+
maybeAnalyzedPredicate.foreach { pred =>
56+
val maybeFilter = ParquetFilters.createFilter(pred)
57+
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
58+
maybeFilter.foreach(f => assert(f.getClass === filterClass))
59+
}
60+
61+
checker(query, expectedResult)
62+
}
63+
}
64+
65+
private def checkFilterPushdown
66+
(rdd: SchemaRDD, output: Symbol*)
67+
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
68+
(expectedResult: => Any): Unit = {
69+
checkFilterPushdown(rdd, output, predicate, filterClass, checkAnswer _, expectedResult)
70+
}
71+
72+
def checkBinaryFilterPushdown
73+
(rdd: SchemaRDD, output: Symbol*)
74+
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
75+
(expectedResult: => Any): Unit = {
76+
def checkBinaryAnswer(rdd: SchemaRDD, result: Any): Unit = {
77+
val actual = rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq
78+
val expected = result match {
79+
case s: Seq[_] => s.map(_.asInstanceOf[Row].getAs[Array[Byte]](0).mkString(","))
80+
case s => Seq(s.asInstanceOf[Array[Byte]].mkString(","))
81+
}
82+
assert(actual.sorted === expected.sorted)
83+
}
84+
checkFilterPushdown(rdd, output, predicate, filterClass, checkBinaryAnswer _, expectedResult)
85+
}
86+
87+
test("filter pushdown - boolean") {
88+
withParquetRDD((true :: false :: Nil).map(Tuple1.apply)) { rdd =>
89+
checkFilterPushdown(rdd, '_1)('_1 === true, classOf[Eq[java.lang.Boolean]])(true)
90+
checkFilterPushdown(rdd, '_1)('_1 !== true, classOf[Operators.Not])(false)
91+
}
92+
}
93+
94+
test("filter pushdown - integer") {
95+
withParquetRDD((1 to 4).map(Tuple1.apply)) { rdd =>
96+
checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[Integer]])(1)
97+
checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
98+
(2 to 4).map(Row.apply(_))
99+
}
100+
101+
checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [Integer]])(1)
102+
checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [Integer]])(4)
103+
checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[Integer]])(1)
104+
checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[Integer]])(4)
105+
106+
checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1)
107+
checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [Integer]])(1)
108+
checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [Integer]])(4)
109+
checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[Integer]])(1)
110+
checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[Integer]])(4)
111+
112+
checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[Integer]])(4)
113+
checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
114+
checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
115+
Seq(Row(1), Row(4))
116+
}
117+
}
118+
}
119+
120+
test("filter pushdown - long") {
121+
withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toLong))) { rdd =>
122+
checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Long]])(1)
123+
checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
124+
(2 to 4).map(Row.apply(_))
125+
}
126+
127+
checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Long]])(1)
128+
checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Long]])(4)
129+
checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Long]])(1)
130+
checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Long]])(4)
131+
132+
checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1)
133+
checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Long]])(1)
134+
checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Long]])(4)
135+
checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Long]])(1)
136+
checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Long]])(4)
137+
138+
checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Long]])(4)
139+
checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
140+
checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
141+
Seq(Row(1), Row(4))
142+
}
143+
}
144+
}
145+
146+
test("filter pushdown - float") {
147+
withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toFloat))) { rdd =>
148+
checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Float]])(1)
149+
checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
150+
(2 to 4).map(Row.apply(_))
151+
}
152+
153+
checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Float]])(1)
154+
checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Float]])(4)
155+
checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Float]])(1)
156+
checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Float]])(4)
157+
158+
checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1)
159+
checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Float]])(1)
160+
checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Float]])(4)
161+
checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Float]])(1)
162+
checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Float]])(4)
163+
164+
checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Float]])(4)
165+
checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
166+
checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
167+
Seq(Row(1), Row(4))
168+
}
169+
}
170+
}
171+
172+
test("filter pushdown - double") {
173+
withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toDouble))) { rdd =>
174+
checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Double]])(1)
175+
checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
176+
(2 to 4).map(Row.apply(_))
177+
}
178+
179+
checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Double]])(1)
180+
checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Double]])(4)
181+
checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Double]])(1)
182+
checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Double]])(4)
183+
184+
checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq[Integer]])(1)
185+
checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Double]])(1)
186+
checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Double]])(4)
187+
checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Double]])(1)
188+
checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Double]])(4)
189+
190+
checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Double]])(4)
191+
checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
192+
checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
193+
Seq(Row(1), Row(4))
194+
}
195+
}
196+
}
197+
198+
test("filter pushdown - string") {
199+
withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toString))) { rdd =>
200+
checkFilterPushdown(rdd, '_1)('_1 === "1", classOf[Eq[String]])("1")
201+
checkFilterPushdown(rdd, '_1)('_1 !== "1", classOf[Operators.Not]) {
202+
(2 to 4).map(i => Row.apply(i.toString))
203+
}
204+
205+
checkFilterPushdown(rdd, '_1)('_1 < "2", classOf[Lt [java.lang.String]])("1")
206+
checkFilterPushdown(rdd, '_1)('_1 > "3", classOf[Gt [java.lang.String]])("4")
207+
checkFilterPushdown(rdd, '_1)('_1 <= "1", classOf[LtEq[java.lang.String]])("1")
208+
checkFilterPushdown(rdd, '_1)('_1 >= "4", classOf[GtEq[java.lang.String]])("4")
209+
210+
checkFilterPushdown(rdd, '_1)(Literal("1") === '_1, classOf[Eq [java.lang.String]])("1")
211+
checkFilterPushdown(rdd, '_1)(Literal("2") > '_1, classOf[Lt [java.lang.String]])("1")
212+
checkFilterPushdown(rdd, '_1)(Literal("3") < '_1, classOf[Gt [java.lang.String]])("4")
213+
checkFilterPushdown(rdd, '_1)(Literal("1") >= '_1, classOf[LtEq[java.lang.String]])("1")
214+
checkFilterPushdown(rdd, '_1)(Literal("4") <= '_1, classOf[GtEq[java.lang.String]])("4")
215+
216+
checkFilterPushdown(rdd, '_1)(!('_1 < "4"), classOf[Operators.GtEq[java.lang.String]])("4")
217+
checkFilterPushdown(rdd, '_1)('_1 > "2" && '_1 < "4", classOf[Operators.And])("3")
218+
checkFilterPushdown(rdd, '_1)('_1 < "2" || '_1 > "3", classOf[Operators.Or]) {
219+
Seq(Row("1"), Row("4"))
220+
}
221+
}
222+
}
223+
224+
test("filter pushdown - binary") {
225+
implicit class IntToBinary(int: Int) {
226+
def b: Array[Byte] = int.toString.getBytes("UTF-8")
227+
}
228+
229+
withParquetRDD((1 to 4).map(i => Tuple1.apply(i.b))) { rdd =>
230+
checkBinaryFilterPushdown(rdd, '_1)('_1 === 1.b, classOf[Eq[Array[Byte]]])(1.b)
231+
checkBinaryFilterPushdown(rdd, '_1)('_1 !== 1.b, classOf[Operators.Not]) {
232+
(2 to 4).map(i => Row.apply(i.b)).toSeq
233+
}
234+
235+
checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b, classOf[Lt [Array[Byte]]])(1.b)
236+
checkBinaryFilterPushdown(rdd, '_1)('_1 > 3.b, classOf[Gt [Array[Byte]]])(4.b)
237+
checkBinaryFilterPushdown(rdd, '_1)('_1 <= 1.b, classOf[LtEq[Array[Byte]]])(1.b)
238+
checkBinaryFilterPushdown(rdd, '_1)('_1 >= 4.b, classOf[GtEq[Array[Byte]]])(4.b)
239+
240+
checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) === '_1, classOf[Eq [Array[Byte]]])(1.b)
241+
checkBinaryFilterPushdown(rdd, '_1)(Literal(2.b) > '_1, classOf[Lt [Array[Byte]]])(1.b)
242+
checkBinaryFilterPushdown(rdd, '_1)(Literal(3.b) < '_1, classOf[Gt [Array[Byte]]])(4.b)
243+
checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) >= '_1, classOf[LtEq[Array[Byte]]])(1.b)
244+
checkBinaryFilterPushdown(rdd, '_1)(Literal(4.b) <= '_1, classOf[GtEq[Array[Byte]]])(4.b)
245+
246+
checkBinaryFilterPushdown(rdd, '_1)(!('_1 < 4.b), classOf[Operators.GtEq[Array[Byte]]])(4.b)
247+
checkBinaryFilterPushdown(rdd, '_1)('_1 > 2.b && '_1 < 4.b, classOf[Operators.And])(3.b)
248+
checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b || '_1 > 3.b, classOf[Operators.Or]) {
249+
Seq(Row(1.b), Row(4.b))
250+
}
251+
}
252+
}
253+
}

0 commit comments

Comments
 (0)