Skip to content

Commit ca6e038

Browse files
committed
Fixes SPARK-5775
1 parent f02394d commit ca6e038

File tree

3 files changed

+187
-22
lines changed

3 files changed

+187
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -143,19 +143,38 @@ private[sql] case class ParquetTableScan(
143143
relation.partitioningAttributes
144144
.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
145145

146+
val mutableRow = new GenericMutableRow(output.size)
147+
146148
new Iterator[Row] {
147149
def hasNext = iter.hasNext
148150
def next() = {
149-
val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
150-
151-
// Parquet will leave partitioning columns empty, so we fill them in here.
152-
var i = 0
153-
while (i < requestedPartitionOrdinals.size) {
154-
row(requestedPartitionOrdinals(i)._2) =
155-
partitionRowValues(requestedPartitionOrdinals(i)._1)
156-
i += 1
151+
iter.next() match {
152+
case (_, row: SpecificMutableRow) =>
153+
// Parquet will leave partitioning columns empty, so we fill them in here.
154+
var i = 0
155+
while (i < requestedPartitionOrdinals.size) {
156+
row(requestedPartitionOrdinals(i)._2) =
157+
partitionRowValues(requestedPartitionOrdinals(i)._1)
158+
i += 1
159+
}
160+
row
161+
162+
case (_, row: Row) =>
163+
var i = 0
164+
while (i < row.size) {
165+
mutableRow(i) = row(i)
166+
i += 1
167+
}
168+
169+
i = 0
170+
while (i < requestedPartitionOrdinals.size) {
171+
mutableRow(requestedPartitionOrdinals(i)._2) =
172+
partitionRowValues(requestedPartitionOrdinals(i)._1)
173+
i += 1
174+
}
175+
176+
mutableRow
157177
}
158-
row
159178
}
160179
}
161180
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -482,17 +482,35 @@ private[sql] case class ParquetRelation2(
482482
}.get
483483

484484
val requiredPartOrdinal = partitionKeyLocations.keys.toSeq
485+
val mutableRow = new GenericMutableRow(requestedSchema.size)
486+
487+
iterator.map {
488+
case (_, row: SpecificMutableRow) =>
489+
var i = 0
490+
while (i < requiredPartOrdinal.size) {
491+
// TODO Avoids boxing cost here!
492+
val partOrdinal = requiredPartOrdinal(i)
493+
row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
494+
i += 1
495+
}
496+
row
497+
498+
case (_, row: Row) =>
499+
var i = 0
500+
while (i < row.size) {
501+
// TODO Avoids boxing cost here!
502+
mutableRow(i) = row(i)
503+
i += 1
504+
}
485505

486-
iterator.map { pair =>
487-
val row = pair._2.asInstanceOf[SpecificMutableRow]
488-
var i = 0
489-
while (i < requiredPartOrdinal.size) {
490-
// TODO Avoids boxing cost here!
491-
val partOrdinal = requiredPartOrdinal(i)
492-
row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
493-
i += 1
494-
}
495-
row
506+
i = 0
507+
while (i < requiredPartOrdinal.size) {
508+
// TODO Avoids boxing cost here!
509+
val partOrdinal = requiredPartOrdinal(i)
510+
mutableRow.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
511+
i += 1
512+
}
513+
mutableRow
496514
}
497515
}
498516
} else {

sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala

Lines changed: 131 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,20 @@ case class ParquetData(intField: Int, stringField: String)
3535
// The data that also includes the partitioning key
3636
case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
3737

38+
case class StructContainer(intStructField :Int, stringStructField: String)
39+
40+
case class ParquetDataWithComplexTypes(
41+
intField: Int,
42+
stringField: String,
43+
structField: StructContainer,
44+
arrayField: Seq[Int])
45+
46+
case class ParquetDataWithKeyAndComplexTypes(
47+
p: Int,
48+
intField: Int,
49+
stringField: String,
50+
structField: StructContainer,
51+
arrayField: Seq[Int])
3852

3953
/**
4054
* A suite to test the automatic conversion of metastore tables with parquet data to use the
@@ -85,6 +99,38 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
8599
location '${new File(normalTableDir, "normal").getCanonicalPath}'
86100
""")
87101

102+
sql(s"""
103+
CREATE EXTERNAL TABLE partitioned_parquet_with_complextypes
104+
(
105+
intField INT,
106+
stringField STRING,
107+
structField STRUCT<intStructField: INT, stringStructField: STRING>,
108+
arrayField ARRAY<INT>
109+
)
110+
PARTITIONED BY (p int)
111+
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
112+
STORED AS
113+
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
114+
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
115+
LOCATION '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
116+
""")
117+
118+
sql(s"""
119+
CREATE EXTERNAL TABLE partitioned_parquet_with_key_and_complextypes
120+
(
121+
intField INT,
122+
stringField STRING,
123+
structField STRUCT<intStructField: INT, stringStructField: STRING>,
124+
arrayField ARRAY<INT>
125+
)
126+
PARTITIONED BY (p int)
127+
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
128+
STORED AS
129+
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
130+
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
131+
LOCATION '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
132+
""")
133+
88134
(1 to 10).foreach { p =>
89135
sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
90136
}
@@ -93,7 +139,15 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
93139
sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
94140
}
95141

96-
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
142+
(1 to 10).foreach { p =>
143+
sql(s"ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD PARTITION (p=$p)")
144+
}
145+
146+
(1 to 10).foreach { p =>
147+
sql(s"ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION (p=$p)")
148+
}
149+
150+
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
97151
jsonRDD(rdd1).registerTempTable("jt")
98152
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
99153
jsonRDD(rdd2).registerTempTable("jt_array")
@@ -104,6 +158,8 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
104158
override def afterAll(): Unit = {
105159
sql("DROP TABLE partitioned_parquet")
106160
sql("DROP TABLE partitioned_parquet_with_key")
161+
sql("DROP TABLE partitioned_parquet_with_complextypes")
162+
sql("DROP TABLE partitioned_parquet_with_key_and_complextypes")
107163
sql("DROP TABLE normal_parquet")
108164
sql("DROP TABLE IF EXISTS jt")
109165
sql("DROP TABLE IF EXISTS jt_array")
@@ -408,6 +464,22 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
408464
path '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
409465
)
410466
""")
467+
468+
sql( s"""
469+
CREATE TEMPORARY TABLE partitioned_parquet_with_key_and_complextypes
470+
USING org.apache.spark.sql.parquet
471+
OPTIONS (
472+
path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
473+
)
474+
""")
475+
476+
sql( s"""
477+
CREATE TEMPORARY TABLE partitioned_parquet_with_complextypes
478+
USING org.apache.spark.sql.parquet
479+
OPTIONS (
480+
path '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
481+
)
482+
""")
411483
}
412484
}
413485

@@ -446,7 +518,8 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
446518
var partitionedTableDir: File = null
447519
var normalTableDir: File = null
448520
var partitionedTableDirWithKey: File = null
449-
521+
var partitionedTableDirWithComplexTypes: File = null
522+
var partitionedTableDirWithKeyAndComplexTypes: File = null
450523

451524
override def beforeAll(): Unit = {
452525
partitionedTableDir = File.createTempFile("parquettests", "sparksql")
@@ -482,9 +555,45 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
482555
.toDF()
483556
.saveAsParquetFile(partDir.getCanonicalPath)
484557
}
558+
559+
partitionedTableDirWithKeyAndComplexTypes = File.createTempFile("parquettests", "sparksql")
560+
partitionedTableDirWithKeyAndComplexTypes.delete()
561+
partitionedTableDirWithKeyAndComplexTypes.mkdir()
562+
563+
(1 to 10).foreach { p =>
564+
val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p")
565+
sparkContext.makeRDD(1 to 10).map { i =>
566+
ParquetDataWithKeyAndComplexTypes(
567+
p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
568+
}.toDF().saveAsParquetFile(partDir.getCanonicalPath)
569+
}
570+
571+
partitionedTableDirWithComplexTypes = File.createTempFile("parquettests", "sparksql")
572+
partitionedTableDirWithComplexTypes.delete()
573+
partitionedTableDirWithComplexTypes.mkdir()
574+
575+
(1 to 10).foreach { p =>
576+
val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
577+
sparkContext.makeRDD(1 to 10).map { i =>
578+
ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
579+
}.toDF().saveAsParquetFile(partDir.getCanonicalPath)
580+
}
581+
}
582+
583+
override protected def afterAll(): Unit = {
584+
partitionedTableDir.delete()
585+
normalTableDir.delete()
586+
partitionedTableDirWithKey.delete()
587+
partitionedTableDirWithComplexTypes.delete()
588+
partitionedTableDirWithKeyAndComplexTypes.delete()
485589
}
486590

487-
Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
591+
Seq(
592+
"partitioned_parquet",
593+
"partitioned_parquet_with_key",
594+
"partitioned_parquet_with_complextypes",
595+
"partitioned_parquet_with_key_and_complextypes").foreach { table =>
596+
488597
test(s"ordering of the partitioning columns $table") {
489598
checkAnswer(
490599
sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
@@ -574,6 +683,25 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
574683
}
575684
}
576685

686+
Seq(
687+
"partitioned_parquet_with_key_and_complextypes",
688+
"partitioned_parquet_with_complextypes").foreach { table =>
689+
690+
test(s"SPARK-5775 read struct from $table") {
691+
checkAnswer(
692+
sql(s"SELECT p, structField.intStructField, structField.stringStructField FROM $table WHERE p = 1"),
693+
(1 to 10).map(i => Row(1, i, f"${i}_string")))
694+
}
695+
696+
// Re-enable this after SPARK-5508 is fixed
697+
ignore(s"SPARK-5775 read array from $table") {
698+
checkAnswer(
699+
sql(s"SELECT arrayField, p FROM $table WHERE p = 1"),
700+
(1 to 10).map(i => Row(1 to i, 1)))
701+
}
702+
}
703+
704+
577705
test("non-part select(*)") {
578706
checkAnswer(
579707
sql("SELECT COUNT(*) FROM normal_parquet"),

0 commit comments

Comments
 (0)