Skip to content

Commit 4b39fd1

Browse files
alexoss68marmbrus
authored andcommitted
[SPARK-4943][SQL] Allow table name having dot for db/catalog
The pull only fixes the parsing error and changes API to use tableIdentifier. Joining different catalog datasource related change is not done in this pull. Author: Alex Liu <[email protected]> Closes #3941 from alexliu68/SPARK-SQL-4943-3 and squashes the following commits: 343ae27 [Alex Liu] [SPARK-4943][SQL] refactoring according to review 29e5e55 [Alex Liu] [SPARK-4943][SQL] fix failed Hive CTAS tests 6ae77ce [Alex Liu] [SPARK-4943][SQL] fix TestHive matching error 3652997 [Alex Liu] [SPARK-4943][SQL] Allow table name having dot to support db/catalog ...
1 parent 1e56eba commit 4b39fd1

File tree

17 files changed

+143
-113
lines changed

17 files changed

+143
-113
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,10 @@ class SqlParser extends AbstractSparkSQLParser {
178178
joinedRelation | relationFactor
179179

180180
protected lazy val relationFactor: Parser[LogicalPlan] =
181-
( ident ~ (opt(AS) ~> opt(ident)) ^^ {
182-
case tableName ~ alias => UnresolvedRelation(None, tableName, alias)
181+
( rep1sep(ident, ".") ~ (opt(AS) ~> opt(ident)) ^^ {
182+
case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias)
183183
}
184-
| ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) }
184+
| ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) }
185185
)
186186

187187
protected lazy val joinedRelation: Parser[LogicalPlan] =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,11 @@ class Analyzer(catalog: Catalog,
228228
*/
229229
object ResolveRelations extends Rule[LogicalPlan] {
230230
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
231-
case i @ InsertIntoTable(UnresolvedRelation(databaseName, name, alias), _, _, _) =>
231+
case i @ InsertIntoTable(UnresolvedRelation(tableIdentifier, alias), _, _, _) =>
232232
i.copy(
233-
table = EliminateAnalysisOperators(catalog.lookupRelation(databaseName, name, alias)))
234-
case UnresolvedRelation(databaseName, name, alias) =>
235-
catalog.lookupRelation(databaseName, name, alias)
233+
table = EliminateAnalysisOperators(catalog.lookupRelation(tableIdentifier, alias)))
234+
case UnresolvedRelation(tableIdentifier, alias) =>
235+
catalog.lookupRelation(tableIdentifier, alias)
236236
}
237237
}
238238

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala

Lines changed: 50 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -28,77 +28,74 @@ trait Catalog {
2828

2929
def caseSensitive: Boolean
3030

31-
def tableExists(db: Option[String], tableName: String): Boolean
31+
def tableExists(tableIdentifier: Seq[String]): Boolean
3232

3333
def lookupRelation(
34-
databaseName: Option[String],
35-
tableName: String,
36-
alias: Option[String] = None): LogicalPlan
34+
tableIdentifier: Seq[String],
35+
alias: Option[String] = None): LogicalPlan
3736

38-
def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit
37+
def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit
3938

40-
def unregisterTable(databaseName: Option[String], tableName: String): Unit
39+
def unregisterTable(tableIdentifier: Seq[String]): Unit
4140

4241
def unregisterAllTables(): Unit
4342

