Skip to content

Commit 2f53588

Browse files
haiyangmarmbrus
authored andcommitted
[SPARK-6199] [SQL] Support CTE in HiveContext and SQLContext
Author: haiyang <[email protected]> Closes #4929 from haiyangsea/cte and squashes the following commits: 220b67d [haiyang] add golden files for cte test d3c7681 [haiyang] Merge branch 'master' into cte-repair 0ba2070 [haiyang] modify code style 9ce6b58 [haiyang] fix conflict ff74741 [haiyang] add comment for With plan 0d56af4 [haiyang] code indention 776a440 [haiyang] add comments for resolve relation strategy 2fccd7e [haiyang] add comments for resolve relation strategy 241bbe2 [haiyang] fix cte problem of view e9e1237 [haiyang] fix test case problem 614182f [haiyang] add test cases for CTE feature 32e415b [haiyang] add comment 1cc8c15 [haiyang] support with 03f1097 [haiyang] support with e960099 [haiyang] support with 9aaa874 [haiyang] support with 0566978 [haiyang] support with a99ecd2 [haiyang] support with c3fa4c2 [haiyang] support with 3b6077f [haiyang] support with 5f8abe3 [haiyang] support with 4572b05 [haiyang] support with f801f54 [haiyang] support with
1 parent 7dbd371 commit 2f53588

File tree

9 files changed

+100
-14
lines changed

9 files changed

+100
-14
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
111111
protected val UPPER = Keyword("UPPER")
112112
protected val WHEN = Keyword("WHEN")
113113
protected val WHERE = Keyword("WHERE")
114+
protected val WITH = Keyword("WITH")
114115

115116
protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
116117
exprs.zipWithIndex.map {
@@ -127,6 +128,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
127128
| UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
128129
)
129130
| insert
131+
| cte
130132
)
131133

132134
protected lazy val select: Parser[LogicalPlan] =
@@ -156,6 +158,11 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
156158
case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o)
157159
}
158160

161+
protected lazy val cte: Parser[LogicalPlan] =
162+
WITH ~> rep1sep(ident ~ ( AS ~ "(" ~> start <~ ")"), ",") ~ start ^^ {
163+
case r ~ s => With(s, r.map({case n ~ s => (n, Subquery(n, s))}).toMap)
164+
}
165+
159166
protected lazy val projection: Parser[Expression] =
160167
expression ~ (AS.? ~> ident.?) ^^ {
161168
case e ~ a => a.fold(e)(Alias(e, _)())

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,21 +169,36 @@ class Analyzer(
169169
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
170170
*/
171171
object ResolveRelations extends Rule[LogicalPlan] {
172-
def getTable(u: UnresolvedRelation): LogicalPlan = {
172+
def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]) = {
173173
try {
174-
catalog.lookupRelation(u.tableIdentifier, u.alias)
174+
// In hive, if there is same table name in database and CTE definition,
175+
// hive will use the table in database, not the CTE one.
176+
// Taking into account the reasonableness and the implementation complexity,
177+
// here use the CTE definition first, check table name only and ignore database name
178+
cteRelations.get(u.tableIdentifier.last)
179+
.map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
180+
.getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
175181
} catch {
176182
case _: NoSuchTableException =>
177183
u.failAnalysis(s"no such table ${u.tableName}")
178184
}
179185
}
180186

181-
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
182-
case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
183-
i.copy(
184-
table = EliminateSubQueries(getTable(u)))
185-
case u: UnresolvedRelation =>
186-
getTable(u)
187+
def apply(plan: LogicalPlan): LogicalPlan = {
188+
val (realPlan, cteRelations) = plan match {
189+
// TODO allow subquery to define CTE
190+
// Add cte table to a temp relation map,drop `with` plan and keep its child
191+
case With(child, relations) => (child, relations)
192+
case other => (other, Map.empty[String, LogicalPlan])
193+
}
194+
195+
realPlan transform {
196+
case i@InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
197+
i.copy(
198+
table = EliminateSubQueries(getTable(u, cteRelations)))
199+
case u: UnresolvedRelation =>
200+
getTable(u, cteRelations)
201+
}
187202
}
188203
}
189204

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,18 @@ case class CreateTableAsSelect[T](
147147
override lazy val resolved: Boolean = databaseName != None && childrenResolved
148148
}
149149

150+
/**
151+
* A container for holding named common table expressions (CTEs) and a query plan.
152+
* This operator will be removed during analysis and the relations will be substituted into child.
153+
* @param child The final query of this CTE.
154+
* @param cteRelations Queries that this CTE defined,
155+
* key is the alias of the CTE definition,
156+
* value is the CTE definition.
157+
*/
158+
case class With(child: LogicalPlan, cteRelations: Map[String, Subquery]) extends UnaryNode {
159+
override def output = child.output
160+
}
161+
150162
case class WriteToFile(
151163
path: String,
152164
child: LogicalPlan) extends UnaryNode {

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,20 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
407407
mapData.collect().take(1).map(Row.fromTuple).toSeq)
408408
}
409409

