-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-5852] [SQL] Passdown the schema for Parquet File in HiveContext #4562
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -287,7 +287,11 @@ case class ParquetRelation2( | |
} | ||
} | ||
|
||
parquetSchema = maybeSchema.getOrElse(readSchema()) | ||
try { | ||
parquetSchema = readSchema().getOrElse(maybeSchema.getOrElse(maybeMetastoreSchema.get)) | ||
} catch { | ||
case e => throw new SparkException(s"Failed to find schema for ${paths.mkString(",")}", e) | ||
} | ||
|
||
partitionKeysIncludedInParquetSchema = | ||
isPartitioned && | ||
|
@@ -308,7 +312,7 @@ case class ParquetRelation2( | |
} | ||
} | ||
|
||
private def readSchema(): StructType = { | ||
private def readSchema(): Option[StructType] = { | ||
// Sees which file(s) we need to touch in order to figure out the schema. | ||
val filesToTouch = | ||
// Always tries the summary files first if users don't require a merged schema. In this case, | ||
|
@@ -611,8 +615,9 @@ object ParquetRelation2 { | |
// internally. | ||
private[sql] val METASTORE_SCHEMA = "metastoreSchema" | ||
|
||
private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = { | ||
footers.map { footer => | ||
private[parquet] def readSchema( | ||
footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { | ||
val mergedSchema = footers.map { footer => | ||
val metadata = footer.getParquetMetadata.getFileMetaData | ||
val parquetSchema = metadata.getSchema | ||
val maybeSparkSchema = metadata | ||
|
@@ -630,11 +635,14 @@ object ParquetRelation2 { | |
sqlContext.conf.isParquetBinaryAsString, | ||
sqlContext.conf.isParquetINT96AsTimestamp)) | ||
} | ||
}.reduce { (left, right) => | ||
try left.merge(right) catch { case e: Throwable => | ||
throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) | ||
} | ||
}.foldLeft[StructType](null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I was trying that also, but seems using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All right. Instead of putting a large code block in |
||
case (null, right) => right | ||
case (left, right) => try left.merge(right) catch { case e: Throwable => | ||
throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) | ||
} | ||
} | ||
|
||
Option(mergedSchema) | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -208,14 +208,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with | |
ParquetRelation2( | ||
paths, | ||
Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json), | ||
None, | ||
Some(metastoreSchema), | ||
Some(partitionSpec))(hive)) | ||
} else { | ||
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) | ||
LogicalRelation( | ||
ParquetRelation2( | ||
LogicalRelation(ParquetRelation2( | ||
paths, | ||
Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json))(hive)) | ||
Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json), | ||
Some(metastoreSchema))(hive)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, we can leave this file unchanged. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, evil case insensitivity... |
||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -121,13 +121,54 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { | |
|
||
override def beforeAll(): Unit = { | ||
super.beforeAll() | ||
|
||
sql(s""" | ||
create table test_parquet | ||
( | ||
intField INT, | ||
stringField STRING | ||
) | ||
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' | ||
STORED AS | ||
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' | ||
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' | ||
""") | ||
|
||
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) | ||
jsonRDD(rdd).registerTempTable("jt") | ||
sql(""" | ||
create table test_parquet_jt ROW FORMAT | ||
| SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' | ||
| STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' | ||
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' | ||
| AS select * from jt""".stripMargin) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also add a test for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, i thought Let's put this test in another PR? |
||
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") | ||
} | ||
|
||
override def afterAll(): Unit = { | ||
super.afterAll() | ||
sql("DROP TABLE test_parquet") | ||
sql("DROP TABLE jt") | ||
sql("DROP TABLE test_parquet_jt") | ||
|
||
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) | ||
} | ||
|
||
test("scan from an empty parquet table") { | ||
checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0)) | ||
} | ||
|
||
test("scan from an empty parquet table with upper case") { | ||
checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0)) | ||
} | ||
|
||
test("scan from an non empty parquet table #1") { | ||
checkAnswer( | ||
sql(s"SELECT a, b FROM test_parquet_jt WHERE a = '1'"), | ||
Seq(Row(1, "str1")) | ||
) | ||
} | ||
} | ||
|
||
class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this
We first check if maybeSchema is defined. If not, we read the schema from existing data. If existing data does not exist, we are dealing with a newly created empty table and we will use maybeMetastoreSchema defined in the options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, seems we do not need
try ... catch
at here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reading the source code, I am wondering if the
maybeMetastoreSchema
is redundant, and it probably should be always converted intomaybeSchema
when creating theParquetRelation2
instance?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on Cheng's comment at https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L194, I think that it is better to keep
maybeMetastoreSchema
and we just fix the bug for now.