Skip to content

Commit c32e228

Browse files
fuwhucloud-fan
authored andcommitted
[SPARK-29859][SQL] ALTER DATABASE (SET LOCATION) should look up catalog like v2 commands
### What changes were proposed in this pull request? Add AlterNamespaceSetLocationStatement, AlterNamespaceSetLocation, AlterNamespaceSetLocationExec to make ALTER DATABASE (SET LOCATION) look up catalog like v2 commands. And also refine the code of AlterNamespaceSetProperties, AlterNamespaceSetPropertiesExec, DescribeNamespace, DescribeNamespaceExec to use SupportsNamespaces instead of CatalogPlugin for catalog parameter. ### Why are the changes needed? It's important to make all the commands have the same catalog/namespace resolution behavior, to avoid confusing end-users. ### Does this PR introduce any user-facing change? Yes, add "ALTER NAMESPACE ... SET LOCATION" whose function is same as "ALTER DATABASE ... SET LOCATION" and "ALTER SCHEMA ... SET LOCATION". ### How was this patch tested? New unit tests Closes #26562 from fuwhu/SPARK-29859. Authored-by: fuwhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 50f6d93 commit c32e228

File tree

12 files changed

+75
-45
lines changed

12 files changed

+75
-45
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ statement
8989
(WITH (DBPROPERTIES | PROPERTIES) tablePropertyList))* #createNamespace
9090
| ALTER (database | NAMESPACE) multipartIdentifier
9191
SET (DBPROPERTIES | PROPERTIES) tablePropertyList #setNamespaceProperties
92-
| ALTER database db=errorCapturingIdentifier
93-
SET locationSpec #setDatabaseLocation
92+
| ALTER (database | NAMESPACE) multipartIdentifier
93+
SET locationSpec #setNamespaceLocation
9494
| DROP (database | NAMESPACE) (IF EXISTS)? multipartIdentifier
9595
(RESTRICT | CASCADE)? #dropNamespace
9696
| SHOW (DATABASES | NAMESPACES) ((FROM | IN) multipartIdentifier)?

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,11 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
9494
s"because view support in catalog has not been implemented yet")
9595

9696
case AlterNamespaceSetPropertiesStatement(NonSessionCatalog(catalog, nameParts), properties) =>
97-
AlterNamespaceSetProperties(catalog, nameParts, properties)
97+
AlterNamespaceSetProperties(catalog.asNamespaceCatalog, nameParts, properties)
98+
99+
case AlterNamespaceSetLocationStatement(NonSessionCatalog(catalog, nameParts), location) =>
100+
AlterNamespaceSetProperties(
101+
catalog.asNamespaceCatalog, nameParts, Map("location" -> location))
98102

99103
case DescribeTableStatement(
100104
nameParts @ NonSessionCatalog(catalog, tableName), partitionSpec, isExtended) =>
@@ -176,7 +180,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
176180
DropNamespace(catalog, nameParts, ifExists, cascade)
177181

178182
case DescribeNamespaceStatement(NonSessionCatalog(catalog, nameParts), extended) =>
179-
DescribeNamespace(catalog, nameParts, extended)
183+
DescribeNamespace(catalog.asNamespaceCatalog, nameParts, extended)
180184

181185
case ShowNamespacesStatement(Some(CatalogAndNamespace(catalog, namespace)), pattern) =>
182186
ShowNamespaces(catalog.asNamespaceCatalog, namespace, pattern)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2539,6 +2539,22 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
25392539
}
25402540
}
25412541

2542+
/**
2543+
* Create an [[AlterNamespaceSetLocationStatement]] logical plan.
2544+
*
2545+
* For example:
2546+
* {{{
2547+
* ALTER (DATABASE|SCHEMA|NAMESPACE) namespace SET LOCATION path;
2548+
* }}}
2549+
*/
2550+
override def visitSetNamespaceLocation(ctx: SetNamespaceLocationContext): LogicalPlan = {
2551+
withOrigin(ctx) {
2552+
AlterNamespaceSetLocationStatement(
2553+
visitMultipartIdentifier(ctx.multipartIdentifier),
2554+
visitLocationSpec(ctx.locationSpec))
2555+
}
2556+
}
2557+
25422558
/**
25432559
* Create a [[ShowNamespacesStatement]] command.
25442560
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,13 @@ case class AlterNamespaceSetPropertiesStatement(
357357
namespace: Seq[String],
358358
properties: Map[String, String]) extends ParsedStatement
359359

360+
/**
361+
* ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET LOCATION command, as parsed from SQL.
362+
*/
363+
case class AlterNamespaceSetLocationStatement(
364+
namespace: Seq[String],
365+
location: String) extends ParsedStatement
366+
360367
/**
361368
* A SHOW NAMESPACES statement, as parsed from SQL.
362369
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ case class DropNamespace(
259259
* The logical plan of the DESCRIBE NAMESPACE command that works for v2 catalogs.
260260
*/
261261
case class DescribeNamespace(
262-
catalog: CatalogPlugin,
262+
catalog: SupportsNamespaces,
263263
namespace: Seq[String],
264264
extended: Boolean) extends Command {
265265

@@ -275,7 +275,7 @@ case class DescribeNamespace(
275275
* command that works for v2 catalogs.
276276
*/
277277
case class AlterNamespaceSetProperties(
278-
catalog: CatalogPlugin,
278+
catalog: SupportsNamespaces,
279279
namespace: Seq[String],
280280
properties: Map[String, String]) extends Command
281281

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,6 +1192,20 @@ class DDLParserSuite extends AnalysisTest {
11921192
Seq("a", "b", "c"), Map("b" -> "b")))
11931193
}
11941194

