From 442ffe01ca4abb3ba94ad4fe30604501befcb32b Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Thu, 12 Feb 2015 01:19:27 -0800 Subject: [PATCH 1/3] passdown the schema for Parquet File in HiveContext --- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0e43faa8afdaf..23458430dfc44 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -208,14 +208,15 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with ParquetRelation2( paths, Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json), - None, + Some(metastoreSchema), Some(partitionSpec))(hive)) } else { val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) LogicalRelation( ParquetRelation2( paths, - Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json))(hive)) + Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json), + Some(metastoreSchema))(hive)) } } From a04930badb291e55ba4e6ba79ce781a89f827932 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Thu, 12 Feb 2015 19:06:47 -0800 Subject: [PATCH 2/3] fix bug of scan an empty parquet based table --- .../apache/spark/sql/parquet/newParquet.scala | 24 +++++++----- .../spark/sql/hive/HiveMetastoreCatalog.scala | 3 +- .../spark/sql/parquet/parquetSuites.scala | 37 +++++++++++++++++++ 3 files changed, 53 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 9bb34e2df9a26..6185306dd02aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -287,7 +287,11 @@ case class ParquetRelation2( } } - parquetSchema = maybeSchema.getOrElse(readSchema()) + try { + parquetSchema = readSchema().getOrElse(maybeSchema.get) + } catch { + case e => throw new SparkException(s"Failed to find schema for ${paths.mkString(",")}", e) + } partitionKeysIncludedInParquetSchema = isPartitioned && @@ -308,7 +312,7 @@ case class ParquetRelation2( } } - private def readSchema(): StructType = { + private def readSchema(): Option[StructType] = { // Sees which file(s) we need to touch in order to figure out the schema. val filesToTouch = // Always tries the summary files first if users don't require a merged schema. In this case, @@ -611,8 +615,9 @@ object ParquetRelation2 { // internally. private[sql] val METASTORE_SCHEMA = "metastoreSchema" - private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = { - footers.map { footer => + private[parquet] def readSchema( + footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { + Option(footers.map { footer => val metadata = footer.getParquetMetadata.getFileMetaData val parquetSchema = metadata.getSchema val maybeSparkSchema = metadata @@ -630,11 +635,12 @@ object ParquetRelation2 { sqlContext.conf.isParquetBinaryAsString, sqlContext.conf.isParquetINT96AsTimestamp)) } - }.reduce { (left, right) => - try left.merge(right) catch { case e: Throwable => - throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) - } - } + }.foldLeft[StructType](null) { + case (null, right) => right + case (left, right) => try left.merge(right) catch { case e: Throwable => + throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) + } + }) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 23458430dfc44..6805fdcc0a2c4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -212,8 +212,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Some(partitionSpec))(hive)) } else { val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) - LogicalRelation( - ParquetRelation2( + LogicalRelation(ParquetRelation2( paths, Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json), Some(metastoreSchema))(hive)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 2acf1a7767c19..edff90f60e917 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -121,13 +121,50 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { override def beforeAll(): Unit = { super.beforeAll() + + sql(s""" + create table test_parquet + ( + intField INT, + stringField STRING + ) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """) + + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + jsonRDD(rdd).registerTempTable("jt") + sql(""" + create table test ROW FORMAT + | SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + | STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + | AS select * from jt""".stripMargin) + conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") } override def afterAll(): Unit = { super.afterAll() + sql("DROP TABLE test_parquet") + sql("DROP TABLE test") + sql("DROP TABLE jt") + setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) } + + test("scan from an empty parquet table") { + checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0)) + } + + test("scan from an non empty parquet table") { + checkAnswer( + sql(s"SELECT a, b FROM jt WHERE a = '1'"), + Seq(Row(1, "str1")) + ) + } } class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase { From 36978d1835ab6e0266ad3787b33056b573fd59e8 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 16 Feb 2015 23:17:54 -0800 Subject: [PATCH 3/3] Update the code as feedback --- .../org/apache/spark/sql/parquet/newParquet.scala | 8 +++++--- .../org/apache/spark/sql/parquet/parquetSuites.scala | 12 ++++++++---- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 6185306dd02aa..c2410811ef688 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -288,7 +288,7 @@ case class ParquetRelation2( } try { - parquetSchema = readSchema().getOrElse(maybeSchema.get) + parquetSchema = readSchema().getOrElse(maybeSchema.getOrElse(maybeMetastoreSchema.get)) } catch { case e => throw new SparkException(s"Failed to find schema for ${paths.mkString(",")}", e) } @@ -617,7 +617,7 @@ object ParquetRelation2 { private[parquet] def readSchema( footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { - Option(footers.map { footer => + val mergedSchema = footers.map { footer => val metadata = footer.getParquetMetadata.getFileMetaData val parquetSchema = metadata.getSchema val maybeSparkSchema = metadata @@ -640,7 +640,9 @@ object ParquetRelation2 { case (left, right) => try left.merge(right) catch { case e: Throwable => throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) } - }) + } + + Option(mergedSchema) } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index edff90f60e917..d619704f4f767 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -137,7 +137,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) jsonRDD(rdd).registerTempTable("jt") sql(""" - create table test ROW FORMAT + create table test_parquet_jt ROW FORMAT | SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' | STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' @@ -149,8 +149,8 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { override def afterAll(): Unit = { super.afterAll() sql("DROP TABLE test_parquet") - sql("DROP TABLE test") sql("DROP TABLE jt") + sql("DROP TABLE test_parquet_jt") setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) } @@ -159,9 +159,13 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0)) } - test("scan from an non empty parquet table") { + test("scan from an empty parquet table with upper case") { + checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0)) + } + + test("scan from an non empty parquet table #1") { checkAnswer( - sql(s"SELECT a, b FROM jt WHERE a = '1'"), + sql(s"SELECT a, b FROM test_parquet_jt WHERE a = '1'"), Seq(Row(1, "str1")) ) }