Skip to content

Commit d25a4aa

Browse files
Add SerDe support for CTAS
1 parent 7d0f172 commit d25a4aa

File tree

6 files changed

+395
-83
lines changed

6 files changed

+395
-83
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 33 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -407,64 +407,60 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
407407
* For example, because of a CREATE TABLE X AS statement.
408408
*/
409409
object CreateTables extends Rule[LogicalPlan] {
410-
import org.apache.hadoop.hive.ql.Context
411-
import org.apache.hadoop.hive.ql.parse.{ASTNode, QB, SemanticAnalyzer}
412-
413410
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
414411
// Wait until children are resolved.
415412
case p: LogicalPlan if !p.childrenResolved => p
416-
417-
case CreateTableAsSelect(desc, child, allowExisting) =>
418-
if (hive.convertCTAS && !desc.serde.isDefined) {
419-
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
420-
// does not specify any storage format (file format and storage handler).
421-
if (desc.specifiedDatabase.isDefined) {
422-
throw new AnalysisException(
423-
"Cannot specify database name in a CTAS statement " +
424-
"when spark.sql.hive.convertCTAS is set to true.")
425-
}
426-
427-
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
428-
CreateTableUsingAsSelect(
429-
desc.name,
430-
conf.defaultDataSourceName,
431-
temporary = false,
432-
mode,
433-
options = Map.empty[String, String],
434-
child
435-
)
413+
case p: LogicalPlan if p.resolved => p
414+
case p @ CreateTableAsSelect(table, child, allowExisting) =>
415+
val schema = if (table.schema.size > 0) {
416+
table.schema
436417
} else {
437-
execution.CreateTableAsSelect(
438-
desc.copy(
439-
specifiedDatabase = Option(desc.specifiedDatabase.getOrElse(client.currentDatabase))),
440-
child,
441-
allowExisting)
418+
child.output.map {
419+
attr => new HiveColumn(
420+
attr.name,
421+
HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
422+
}
442423
}
443424

444-
case p: LogicalPlan if p.resolved => p
425+
val desc = table.copy(schema = schema)
445426

446-
case p @ CreateTableAsSelect(desc, child, allowExisting) =>
447-
val (dbName, tblName) = processDatabaseAndTableName(desc.database, desc.name)
427+
// This is a hack, we only take the RC, ORC and Parquet as specific storage
428+
// otherwise, we will convert it into Parquet2 when hive.convertCTAS specified
429+
val specificStorage = (table.inputFormat.map(format => {
430+
// org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat => Parquet
431+
// org.apache.hadoop.hive.ql.io.orc.OrcInputFormat => Orc
432+
// org.apache.hadoop.hive.ql.io.RCFileInputFormat => RCFile
433+
// parquet.hive.DeprecatedParquetInputFormat => Parquet
434+
// TODO configurable?
435+
format.contains("Orc") || format.contains("Parquet") || format.contains("RCFile")
436+
}).getOrElse(false))
448437

449-
if (hive.convertCTAS) {
450-
if (desc.specifiedDatabase.isDefined) {
438+
if (hive.convertCTAS && !specificStorage) {
439+
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
440+
// does not specify any storage format (file format and storage handler).
441+
if (table.specifiedDatabase.isDefined) {
451442
throw new AnalysisException(
452443
"Cannot specify database name in a CTAS statement " +
453-
"when spark.sql.hive.convertCTAS is set to true.")
444+
"when spark.sql.hive.convertCTAS is set to true.")
454445
}
455446

456447
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
457448
CreateTableUsingAsSelect(
458-
tblName,
459-
conf.defaultDataSourceName,
449+
desc.name,
450+
hive.conf.defaultDataSourceName,
460451
temporary = false,
461452
mode,
462453
options = Map.empty[String, String],
463454
child
464455
)
465456
} else {
457+
val (dbName, tblName) =
458+
processDatabaseAndTableName(
459+
table.specifiedDatabase.getOrElse(client.currentDatabase), table.name)
466460
execution.CreateTableAsSelect(
467-
desc,
461+
desc.copy(
462+
specifiedDatabase = Some(dbName),
463+
name = tblName),
468464
child,
469465
allowExisting)
470466
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 176 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ import java.sql.Date
2222
import scala.collection.mutable.ArrayBuffer
2323

2424
import org.apache.hadoop.hive.conf.HiveConf
25-
import org.apache.hadoop.hive.ql.Context
25+
import org.apache.hadoop.hive.serde.serdeConstants
26+
import org.apache.hadoop.hive.ql.{ErrorMsg, Context}
2627
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, FunctionInfo}
2728
import org.apache.hadoop.hive.ql.lib.Node
28-
import org.apache.hadoop.hive.ql.metadata.Table
2929
import org.apache.hadoop.hive.ql.parse._
30-
import org.apache.hadoop.hive.ql.plan.PlanUtils
31-
import org.apache.spark.sql.AnalysisException
30+
import org.apache.hadoop.hive.ql.session.SessionState
3231

32+
import org.apache.spark.sql.{AnalysisException, SparkSQLParser}
3333
import org.apache.spark.sql.catalyst.analysis._
3434
import org.apache.spark.sql.catalyst.expressions._
3535
import org.apache.spark.sql.catalyst.plans._
@@ -62,7 +62,8 @@ case class CreateTableAsSelect(
6262
allowExisting: Boolean) extends UnaryNode with Command {
6363

6464
override def output: Seq[Attribute] = Seq.empty[Attribute]
65-
override lazy val resolved: Boolean = tableDesc.specifiedDatabase.isDefined && childrenResolved
65+
override lazy val resolved: Boolean =
66+
tableDesc.specifiedDatabase.isDefined && tableDesc.schema.size > 0 && childrenResolved
6667
}
6768

6869
/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
@@ -240,12 +241,24 @@ private[hive] object HiveQl {
240241
* Otherwise, there will be Null pointer exception,
241242
* when retrieving properties form HiveConf.
242243
*/
243-
val hContext = new Context(new HiveConf())
244+
val hContext = new Context(hiveConf)
244245
val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext))
245246
hContext.clear()
246247
node
247248
}
248249

250+
/**
251+
* Returns the HiveConf
252+
* TODO get it from HiveContext?
253+
*/
254+
private[this] def hiveConf(): HiveConf = {
255+
val ss = SessionState.get() // SessionState is lazy initializaion, it can be null here
256+
if (ss == null) {
257+
new HiveConf()
258+
} else {
259+
ss.getConf
260+
}
261+
}
249262

250263
/** Returns a LogicalPlan for a given HiveQL string. */
251264
def parseSql(sql: String): LogicalPlan = hqlParser.parse(sql)
@@ -476,8 +489,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
476489
DropTable(tableName, ifExists.nonEmpty)
477490
// Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan"
478491
case Token("TOK_ANALYZE",
479-
Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
480-
isNoscan) =>
492+
Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
493+
isNoscan) =>
481494
// Reference:
482495
// https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables
483496
if (partitionSpec.nonEmpty) {
@@ -547,13 +560,15 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
547560
val (
548561
Some(tableNameParts) ::
549562
_ /* likeTable */ ::
563+
externalTable ::
550564
Some(query) ::
551565
allowExisting +:
552566
ignores) =
553567
getClauses(
554568
Seq(
555569
"TOK_TABNAME",
556570
"TOK_LIKETABLE",
571+
"EXTERNAL",
557572
"TOK_QUERY",
558573
"TOK_IFNOTEXISTS",
559574
"TOK_TABLECOMMENT",
@@ -576,43 +591,156 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
576591
children)
577592
val (db, tableName) = extractDbNameTableName(tableNameParts)
578593

579-
var tableDesc =
580-
HiveTable(
581-
specifiedDatabase = db,
582-
name = tableName,
583-
schema = Seq.empty,
584-
partitionColumns = Seq.empty,
585-
properties = Map.empty,
586-
serdeProperties = Map.empty,
587-
tableType = ManagedTable,
588-
location = None,
589-
inputFormat = None,
590-
outputFormat = None,
591-
serde = None)
592-
593-
// TODO: Handle all the cases here...
594-
children.foreach {
595-
case Token("TOK_TBLRCFILE", Nil) =>
596-
import org.apache.hadoop.hive.ql.io.{RCFileInputFormat, RCFileOutputFormat}
594+
// TODO add bucket support
595+
var tableDesc: HiveTable = HiveTable(
596+
specifiedDatabase = db,
597+
name = tableName,
598+
schema = Seq.empty[HiveColumn],
599+
partitionColumns = Seq.empty[HiveColumn],
600+
properties = Map[String, String](),
601+
serdeProperties = Map[String, String](),
602+
tableType = if (externalTable.isDefined) ExternalTable else ManagedTable,
603+
location = None,
604+
inputFormat = None,
605+
outputFormat = None,
606+
serde = None,
607+
viewText = None)
608+
609+
// default serde & input/output format
610+
tableDesc = if ("SequenceFile".equalsIgnoreCase(
611+
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
612+
tableDesc.copy(
613+
inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
614+
outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"),
615+
serde = Option("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
616+
} else if ("RCFile".equalsIgnoreCase(
617+
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
618+
tableDesc.copy(
619+
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
620+
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
621+
serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE)))
622+
} else if ("ORC".equalsIgnoreCase(
623+
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
624+
tableDesc.copy(
625+
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
626+
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
627+
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
628+
} else if ("PARQUET".equalsIgnoreCase(
629+
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
630+
tableDesc.copy(
631+
inputFormat =
632+
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
633+
outputFormat =
634+
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
635+
serde =
636+
Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
637+
} else {
638+
tableDesc.copy(
639+
inputFormat =
640+
Option("org.apache.hadoop.mapred.TextInputFormat"),
641+
outputFormat =
642+
Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
643+
}
644+
645+
children.collect {
646+
case list @ Token("TOK_TABCOLLIST", _) =>
647+
val cols = BaseSemanticAnalyzer.getColumns(list, true)
648+
if (cols != null) {
649+
tableDesc = tableDesc.copy(
650+
schema = cols.map { field =>
651+
HiveColumn(field.getName, field.getType, field.getComment)
652+
})
653+
}
654+
case Token("TOK_TABLECOMMENT", child :: Nil) =>
655+
val comment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
656+
// TODO support the sql text
657+
tableDesc = tableDesc.copy(viewText = Option(comment))
658+
case Token("TOK_TABLEPARTCOLS", list @ Token("TOK_TABCOLLIST", _) :: Nil) =>
659+
val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
660+
if (cols != null) {
661+
tableDesc = tableDesc.copy(
662+
partitionColumns = cols.map { field =>
663+
HiveColumn(field.getName, field.getType, field.getComment)
664+
})
665+
}
666+
case Token("TOK_TABLEROWFORMAT", Token("TOK_SERDEPROPS", child :: Nil) :: Nil)=>
667+
val serdeParams = new java.util.HashMap[String, String]()
668+
child match {
669+
case Token("TOK_TABLEROWFORMATFIELD", rowChild1 :: rowChild2) =>
670+
val fieldDelim = BaseSemanticAnalyzer.unescapeSQLString (rowChild1.getText())
671+
serdeParams.put(serdeConstants.FIELD_DELIM, fieldDelim)
672+
serdeParams.put(serdeConstants.SERIALIZATION_FORMAT, fieldDelim)
673+
if (rowChild2.length > 1) {
674+
val fieldEscape = BaseSemanticAnalyzer.unescapeSQLString (rowChild2(0).getText)
675+
serdeParams.put(serdeConstants.ESCAPE_CHAR, fieldEscape)
676+
}
677+
case Token("TOK_TABLEROWFORMATCOLLITEMS", rowChild :: Nil) =>
678+
val collItemDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
679+
serdeParams.put(serdeConstants.COLLECTION_DELIM, collItemDelim)
680+
case Token("TOK_TABLEROWFORMATMAPKEYS", rowChild :: Nil) =>
681+
val mapKeyDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
682+
serdeParams.put(serdeConstants.MAPKEY_DELIM, mapKeyDelim)
683+
case Token("TOK_TABLEROWFORMATLINES", rowChild :: Nil) =>
684+
val lineDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
685+
if (!(lineDelim == "\n") && !(lineDelim == "10")) {
686+
throw new AnalysisException(
687+
SemanticAnalyzer.generateErrorMessage(
688+
rowChild,
689+
ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg))
690+
}
691+
serdeParams.put(serdeConstants.LINE_DELIM, lineDelim)
692+
case Token("TOK_TABLEROWFORMATNULL", rowChild :: Nil) =>
693+
val nullFormat = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
694+
// TODO support the nullFormat
695+
case _ => assert(false)
696+
}
597697
tableDesc = tableDesc.copy(
598-
outputFormat = Option(classOf[RCFileOutputFormat].getName),
599-
inputFormat = Option(classOf[RCFileInputFormat[_, _]].getName))
698+
serdeProperties = tableDesc.serdeProperties ++ serdeParams)
699+
case Token("TOK_TABLELOCATION", child :: Nil) =>
700+
var location = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
701+
location = EximUtil.relativeToAbsolutePath(hiveConf, location)
702+
tableDesc = tableDesc.copy(location = Option(location))
703+
case Token("TOK_TABLESERIALIZER", child :: Nil) =>
704+
tableDesc = tableDesc.copy(
705+
serde = Option(BaseSemanticAnalyzer.unescapeSQLString(child.getChild(0).getText)))
706+
if (child.getChildCount == 2) {
707+
val serdeParams = new java.util.HashMap[String, String]()
708+
BaseSemanticAnalyzer.readProps(
709+
(child.getChild(1).getChild(0)).asInstanceOf[ASTNode], serdeParams)
710+
tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams)
711+
}
712+
case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) =>
713+
throw new SemanticException(
714+
"Unrecognized file format in STORED AS clause:${child.getText}")
600715

716+
case Token("TOK_TBLRCFILE", Nil) =>
717+
tableDesc = tableDesc.copy(
718+
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
719+
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
601720
if (tableDesc.serde.isEmpty) {
602721
tableDesc = tableDesc.copy(
603722
serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
604723
}
724+
605725
case Token("TOK_TBLORCFILE", Nil) =>
606726
tableDesc = tableDesc.copy(
607727
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
608-
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
609-
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
728+
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
729+
if (tableDesc.serde.isEmpty) {
730+
tableDesc = tableDesc.copy(
731+
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
732+
}
610733

611734
case Token("TOK_TBLPARQUETFILE", Nil) =>
612735
tableDesc = tableDesc.copy(
613-
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
614-
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
615-
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
736+
inputFormat =
737+
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
738+
outputFormat =
739+
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
740+
if (tableDesc.serde.isEmpty) {
741+
tableDesc = tableDesc.copy(
742+
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
743+
}
616744

617745
case Token("TOK_TABLESERIALIZER",
618746
Token("TOK_SERDENAME", Token(serdeName, Nil) :: otherProps) :: Nil) =>
@@ -627,13 +755,26 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
627755

628756
case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
629757
tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list))
758+
case list @ Token("TOK_TABLEFILEFORMAT", _) =>
759+
tableDesc = tableDesc.copy(
760+
inputFormat =
761+
Option(BaseSemanticAnalyzer.unescapeSQLString(list.getChild(0).getText)),
762+
outputFormat =
763+
Option(BaseSemanticAnalyzer.unescapeSQLString(list.getChild(1).getText)))
764+
case Token("TOK_STORAGEHANDLER", _) =>
765+
throw new AnalysisException(ErrorMsg.CREATE_NON_NATIVE_AS.getMsg())
766+
case _ => // Unsupport features
767+
}
630768

631-
case _ =>
769+
if (tableDesc.serde.isEmpty) {
770+
// add default serde
771+
tableDesc = tableDesc.copy(
772+
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
632773
}
633774

634775
CreateTableAsSelect(tableDesc, nodeToPlan(query), allowExisting != None)
635776

636-
// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
777+
// If its not a "CTAS" like above then take it as a native command
637778
case Token("TOK_CREATETABLE", _) => NativePlaceholder
638779

639780
// Support "TRUNCATE TABLE table_name [PARTITION partition_spec]"

0 commit comments

Comments
 (0)