Skip to content

Commit e080cc3

Browse files
Anselme Vignonmarmbrus
authored andcommitted
[SPARK-5775] BugFix: GenericRow cannot be cast to SpecificMutableRow when nested data and partitioned table
The Bug solved here was due to a change in PartitionTableScan, when reading a partitioned table. - When the Partititon column is requested out of a parquet table, the Table Scan needs to add the column back to the output Rows. - To update the Row object created by PartitionTableScan, the Row was first casted in SpecificMutableRow, before being updated. - This casting was unsafe, since there are no guarantee that the newHadoopRDD used internally will instanciate the output Rows as MutableRow. Particularly, when reading a Table with complex (e.g. struct or Array) types, the newHadoopRDD uses a parquet.io.api.RecordMateralizer, that is produced by the org.apache.spark.sql.parquet.RowReadSupport . This consumer will be created as a org.apache.spark.sql.parquet.CatalystGroupConverter (a) and not a org.apache.spark.sql.parquet.CatalystPrimitiveRowConverter (b), when there are complex types involved (in the org.apache.spark.sql.parquet.CatalystConverter.createRootConverter factory ) The consumer (a) will output GenericRow, while the consumer (b) produces SpecificMutableRow. Therefore any request selecting a partition columns, plus a complex type column, are returned as GenericRows, and fails into an unsafe casting pit (see https://issues.apache.org/jira/browse/SPARK-5775 for an example. ) The fix proposed here originally replaced the unsafe class casting by a case matching on the Row type, updating the Row if it is of a mutable type, and recreating a Row otherwise. This PR now implements the solution updated by liancheng on aa39460 : The fix checks if every requested requested columns are primitiveType, in a manner symmetrical to the check in org.apache.spark.sql.parquet.CatalystConverter.createRootConverter. - If all columns are primitive type, the Row can safely be casted to a MutableRow. - Otherwise a new GenericRow is created, and the partition column is written this new row structure This fix is unit-tested in sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala Author: Anselme Vignon <[email protected]> Author: Cheng Lian <[email protected]> Closes #4697 from anselmevignon/local_dev and squashes the following commits: 6a4c53d [Anselme Vignon] style corrections 52f73fc [Cheng Lian] cherry-pick & merge from aa39460 8fc6a8c [Anselme Vignon] correcting tests on temporary tables 24928ea [Anselme Vignon] corrected mirror bug (see SPARK-5775) for newParquet 7c829cb [Anselme Vignon] bugfix, hopefully correct this time 005a7f8 [Anselme Vignon] added test cleanup 22cec52 [Anselme Vignon] lint compatible changes ae48f7c [Anselme Vignon] unittesting SPARK-5775 f876dea [Anselme Vignon] starting to write tests dbceaa3 [Anselme Vignon] cutting lines 4eb04e9 [Anselme Vignon] bugfix SPARK-5775
1 parent abdcec6 commit e080cc3

File tree

3 files changed

+230
-26
lines changed

3 files changed

+230
-26
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,13 @@ case class ParquetTableScan(
124124
conf)
125125

126126
if (requestedPartitionOrdinals.nonEmpty) {
127+
// This check is based on CatalystConverter.createRootConverter.
128+
val primitiveRow = output.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))
129+
130+
// Uses temporary variable to avoid the whole `ParquetTableScan` object being captured into
131+
// the `mapPartitionsWithInputSplit` closure below.
132+
val outputSize = output.size
133+
127134
baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
128135
val partValue = "([^=]+)=([^=]+)".r
129136
val partValues =
@@ -141,19 +148,46 @@ case class ParquetTableScan(
141148
relation.partitioningAttributes
142149
.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
143150

144-
new Iterator[Row] {
145-
def hasNext = iter.hasNext
146-
def next() = {
147-
val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
148-
149-
// Parquet will leave partitioning columns empty, so we fill them in here.
150-
var i = 0
151-
while (i < requestedPartitionOrdinals.size) {
152-
row(requestedPartitionOrdinals(i)._2) =
153-
partitionRowValues(requestedPartitionOrdinals(i)._1)
154-
i += 1
151+
// Parquet will leave partitioning columns empty, so we fill them in here.
152+
if (primitiveRow) {
153+
new Iterator[Row] {
154+
def hasNext = iter.hasNext
155+
def next() = {
156+
// We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
157+
val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
158+
159+
var i = 0
160+
while (i < requestedPartitionOrdinals.size) {
161+
row(requestedPartitionOrdinals(i)._2) =
162+
partitionRowValues(requestedPartitionOrdinals(i)._1)
163+
i += 1
164+
}
165+
row
166+
}
167+
}
168+
} else {
169+
// Create a mutable row since we need to fill in values from partition columns.
170+
val mutableRow = new GenericMutableRow(outputSize)
171+
new Iterator[Row] {
172+
def hasNext = iter.hasNext
173+
def next() = {
174+
// We are using CatalystGroupConverter and it returns a GenericRow.
175+
// Since GenericRow is not mutable, we just cast it to a Row.
176+
val row = iter.next()._2.asInstanceOf[Row]
177+
178+
var i = 0
179+
while (i < row.size) {
180+
mutableRow(i) = row(i)
181+
i += 1
182+
}
183+
i = 0
184+
while (i < requestedPartitionOrdinals.size) {
185+
mutableRow(requestedPartitionOrdinals(i)._2) =
186+
partitionRowValues(requestedPartitionOrdinals(i)._1)
187+
i += 1
188+
}
189+
mutableRow
155190
}
156-
row
157191
}
158192
}
159193
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2222
import org.apache.hadoop.conf.{Configurable, Configuration}
2323
import org.apache.hadoop.io.Writable
2424
import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job}
25-
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
2625

27-
import parquet.hadoop.ParquetInputFormat
26+
import parquet.hadoop.{ParquetInputSplit, ParquetInputFormat}
2827
import parquet.hadoop.util.ContextUtil
2928

3029
import org.apache.spark.annotation.DeveloperApi
@@ -40,7 +39,7 @@ import scala.collection.JavaConversions._
4039

4140
/**
4241
* Allows creation of parquet based tables using the syntax
43-
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
42+
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
4443
* required is `path`, which should be the location of a collection of, optionally partitioned,
4544
* parquet files.
4645
*/
@@ -265,23 +264,45 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
265264
// When the data does not include the key and the key is requested then we must fill it in
266265
// based on information from the input split.
267266
if (!dataIncludesKey && partitionKeyLocation != -1) {
268-
baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
267+
val primitiveRow =
268+
requestedSchema.toAttributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))
269+
270+
baseRDD.mapPartitionsWithInputSplit { case (split, iterator) =>
269271
val partValue = "([^=]+)=([^=]+)".r
270272
val partValues =
271273
split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
272274
.getPath
273275
.toString
274276
.split("/")
275277
.flatMap {
276-
case partValue(key, value) => Some(key -> value)
277-
case _ => None
278-
}.toMap
279-
280-
val currentValue = partValues.values.head.toInt
281-
iter.map { pair =>
282-
val res = pair._2.asInstanceOf[SpecificMutableRow]
283-
res.setInt(partitionKeyLocation, currentValue)
284-
res
278+
case partValue(key, value) => Some(key -> value)
279+
case _ => None }
280+
.toMap
281+
282+
if (primitiveRow) {
283+
iterator.map { pair =>
284+
// We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
285+
val mutableRow = pair._2.asInstanceOf[SpecificMutableRow]
286+
var i = 0
287+
mutableRow.update(partitionKeyLocation, partValues.values.head.toInt)
288+
mutableRow
289+
}
290+
} else {
291+
// Create a mutable row since we need to fill in values from partition columns.
292+
val mutableRow = new GenericMutableRow(requestedSchema.toAttributes.size)
293+
294+
iterator.map { pair =>
295+
// We are using CatalystGroupConverter and it returns a GenericRow.
296+
// Since GenericRow is not mutable, we just cast it to a Row.
297+
val row = pair._2.asInstanceOf[Row]
298+
var i = 0
299+
while (i < row.size) {
300+
mutableRow(i) = row(i)
301+
i += 1
302+
}
303+
mutableRow.update(partitionKeyLocation, partValues.values.head.toInt)
304+
mutableRow
305+
}
285306
}
286307
}
287308
} else {

sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala

Lines changed: 150 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,20 @@ import org.apache.spark.sql.hive.test.TestHive._
3131
case class ParquetData(intField: Int, stringField: String)
3232
// The data that also includes the partitioning key
3333
case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
34+
case class StructContainer(intStructField: Int, stringStructField: String)
3435

36+
case class ParquetDataWithComplexTypes(
37+
intField: Int,
38+
stringField: String,
39+
structField: StructContainer,
40+
arrayField: Seq[Int])
41+
42+
case class ParquetDataWithKeyAndComplexTypes(
43+
p: Int,
44+
intField: Int,
45+
stringField: String,
46+
structField: StructContainer,
47+
arrayField: Seq[Int])
3548

3649
/**
3750
* A suite to test the automatic conversion of metastore tables with parquet data to use the
@@ -69,6 +82,38 @@ class ParquetMetastoreSuite extends ParquetTest {
6982
location '${partitionedTableDirWithKey.getCanonicalPath}'
7083
""")
7184

85+
sql(s"""
86+
create external table partitioned_parquet_with_complextypes
87+
(
88+
intField INT,
89+
stringField STRING,
90+
structField STRUCT<intStructField: INT, stringStructField: STRING>,
91+
arrayField ARRAY<INT>
92+
)
93+
PARTITIONED BY (p int)
94+
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
95+
STORED AS
96+
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
97+
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
98+
location '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
99+
""")
100+
101+
sql(s"""
102+
create external table partitioned_parquet_with_key_and_complextypes
103+
(
104+
intField INT,
105+
stringField STRING,
106+
structField STRUCT<intStructField: INT, stringStructField: STRING>,
107+
arrayField ARRAY<INT>
108+
)
109+
PARTITIONED BY (p int)
110+
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
111+
STORED AS
112+
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
113+
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
114+
location '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
115+
""")
116+
72117
sql(s"""
73118
create external table normal_parquet
74119
(
@@ -90,10 +135,24 @@ class ParquetMetastoreSuite extends ParquetTest {
90135
sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
91136
}
92137

138+
(1 to 10).foreach { p =>
139+
sql(s"ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD PARTITION (p=$p)")
140+
}
141+
142+
(1 to 10).foreach { p =>
143+
sql(s"ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION (p=$p)")
144+
}
145+
93146
setConf("spark.sql.hive.convertMetastoreParquet", "true")
94147
}
95148

96149
override def afterAll(): Unit = {
150+
sql("DROP TABLE IF EXISTS partitioned_parquet")
151+
sql("DROP TABLE IF EXISTS partitioned_parquet_with_key")
152+
sql("DROP TABLE IF EXISTS partitioned_parquet_with_complextypes")
153+
sql("DROP TABLE IF EXISTS partitioned_parquet_with_key_and_complextypes")
154+
sql("DROP TABLE IF EXISTS normal_parquet")
155+
97156
setConf("spark.sql.hive.convertMetastoreParquet", "false")
98157
}
99158

@@ -139,6 +198,22 @@ class ParquetSourceSuite extends ParquetTest {
139198
path '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
140199
)
141200
""")
201+
202+
sql( s"""
203+
CREATE TEMPORARY TABLE partitioned_parquet_with_key_and_complextypes
204+
USING org.apache.spark.sql.parquet
205+
OPTIONS (
206+
path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
207+
)
208+
""")
209+
210+
sql( s"""
211+
CREATE TEMPORARY TABLE partitioned_parquet_with_complextypes
212+
USING org.apache.spark.sql.parquet
213+
OPTIONS (
214+
path '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
215+
)
216+
""")
142217
}
143218
}
144219

@@ -147,7 +222,10 @@ class ParquetSourceSuite extends ParquetTest {
147222
*/
148223
abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
149224
var partitionedTableDir: File = null
225+
var normalTableDir: File = null
150226
var partitionedTableDirWithKey: File = null
227+
var partitionedTableDirWithKeyAndComplexTypes: File = null
228+
var partitionedTableDirWithComplexTypes: File = null
151229

152230
override def beforeAll(): Unit = {
153231
partitionedTableDir = File.createTempFile("parquettests", "sparksql")
@@ -161,6 +239,15 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
161239
.saveAsParquetFile(partDir.getCanonicalPath)
162240
}
163241

242+
normalTableDir = File.createTempFile("parquettests", "sparksql")
243+
normalTableDir.delete()
244+
normalTableDir.mkdir()
245+
246+
sparkContext
247+
.makeRDD(1 to 10)
248+
.map(i => ParquetData(i, s"part-1"))
249+
.saveAsParquetFile(new File(normalTableDir, "normal").getCanonicalPath)
250+
164251
partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql")
165252
partitionedTableDirWithKey.delete()
166253
partitionedTableDirWithKey.mkdir()
@@ -171,9 +258,46 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
171258
.map(i => ParquetDataWithKey(p, i, s"part-$p"))
172259
.saveAsParquetFile(partDir.getCanonicalPath)
173260
}
261+
262+
partitionedTableDirWithKeyAndComplexTypes = File.createTempFile("parquettests", "sparksql")
263+
partitionedTableDirWithKeyAndComplexTypes.delete()
264+
partitionedTableDirWithKeyAndComplexTypes.mkdir()
265+
266+
(1 to 10).foreach { p =>
267+
val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p")
268+
sparkContext.makeRDD(1 to 10)
269+
.map(i => ParquetDataWithKeyAndComplexTypes(
270+
p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i))
271+
.saveAsParquetFile(partDir.getCanonicalPath)
272+
}
273+
274+
partitionedTableDirWithComplexTypes = File.createTempFile("parquettests", "sparksql")
275+
partitionedTableDirWithComplexTypes.delete()
276+
partitionedTableDirWithComplexTypes.mkdir()
277+
278+
(1 to 10).foreach { p =>
279+
val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
280+
sparkContext.makeRDD(1 to 10)
281+
.map(i => ParquetDataWithComplexTypes(
282+
i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i))
283+
.saveAsParquetFile(partDir.getCanonicalPath)
284+
}
174285
}
175286

