Skip to content

[SPARK-6618][SPARK-6669][SQL] Lock Hive metastore client correctly. #5333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
val tableFullName =
relation.hiveQlTable.getDbName + "." + relation.hiveQlTable.getTableName

catalog.client.alterTable(tableFullName, new Table(hiveTTable))
catalog.synchronized {
catalog.client.alterTable(tableFullName, new Table(hiveTTable))
}
}
case otherRelation =>
throw new NotImplementedError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
override def load(in: QualifiedTableName): LogicalPlan = {
logDebug(s"Creating new cached data source for $in")
val table = synchronized {
val table = HiveMetastoreCatalog.this.synchronized {
client.getTable(in.database, in.name)
}
val userSpecifiedSchema =
Expand Down Expand Up @@ -173,12 +173,16 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

def lookupRelation(
tableIdentifier: Seq[String],
alias: Option[String]): LogicalPlan = synchronized {
alias: Option[String]): LogicalPlan = {
val tableIdent = processTableIdentifier(tableIdentifier)
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
hive.sessionState.getCurrentDatabase)
val tblName = tableIdent.last
val table = try client.getTable(databaseName, tblName) catch {
val table = try {
synchronized {
client.getTable(databaseName, tblName)
}
} catch {
case te: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
throw new NoSuchTableException
}
Expand All @@ -200,7 +204,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
} else {
val partitions: Seq[Partition] =
if (table.isPartitioned) {
HiveShim.getAllPartitionsOf(client, table).toSeq
synchronized {
HiveShim.getAllPartitionsOf(client, table).toSeq
}
} else {
Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ case class InsertIntoHiveTable(
@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing: Calling a HiveContext sc is pretty confusing. Also since this is a convenience cast it should be a def so as to not take up extra space in the object.

@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val db = Hive.get(sc.hiveconf)
@transient private lazy val catalog = sc.catalog

private def newSerializer(tableDesc: TableDesc): Serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
Expand Down Expand Up @@ -199,38 +199,45 @@ case class InsertIntoHiveTable(
orderedPartitionSpec.put(entry.getName,partitionSpec.get(entry.getName).getOrElse(""))
}
val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
db.validatePartitionNameCharacters(partVals)
catalog.synchronized {
catalog.client.validatePartitionNameCharacters(partVals)
}
// inheritTableSpecs is set to true. It should be set to false for a IMPORT query
// which is currently considered as a Hive native command.
val inheritTableSpecs = true
// TODO: Correctly set isSkewedStoreAsSubdir.
val isSkewedStoreAsSubdir = false
if (numDynamicPartitions > 0) {
db.loadDynamicPartitions(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
numDynamicPartitions,
holdDDLTime,
isSkewedStoreAsSubdir
)
catalog.synchronized {
catalog.client.loadDynamicPartitions(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
numDynamicPartitions,
holdDDLTime,
isSkewedStoreAsSubdir)
}
} else {
db.loadPartition(
catalog.synchronized {
catalog.client.loadPartition(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir)
}
}
} else {
catalog.synchronized {
catalog.client.loadTable(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir)
holdDDLTime)
}
} else {
db.loadTable(
outputPath,
qualifiedTableName,
overwrite,
holdDDLTime)
}

// Invalidate the cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,4 +457,15 @@ class SQLQuerySuite extends QueryTest {
dropTempTable("data")
setConf("spark.sql.hive.convertCTAS", originalConf)
}

test("sanity test for SPARK-6618") {
(1 to 100).par.map { i =>
val tableName = s"SPARK_6618_table_$i"
sql(s"CREATE TABLE $tableName (col1 string)")
catalog.lookupRelation(Seq(tableName))
table(tableName)
tables()
sql(s"DROP TABLE $tableName")
}
}
}