Skip to content

Commit 0605e08

Browse files
committed
[SPARK-8604] [SQL] HadoopFsRelation subclasses should set their output format class
`HadoopFsRelation` subclasses, especially `ParquetRelation2` should set its own output format class, so that the default output committer can be setup correctly when doing appending (where we ignore user defined output committers). Author: Cheng Lian <[email protected]> Closes apache#6998 from liancheng/spark-8604 and squashes the following commits: 9be51d1 [Cheng Lian] Adds more comments 6db1368 [Cheng Lian] HadoopFsRelation subclasses should set their output format class (cherry picked from commit c337844) Signed-off-by: Cheng Lian <[email protected]>
1 parent 792ed7a commit 0605e08

File tree

4 files changed

+40
-1
lines changed

4 files changed

+40
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,12 @@ private[sql] class ParquetRelation2(
195195
committerClass,
196196
classOf[ParquetOutputCommitter])
197197

198+
// We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override
199+
// it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why
200+
// we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is
201+
// bundled with `ParquetOutputFormat[Row]`.
202+
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
203+
198204
// TODO There's no need to use two kinds of WriteSupport
199205
// We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
200206
// complex types.

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSer
2727
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
2828
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
2929
import org.apache.hadoop.io.{NullWritable, Writable}
30-
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, RecordWriter, Reporter}
30+
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
3131
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
3232
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
3333

@@ -193,6 +193,16 @@ private[sql] class OrcRelation(
193193
}
194194

195195
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
196+
job.getConfiguration match {
197+
case conf: JobConf =>
198+
conf.setOutputFormat(classOf[OrcOutputFormat])
199+
case conf =>
200+
conf.setClass(
201+
"mapred.output.format.class",
202+
classOf[OrcOutputFormat],
203+
classOf[MapRedOutputFormat[_, _]])
204+
}
205+
196206
new OutputWriterFactory {
197207
override def newInstance(
198208
path: String,

sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ class SimpleTextRelation(
119119
}
120120

121121
override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory {
122+
job.setOutputFormatClass(classOf[TextOutputFormat[_, _]])
123+
122124
override def newInstance(
123125
path: String,
124126
dataSchema: StructType,

sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,4 +719,25 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
719719
}
720720
}
721721
}
722+
723+
test("SPARK-8604: Parquet data source should write summary file while doing appending") {
724+
withTempPath { dir =>
725+
val path = dir.getCanonicalPath
726+
val df = sqlContext.range(0, 5)
727+
df.write.mode(SaveMode.Overwrite).parquet(path)
728+
729+
val summaryPath = new Path(path, "_metadata")
730+
val commonSummaryPath = new Path(path, "_common_metadata")
731+
732+
val fs = summaryPath.getFileSystem(configuration)
733+
fs.delete(summaryPath, true)
734+
fs.delete(commonSummaryPath, true)
735+
736+
df.write.mode(SaveMode.Append).parquet(path)
737+
checkAnswer(sqlContext.read.parquet(path), df.unionAll(df))
738+
739+
assert(fs.exists(summaryPath))
740+
assert(fs.exists(commonSummaryPath))
741+
}
742+
}
722743
}

0 commit comments

Comments
 (0)