Skip to content

Commit c18aa38

Browse files
committed
Merge remote-tracking branch 'remotes/ravipesala/Add-Cache-table-as' into SPARK-2594
2 parents f493f79 + 394d5ca commit c18aa38

File tree

6 files changed

+71
-13
lines changed

6 files changed

+71
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
151151
EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
152152
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
153153
)
154-
| insert | cache
154+
| insert | cache | unCache
155155
)
156156

157157
protected lazy val select: Parser[LogicalPlan] =
@@ -183,9 +183,17 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
183183
}
184184

185185
protected lazy val cache: Parser[LogicalPlan] =
186-
(CACHE ^^^ true | UNCACHE ^^^ false) ~ TABLE ~ ident ^^ {
187-
case doCache ~ _ ~ tableName => CacheCommand(tableName, doCache)
186+
CACHE ~ TABLE ~> ident ~ opt(AS ~ select) <~ opt(";") ^^ {
187+
case tableName ~ None =>
188+
CacheCommand(tableName, true)
189+
case tableName ~ Some(as ~ plan) =>
190+
CacheTableAsSelectCommand(tableName, plan)
188191
}
192+
193+
protected lazy val unCache: Parser[LogicalPlan] =
194+
UNCACHE ~ TABLE ~> ident <~ opt(";") ^^ {
195+
case tableName => CacheCommand(tableName, false)
196+
}
189197

190198
protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",")
191199

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,8 @@ case class DescribeCommand(
7575
AttributeReference("data_type", StringType, nullable = false)(),
7676
AttributeReference("comment", StringType, nullable = false)())
7777
}
78+
79+
/**
80+
* Returned for the "CACHE TABLE tableName AS SELECT .." command.
81+
*/
82+
case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) extends Command

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
305305
Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context))
306306
case logical.CacheCommand(tableName, cache) =>
307307
Seq(execution.CacheCommand(tableName, cache)(context))
308+
case logical.CacheTableAsSelectCommand(tableName, plan) =>
309+
Seq(execution.CacheTableAsSelectCommand(tableName, plan))
308310
case _ => Nil
309311
}
310312
}

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,3 +166,22 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
166166
child.output.map(field => Row(field.name, field.dataType.toString, null))
167167
}
168168
}
169+
170+
/**
171+
* :: DeveloperApi ::
172+
*/
173+
@DeveloperApi
174+
case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan)
175+
extends LeafNode with Command {
176+
177+
override protected[sql] lazy val sideEffectResult = {
178+
sqlContext.catalog.registerTable(None, tableName, sqlContext.executePlan(plan).analyzed)
179+
sqlContext.cacheTable(tableName)
180+
// It does the caching eager.
181+
sqlContext.table(tableName).count
182+
Seq.empty[Row]
183+
}
184+
185+
override def output: Seq[Attribute] = Seq.empty
186+
187+
}

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,17 @@ class CachedTableSuite extends QueryTest {
119119
}
120120
assert(!TestSQLContext.isCached("testData"), "Table 'testData' should not be cached")
121121
}
122+
123+
test("CACHE TABLE tableName AS SELECT Star Table") {
124+
TestSQLContext.sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
125+
TestSQLContext.sql("SELECT * FROM testCacheTable WHERE key = 1").collect()
126+
assert(TestSQLContext.isCached("testCacheTable"), "Table 'testCacheTable' should be cached")
127+
TestSQLContext.uncacheTable("testCacheTable")
128+
}
129+
130+
test("'CACHE TABLE tableName AS SELECT ..'") {
131+
TestSQLContext.sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
132+
assert(TestSQLContext.isCached("testCacheTable"), "Table 'testCacheTable' should be cached")
133+
TestSQLContext.uncacheTable("testCacheTable")
134+
}
122135
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,13 @@ private[hive] object HiveQl {
229229
SetCommand(Some(key), Some(value))
230230
}
231231
} else if (sql.trim.toLowerCase.startsWith("cache table")) {
232-
CacheCommand(sql.trim.drop(12).trim, true)
232+
sql.trim.drop(12).trim.split(" ").toSeq match {
233+
case Seq(tableName) =>
234+
CacheCommand(tableName, true)
235+
case Seq(tableName,as, select@_*) =>
236+
CacheTableAsSelectCommand(tableName,
237+
createPlan(sql.trim.drop(12 + tableName.length() + as.length() + 2)))
238+
}
233239
} else if (sql.trim.toLowerCase.startsWith("uncache table")) {
234240
CacheCommand(sql.trim.drop(14).trim, false)
235241
} else if (sql.trim.toLowerCase.startsWith("add jar")) {
@@ -243,15 +249,7 @@ private[hive] object HiveQl {
243249
} else if (sql.trim.startsWith("!")) {
244250
ShellCommand(sql.drop(1))
245251
} else {
246-
val tree = getAst(sql)
247-
if (nativeCommands contains tree.getText) {
248-
NativeCommand(sql)
249-
} else {
250-
nodeToPlan(tree) match {
251-
case NativePlaceholder => NativeCommand(sql)
252-
case other => other
253-
}
254-
}
252+
createPlan(sql)
255253
}
256254
} catch {
257255
case e: Exception => throw new ParseException(sql, e)
@@ -262,6 +260,19 @@ private[hive] object HiveQl {
262260
""".stripMargin)
263261
}
264262
}
263+
264+
/** Creates LogicalPlan for a given HiveQL string. */
265+
def createPlan(sql: String) = {
266+
val tree = getAst(sql)
267+
if (nativeCommands contains tree.getText) {
268+
NativeCommand(sql)
269+
} else {
270+
nodeToPlan(tree) match {
271+
case NativePlaceholder => NativeCommand(sql)
272+
case other => other
273+
}
274+
}
275+
}
265276

266277
def parseDdl(ddl: String): Seq[Attribute] = {
267278
val tree =

0 commit comments

Comments
 (0)