Skip to content

Commit a04930b

Browse files
fix bug of scan an empty parquet based table
1 parent 442ffe0 commit a04930b

File tree

3 files changed

+53
-11
lines changed

3 files changed

+53
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,11 @@ case class ParquetRelation2(
287287
}
288288
}
289289

290-
parquetSchema = maybeSchema.getOrElse(readSchema())
290+
try {
291+
parquetSchema = readSchema().getOrElse(maybeSchema.get)
292+
} catch {
293+
case e => throw new SparkException(s"Failed to find schema for ${paths.mkString(",")}", e)
294+
}
291295

292296
partitionKeysIncludedInParquetSchema =
293297
isPartitioned &&
@@ -308,7 +312,7 @@ case class ParquetRelation2(
308312
}
309313
}
310314

311-
private def readSchema(): StructType = {
315+
private def readSchema(): Option[StructType] = {
312316
// Sees which file(s) we need to touch in order to figure out the schema.
313317
val filesToTouch =
314318
// Always tries the summary files first if users don't require a merged schema. In this case,
@@ -611,8 +615,9 @@ object ParquetRelation2 {
611615
// internally.
612616
private[sql] val METASTORE_SCHEMA = "metastoreSchema"
613617

614-
private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = {
615-
footers.map { footer =>
618+
private[parquet] def readSchema(
619+
footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
620+
Option(footers.map { footer =>
616621
val metadata = footer.getParquetMetadata.getFileMetaData
617622
val parquetSchema = metadata.getSchema
618623
val maybeSparkSchema = metadata
@@ -630,11 +635,12 @@ object ParquetRelation2 {
630635
sqlContext.conf.isParquetBinaryAsString,
631636
sqlContext.conf.isParquetINT96AsTimestamp))
632637
}
633-
}.reduce { (left, right) =>
634-
try left.merge(right) catch { case e: Throwable =>
635-
throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
636-
}
637-
}
638+
}.foldLeft[StructType](null) {
639+
case (null, right) => right
640+
case (left, right) => try left.merge(right) catch { case e: Throwable =>
641+
throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
642+
}
643+
})
638644
}
639645

640646
/**

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
212212
Some(partitionSpec))(hive))
213213
} else {
214214
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
215-
LogicalRelation(
216-
ParquetRelation2(
215+
LogicalRelation(ParquetRelation2(
217216
paths,
218217
Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json),
219218
Some(metastoreSchema))(hive))

sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,50 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
121121

122122
override def beforeAll(): Unit = {
123123
super.beforeAll()
124+
125+
sql(s"""
126+
create table test_parquet
127+
(
128+
intField INT,
129+
stringField STRING
130+
)
131+
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
132+
STORED AS
133+
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
134+
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
135+
""")
136+
137+
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
138+
jsonRDD(rdd).registerTempTable("jt")
139+
sql("""
140+
create table test ROW FORMAT
141+
| SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
142+
| STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
143+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
144+
| AS select * from jt""".stripMargin)
145+
124146
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
125147
}
126148

127149
override def afterAll(): Unit = {
128150
super.afterAll()
151+
sql("DROP TABLE test_parquet")
152+
sql("DROP TABLE test")
153+
sql("DROP TABLE jt")
154+
129155
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
130156
}
157+
158+
test("scan from an empty parquet table") {
159+
checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0))
160+
}
161+
162+
test("scan from an non empty parquet table") {
163+
checkAnswer(
164+
sql(s"SELECT a, b FROM jt WHERE a = '1'"),
165+
Seq(Row(1, "str1"))
166+
)
167+
}
131168
}
132169

133170
class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {

0 commit comments

Comments
 (0)