44-
protected def processDatabaseAndTableName(
45-
databaseName: Option[String],
46-
tableName: String): (Option[String], String) = {
43+
protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = {
4744
if (!caseSensitive) {
48-
(databaseName.map(_.toLowerCase), tableName.toLowerCase)
45+
tableIdentifier.map(_.toLowerCase)
4946
} else {
50-
(databaseName, tableName)
47+
tableIdentifier
5148
}
5249
}
5350

54-
protected def processDatabaseAndTableName(
55-
databaseName: String,
56-
tableName: String): (String, String) = {
57-
if (!caseSensitive) {
58-
(databaseName.toLowerCase, tableName.toLowerCase)
51+
protected def getDbTableName(tableIdent: Seq[String]): String = {
52+
val size = tableIdent.size
53+
if (size <= 2) {
54+
tableIdent.mkString(".")
5955
} else {
60-
(databaseName, tableName)
56+
tableIdent.slice(size - 2, size).mkString(".")
6157
}
6258
}
59+
60+
protected def getDBTable(tableIdent: Seq[String]) : (Option[String], String) = {
61+
(tableIdent.lift(tableIdent.size - 2), tableIdent.last)
62+
}
6363
}
6464

6565
class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
6666
val tables = new mutable.HashMap[String, LogicalPlan]()
6767

6868
override def registerTable(
69-
databaseName: Option[String],
70-
tableName: String,
69+
tableIdentifier: Seq[String],
7170
plan: LogicalPlan): Unit = {
72-
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
73-
tables += ((tblName, plan))
71+
val tableIdent = processTableIdentifier(tableIdentifier)
72+
tables += ((getDbTableName(tableIdent), plan))
7473
}
7574

76-
override def unregisterTable(
77-
databaseName: Option[String],
78-
tableName: String) = {
79-
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
80-
tables -= tblName
75+
override def unregisterTable(tableIdentifier: Seq[String]) = {
76+
val tableIdent = processTableIdentifier(tableIdentifier)
77+
tables -= getDbTableName(tableIdent)
8178
}
8279

8380
override def unregisterAllTables() = {
8481
tables.clear()
8582
}
8683

87-
override def tableExists(db: Option[String], tableName: String): Boolean = {
88-
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
89-
tables.get(tblName) match {
84+
override def tableExists(tableIdentifier: Seq[String]): Boolean = {
85+
val tableIdent = processTableIdentifier(tableIdentifier)
86+
tables.get(getDbTableName(tableIdent)) match {
9087
case Some(_) => true
9188
case None => false
9289
}
9390
}
9491

9592
override def lookupRelation(
96-
databaseName: Option[String],
97-
tableName: String,
93+
tableIdentifier: Seq[String],
9894
alias: Option[String] = None): LogicalPlan = {
99-
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
100-
val table = tables.getOrElse(tblName, sys.error(s"Table Not Found: $tableName"))
101-
val tableWithQualifiers = Subquery(tblName, table)
95+
val tableIdent = processTableIdentifier(tableIdentifier)
96+
val tableFullName = getDbTableName(tableIdent)
97+
val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName"))
98+
val tableWithQualifiers = Subquery(tableIdent.last, table)
10299

103100
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
104101
// properly qualified with this alias.
@@ -117,41 +114,39 @@ trait OverrideCatalog extends Catalog {
117114
// TODO: This doesn't work when the database changes...
118115
val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()
119116

120-
abstract override def tableExists(db: Option[String], tableName: String): Boolean = {
121-
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
122-
overrides.get((dbName, tblName)) match {
117+
abstract override def tableExists(tableIdentifier: Seq[String]): Boolean = {
118+
val tableIdent = processTableIdentifier(tableIdentifier)
119+
overrides.get(getDBTable(tableIdent)) match {
123120
case Some(_) => true
124-
case None => super.tableExists(db, tableName)
121+
case None => super.tableExists(tableIdentifier)
125122
}
126123
}
127124

128125
abstract override def lookupRelation(
129-
databaseName: Option[String],
130-
tableName: String,
126+
tableIdentifier: Seq[String],
131127
alias: Option[String] = None): LogicalPlan = {
132-
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
133-
val overriddenTable = overrides.get((dbName, tblName))
134-
val tableWithQualifers = overriddenTable.map(r => Subquery(tblName, r))
128+
val tableIdent = processTableIdentifier(tableIdentifier)
129+
val overriddenTable = overrides.get(getDBTable(tableIdent))
130+
val tableWithQualifers = overriddenTable.map(r => Subquery(tableIdent.last, r))
135131

136132
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
137133
// properly qualified with this alias.
138134
val withAlias =
139135
tableWithQualifers.map(r => alias.map(a => Subquery(a, r)).getOrElse(r))
140136

141-
withAlias.getOrElse(super.lookupRelation(dbName, tblName, alias))
137+
withAlias.getOrElse(super.lookupRelation(tableIdentifier, alias))
142138
}
143139

144140
override def registerTable(
145-
databaseName: Option[String],
146-
tableName: String,
141+
tableIdentifier: Seq[String],
147142
plan: LogicalPlan): Unit = {
148-
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
149-
overrides.put((dbName, tblName), plan)
143+
val tableIdent = processTableIdentifier(tableIdentifier)
144+
overrides.put(getDBTable(tableIdent), plan)
150145
}
151146

152-
override def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
153-
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
154-
overrides.remove((dbName, tblName))
147+
override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
148+
val tableIdent = processTableIdentifier(tableIdentifier)
149+
overrides.remove(getDBTable(tableIdent))
155150
}
156151

157152
override def unregisterAllTables(): Unit = {
@@ -167,22 +162,21 @@ object EmptyCatalog extends Catalog {
167162

168163
val caseSensitive: Boolean = true
169164

170-
def tableExists(db: Option[String], tableName: String): Boolean = {
165+
def tableExists(tableIdentifier: Seq[String]): Boolean = {
171166
throw new UnsupportedOperationException
172167
}
173168

174169
def lookupRelation(
175-
databaseName: Option[String],
176-
tableName: String,
170+
tableIdentifier: Seq[String],
177171
alias: Option[String] = None) = {
178172
throw new UnsupportedOperationException
179173
}
180174

181-
def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = {
175+
def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
182176
throw new UnsupportedOperationException
183177
}
184178

185-
def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
179+
def unregisterTable(tableIdentifier: Seq[String]): Unit = {
186180
throw new UnsupportedOperationException
187181
}
188182

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
3434
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
3535
*/
3636
case class UnresolvedRelation(
37-
databaseName: Option[String],
38-
tableName: String,
37+
tableIdentifier: Seq[String],
3938
alias: Option[String] = None) extends LeafNode {
4039
override def output = Nil
4140
override lazy val resolved = false

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ package object dsl {
290290

291291
def insertInto(tableName: String, overwrite: Boolean = false) =
292292
InsertIntoTable(
293-
analysis.UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)
293+
analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite)
294294

295295
def analyze = analysis.SimpleAnalyzer(logicalPlan)
296296
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
4444
AttributeReference("e", ShortType)())
4545

4646
before {
47-
caseSensitiveCatalog.registerTable(None, "TaBlE", testRelation)
48-
caseInsensitiveCatalog.registerTable(None, "TaBlE", testRelation)
47+
caseSensitiveCatalog.registerTable(Seq("TaBlE"), testRelation)
48+
caseInsensitiveCatalog.registerTable(Seq("TaBlE"), testRelation)
4949
}
5050

5151
test("union project *") {
@@ -64,45 +64,45 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
6464
assert(
6565
caseSensitiveAnalyze(
6666
Project(Seq(UnresolvedAttribute("TbL.a")),
67-
UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
67+
UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
6868
Project(testRelation.output, testRelation))
6969

7070
val e = intercept[TreeNodeException[_]] {
7171
caseSensitiveAnalyze(
7272
Project(Seq(UnresolvedAttribute("tBl.a")),
73-
UnresolvedRelation(None, "TaBlE", Some("TbL"))))
73+
UnresolvedRelation(Seq("TaBlE"), Some("TbL"))))
7474
}
7575
assert(e.getMessage().toLowerCase.contains("unresolved"))
7676

7777
assert(
7878
caseInsensitiveAnalyze(
7979
Project(Seq(UnresolvedAttribute("TbL.a")),
80-
UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
80+
UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
8181
Project(testRelation.output, testRelation))
8282

8383
assert(
8484
caseInsensitiveAnalyze(
8585
Project(Seq(UnresolvedAttribute("tBl.a")),
86-
UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
86+
UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
8787
Project(testRelation.output, testRelation))
8888
}
8989

9090
test("resolve relations") {
9191
val e = intercept[RuntimeException] {
92-
caseSensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None))
92+
caseSensitiveAnalyze(UnresolvedRelation(Seq("tAbLe"), None))
9393
}
9494
assert(e.getMessage == "Table Not Found: tAbLe")
9595

9696
assert(
97-
caseSensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) ===
97+
caseSensitiveAnalyze(UnresolvedRelation(Seq("TaBlE"), None)) ===
9898
testRelation)
9999

100100
assert(
101-
caseInsensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None)) ===
101+
caseInsensitiveAnalyze(UnresolvedRelation(Seq("tAbLe"), None)) ===
102102
testRelation)
103103

104104
assert(
105-
caseInsensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) ===
105+
caseInsensitiveAnalyze(UnresolvedRelation(Seq("TaBlE"), None)) ===
106106
testRelation)
107107
}
108108

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter {
4141
val f: Expression = UnresolvedAttribute("f")
4242

4343
before {
44-
catalog.registerTable(None, "table", relation)
44+
catalog.registerTable(Seq("table"), relation)
4545
}
4646

4747
private def checkType(expression: Expression, expectedType: DataType): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
276276
* @group userf
277277
*/
278278
def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
279-
catalog.registerTable(None, tableName, rdd.queryExecution.logical)
279+
catalog.registerTable(Seq(tableName), rdd.queryExecution.logical)
280280
}
281281

282282
/**
@@ -289,7 +289,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
289289
*/
290290
def dropTempTable(tableName: String): Unit = {
291291
tryUncacheQuery(table(tableName))
292-
catalog.unregisterTable(None, tableName)
292+
catalog.unregisterTable(Seq(tableName))
293293
}
294294

295295
/**
@@ -308,7 +308,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
308308

309309
/** Returns the specified table as a SchemaRDD */
310310
def table(tableName: String): SchemaRDD =
311-
new SchemaRDD(this, catalog.lookupRelation(None, tableName))
311+
new SchemaRDD(this, catalog.lookupRelation(Seq(tableName)))
312312

313313
/**
314314
* :: DeveloperApi ::

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ private[sql] trait SchemaRDDLike {
9797
*/
9898
@Experimental
9999
def insertInto(tableName: String, overwrite: Boolean): Unit =
100-
sqlContext.executePlan(
101-
InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)).toRdd
100+
sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)),
101+
Map.empty, logicalPlan, overwrite)).toRdd
102102

103103
/**
104104
* :: Experimental ::

sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
302302
upperCaseData.where('N <= 4).registerTempTable("left")
303303
upperCaseData.where('N >= 3).registerTempTable("right")
304304

305-
val left = UnresolvedRelation(None, "left", None)
306-
val right = UnresolvedRelation(None, "right", None)
305+
val left = UnresolvedRelation(Seq("left"), None)
306+
val right = UnresolvedRelation(Seq("right"), None)
307307

308308
checkAnswer(
309309
left.join(right, FullOuter, Some("left.N".attr === "right.N".attr)),

0 commit comments

Comments
 (0)