176-
Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
287+
override protected def afterAll(): Unit = {
288+
//delete temporary files
289+
partitionedTableDir.delete()
290+
normalTableDir.delete()
291+
partitionedTableDirWithKey.delete()
292+
partitionedTableDirWithKeyAndComplexTypes.delete()
293+
partitionedTableDirWithComplexTypes.delete()
294+
}
295+
296+
Seq(
297+
"partitioned_parquet",
298+
"partitioned_parquet_with_key",
299+
"partitioned_parquet_with_complextypes",
300+
"partitioned_parquet_with_key_and_complextypes").foreach { table =>
177301
test(s"ordering of the partitioning columns $table") {
178302
checkAnswer(
179303
sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
@@ -186,6 +310,8 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
186310
)
187311
}
188312

313+
314+
189315
test(s"project the partitioning column $table") {
190316
checkAnswer(
191317
sql(s"SELECT p, count(*) FROM $table group by p"),
@@ -263,6 +389,29 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
263389
}
264390
}
265391

392+
Seq(
393+
"partitioned_parquet_with_key_and_complextypes",
394+
"partitioned_parquet_with_complextypes").foreach { table =>
395+
test(s"SPARK-5775 read structure from $table") {
396+
checkAnswer(
397+
sql(s"""
398+
SELECT
399+
p,
400+
structField.intStructField,
401+
structField.stringStructField
402+
FROM $table
403+
WHERE p = 1"""),
404+
(1 to 10).map(i => Row(1, i, f"${i}_string")))
405+
}
406+
407+
// Re-enable this after SPARK-5508 is fixed
408+
ignore(s"SPARK-5775 read array from $table") {
409+
checkAnswer(
410+
sql(s"SELECT arrayField, p FROM $table WHERE p = 1"),
411+
(1 to 10).map(i => Row(1 to i, 1)))
412+
}
413+
}
414+
266415
test("non-part select(*)") {
267416
checkAnswer(
268417
sql("SELECT COUNT(*) FROM normal_parquet"),

0 commit comments

Comments
 (0)