Skip to content

Commit 20a4d7d

Browse files
committed
[SPARK-8501] [SQL] Avoids reading schema from empty ORC files
ORC writes empty schema (`struct<>`) to ORC files containing zero rows. This is OK for Hive since the table schema is managed by the metastore. But it causes trouble when reading raw ORC files via Spark SQL since we have to discover the schema from the files. Notice that the ORC data source always avoids writing empty ORC files, but it's still problematic when reading Hive tables which contain empty part-files. Author: Cheng Lian <[email protected]> Closes #7199 from liancheng/spark-8501 and squashes the following commits: bb8cd95 [Cheng Lian] Addresses comments a290221 [Cheng Lian] Avoids reading schema from empty ORC files
1 parent dfd8bac commit 20a4d7d

File tree

4 files changed

+135
-52
lines changed

4 files changed

+135
-52
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,30 +24,70 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
2424

2525
import org.apache.spark.Logging
2626
import org.apache.spark.deploy.SparkHadoopUtil
27+
import org.apache.spark.sql.AnalysisException
2728
import org.apache.spark.sql.hive.HiveMetastoreTypes
2829
import org.apache.spark.sql.types.StructType
2930

3031
private[orc] object OrcFileOperator extends Logging {
31-
def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
32+
/**
33+
* Retrieves a ORC file reader from a given path. The path can point to either a directory or a
34+
* single ORC file. If it points to an directory, it picks any non-empty ORC file within that
35+
* directory.
36+
*
37+
* The reader returned by this method is mainly used for two purposes:
38+
*
39+
* 1. Retrieving file metadata (schema and compression codecs, etc.)
40+
* 2. Read the actual file content (in this case, the given path should point to the target file)
41+
*
42+
* @note As recorded by SPARK-8501, ORC writes an empty schema (<code>struct&lt;&gt;</code) to an
43+
* ORC file if the file contains zero rows. This is OK for Hive since the schema of the
44+
* table is managed by metastore. But this becomes a problem when reading ORC files
45+
* directly from HDFS via Spark SQL, because we have to discover the schema from raw ORC
46+
* files. So this method always tries to find a ORC file whose schema is non-empty, and
47+
* create the result reader from that file. If no such file is found, it returns `None`.
48+
*
49+
* @todo Needs to consider all files when schema evolution is taken into account.
50+
*/
51+
def getFileReader(basePath: String, config: Option[Configuration] = None): Option[Reader] = {
52+
def isWithNonEmptySchema(path: Path, reader: Reader): Boolean = {
53+
reader.getObjectInspector match {
54+
case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() == 0 =>
55+
logInfo(
56+
s"ORC file $path has empty schema, it probably contains no rows. " +
57+
"Trying to read another ORC file to figure out the schema.")
58+
false
59+
case _ => true
60+
}
61+
}
62+
3263
val conf = config.getOrElse(new Configuration)
33-
val fspath = new Path(pathStr)
34-
val fs = fspath.getFileSystem(conf)
35-
val orcFiles = listOrcFiles(pathStr, conf)
36-
logDebug(s"Creating ORC Reader from ${orcFiles.head}")
37-
// TODO Need to consider all files when schema evolution is taken into account.
38-
OrcFile.createReader(fs, orcFiles.head)
64+
val fs = {
65+
val hdfsPath = new Path(basePath)
66+
hdfsPath.getFileSystem(conf)
67+
}
68+
69+
listOrcFiles(basePath, conf).iterator.map { path =>
70+
path -> OrcFile.createReader(fs, path)
71+
}.collectFirst {
72+
case (path, reader) if isWithNonEmptySchema(path, reader) => reader
73+
}
3974
}
4075

4176
def readSchema(path: String, conf: Option[Configuration]): StructType = {
42-
val reader = getFileReader(path, conf)
77+
val reader = getFileReader(path, conf).getOrElse {
78+
throw new AnalysisException(
79+
s"Failed to discover schema from ORC files stored in $path. " +
80+
"Probably there are either no ORC files or only empty ORC files.")
81+
}
4382
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
4483
val schema = readerInspector.getTypeName
4584
logDebug(s"Reading schema from file $path, got Hive schema string: $schema")
4685
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
4786
}
4887

49-
def getObjectInspector(path: String, conf: Option[Configuration]): StructObjectInspector = {
50-
getFileReader(path, conf).getObjectInspector.asInstanceOf[StructObjectInspector]
88+
def getObjectInspector(
89+
path: String, conf: Option[Configuration]): Option[StructObjectInspector] = {
90+
getFileReader(path, conf).map(_.getObjectInspector.asInstanceOf[StructObjectInspector])
5191
}
5292

5393
def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -242,26 +242,34 @@ private[orc] case class OrcTableScan(
242242
nonPartitionKeyAttrs: Seq[(Attribute, Int)],
243243
mutableRow: MutableRow): Iterator[InternalRow] = {
244244
val deserializer = new OrcSerde
245-
val soi = OrcFileOperator.getObjectInspector(path, Some(conf))
246-
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
247-
case (attr, ordinal) =>
248-
soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal
249-
}.unzip
250-
val unwrappers = fieldRefs.map(unwrapperFor)
251-
// Map each tuple to a row object
252-
iterator.map { value =>
253-
val raw = deserializer.deserialize(value)
254-
var i = 0
255-
while (i < fieldRefs.length) {
256-
val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
257-
if (fieldValue == null) {
258-
mutableRow.setNullAt(fieldOrdinals(i))
259-
} else {
260-
unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
245+
val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf))
246+
247+
// SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero
248+
// rows, and thus couldn't give a proper ObjectInspector. In this case we just return an empty
249+
// partition since we know that this file is empty.
250+
maybeStructOI.map { soi =>
251+
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
252+
case (attr, ordinal) =>
253+
soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal
254+
}.unzip
255+
val unwrappers = fieldRefs.map(unwrapperFor)
256+
// Map each tuple to a row object
257+
iterator.map { value =>
258+
val raw = deserializer.deserialize(value)
259+
var i = 0
260+
while (i < fieldRefs.length) {
261+
val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
262+
if (fieldValue == null) {
263+
mutableRow.setNullAt(fieldOrdinals(i))
264+
} else {
265+
unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
266+
}
267+
i += 1
261268
}
262-
i += 1
269+
mutableRow: InternalRow
263270
}
264-
mutableRow: InternalRow
271+
}.getOrElse {
272+
Iterator.empty
265273
}
266274
}
267275

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
2323
import org.apache.hadoop.hive.ql.io.orc.CompressionKind
2424
import org.scalatest.BeforeAndAfterAll
2525

26-
import org.apache.spark.SparkFunSuite
2726
import org.apache.spark.sql._
28-
import org.apache.spark.sql.catalyst.expressions.InternalRow
29-
import org.apache.spark.sql.hive.test.TestHive
3027
import org.apache.spark.sql.hive.test.TestHive._
3128
import org.apache.spark.sql.hive.test.TestHive.implicits._
3229

@@ -170,7 +167,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
170167
test("Default compression options for writing to an ORC file") {
171168
withOrcFile((1 to 100).map(i => (i, s"val_$i"))) { file =>
172169
assertResult(CompressionKind.ZLIB) {
173-
OrcFileOperator.getFileReader(file).getCompression
170+
OrcFileOperator.getFileReader(file).get.getCompression
174171
}
175172
}
176173
}
@@ -183,21 +180,21 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
183180
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "SNAPPY")
184181
withOrcFile(data) { file =>
185182
assertResult(CompressionKind.SNAPPY) {
186-
OrcFileOperator.getFileReader(file).getCompression
183+
OrcFileOperator.getFileReader(file).get.getCompression
187184
}
188185
}
189186