1195+
test("set namespace location") {
1196+
comparePlans(
1197+
parsePlan("ALTER DATABASE a.b.c SET LOCATION '/home/user/db'"),
1198+
AlterNamespaceSetLocationStatement(Seq("a", "b", "c"), "/home/user/db"))
1199+
1200+
comparePlans(
1201+
parsePlan("ALTER SCHEMA a.b.c SET LOCATION '/home/user/db'"),
1202+
AlterNamespaceSetLocationStatement(Seq("a", "b", "c"), "/home/user/db"))
1203+
1204+
comparePlans(
1205+
parsePlan("ALTER NAMESPACE a.b.c SET LOCATION '/home/user/db'"),
1206+
AlterNamespaceSetLocationStatement(Seq("a", "b", "c"), "/home/user/db"))
1207+
}
1208+
11951209
test("show databases: basic") {
11961210
comparePlans(
11971211
parsePlan("SHOW DATABASES"),

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,13 @@ class ResolveSessionCatalog(
172172
}
173173
AlterDatabasePropertiesCommand(nameParts.head, properties)
174174

175+
case AlterNamespaceSetLocationStatement(SessionCatalog(_, nameParts), location) =>
176+
if (nameParts.length != 1) {
177+
throw new AnalysisException(
178+
s"The database name is not valid: ${nameParts.quoted}")
179+
}
180+
AlterDatabaseSetLocationCommand(nameParts.head, location)
181+
175182
case DescribeTableStatement(
176183
nameParts @ SessionCatalog(catalog, tableName), partitionSpec, isExtended) =>
177184
loadTable(catalog, tableName.asIdentifier).collect {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -227,22 +227,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
227227
options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
228228
}
229229

230-
/**
231-
* Create an [[AlterDatabaseSetLocationCommand]] command.
232-
*
233-
* For example:
234-
* {{{
235-
* ALTER (DATABASE|SCHEMA) database SET LOCATION path;
236-
* }}}
237-
*/
238-
override def visitSetDatabaseLocation(
239-
ctx: SetDatabaseLocationContext): LogicalPlan = withOrigin(ctx) {
240-
AlterDatabaseSetLocationCommand(
241-
ctx.db.getText,
242-
visitLocationSpec(ctx.locationSpec)
243-
)
244-
}
245-
246230
/**
247231
* Create a plan for a DESCRIBE FUNCTION command.
248232
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterNamespaceSetPropertiesExec.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,20 @@ package org.apache.spark.sql.execution.datasources.v2
1919

2020
import org.apache.spark.sql.catalyst.InternalRow
2121
import org.apache.spark.sql.catalyst.expressions.Attribute
22-
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, NamespaceChange}
22+
import org.apache.spark.sql.connector.catalog.{NamespaceChange, SupportsNamespaces}
2323

2424
/**
2525
* Physical plan node for setting properties of namespace.
2626
*/
2727
case class AlterNamespaceSetPropertiesExec(
28-
catalog: CatalogPlugin,
28+
catalog: SupportsNamespaces,
2929
namespace: Seq[String],
30-
props: Map[String, String])
31-
extends V2CommandExec {
30+
props: Map[String, String]) extends V2CommandExec {
3231
override protected def run(): Seq[InternalRow] = {
33-
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
34-
3532
val changes = props.map{ case (k, v) =>
3633
NamespaceChange.setProperty(k, v)
3734
}.toSeq
38-
catalog.asNamespaceCatalog.alterNamespace(namespace.toArray, changes: _*)
35+
catalog.alterNamespace(namespace.toArray, changes: _*)
3936
Seq.empty
4037
}
4138

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
2323
import org.apache.spark.sql.catalyst.InternalRow
2424
import org.apache.spark.sql.catalyst.encoders.RowEncoder
2525
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema}
26-
import org.apache.spark.sql.connector.catalog.CatalogPlugin
26+
import org.apache.spark.sql.connector.catalog.SupportsNamespaces
2727
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.COMMENT_TABLE_PROP
2828
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.LOCATION_TABLE_PROP
2929
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.RESERVED_PROPERTIES
@@ -34,19 +34,15 @@ import org.apache.spark.sql.types.StructType
3434
*/
3535
case class DescribeNamespaceExec(
3636
output: Seq[Attribute],
37-
catalog: CatalogPlugin,
37+
catalog: SupportsNamespaces,
3838
namespace: Seq[String],
3939
isExtended: Boolean) extends V2CommandExec {
40-
4140
private val encoder = RowEncoder(StructType.fromAttributes(output)).resolveAndBind()
4241

4342
override protected def run(): Seq[InternalRow] = {
44-
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
45-
4643
val rows = new ArrayBuffer[InternalRow]()
47-
val nsCatalog = catalog.asNamespaceCatalog
4844
val ns = namespace.toArray
49-
val metadata = nsCatalog.loadNamespaceMetadata(ns)
45+
val metadata = catalog.loadNamespaceMetadata(ns)
5046

5147
rows += toCatalystRow("Namespace Name", ns.last)
5248
rows += toCatalystRow("Description", metadata.get(COMMENT_TABLE_PROP))

0 commit comments

Comments
 (0)