Skip to content

Commit 117121a

Browse files
yhuaimarmbrus
authored andcommitted
[SPARK-5852][SQL]Fail to convert a newly created empty metastore parquet table to a data source parquet table.
The problem is that after we create an empty hive metastore parquet table (e.g. `CREATE TABLE test (a int) STORED AS PARQUET`), Hive will create an empty dir for us, which cause our data source `ParquetRelation2` fail to get the schema of the table. See JIRA for the case to reproduce the bug and the exception. This PR is based on #4562 from chenghao-intel. JIRA: https://issues.apache.org/jira/browse/SPARK-5852 Author: Yin Huai <[email protected]> Author: Cheng Hao <[email protected]> Closes #4655 from yhuai/CTASParquet and squashes the following commits: b8b3450 [Yin Huai] Update tests. 2ac94f7 [Yin Huai] Update tests. 3db3d20 [Yin Huai] Minor update. d7e2308 [Yin Huai] Revert changes in HiveMetastoreCatalog.scala. 36978d1 [Cheng Hao] Update the code as feedback a04930b [Cheng Hao] fix bug of scan an empty parquet based table 442ffe0 [Cheng Hao] passdown the schema for Parquet File in HiveContext
1 parent 4d4cc76 commit 117121a

File tree

3 files changed

+164
-6
lines changed

3 files changed

+164
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,16 @@ private[sql] case class ParquetRelation2(
287287
}
288288
}
289289

290-
parquetSchema = maybeSchema.getOrElse(readSchema())
290+
// To get the schema. We first try to get the schema defined in maybeSchema.
291+
// If maybeSchema is not defined, we will try to get the schema from existing parquet data
292+
// (through readSchema). If data does not exist, we will try to get the schema defined in
293+
// maybeMetastoreSchema (defined in the options of the data source).
294+
// Finally, if we still could not get the schema. We throw an error.
295+
parquetSchema =
296+
maybeSchema
297+
.orElse(readSchema())
298+
.orElse(maybeMetastoreSchema)
299+
.getOrElse(sys.error("Failed to get the schema."))
291300

292301
partitionKeysIncludedInParquetSchema =
293302
isPartitioned &&
@@ -308,7 +317,7 @@ private[sql] case class ParquetRelation2(
308317
}
309318
}
310319

