Skip to content

Commit 5315dfc

Browse files
committed
rxin's comments.
1 parent 7fc4b56 commit 5315dfc

File tree

3 files changed

+38
-38
lines changed

3 files changed

+38
-38
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
2020
import java.io.IOException
2121
import java.util.{List => JList}
2222

23-
import com.google.common.cache.{CacheLoader, CacheBuilder}
23+
import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder}
2424

2525
import org.apache.hadoop.util.ReflectionUtils
2626
import org.apache.hadoop.hive.metastore.TableType
@@ -54,46 +54,47 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
5454

5555
// TODO: Use this everywhere instead of tuples or databaseName, tableName,.
5656
/** A fully qualified identifier for a table (i.e., database.tableName) */
57-
case class TableIdent(database: String, name: String) {
58-
def toLowerCase = TableIdent(database.toLowerCase, name.toLowerCase)
57+
case class QualifiedTableName(database: String, name: String) {
58+
def toLowerCase = QualifiedTableName(database.toLowerCase, name.toLowerCase)
5959
}
6060

6161
/** A cache of Spark SQL data source tables that have been accessed. */
62-
protected[hive] val cachedDataSourceTables = CacheBuilder.newBuilder()
63-
.maximumSize(1000)
64-
.build(
65-
new CacheLoader[TableIdent, LogicalPlan]() {
66-
override def load(in: TableIdent): LogicalPlan = {
67-
logDebug(s"Creating new cached data source for $in")
68-
val table = client.getTable(in.database, in.name)
69-
val schemaString = table.getProperty("spark.sql.sources.schema")
70-
val userSpecifiedSchema =
71-
if (schemaString == null) {
72-
None
73-
} else {
74-
Some(DataType.fromJson(schemaString).asInstanceOf[StructType])
75-
}
76-
// It does not appear that the ql client for the metastore has a way to enumerate all the
77-
// SerDe properties directly...
78-
val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap
79-
80-
val resolvedRelation =
81-
ResolvedDataSource(
82-
hive,
83-
userSpecifiedSchema,
84-
table.getProperty("spark.sql.sources.provider"),
85-
options)
86-
87-
LogicalRelation(resolvedRelation.relation)
88-
}
89-
})
62+
protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = {
63+
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
64+
override def load(in: QualifiedTableName): LogicalPlan = {
65+
logDebug(s"Creating new cached data source for $in")
66+
val table = client.getTable(in.database, in.name)
67+
val schemaString = table.getProperty("spark.sql.sources.schema")
68+
val userSpecifiedSchema =
69+
if (schemaString == null) {
70+
None
71+
} else {
72+
Some(DataType.fromJson(schemaString).asInstanceOf[StructType])
73+
}
74+
// It does not appear that the ql client for the metastore has a way to enumerate all the
75+
// SerDe properties directly...
76+
val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap
77+
78+
val resolvedRelation =
79+
ResolvedDataSource(
80+
hive,
81+
userSpecifiedSchema,
82+
table.getProperty("spark.sql.sources.provider"),
83+
options)
84+
85+
LogicalRelation(resolvedRelation.relation)
86+
}
87+
}
88+
89+
CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
90+
}
9091

9192
def refreshTable(databaseName: String, tableName: String): Unit = {
92-
cachedDataSourceTables.refresh(TableIdent(databaseName, tableName).toLowerCase)
93+
cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
9394
}
9495

9596
def invalidateTable(databaseName: String, tableName: String): Unit = {
96-
cachedDataSourceTables.invalidate(TableIdent(databaseName, tableName).toLowerCase)
97+
cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase)
9798
}
9899

99100
val caseSensitive: Boolean = false
@@ -143,7 +144,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
143144
val table = client.getTable(databaseName, tblName)
144145

145146
if (table.getProperty("spark.sql.sources.provider") != null) {
146-
cachedDataSourceTables(TableIdent(databaseName, tblName).toLowerCase)
147+
cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
147148
} else if (table.isView) {
148149
// if the unresolved relation is from hive view
149150
// parse the text into logic node.

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ case class DropTable(
5353
override def run(sqlContext: SQLContext) = {
5454
val hiveContext = sqlContext.asInstanceOf[HiveContext]
5555
val ifExistsClause = if (ifExists) "IF EXISTS " else ""
56-
try hiveContext.tryUncacheQuery(hiveContext.table(tableName)) catch {
56+
try {
57+
hiveContext.tryUncacheQuery(hiveContext.table(tableName))
58+
} catch {
5759
case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
5860
}
5961
hiveContext.invalidateTable(tableName)

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
4949
checkAnswer(
5050
sql("SELECT * FROM jsonTable"),
5151
jsonFile("src/test/resources/data/files/sample.json").collect().toSeq)
52-
5352
}
5453

5554
test ("persistent JSON table with a user specified schema") {
@@ -71,7 +70,6 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
7170
checkAnswer(
7271
sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM jsonTable"),
7372
sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM expectedJsonTable").collect().toSeq)
74-
7573
}
7674

7775
test ("persistent JSON table with a user specified schema with a subset of fields") {
@@ -99,7 +97,6 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
9997
checkAnswer(
10098
sql("SELECT b, `<d>`.`=` FROM jsonTable"),
10199
sql("SELECT b, `<d>`.`=` FROM expectedJsonTable").collect().toSeq)
102-
103100
}
104101

105102
test("resolve shortened provider names") {

0 commit comments

Comments
 (0)