Skip to content

Commit 1c9a092

Browse files
committed
added Substitution bastch
1 parent f49284b commit 1c9a092

File tree

1 file changed

+48
-23
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis

1 file changed

+48
-23
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,21 @@ class Analyzer(
4747

4848
val fixedPoint = FixedPoint(maxIterations)
4949

50+
/**
51+
* Override to provide additional rules for the "Substitution" batch.
52+
*/
53+
val extendedSubstitutionRules: Seq[Rule[LogicalPlan]] = Nil
54+
5055
/**
5156
* Override to provide additional rules for the "Resolution" batch.
5257
*/
5358
val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil
5459

5560
lazy val batches: Seq[Batch] = Seq(
61+
Batch("Substitution", fixedPoint,
62+
CTESubstitution ::
63+
Nil ++
64+
extendedSubstitutionRules : _*),
5665
Batch("Resolution", fixedPoint,
5766
ResolveRelations ::
5867
ResolveReferences ::
@@ -68,6 +77,38 @@ class Analyzer(
6877
extendedResolutionRules : _*)
6978
)
7079

80+
/**
81+
* Substitute CTE definitions
82+
*/
83+
object CTESubstitution extends Rule[LogicalPlan] {
84+
def apply(plan: LogicalPlan): LogicalPlan = {
85+
val (realPlan, cteRelations) = plan match {
86+
case With(child, relations) =>
87+
(child, relations)
88+
case other => (other, Map.empty[String, LogicalPlan])
89+
}
90+
substituteCTE(realPlan, cteRelations)
91+
}
92+
93+
def substituteCTE(plan: LogicalPlan, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
94+
plan transform {
95+
case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
96+
// In hive, if there is same table name in database and CTE definition,
97+
// hive will use the table in database, not the CTE one.
98+
// Taking into account the reasonableness and the implementation complexity,
99+
// here use the CTE definition first, check table name only and ignore database name
100+
val relation = cteRelations.get(u.tableIdentifier.last)
101+
.map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
102+
.getOrElse(u)
103+
i.copy(table = relation)
104+
case u : UnresolvedRelation =>
105+
cteRelations.get(u.tableIdentifier.last)
106+
.map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
107+
.getOrElse(u)
108+
}
109+
}
110+
}
111+
71112
/**
72113
* Removes no-op Alias expressions from the plan.
73114
*/
@@ -169,36 +210,20 @@ class Analyzer(
169210
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
170211
*/
171212
object ResolveRelations extends Rule[LogicalPlan] {
172-
def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
213+
def getTable(u: UnresolvedRelation): LogicalPlan = {
173214
try {
174-
// In hive, if there is same table name in database and CTE definition,
175-
// hive will use the table in database, not the CTE one.
176-
// Taking into account the reasonableness and the implementation complexity,
177-
// here use the CTE definition first, check table name only and ignore database name
178-
cteRelations.get(u.tableIdentifier.last)
179-
.map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
180-
.getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
215+
catalog.lookupRelation(u.tableIdentifier, u.alias)
181216
} catch {
182217
case _: NoSuchTableException =>
183218
u.failAnalysis(s"no such table ${u.tableName}")
184219
}
185220
}
186221

187-
def apply(plan: LogicalPlan): LogicalPlan = {
188-
val (realPlan, cteRelations) = plan match {
189-
// TODO allow subquery to define CTE
190-
// Add cte table to a temp relation map,drop `with` plan and keep its child
191-
case With(child, relations) => (child, relations)
192-
case other => (other, Map.empty[String, LogicalPlan])
193-
}
194-
195-
realPlan transform {
196-
case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
197-
i.copy(
198-
table = EliminateSubQueries(getTable(u, cteRelations)))
199-
case u: UnresolvedRelation =>
200-
getTable(u, cteRelations)
201-
}
222+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
223+
case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
224+
i.copy(table = EliminateSubQueries(getTable(u)))
225+
case u: UnresolvedRelation =>
226+
getTable(u, cteRelations)
202227
}
203228
}
204229

0 commit comments

Comments
 (0)