Skip to content

Commit 701a814

Browse files
baishuoliancheng
authored andcommitted
Update SparkHadoopWriter.scala
1 parent dc24c41 commit 701a814

File tree

1 file changed

+27
-0
lines changed

1 file changed

+27
-0
lines changed

sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,33 @@ private[hive] class SparkHiveHadoopWriter(
9393
null)
9494
}
9595

96+
def open(dynamicPartPath: String) {
97+
val numfmt = NumberFormat.getInstance()
98+
numfmt.setMinimumIntegerDigits(5)
99+
numfmt.setGroupingUsed(false)
100+
101+
val extension = Utilities.getFileExtension(
102+
conf.value,
103+
fileSinkConf.getCompressed,
104+
getOutputFormat())
105+
106+
val outputName = "part-" + numfmt.format(splitID) + extension
107+
val outputPath: Path = FileOutputFormat.getOutputPath(conf.value)
108+
if (outputPath == null) {
109+
throw new IOException("Undefined job output-path")
110+
}
111+
val workPath = new Path(outputPath, dynamicPartPath.substring(1))//remove "/"
112+
val path = new Path(workPath, outputName)
113+
getOutputCommitter().setupTask(getTaskContext())
114+
writer = HiveFileFormatUtils.getHiveRecordWriter(
115+
conf.value,
116+
fileSinkConf.getTableInfo,
117+
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
118+
fileSinkConf,
119+
path,
120+
null)
121+
}
122+
96123
def write(value: Writable) {
97124
if (writer != null) {
98125
writer.write(value)

0 commit comments

Comments
 (0)