311-
private def readSchema(): StructType = {
320+
private def readSchema(): Option[StructType] = {
312321
// Sees which file(s) we need to touch in order to figure out the schema.
313322
val filesToTouch =
314323
// Always tries the summary files first if users don't require a merged schema. In this case,
@@ -611,7 +620,8 @@ private[sql] object ParquetRelation2 {
611620
// internally.
612621
private[sql] val METASTORE_SCHEMA = "metastoreSchema"
613622

614-
private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = {
623+
private[parquet] def readSchema(
624+
footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
615625
footers.map { footer =>
616626
val metadata = footer.getParquetMetadata.getFileMetaData
617627
val parquetSchema = metadata.getSchema
@@ -630,7 +640,7 @@ private[sql] object ParquetRelation2 {
630640
sqlContext.conf.isParquetBinaryAsString,
631641
sqlContext.conf.isParquetINT96AsTimestamp))
632642
}
633-
}.reduce { (left, right) =>
643+
}.reduceOption { (left, right) =>
634644
try left.merge(right) catch { case e: Throwable =>
635645
throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
636646
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.hive
1919

2020
import java.io.File
21+
2122
import org.scalatest.BeforeAndAfterEach
2223

2324
import org.apache.commons.io.FileUtils
@@ -30,6 +31,8 @@ import org.apache.spark.util.Utils
3031
import org.apache.spark.sql.types._
3132
import org.apache.spark.sql.hive.test.TestHive._
3233
import org.apache.spark.sql.hive.test.TestHive.implicits._
34+
import org.apache.spark.sql.parquet.ParquetRelation2
35+
import org.apache.spark.sql.sources.LogicalRelation
3336

3437
/**
3538
* Tests for persisting tables created though the data sources API into the metastore.
@@ -553,4 +556,39 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
553556
sql("DROP TABLE savedJsonTable")
554557
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
555558
}
559+
560+
if (HiveShim.version == "0.13.1") {
561+
test("scan a parquet table created through a CTAS statement") {
562+
val originalConvertMetastore = getConf("spark.sql.hive.convertMetastoreParquet", "true")
563+
val originalUseDataSource = getConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
564+
setConf("spark.sql.hive.convertMetastoreParquet", "true")
565+
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
566+
567+
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
568+
jsonRDD(rdd).registerTempTable("jt")
569+
sql(
570+
"""
571+
|create table test_parquet_ctas STORED AS parquET
572+
|AS select tmp.a from jt tmp where tmp.a < 5
573+
""".stripMargin)
574+
575+
checkAnswer(
576+
sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "),
577+
Row(3) :: Row(4) :: Nil
578+
)
579+
580+
table("test_parquet_ctas").queryExecution.analyzed match {
581+
case LogicalRelation(p: ParquetRelation2) => // OK
582+
case _ =>
583+
fail(
584+
s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}")
585+
}
586+
587+
// Clenup and reset confs.
588+
sql("DROP TABLE IF EXISTS jt")
589+
sql("DROP TABLE IF EXISTS test_parquet_ctas")
590+
setConf("spark.sql.hive.convertMetastoreParquet", originalConvertMetastore)
591+
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource)
592+
}
593+
}
556594
}

sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@ package org.apache.spark.sql.parquet
2020

2121
import java.io.File
2222

23-
import org.apache.spark.sql.catalyst.expressions.Row
2423
import org.scalatest.BeforeAndAfterAll
2524

2625
import org.apache.spark.sql.{SQLConf, QueryTest}
26+
import org.apache.spark.sql.catalyst.expressions.Row
2727
import org.apache.spark.sql.execution.PhysicalRDD
2828
import org.apache.spark.sql.hive.execution.HiveTableScan
2929
import org.apache.spark.sql.hive.test.TestHive._
3030
import org.apache.spark.sql.hive.test.TestHive.implicits._
31-
31+
import org.apache.spark.sql.sources.LogicalRelation
3232

3333
// The data where the partitioning key exists only in the directory structure.
3434
case class ParquetData(intField: Int, stringField: String)
@@ -121,13 +121,123 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
121121

122122
override def beforeAll(): Unit = {
123123
super.beforeAll()
124+
125+
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
126+
jsonRDD(rdd).registerTempTable("jt")
127+
128+
sql(
129+
"""
130+
|create table test_parquet
131+
|(
132+
| intField INT,
133+
| stringField STRING
134+
|)
135+
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
136+
|STORED AS
137+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
138+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
139+
""".stripMargin)
140+
124141
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
125142
}
126143

127144
override def afterAll(): Unit = {
128145
super.afterAll()
146+
sql("DROP TABLE IF EXISTS jt")
147+
sql("DROP TABLE IF EXISTS test_parquet")
148+
129149
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
130150
}
151+
152+
test("scan an empty parquet table") {
153+
checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0))
154+
}
155+
156+
test("scan an empty parquet table with upper case") {
157+
checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0))
158+
}
159+
160+
test("insert into an empty parquet table") {
161+
sql(
162+
"""
163+
|create table test_insert_parquet
164+
|(
165+
| intField INT,
166+
| stringField STRING
167+
|)
168+
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
169+
|STORED AS
170+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
171+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
172+
""".stripMargin)
173+
174+
// Insert into am empty table.
175+
sql("insert into table test_insert_parquet select a, b from jt where jt.a > 5")
176+
checkAnswer(
177+
sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField < 8"),
178+
Row(6, "str6") :: Row(7, "str7") :: Nil
179+
)
180+
// Insert overwrite.
181+
sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5")
182+
checkAnswer(
183+
sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"),
184+
Row(3, "str3") :: Row(4, "str4") :: Nil
185+
)
186+
sql("DROP TABLE IF EXISTS test_insert_parquet")
187+
188+
// Create it again.
189+
sql(
190+
"""
191+
|create table test_insert_parquet
192+
|(
193+
| intField INT,
194+
| stringField STRING
195+
|)
196+
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
197+
|STORED AS
198+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
199+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
200+
""".stripMargin)
201+
// Insert overwrite an empty table.
202+
sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5")
203+
checkAnswer(
204+
sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"),
205+
Row(3, "str3") :: Row(4, "str4") :: Nil
206+
)
207+
// Insert into the table.
208+
sql("insert into table test_insert_parquet select a, b from jt")
209+
checkAnswer(
210+
sql(s"SELECT intField, stringField FROM test_insert_parquet"),
211+
(1 to 10).map(i => Row(i, s"str$i")) ++ (1 to 4).map(i => Row(i, s"str$i"))
212+
)
213+
sql("DROP TABLE IF EXISTS test_insert_parquet")
214+
}
215+
216+
test("scan a parquet table created through a CTAS statement") {
217+
sql(
218+
"""
219+
|create table test_parquet_ctas ROW FORMAT
220+
|SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
221+
|STORED AS
222+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
223+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
224+
|AS select * from jt
225+
""".stripMargin)
226+
227+
checkAnswer(
228+
sql(s"SELECT a, b FROM test_parquet_ctas WHERE a = 1"),
229+
Seq(Row(1, "str1"))
230+
)
231+
232+
table("test_parquet_ctas").queryExecution.analyzed match {
233+
case LogicalRelation(p: ParquetRelation2) => // OK
234+
case _ =>
235+
fail(
236+
s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}")
237+
}
238+
239+
sql("DROP TABLE IF EXISTS test_parquet_ctas")
240+
}
131241
}
132242

133243
class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {

0 commit comments

Comments
 (0)