@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils
26
26
import org .apache .hadoop .hive .ql .Context
27
27
import org .apache .hadoop .hive .ql .ErrorMsg
28
28
import org .apache .hadoop .hive .ql .metadata .Hive
29
- import org .apache .hadoop .hive .ql .parse .SemanticException
30
29
import org .apache .hadoop .hive .ql .plan .{FileSinkDesc , TableDesc }
31
30
import org .apache .hadoop .hive .serde2 .Serializer
32
31
import org .apache .hadoop .hive .serde2 .objectinspector ._
@@ -104,91 +103,132 @@ case class InsertIntoHiveTable(
104
103
}
105
104
106
105
def saveAsHiveFile (
107
- rdd : RDD [Writable ],
106
+ rdd : RDD [( Writable , String ) ],
108
107
valueClass : Class [_],
109
108
fileSinkConf : FileSinkDesc ,
110
- conf : JobConf ,
111
- isCompressed : Boolean ) {
109
+ conf : SerializableWritable [JobConf ],
110
+ isCompressed : Boolean ,
111
+ dynamicPartNum : Int ) {
112
112
if (valueClass == null ) {
113
113
throw new SparkException (" Output value class not set" )
114
114
}
115
- conf.setOutputValueClass(valueClass)
115
+ conf.value. setOutputValueClass(valueClass)
116
116
if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null ) {
117
117
throw new SparkException (" Output format class not set" )
118
118
}
119
119
// Doesn't work in Scala 2.9 due to what may be a generics bug
120
120
// TODO: Should we uncomment this for Scala 2.10?
121
121
// conf.setOutputFormat(outputFormatClass)
122
- conf.set(" mapred.output.format.class" , fileSinkConf.getTableInfo.getOutputFileFormatClassName)
122
+ conf.value. set(" mapred.output.format.class" , fileSinkConf.getTableInfo.getOutputFileFormatClassName)
123
123
if (isCompressed) {
124
124
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
125
125
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
126
126
// to store compression information.
127
- conf.set(" mapred.output.compress" , " true" )
127
+ conf.value. set(" mapred.output.compress" , " true" )
128
128
fileSinkConf.setCompressed(true )
129
- fileSinkConf.setCompressCodec(conf.get(" mapred.output.compression.codec" ))
130
- fileSinkConf.setCompressType(conf.get(" mapred.output.compression.type" ))
129
+ fileSinkConf.setCompressCodec(conf.value. get(" mapred.output.compression.codec" ))
130
+ fileSinkConf.setCompressType(conf.value. get(" mapred.output.compression.type" ))
131
131
}
132
- conf.setOutputCommitter(classOf [FileOutputCommitter ])
133
- FileOutputFormat .setOutputPath(
134
- conf,
135
- SparkHiveHadoopWriter .createPathFromString(fileSinkConf.getDirName, conf))
132
+ conf.value.setOutputCommitter(classOf [FileOutputCommitter ])
136
133
134
+ FileOutputFormat .setOutputPath(
135
+ conf.value,
136
+ SparkHiveHadoopWriter .createPathFromString(fileSinkConf.getDirName, conf.value))
137
137
log.debug(" Saving as hadoop file of type " + valueClass.getSimpleName)
138
+ var writer : SparkHiveHadoopWriter = null
139
+ // Map restore writesr for Dynamic Partition
140
+ var writerMap : scala.collection.mutable.HashMap [String , SparkHiveHadoopWriter ] = null
141
+ if (dynamicPartNum == 0 ) {
142
+ writer = new SparkHiveHadoopWriter (conf.value, fileSinkConf)
143
+ writer.preSetup()
144
+ } else {
145
+ writerMap = new scala.collection.mutable.HashMap [String , SparkHiveHadoopWriter ]
146
+ }
138
147
139
- val writer = new SparkHiveHadoopWriter (conf, fileSinkConf)
140
- writer.preSetup()
141
-
142
- def writeToFile (context : TaskContext , iter : Iterator [Writable ]) {
143
- // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
144
- // around by taking a mod. We expect that no task will be attempted 2 billion times.
145
- val attemptNumber = (context.attemptId % Int .MaxValue ).toInt
146
-
148
+ def writeToFile (context : TaskContext , iter : Iterator [(Writable , String )]) {
149
+ // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
150
+ // around by taking a mod. We expect that no task will be attempted 2 billion times.
151
+ val attemptNumber = (context.attemptId % Int .MaxValue ).toInt
152
+ // writer for No Dynamic Partition
153
+ if (dynamicPartNum == 0 ) {
147
154
writer.setup(context.stageId, context.partitionId, attemptNumber)
148
155
writer.open()
156
+ } else {
149
157
150
- var count = 0
151
- while (iter.hasNext) {
152
- val record = iter.next()
153
- count += 1
154
- writer.write(record)
155
- }
156
-
157
- writer.close()
158
- writer.commit()
159
158
}
160
-
161
- sc.sparkContext.runJob(rdd, writeToFile _)
162
- writer.commitJob()
163
- }
164
-
165
- def getDynamicPartDir (tableInfo : TableDesc , row : Row , dynamicPartNum2 : Int , jobConf : JobConf ) : String = {
166
- dynamicPartNum2 match {
167
- case 0 => " "
168
- case i => {
169
- val colsNum = tableInfo.getProperties.getProperty(" columns" ).split(" \\ ," ).length
170
- val partColStr = tableInfo.getProperties.getProperty(" partition_columns" )
171
- val partCols = partColStr.split(" /" )
172
- var buf = new StringBuffer ()
173
- if (partCols.length == dynamicPartNum2) {
174
- for (j <- 0 until partCols.length) {
175
- buf.append(" /" ).append(partCols(j)).append(" =" ).append(handleNull(row(colsNum + j ), jobConf))
176
- }
177
- } else {
178
- for (j <- 0 until dynamicPartNum2) {
179
- buf.append(" /" ).append(partCols(j + partCols.length - dynamicPartNum2)).append(" =" ).append(handleNull(row(colsNum + j), jobConf))
159
+ var count = 0
160
+ // writer for Dynamic Partition
161
+ var writer2 : SparkHiveHadoopWriter = null
162
+ while (iter.hasNext) {
163
+ val record = iter.next()
164
+ count += 1
165
+ if (record._2 == null ) { // without Dynamic Partition
166
+ writer.write(record._1)
167
+ } else { // for Dynamic Partition
168
+ val location = fileSinkConf.getDirName
169
+ val partLocation = location + record._2 // this is why the writer can write to different file
170
+ writer2 = writerMap.get(record._2) match {
171
+ case Some (writer)=> writer
172
+ case None => {
173
+ val tempWriter = new SparkHiveHadoopWriter (conf.value, new FileSinkDesc (partLocation, fileSinkConf.getTableInfo, false ))
174
+ tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
175
+ tempWriter.open(record._2)
176
+ writerMap += (record._2 -> tempWriter)
177
+ tempWriter
178
+ }
179
+ }
180
+ writer2.write(record._1)
180
181
}
181
182
}
182
- buf.toString
183
+ if (dynamicPartNum == 0 ) {
184
+ writer.close()
185
+ writer.commit()
186
+ } else {
187
+ for ((k,v) <- writerMap) {
188
+ v.close()
189
+ v.commit()
190
+ }
191
+ }
183
192
}
193
+
194
+ sc.sparkContext.runJob(rdd, writeToFile _)
195
+ if (dynamicPartNum == 0 ) {
196
+ writer.commitJob()
197
+ } else {
198
+ for ((k,v) <- writerMap) {
199
+ v.commitJob()
200
+ }
201
+ writerMap.clear()
184
202
}
185
- }
186
203
187
- def handleNull (obj : Any , jobConf : JobConf ) : String = {
188
- if (obj == null || obj.toString.length == 0 ) {
189
- jobConf.get(" hive.exec.default.partition.name " , " __HIVE_DEFAULT_PARTITION__" )
204
+
205
+
206
+ }
207
+ /*
208
+ * e.g.
209
+ * for sql: Insert.....tablename(part1,part2) select ....val1,val2 from ...
210
+ * return: /part1=val1/part2=val2
211
+ * for sql: Insert.....tablename(part1=val1,part2) select ....,val2 from ...
212
+ * return: /part2=val2
213
+ * for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ...
214
+ * return: /part2=val2/part3=val3
215
+ * */
216
+ private def getDynamicPartDir (partCols : Array [String ], row : Row , dynamicPartNum : Int , defaultPartName : String ): String = {
217
+ assert(dynamicPartNum > 0 )
218
+ partCols
219
+ .takeRight(dynamicPartNum)
220
+ .zip(row.takeRight(dynamicPartNum))
221
+ .map { case (c, v) => s " / $c= ${handleNull(v, defaultPartName)}" }
222
+ .mkString
223
+ }
224
+ /*
225
+ * if rowVal is null or "",will return HiveConf.get(hive.exec.default.partition.name) with default
226
+ * */
227
+ private def handleNull (rowVal : Any , defaultPartName : String ): String = {
228
+ if (rowVal == null || String .valueOf(rowVal).length == 0 ) {
229
+ defaultPartName
190
230
} else {
191
- obj.toString
231
+ String .valueOf(rowVal)
192
232
}
193
233
}
194
234
@@ -211,32 +251,32 @@ case class InsertIntoHiveTable(
211
251
val tableLocation = table.hiveQlTable.getDataLocation
212
252
val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
213
253
val fileSinkConf = new FileSinkDesc (tmpLocation.toString, tableDesc, false )
214
- var dynamicPartNum = 0
254
+ var tmpDynamicPartNum = 0
215
255
var numStaPart = 0
216
- var dynamicPartPath = " " ;
217
256
val partitionSpec = partition.map {
218
- case (key, Some (value)) => { numStaPart += 1 ; key -> value }
219
- case (key, None ) => { dynamicPartNum += 1 ; key -> " " }
257
+ case (key, Some (value)) =>
258
+ numStaPart += 1
259
+ key -> value
260
+ case (key, None ) =>
261
+ tmpDynamicPartNum += 1
262
+ key -> " "
220
263
}
221
- // ORC stores compression information in table properties. While, there are other formats
222
- // (e.g. RCFile) that rely on hadoop configurations to store compression information.
264
+ val dynamicPartNum = tmpDynamicPartNum
223
265
val jobConf = new JobConf (sc.hiveconf)
224
266
val jobConfSer = new SerializableWritable (jobConf)
225
267
// check if the partition spec is valid
226
268
if (dynamicPartNum > 0 ) {
227
269
if (! sc.hiveconf.getBoolVar(HiveConf .ConfVars .DYNAMICPARTITIONING )) {
228
- throw new SemanticException (
229
- ErrorMsg .DYNAMIC_PARTITION_DISABLED .getMsg())
270
+ throw new SparkException (ErrorMsg .DYNAMIC_PARTITION_DISABLED .getMsg())
230
271
}
231
272
if (numStaPart == 0 && sc.hiveconf.getVar(HiveConf .ConfVars .DYNAMICPARTITIONINGMODE ).equalsIgnoreCase(" strict" )) {
232
- throw new SemanticException (ErrorMsg .DYNAMIC_PARTITION_STRICT_MODE .getMsg());
273
+ throw new SparkException (ErrorMsg .DYNAMIC_PARTITION_STRICT_MODE .getMsg())
233
274
}
234
275
// check if static partition appear after dynamic partitions
235
276
for ((k,v) <- partitionSpec) {
236
277
if (partitionSpec(k) == " " ) {
237
278
if (numStaPart > 0 ) { // found a DP, but there exists ST as subpartition
238
- throw new SemanticException (
239
- ErrorMsg .PARTITION_DYN_STA_ORDER .getMsg());
279
+ throw new SparkException (ErrorMsg .PARTITION_DYN_STA_ORDER .getMsg())
240
280
}
241
281
} else {
242
282
numStaPart -= 1
@@ -252,96 +292,40 @@ case class InsertIntoHiveTable(
252
292
ObjectInspectorCopyOption .JAVA )
253
293
.asInstanceOf [StructObjectInspector ]
254
294
255
-
256
295
val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
257
296
val outputData = new Array [Any ](fieldOIs.length)
297
+ val defaultPartName = jobConfSer.value.get(" hive.exec.default.partition.name " , " __HIVE_DEFAULT_PARTITION__" )
298
+ var partColStr : Array [String ] = null ;
299
+ if (fileSinkConf.getTableInfo.getProperties.getProperty(" partition_columns" ) != null ) {
300
+ partColStr = fileSinkConf
301
+ .getTableInfo
302
+ .getProperties
303
+ .getProperty(" partition_columns" )
304
+ .split(" /" )
305
+ }
306
+
258
307
iter.map { row =>
308
+ var dynamicPartPath : String = null
309
+ if (dynamicPartNum > 0 ) {
310
+ dynamicPartPath = getDynamicPartDir(partColStr, row, dynamicPartNum, defaultPartName)
311
+ }
259
312
var i = 0
260
313
while (i < fieldOIs.length) {
261
- if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) {
262
- dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum, jobConfSer.value)
263
- }
264
314
// Casts Strings to HiveVarchars when necessary.
265
315
outputData(i) = wrap(row(i), fieldOIs(i))
266
316
i += 1
267
317
}
268
318
269
- serializer.serialize(outputData, standardOI)
319
+ serializer.serialize(outputData, standardOI) -> dynamicPartPath
270
320
}
271
321
}
272
-
273
- if (dynamicPartNum > 0 ) {
274
- if (outputClass == null ) {
275
- throw new SparkException (" Output value class not set" )
276
- }
277
- jobConfSer.value.setOutputValueClass(outputClass)
278
- if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null ) {
279
- throw new SparkException (" Output format class not set" )
280
- }
281
- // Doesn't work in Scala 2.9 due to what may be a generics bug
282
- // TODO: Should we uncomment this for Scala 2.10?
283
- // conf.setOutputFormat(outputFormatClass)
284
- jobConfSer.value.set(" mapred.output.format.class" , fileSinkConf.getTableInfo.getOutputFileFormatClassName)
285
- if (sc.hiveconf.getBoolean(" hive.exec.compress.output" , false )) {
286
- // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
287
- // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
288
- // to store compression information.
289
- jobConfSer.value.set(" mapred.output.compress" , " true" )
290
- fileSinkConf.setCompressed(true )
291
- fileSinkConf.setCompressCodec(jobConfSer.value.get(" mapred.output.compression.codec" ))
292
- fileSinkConf.setCompressType(jobConfSer.value.get(" mapred.output.compression.type" ))
293
- }
294
- jobConfSer.value.setOutputCommitter(classOf [FileOutputCommitter ])
295
-
296
- FileOutputFormat .setOutputPath(
297
- jobConfSer.value,
298
- SparkHiveHadoopWriter .createPathFromString(fileSinkConf.getDirName, jobConfSer.value))
299
-
300
- var writerMap = new scala.collection.mutable.HashMap [String , SparkHiveHadoopWriter ]
301
- def writeToFile2 (context : TaskContext , iter : Iterator [Writable ]) {
302
- // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
303
- // around by taking a mod. We expect that no task will be attempted 2 billion times.
304
- val attemptNumber = (context.attemptId % Int .MaxValue ).toInt
305
- val serializer = newSerializer(fileSinkConf.getTableInfo)
306
- var count = 0
307
- var writer2 : SparkHiveHadoopWriter = null
308
- while (iter.hasNext) {
309
- val record = iter.next();
310
- val location = fileSinkConf.getDirName
311
- val partLocation = location + dynamicPartPath
312
- writer2= writerMap.get(dynamicPartPath) match {
313
- case Some (writer)=> writer
314
- case None => {
315
- val tempWriter = new SparkHiveHadoopWriter (jobConfSer.value, new FileSinkDesc (partLocation, fileSinkConf.getTableInfo, false ))
316
- tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
317
- tempWriter.open(dynamicPartPath);
318
- writerMap += (dynamicPartPath -> tempWriter)
319
- tempWriter
320
- }
321
- }
322
- count += 1
323
- writer2.write(record)
324
- }
325
- for ((k,v) <- writerMap) {
326
- v.close()
327
- v.commit()
328
- }
329
- }
330
-
331
- sc.sparkContext.runJob(rdd, writeToFile2 _)
332
-
333
- for ((k,v) <- writerMap) {
334
- v.commitJob()
335
- }
336
- writerMap.clear()
337
- } else {
338
322
saveAsHiveFile(
339
323
rdd,
340
324
outputClass,
341
325
fileSinkConf,
342
- jobConf ,
343
- sc.hiveconf.getBoolean(" hive.exec.compress.output" , false ))
344
- }
326
+ jobConfSer ,
327
+ sc.hiveconf.getBoolean(" hive.exec.compress.output" , false ),
328
+ dynamicPartNum)
345
329
346
330
val outputPath = FileOutputFormat .getOutputPath(jobConf)
347
331
// Have to construct the format of dbname.tablename.
@@ -358,13 +342,13 @@ case class InsertIntoHiveTable(
358
342
val inheritTableSpecs = true
359
343
// TODO: Correctly set isSkewedStoreAsSubdir.
360
344
val isSkewedStoreAsSubdir = false
361
- if (dynamicPartNum > 0 ) {
345
+ if (dynamicPartNum> 0 ) {
362
346
db.loadDynamicPartitions(
363
347
outputPath,
364
348
qualifiedTableName,
365
349
partitionSpec,
366
350
overwrite,
367
- dynamicPartNum/* dpCtx.getNumDPCols() */ ,
351
+ dynamicPartNum,
368
352
holdDDLTime,
369
353
isSkewedStoreAsSubdir
370
354
)
0 commit comments