@@ -50,7 +50,7 @@ case class InsertIntoHiveTable(
50
50
@ transient val sc : HiveContext = sqlContext.asInstanceOf [HiveContext ]
51
51
@ transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
52
52
@ transient private lazy val hiveContext = new Context (sc.hiveconf)
53
- @ transient private lazy val db = Hive .get( sc.hiveconf)
53
+ @ transient private lazy val catalog = sc.catalog
54
54
55
55
private def newSerializer (tableDesc : TableDesc ): Serializer = {
56
56
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf [Serializer ]
@@ -199,38 +199,45 @@ case class InsertIntoHiveTable(
199
199
orderedPartitionSpec.put(entry.getName,partitionSpec.get(entry.getName).getOrElse(" " ))
200
200
}
201
201
val partVals = MetaStoreUtils .getPvals(table.hiveQlTable.getPartCols, partitionSpec)
202
- db.validatePartitionNameCharacters(partVals)
202
+ catalog.synchronized {
203
+ catalog.client.validatePartitionNameCharacters(partVals)
204
+ }
203
205
// inheritTableSpecs is set to true. It should be set to false for a IMPORT query
204
206
// which is currently considered as a Hive native command.
205
207
val inheritTableSpecs = true
206
208
// TODO: Correctly set isSkewedStoreAsSubdir.
207
209
val isSkewedStoreAsSubdir = false
208
210
if (numDynamicPartitions > 0 ) {
209
- db.loadDynamicPartitions(
210
- outputPath,
211
- qualifiedTableName,
212
- orderedPartitionSpec,
213
- overwrite,
214
- numDynamicPartitions,
215
- holdDDLTime,
216
- isSkewedStoreAsSubdir
217
- )
211
+ catalog.synchronized {
212
+ catalog.client.loadDynamicPartitions(
213
+ outputPath,
214
+ qualifiedTableName,
215
+ orderedPartitionSpec,
216
+ overwrite,
217
+ numDynamicPartitions,
218
+ holdDDLTime,
219
+ isSkewedStoreAsSubdir
220
+ }
218
221
} else {
219
- db.loadPartition(
222
+ catalog.synchronized {
223
+ catalog.client.loadPartition(
224
+ outputPath,
225
+ qualifiedTableName,
226
+ orderedPartitionSpec,
227
+ overwrite,
228
+ holdDDLTime,
229
+ inheritTableSpecs,
230
+ isSkewedStoreAsSubdir)
231
+ }
232
+ }
233
+ } else {
234
+ catalog.synchronized {
235
+ catalog.client.loadTable(
220
236
outputPath,
221
237
qualifiedTableName,
222
- orderedPartitionSpec,
223
238
overwrite,
224
- holdDDLTime,
225
- inheritTableSpecs,
226
- isSkewedStoreAsSubdir)
239
+ holdDDLTime)
227
240
}
228
- } else {
229
- db.loadTable(
230
- outputPath,
231
- qualifiedTableName,
232
- overwrite,
233
- holdDDLTime)
234
241
}
235
242
236
243
// Invalidate the cache.
0 commit comments