190187
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "NONE")
191188
withOrcFile(data) { file =>
192189
assertResult(CompressionKind.NONE) {
193-
OrcFileOperator.getFileReader(file).getCompression
190+
OrcFileOperator.getFileReader(file).get.getCompression
194191
}
195192
}
196193

197194
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "LZO")
198195
withOrcFile(data) { file =>
199196
assertResult(CompressionKind.LZO) {
200-
OrcFileOperator.getFileReader(file).getCompression
197+
OrcFileOperator.getFileReader(file).get.getCompression
201198
}
202199
}
203200
}
@@ -289,4 +286,48 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
289286
List(Row("same", "run_5", 100)))
290287
}
291288
}
289+
290+
test("SPARK-8501: Avoids discovery schema from empty ORC files") {
291+
withTempPath { dir =>
292+
val path = dir.getCanonicalPath
293+
294+
withTable("empty_orc") {
295+
withTempTable("empty", "single") {
296+
sqlContext.sql(
297+
s"""CREATE TABLE empty_orc(key INT, value STRING)
298+
|STORED AS ORC
299+
|LOCATION '$path'
300+
""".stripMargin)
301+
302+
val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1)
303+
emptyDF.registerTempTable("empty")
304+
305+
// This creates 1 empty ORC file with Hive ORC SerDe. We are using this trick because
306+
// Spark SQL ORC data source always avoids write empty ORC files.
307+
sqlContext.sql(
308+
s"""INSERT INTO TABLE empty_orc
309+
|SELECT key, value FROM empty
310+
""".stripMargin)
311+
312+
val errorMessage = intercept[AnalysisException] {
313+
sqlContext.read.format("orc").load(path)
314+
}.getMessage
315+
316+
assert(errorMessage.contains("Failed to discover schema from ORC files"))
317+
318+
val singleRowDF = Seq((0, "foo")).toDF("key", "value").coalesce(1)
319+
singleRowDF.registerTempTable("single")
320+
321+
sqlContext.sql(
322+
s"""INSERT INTO TABLE empty_orc
323+
|SELECT key, value FROM single
324+
""".stripMargin)
325+
326+
val df = sqlContext.read.format("orc").load(path)
327+
assert(df.schema === singleRowDF.schema.asNullable)
328+
checkAnswer(df, singleRowDF)
329+
}
330+
}
331+
}
332+
}
292333
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,8 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
4343
orcTableDir.mkdir()
4444
import org.apache.spark.sql.hive.test.TestHive.implicits._
4545