410+
test("CTE feature") {
411+
checkAnswer(
412+
sql("with q1 as (select * from testData limit 10) select * from q1"),
413+
testData.take(10).toSeq)
414+
415+
checkAnswer(
416+
sql("""
417+
|with q1 as (select * from testData where key= '5'),
418+
|q2 as (select * from testData where key = '4')
419+
|select * from q1 union all select * from q2""".stripMargin),
420+
Row(5, "5") :: Row(4, "4") :: Nil)
421+
422+
}
423+
410424
test("date row") {
411425
checkAnswer(sql(
412426
"""select cast("2015-01-28" as date) from testData limit 1"""),

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -576,11 +576,23 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
576576
case Token("TOK_QUERY", queryArgs)
577577
if Seq("TOK_FROM", "TOK_INSERT").contains(queryArgs.head.getText) =>
578578

579-
val (fromClause: Option[ASTNode], insertClauses) = queryArgs match {
580-
case Token("TOK_FROM", args: Seq[ASTNode]) :: insertClauses =>
581-
(Some(args.head), insertClauses)
582-
case Token("TOK_INSERT", _) :: Nil => (None, queryArgs)
583-
}
579+
val (fromClause: Option[ASTNode], insertClauses, cteRelations) =
580+
queryArgs match {
581+
case Token("TOK_FROM", args: Seq[ASTNode]) :: insertClauses =>
582+
// check if has CTE
583+
insertClauses.last match {
584+
case Token("TOK_CTE", cteClauses) =>
585+
val cteRelations = cteClauses.map(node => {
586+
val relation = nodeToRelation(node).asInstanceOf[Subquery]
587+
(relation.alias, relation)
588+
}).toMap
589+
(Some(args.head), insertClauses.init, Some(cteRelations))
590+
591+
case _ => (Some(args.head), insertClauses, None)
592+
}
593+
594+
case Token("TOK_INSERT", _) :: Nil => (None, queryArgs, None)
595+
}
584596

585597
// Return one query for each insert clause.
586598
val queries = insertClauses.map { case Token("TOK_INSERT", singleInsert) =>
@@ -794,7 +806,10 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
794806
}
795807

796808
// If there are multiple INSERTS just UNION them together into on query.
797-
queries.reduceLeft(Union)
809+
val query = queries.reduceLeft(Union)
810+
811+
// return With plan if there is CTE
812+
cteRelations.map(With(query, _)).getOrElse(query)
798813

799814
case Token("TOK_UNION", left :: right :: Nil) => Union(nodeToPlan(left), nodeToPlan(right))
800815

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
5
2+
5
3+
5
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
val_4
2+
val_5
3+
val_5
4+
val_5
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
4

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,21 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
542542
createQueryTest("select null from table",
543543
"SELECT null FROM src LIMIT 1")
544544

545+
createQueryTest("CTE feature #1",
546+
"with q1 as (select key from src) select * from q1 where key = 5")
547+
548+
createQueryTest("CTE feature #2",
549+
"""with q1 as (select * from src where key= 5),
550+
|q2 as (select * from src s2 where key = 4)
551+
|select value from q1 union all select value from q2
552+
""".stripMargin)
553+
554+
createQueryTest("CTE feature #3",
555+
"""with q1 as (select key from src)
556+
|from q1
557+
|select * where key = 4
558+
""".stripMargin)
559+
545560
test("predicates contains an empty AttributeSet() references") {
546561
sql(
547562
"""

0 commit comments

Comments
 (0)