Skip to content

Commit 5004542

Browse files
committed
Minor refactoring
1 parent fae9eff commit 5004542

File tree

2 files changed

+15
-27
lines changed

2 files changed

+15
-27
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.Date
2424
import scala.collection.mutable
2525

2626
import org.apache.hadoop.fs.Path
27+
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
2728
import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
2829
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
2930
import org.apache.hadoop.hive.ql.plan.FileSinkDesc
@@ -159,11 +160,13 @@ private[hive] object SparkHiveWriterContainer {
159160
private[spark] class SparkHiveDynamicPartitionWriterContainer(
160161
@transient jobConf: JobConf,
161162
fileSinkConf: FileSinkDesc,
162-
dynamicPartColNames: Array[String],
163-
defaultPartName: String)
163+
dynamicPartColNames: Array[String])
164164
extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
165165

166-
@transient var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _
166+
private val defaultPartName = jobConf.get(
167+
ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)
168+
169+
@transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _
167170

168171
override def open(): Unit = {
169172
writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter]

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -103,17 +103,16 @@ case class InsertIntoHiveTable(
103103
valueClass: Class[_],
104104
fileSinkConf: FileSinkDesc,
105105
conf: SerializableWritable[JobConf],
106-
isCompressed: Boolean,
107106
writerContainer: SparkHiveWriterContainer) {
108107
assert(valueClass != null, "Output value class not set")
109108
conf.value.setOutputValueClass(valueClass)
110109

111-
assert(fileSinkConf.getTableInfo.getOutputFileFormatClassName != null)
112-
// Doesn't work in Scala 2.9 due to what may be a generics bug
113-
// TODO: Should we uncomment this for Scala 2.10?
114-
// conf.setOutputFormat(outputFormatClass)
115-
conf.value.set(
116-
"mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
110+
val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
111+
assert(outputFileFormatClassName != null, "Output format class not set")
112+
conf.value.set("mapred.output.format.class", outputFileFormatClassName)
113+
114+
val isCompressed = conf.value.getBoolean(
115+
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
117116

118117
if (isCompressed) {
119118
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
@@ -218,28 +217,14 @@ case class InsertIntoHiveTable(
218217
val jobConf = new JobConf(sc.hiveconf)
219218
val jobConfSer = new SerializableWritable(jobConf)
220219

221-
val defaultPartName = jobConf.get(
222-
ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)
223220
val writerContainer = if (numDynamicPartitions > 0) {
224-
new SparkHiveDynamicPartitionWriterContainer(
225-
jobConf,
226-
fileSinkConf,
227-
partitionColumnNames.takeRight(numDynamicPartitions),
228-
defaultPartName)
221+
val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
222+
new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
229223
} else {
230224
new SparkHiveWriterContainer(jobConf, fileSinkConf)
231225
}
232226

233-
val isCompressed = jobConf.getBoolean(
234-
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
235-
236-
saveAsHiveFile(
237-
child.execute(),
238-
outputClass,
239-
fileSinkConf,
240-
jobConfSer,
241-
isCompressed,
242-
writerContainer)
227+
saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
243228

244229
val outputPath = FileOutputFormat.getOutputPath(jobConf)
245230
// Have to construct the format of dbname.tablename.

0 commit comments

Comments
 (0)