46-
// Originally we were using a 10-row RDD for testing. However, when default parallelism is
47-
// greater than 10 (e.g., running on a node with 32 cores), this RDD contains empty partitions,
48-
// which result in empty ORC files. Unfortunately, ORC doesn't handle empty files properly and
49-
// causes build failure on Jenkins, which happens to have 32 cores. Please refer to SPARK-8501
50-
// for more details. To workaround this issue before fixing SPARK-8501, we simply increase row
51-
// number in this RDD to avoid empty partitions.
5246
sparkContext
53-
.makeRDD(1 to 100)
47+
.makeRDD(1 to 10)
5448
.map(i => OrcData(i, s"part-$i"))
5549
.toDF()
5650
.registerTempTable(s"orc_temp_table")
@@ -76,43 +70,43 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
7670
}
7771

7872
test("create temporary orc table") {
79-
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100))
73+
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
8074

8175
checkAnswer(
8276
sql("SELECT * FROM normal_orc_source"),
83-
(1 to 100).map(i => Row(i, s"part-$i")))
77+
(1 to 10).map(i => Row(i, s"part-$i")))
8478

8579
checkAnswer(
8680
sql("SELECT * FROM normal_orc_source where intField > 5"),
87-
(6 to 100).map(i => Row(i, s"part-$i")))
81+
(6 to 10).map(i => Row(i, s"part-$i")))
8882

8983
checkAnswer(
9084
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
91-
(1 to 100).map(i => Row(1, s"part-$i")))
85+
(1 to 10).map(i => Row(1, s"part-$i")))
9286
}
9387

9488
test("create temporary orc table as") {
95-
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100))
89+
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
9690

9791
checkAnswer(
9892
sql("SELECT * FROM normal_orc_source"),
99-
(1 to 100).map(i => Row(i, s"part-$i")))
93+
(1 to 10).map(i => Row(i, s"part-$i")))
10094

10195
checkAnswer(
10296
sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
103-
(6 to 100).map(i => Row(i, s"part-$i")))
97+
(6 to 10).map(i => Row(i, s"part-$i")))
10498

10599
checkAnswer(
106100
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
107-
(1 to 100).map(i => Row(1, s"part-$i")))
101+
(1 to 10).map(i => Row(1, s"part-$i")))
108102
}
109103

110104
test("appending insert") {
111105
sql("INSERT INTO TABLE normal_orc_source SELECT * FROM orc_temp_table WHERE intField > 5")
112106

113107
checkAnswer(
114108
sql("SELECT * FROM normal_orc_source"),
115-
(1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i =>
109+
(1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
116110
Seq.fill(2)(Row(i, s"part-$i"))
117111
})
118112
}
@@ -125,7 +119,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
125119

126120
checkAnswer(
127121
sql("SELECT * FROM normal_orc_as_source"),
128-
(6 to 100).map(i => Row(i, s"part-$i")))
122+
(6 to 10).map(i => Row(i, s"part-$i")))
129123
}
130124
}
131125

0 commit comments

Comments
 (0)