Skip to content

Commit 51b1fe1

Browse files
chenghao-intelmarmbrus
authored andcommitted
[SPARK-4769] [SQL] CTAS does not work when reading from temporary tables
This is the code refactor and follow ups for #2570 Author: Cheng Hao <[email protected]> Closes #3336 from chenghao-intel/createtbl and squashes the following commits: 3563142 [Cheng Hao] remove the unused variable e215187 [Cheng Hao] eliminate the compiling warning 4f97f14 [Cheng Hao] fix bug in unittest 5d58812 [Cheng Hao] revert the API changes b85b620 [Cheng Hao] fix the regression of temp tabl not found in CTAS
1 parent 9443843 commit 51b1fe1

File tree

4 files changed

+49
-16
lines changed

4 files changed

+49
-16
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,15 +254,37 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
254254
* For example, because of a CREATE TABLE X AS statement.
255255
*/
256256
object CreateTables extends Rule[LogicalPlan] {
257+
import org.apache.hadoop.hive.ql.Context
258+
import org.apache.hadoop.hive.ql.parse.{QB, ASTNode, SemanticAnalyzer}
259+
257260
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
258261
// Wait until children are resolved.
259262
case p: LogicalPlan if !p.childrenResolved => p
260263

261-
case CreateTableAsSelect(db, tableName, child, allowExisting, extra) =>
264+
case CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) =>
262265
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
263266
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
264267

265-
CreateTableAsSelect(Some(databaseName), tableName, child, allowExisting, extra)
268+
// Get the CreateTableDesc from Hive SemanticAnalyzer
269+
val desc: Option[CreateTableDesc] = if (tableExists(Some(databaseName), tblName)) {
270+
None
271+
} else {
272+
val sa = new SemanticAnalyzer(hive.hiveconf) {
273+
override def analyzeInternal(ast: ASTNode) {
274+
// A hack to intercept the SemanticAnalyzer.analyzeInternal,
275+
// to ignore the SELECT clause of the CTAS
276+
val method = classOf[SemanticAnalyzer].getDeclaredMethod(
277+
"analyzeCreateTable", classOf[ASTNode], classOf[QB])
278+
method.setAccessible(true)
279+
method.invoke(this, ast, this.getQB)
280+
}
281+
}
282+
283+
sa.analyze(extra, new Context(hive.hiveconf))
284+
Some(sa.getQB().getTableDesc)
285+
}
286+
287+
CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, desc)
266288
}
267289
}
268290

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.hive
1919

2020
import org.apache.hadoop.hive.ql.parse.ASTNode
21+
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
2122

2223
import org.apache.spark.annotation.Experimental
2324
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -181,13 +182,20 @@ private[hive] trait HiveStrategies {
181182
execution.InsertIntoHiveTable(
182183
table, partition, planLater(child), overwrite)(hiveContext) :: Nil
183184
case logical.CreateTableAsSelect(
184-
Some(database), tableName, child, allowExisting, Some(extra: ASTNode)) =>
185-
CreateTableAsSelect(
185+
Some(database), tableName, child, allowExisting, Some(desc: CreateTableDesc)) =>
186+
execution.CreateTableAsSelect(
186187
database,
187188
tableName,
188189
child,
189190
allowExisting,
190-
extra) :: Nil
191+
Some(desc)) :: Nil
192+
case logical.CreateTableAsSelect(Some(database), tableName, child, allowExisting, None) =>
193+
execution.CreateTableAsSelect(
194+
database,
195+
tableName,
196+
child,
197+
allowExisting,
198+
None) :: Nil
191199
case _ => Nil
192200
}
193201
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package org.apache.spark.sql.hive.execution
1919

20-
import org.apache.hadoop.hive.ql.Context
21-
import org.apache.hadoop.hive.ql.parse.{SemanticAnalyzer, ASTNode}
20+
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
21+
2222
import org.apache.spark.annotation.Experimental
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.sql.catalyst.expressions.Row
@@ -35,8 +35,7 @@ import org.apache.spark.sql.hive.MetastoreRelation
3535
* @param query the query whose result will be insert into the new relation
3636
* @param allowExisting allow continue working if it's already exists, otherwise
3737
* raise exception
38-
* @param extra the extra information for this Operator, it should be the
39-
* ASTNode object for extracting the CreateTableDesc.
38+
* @param desc the CreateTableDesc, which may contains serde, storage handler etc.
4039
4140
*/
4241
@Experimental
@@ -45,21 +44,16 @@ case class CreateTableAsSelect(
4544
tableName: String,
4645
query: LogicalPlan,
4746
allowExisting: Boolean,
48-
extra: ASTNode) extends LeafNode with Command {
47+
desc: Option[CreateTableDesc]) extends LeafNode with Command {
4948

5049
def output = Seq.empty
5150

5251
private[this] def sc = sqlContext.asInstanceOf[HiveContext]
5352

5453
// A lazy computing of the metastoreRelation
5554
private[this] lazy val metastoreRelation: MetastoreRelation = {
56-
// Get the CreateTableDesc from Hive SemanticAnalyzer
57-
val sa = new SemanticAnalyzer(sc.hiveconf)
58-
59-
sa.analyze(extra, new Context(sc.hiveconf))
60-
val desc = sa.getQB().getTableDesc
6155
// Create Hive Table
62-
sc.catalog.createTable(database, tableName, query.output, allowExisting, Some(desc))
56+
sc.catalog.createTable(database, tableName, query.output, allowExisting, desc)
6357

6458
// Get the Metastore Relation
6559
sc.catalog.lookupRelation(Some(database), tableName, None) match {

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ class SQLQuerySuite extends QueryTest {
119119
checkAnswer(
120120
sql("SELECT f1.f2.f3 FROM nested"),
121121
1)
122+
checkAnswer(sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested"),
123+
Seq.empty[Row])
124+
checkAnswer(
125+
sql("SELECT * FROM test_ctas_1234"),
126+
sql("SELECT * FROM nested").collect().toSeq)
127+
128+
intercept[org.apache.hadoop.hive.ql.metadata.InvalidTableException] {
129+
sql("CREATE TABLE test_ctas_12345 AS SELECT * from notexists").collect()
130+
}
122131
}
123132

124133
test("test CTAS") {

0 commit comments

Comments
 (0)