Skip to content

Commit 7cb9c1c

Browse files
cloud-fanMaxGekk
authored andcommitted
[SPARK-36380][SQL] Simplify the logical plan names for ALTER TABLE ... COLUMN
### What changes were proposed in this pull request? This a followup of the recent work such as #33200 For `ALTER TABLE` commands, the logical plans do not have the common `AlterTable` prefix in the name and just use names like `SetTableLocation`. This PR proposes to follow the same naming rule in `ALTER TABE ... COLUMN` commands. This PR also moves these AlterTable commands to a individual file and give them a base trait. ### Why are the changes needed? name simplification ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing test Closes #33609 from cloud-fan/dsv2. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Max Gekk <[email protected]>
1 parent 8ca11fe commit 7cb9c1c

File tree

10 files changed

+303
-284
lines changed

10 files changed

+303
-284
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ class Analyzer(override val catalogManager: CatalogManager)
269269
ResolveRelations ::
270270
ResolveTables ::
271271
ResolvePartitionSpec ::
272-
ResolveAlterTableColumnCommands ::
272+
ResolveAlterTableCommands ::
273273
AddMetadataColumns ::
274274
DeduplicateRelations ::
275275
ResolveReferences ::
@@ -3607,15 +3607,15 @@ class Analyzer(override val catalogManager: CatalogManager)
36073607
* Rule to mostly resolve, normalize and rewrite column names based on case sensitivity
36083608
* for alter table column commands.
36093609
*/
3610-
object ResolveAlterTableColumnCommands extends Rule[LogicalPlan] {
3610+
object ResolveAlterTableCommands extends Rule[LogicalPlan] {
36113611
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
3612-
case a: AlterTableColumnCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
3612+
case a: AlterTableCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
36133613
val table = a.table.asInstanceOf[ResolvedTable]
36143614
a.transformExpressions {
36153615
case u: UnresolvedFieldName => resolveFieldNames(table, u.name, u)
36163616
}
36173617

3618-
case a @ AlterTableAddColumns(r: ResolvedTable, cols) if !a.resolved =>
3618+
case a @ AddColumns(r: ResolvedTable, cols) if !a.resolved =>
36193619
// 'colsToAdd' keeps track of new columns being added. It stores a mapping from a
36203620
// normalized parent name of fields to field names that belong to the parent.
36213621
// For example, if we add columns "a.b.c", "a.b.d", and "a.c", 'colsToAdd' will become
@@ -3668,7 +3668,7 @@ class Analyzer(override val catalogManager: CatalogManager)
36683668
resolved.copyTagsFrom(a)
36693669
resolved
36703670

3671-
case a @ AlterTableAlterColumn(
3671+
case a @ AlterColumn(
36723672
table: ResolvedTable, ResolvedFieldName(path, field), dataType, _, _, position) =>
36733673
val newDataType = dataType.flatMap { dt =>
36743674
// Hive style syntax provides the column type, even if it may not have changed.
@@ -3705,7 +3705,7 @@ class Analyzer(override val catalogManager: CatalogManager)
37053705
}.getOrElse(throw QueryCompilationErrors.missingFieldError(fieldName, table, context.origin))
37063706
}
37073707

3708-
private def hasUnresolvedFieldName(a: AlterTableColumnCommand): Boolean = {
3708+
private def hasUnresolvedFieldName(a: AlterTableCommand): Boolean = {
37093709
a.expressions.exists(_.find(_.isInstanceOf[UnresolvedFieldName]).isDefined)
37103710
}
37113711
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -442,8 +442,8 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
442442
case write: V2WriteCommand if write.resolved =>
443443
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
444444

445-
case alter: AlterTableColumnCommand if alter.table.resolved =>
446-
checkAlterTableColumnCommand(alter)
445+
case alter: AlterTableCommand =>
446+
checkAlterTableCommand(alter)
447447

448448
case _ => // Falls back to the following checks
449449
}
@@ -939,7 +939,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
939939
/**
940940
* Validates the options used for alter table commands after table and columns are resolved.
941941
*/
942-
private def checkAlterTableColumnCommand(alter: AlterTableColumnCommand): Unit = {
942+
private def checkAlterTableCommand(alter: AlterTableCommand): Unit = {
943943
def checkColumnNotExists(op: String, fieldNames: Seq[String], struct: StructType): Unit = {
944944
if (struct.findNestedField(fieldNames, includeCollections = true).isDefined) {
945945
alter.failAnalysis(s"Cannot $op column, because ${fieldNames.quoted} " +
@@ -948,7 +948,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
948948
}
949949

950950
alter match {
951-
case AlterTableAddColumns(table: ResolvedTable, colsToAdd) =>
951+
case AddColumns(table: ResolvedTable, colsToAdd) =>
952952
colsToAdd.foreach { colToAdd =>
953953
checkColumnNotExists("add", colToAdd.name, table.schema)
954954
}
@@ -957,10 +957,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
957957
"in the user specified columns",
958958
alter.conf.resolver)
959959

960-
case AlterTableRenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
960+
case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
961961
checkColumnNotExists("rename", col.path :+ newName, table.schema)
962962

963-
case a @ AlterTableAlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) =>
963+
case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) =>
964964
val fieldName = col.name.quoted
965965
if (a.dataType.isDefined) {
966966
val field = CharVarcharUtils.getRawType(col.field.metadata)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3611,7 +3611,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
36113611
*/
36123612
override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
36133613
val colToken = if (ctx.COLUMN() != null) "COLUMN" else "COLUMNS"
3614-
AlterTableAddColumns(
3614+
AddColumns(
36153615
createUnresolvedTable(ctx.multipartIdentifier, s"ALTER TABLE ... ADD $colToken"),
36163616
ctx.columns.qualifiedColTypeWithPosition.asScala.map(typedVisit[QualifiedColType]).toSeq
36173617
)
@@ -3627,7 +3627,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
36273627
*/
36283628
override def visitRenameTableColumn(
36293629
ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) {
3630-
AlterTableRenameColumn(
3630+
RenameColumn(
36313631
createUnresolvedTable(ctx.table, "ALTER TABLE ... RENAME COLUMN"),
36323632
UnresolvedFieldName(typedVisit[Seq[String]](ctx.from)),
36333633
ctx.to.getText)
@@ -3681,7 +3681,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
36813681

36823682
assert(Seq(dataType, nullable, comment, position).count(_.nonEmpty) == 1)
36833683

3684-
AlterTableAlterColumn(
3684+
AlterColumn(
36853685
createUnresolvedTable(ctx.table, s"ALTER TABLE ... $verb COLUMN"),
36863686
UnresolvedFieldName(typedVisit[Seq[String]](ctx.column)),
36873687
dataType = dataType,
@@ -3715,7 +3715,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
37153715
Some("please run ALTER COLUMN ... SET/DROP NOT NULL instead"))
37163716
}
37173717

3718-
AlterTableAlterColumn(
3718+
AlterColumn(
37193719
createUnresolvedTable(ctx.table, s"ALTER TABLE ... CHANGE COLUMN"),
37203720
UnresolvedFieldName(columnNameParts),
37213721
dataType = Option(ctx.colType().dataType()).map(typedVisit[DataType]),
@@ -3730,7 +3730,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
37303730
if (ctx.partitionSpec != null) {
37313731
operationNotAllowed("ALTER TABLE table PARTITION partition_spec REPLACE COLUMNS", ctx)
37323732
}
3733-
AlterTableReplaceColumns(
3733+
ReplaceColumns(
37343734
createUnresolvedTable(ctx.multipartIdentifier, "ALTER TABLE ... REPLACE COLUMNS"),
37353735
ctx.columns.qualifiedColTypeWithPosition.asScala.map { colType =>
37363736
if (colType.NULL != null) {
@@ -3763,7 +3763,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
37633763
override def visitDropTableColumns(
37643764
ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) {
37653765
val columnsToDrop = ctx.columns.multipartIdentifier.asScala.map(typedVisit[Seq[String]])
3766-
AlterTableDropColumns(
3766+
DropColumns(
37673767
createUnresolvedTable(
37683768
ctx.multipartIdentifier,
37693769
"ALTER TABLE ... DROP COLUMNS"),
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical
19+
20+
import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition}
21+
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
22+
import org.apache.spark.sql.catalyst.util.TypeUtils
23+
import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
24+
import org.apache.spark.sql.errors.QueryCompilationErrors
25+
import org.apache.spark.sql.types.DataType
26+
27+
/**
28+
* The base trait for commands that need to alter a v2 table with [[TableChange]]s.
29+
*/
30+
trait AlterTableCommand extends UnaryCommand {
31+
def changes: Seq[TableChange]
32+
def table: LogicalPlan
33+
final override def child: LogicalPlan = table
34+
}
35+
36+
/**
37+
* The logical plan that defines or changes the comment of an TABLE for v2 catalogs.
38+
*
39+
* {{{
40+
* COMMENT ON TABLE tableIdentifier IS ('text' | NULL)
41+
* }}}
42+
*
43+
* where the `text` is the new comment written as a string literal; or `NULL` to drop the comment.
44+
*/
45+
case class CommentOnTable(table: LogicalPlan, comment: String) extends AlterTableCommand {
46+
override def changes: Seq[TableChange] = {
47+
Seq(TableChange.setProperty(TableCatalog.PROP_COMMENT, comment))
48+
}
49+
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
50+
copy(table = newChild)
51+
}
52+
53+
/**
54+
* The logical plan of the ALTER TABLE ... SET LOCATION command.
55+
*/
56+
case class SetTableLocation(
57+
table: LogicalPlan,
58+
partitionSpec: Option[TablePartitionSpec],
59+
location: String) extends AlterTableCommand {
60+
override def changes: Seq[TableChange] = {
61+
if (partitionSpec.nonEmpty) {
62+
throw QueryCompilationErrors.alterV2TableSetLocationWithPartitionNotSupportedError()
63+
}
64+
Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, location))
65+
}
66+
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
67+
copy(table = newChild)
68+
}
69+
70+
/**
71+
* The logical plan of the ALTER TABLE ... SET TBLPROPERTIES command.
72+
*/
73+
case class SetTableProperties(
74+
table: LogicalPlan,
75+
properties: Map[String, String]) extends AlterTableCommand {
76+
override def changes: Seq[TableChange] = {
77+
properties.map { case (key, value) =>
78+
TableChange.setProperty(key, value)
79+
}.toSeq
80+
}
81+
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
82+
copy(table = newChild)
83+
}
84+
85+
/**
86+
* The logical plan of the ALTER TABLE ... UNSET TBLPROPERTIES command.
87+
*/
88+
case class UnsetTableProperties(
89+
table: LogicalPlan,
90+
propertyKeys: Seq[String],
91+
ifExists: Boolean) extends AlterTableCommand {
92+
override def changes: Seq[TableChange] = {
93+
propertyKeys.map(key => TableChange.removeProperty(key))
94+
}
95+
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
96+
copy(table = newChild)
97+
}
98+
99+
/**
100+
* The logical plan of the ALTER TABLE ... ADD COLUMNS command.
101+
*/
102+
case class AddColumns(
103+
table: LogicalPlan,
104+
columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
105+
columnsToAdd.foreach { c =>
106+
TypeUtils.failWithIntervalType(c.dataType)
107+
}
108+
109+
override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
110+
111+
override def changes: Seq[TableChange] = {
112+
columnsToAdd.map { col =>
113+
require(col.path.forall(_.resolved),
114+
"FieldName should be resolved before it's converted to TableChange.")
115+
require(col.position.forall(_.resolved),
116+
"FieldPosition should be resolved before it's converted to TableChange.")
117+
TableChange.addColumn(
118+
col.name.toArray,
119+
col.dataType,
120+
col.nullable,
121+
col.comment.orNull,
122+
col.position.map(_.position).orNull)
123+
}
124+
}
125+
126+
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
127+
copy(table = newChild)
128+
}
129+
130+
/**
131+
* The logical plan of the ALTER TABLE ... REPLACE COLUMNS command.
132+
*/
133+
case class ReplaceColumns(
134+
table: LogicalPlan,
135+
columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
136+
columnsToAdd.foreach { c =>
137+
TypeUtils.failWithIntervalType(c.dataType)
138+
}
139+
140+
override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
141+
142+
override def changes: Seq[TableChange] = {
143+
// REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
144+
require(table.resolved)
145+
val deleteChanges = table.schema.fieldNames.map { name =>
146+
TableChange.deleteColumn(Array(name))
147+
}
148+
val addChanges = columnsToAdd.map { col =>
149+
assert(col.path.isEmpty)
150+
assert(col.position.isEmpty)
151+
TableChange.addColumn(
152+
col.name.toArray,
153+
col.dataType,
154+
col.nullable,
155+
col.comment.orNull,
156+
null)
157+
}
158+
deleteChanges ++ addChanges
159+
}
160+
161+
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
162+
copy(table = newChild)
163+
}
164+
165+
/**
166+
* The logical plan of the ALTER TABLE ... DROP COLUMNS command.
167+
*/
168+
case class DropColumns(
169+
table: LogicalPlan,
170+
columnsToDrop: Seq[FieldName]) extends AlterTableCommand {
171+
override def changes: Seq[TableChange] = {
172+
columnsToDrop.map { col =>
173+
require(col.resolved, "FieldName should be resolved before it's converted to TableChange.")
174+
TableChange.deleteColumn(col.name.toArray)
175+
}
176+
}
177+
178+
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
179+
copy(table = newChild)
180+
}
181+
182+
/**
183+
* The logical plan of the ALTER TABLE ... RENAME COLUMN command.
184+
*/
185+
case class RenameColumn(
186+
table: LogicalPlan,
187+
column: FieldName,
188+
newName: String) extends AlterTableCommand {
189+
override def changes: Seq[TableChange] = {
190+
require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
191+
Seq(TableChange.renameColumn(column.name.toArray, newName))
192+
}
193+
194+
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
195+
copy(table = newChild)
196+
}
197+
198+
/**
199+
* The logical plan of the ALTER TABLE ... ALTER COLUMN command.
200+
*/
201+
case class AlterColumn(
202+
table: LogicalPlan,
203+
column: FieldName,
204+
dataType: Option[DataType],
205+
nullable: Option[Boolean],
206+
comment: Option[String],
207+
position: Option[FieldPosition]) extends AlterTableCommand {
208+
override def changes: Seq[TableChange] = {
209+
require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
210+
val colName = column.name.toArray
211+
val typeChange = dataType.map { newDataType =>
212+
TableChange.updateColumnType(colName, newDataType)
213+
}
214+
val nullabilityChange = nullable.map { nullable =>
215+
TableChange.updateColumnNullability(colName, nullable)
216+
}
217+
val commentChange = comment.map { newComment =>
218+
TableChange.updateColumnComment(colName, newComment)
219+
}
220+
val positionChange = position.map { newPosition =>
221+
require(newPosition.resolved,
222+
"FieldPosition should be resolved before it's converted to TableChange.")
223+
TableChange.updateColumnPosition(colName, newPosition.position)
224+
}
225+
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange
226+
}
227+
228+
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
229+
copy(table = newChild)
230+
}

0 commit comments

Comments
 (0)