Skip to content

Commit bb8cd95

Browse files
committed
Addresses comments
1 parent a290221 commit bb8cd95

File tree

2 files changed

+31
-25
lines changed

2 files changed

+31
-25
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,34 @@ import org.apache.spark.sql.hive.HiveMetastoreTypes
2929
import org.apache.spark.sql.types.StructType
3030

3131
private[orc] object OrcFileOperator extends Logging {
32-
// TODO Needs to consider all files when schema evolution is taken into account.
32+
/**
33+
* Retrieves a ORC file reader from a given path. The path can point to either a directory or a
34+
* single ORC file. If it points to an directory, it picks any non-empty ORC file within that
35+
* directory.
36+
*
37+
* The reader returned by this method is mainly used for two purposes:
38+
*
39+
* 1. Retrieving file metadata (schema and compression codecs, etc.)
40+
* 2. Read the actual file content (in this case, the given path should point to the target file)
41+
*
42+
* @note As recorded by SPARK-8501, ORC writes an empty schema (<code>struct&lt;&gt;</code) to an
43+
* ORC file if the file contains zero rows. This is OK for Hive since the schema of the
44+
* table is managed by metastore. But this becomes a problem when reading ORC files
45+
* directly from HDFS via Spark SQL, because we have to discover the schema from raw ORC
46+
* files. So this method always tries to find a ORC file whose schema is non-empty, and
47+
* create the result reader from that file. If no such file is found, it returns `None`.
48+
*
49+
* @todo Needs to consider all files when schema evolution is taken into account.
50+
*/
3351
def getFileReader(basePath: String, config: Option[Configuration] = None): Option[Reader] = {
3452
def isWithNonEmptySchema(path: Path, reader: Reader): Boolean = {
3553
reader.getObjectInspector match {
36-
case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() > 0 =>
37-
true
3854
case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() == 0 =>
3955
logInfo(
4056
s"ORC file $path has empty schema, it probably contains no rows. " +
4157
"Trying to read another ORC file to figure out the schema.")
4258
false
43-
case _ => false
59+
case _ => true
4460
}
4561
}
4662

@@ -50,10 +66,6 @@ private[orc] object OrcFileOperator extends Logging {
5066
hdfsPath.getFileSystem(conf)
5167
}
5268

53-
// SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero
54-
// rows. This is OK for Hive since the schema of the table is managed by metastore. But this
55-
// becomes a problem when reading ORC files directly from HDFS via Spark SQL, because we have
56-
// to discover the schema from raw ORC files.
5769
listOrcFiles(basePath, conf).iterator.map { path =>
5870
path -> OrcFile.createReader(fs, path)
5971
}.collectFirst {

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,8 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
4343
orcTableDir.mkdir()
4444
import org.apache.spark.sql.hive.test.TestHive.implicits._
4545

46-
// Originally we were using a 10-row RDD for testing. However, when default parallelism is
47-
// greater than 10 (e.g., running on a node with 32 cores), this RDD contains empty partitions,
48-
// which result in empty ORC files. Unfortunately, ORC doesn't handle empty files properly and
49-
// causes build failure on Jenkins, which happens to have 32 cores. Please refer to SPARK-8501
50-
// for more details. To workaround this issue before fixing SPARK-8501, we simply increase row
51-
// number in this RDD to avoid empty partitions.
5246
sparkContext
53-
.makeRDD(1 to 100)
47+
.makeRDD(1 to 10)
5448
.map(i => OrcData(i, s"part-$i"))
5549
.toDF()
5650
.registerTempTable(s"orc_temp_table")
@@ -76,43 +70,43 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
7670
}
7771

7872
test("create temporary orc table") {
79-
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100))
73+
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
8074

8175
checkAnswer(
8276
sql("SELECT * FROM normal_orc_source"),
83-
(1 to 100).map(i => Row(i, s"part-$i")))
77+
(1 to 10).map(i => Row(i, s"part-$i")))
8478

8579
checkAnswer(
8680
sql("SELECT * FROM normal_orc_source where intField > 5"),
87-
(6 to 100).map(i => Row(i, s"part-$i")))
81+
(6 to 10).map(i => Row(i, s"part-$i")))
8882

8983
checkAnswer(
9084
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
91-
(1 to 100).map(i => Row(1, s"part-$i")))
85+
(1 to 10).map(i => Row(1, s"part-$i")))
9286
}
9387

9488
test("create temporary orc table as") {
95-
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100))
89+
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
9690

9791
checkAnswer(
9892
sql("SELECT * FROM normal_orc_source"),
99-
(1 to 100).map(i => Row(i, s"part-$i")))
93+
(1 to 10).map(i => Row(i, s"part-$i")))
10094

10195
checkAnswer(
10296
sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
103-
(6 to 100).map(i => Row(i, s"part-$i")))
97+
(6 to 10).map(i => Row(i, s"part-$i")))
10498

10599
checkAnswer(
106100
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
107-
(1 to 100).map(i => Row(1, s"part-$i")))
101+
(1 to 10).map(i => Row(1, s"part-$i")))
108102
}
109103

110104
test("appending insert") {
111105
sql("INSERT INTO TABLE normal_orc_source SELECT * FROM orc_temp_table WHERE intField > 5")
112106

113107
checkAnswer(
114108
sql("SELECT * FROM normal_orc_source"),
115-
(1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i =>
109+
(1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
116110
Seq.fill(2)(Row(i, s"part-$i"))
117111
})
118112
}
@@ -125,7 +119,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
125119

126120
checkAnswer(
127121
sql("SELECT * FROM normal_orc_as_source"),
128-
(6 to 100).map(i => Row(i, s"part-$i")))
122+
(6 to 10).map(i => Row(i, s"part-$i")))
129123
}
130124
}
131125

0 commit comments

Comments
 (0)