Skip to content

Commit 1a664a0

Browse files
chenghao-intelmarmbrus
authored andcommitted
[SPARK-7411] [SQL] Support SerDe for HiveQl in CTAS
This is a follow up of #5876 and should be merged after #5876. Let's wait for unit testing result from Jenkins. Author: Cheng Hao <[email protected]> Closes #5963 from chenghao-intel/useIsolatedClient and squashes the following commits: f87ace6 [Cheng Hao] remove the TODO and add `resolved condition` for HiveTable a8260e8 [Cheng Hao] Update code as feedback f4e243f [Cheng Hao] remove the serde setting for SequenceFile d166afa [Cheng Hao] style issue d25a4aa [Cheng Hao] Add SerDe support for CTAS (cherry picked from commit e35d878) Signed-off-by: Michael Armbrust <[email protected]>
1 parent 8a9d234 commit 1a664a0

File tree

6 files changed

+390
-82
lines changed

6 files changed

+390
-82
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -407,64 +407,58 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
407407
* For example, because of a CREATE TABLE X AS statement.
408408
*/
409409
object CreateTables extends Rule[LogicalPlan] {
410-
import org.apache.hadoop.hive.ql.Context
411-
import org.apache.hadoop.hive.ql.parse.{ASTNode, QB, SemanticAnalyzer}
412-
413410
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
414411
// Wait until children are resolved.
415412
case p: LogicalPlan if !p.childrenResolved => p
413+
case p: LogicalPlan if p.resolved => p
414+
case p @ CreateTableAsSelect(table, child, allowExisting) =>
415+
val schema = if (table.schema.size > 0) {
416+
table.schema
417+
} else {
418+
child.output.map {
419+
attr => new HiveColumn(
420+
attr.name,
421+
HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
422+
}
423+
}
424+
425+
val desc = table.copy(schema = schema)
416426

417-
case CreateTableAsSelect(desc, child, allowExisting) =>
418-
if (hive.convertCTAS && !desc.serde.isDefined) {
427+
if (hive.convertCTAS && table.serde.isEmpty) {
419428
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
420429
// does not specify any storage format (file format and storage handler).
421-
if (desc.specifiedDatabase.isDefined) {
430+
if (table.specifiedDatabase.isDefined) {
422431
throw new AnalysisException(
423432
"Cannot specify database name in a CTAS statement " +
424-
"when spark.sql.hive.convertCTAS is set to true.")
433+
"when spark.sql.hive.convertCTAS is set to true.")
425434
}
426435

427436
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
428437
CreateTableUsingAsSelect(
429438
desc.name,
430-
conf.defaultDataSourceName,
439+
hive.conf.defaultDataSourceName,
431440
temporary = false,
432441
mode,
433442
options = Map.empty[String, String],
434443
child
435444
)
436445
} else {
437-
execution.CreateTableAsSelect(
438-
desc.copy(
439-
specifiedDatabase = Option(desc.specifiedDatabase.getOrElse(client.currentDatabase))),
440-
child,
441-
allowExisting)
442-
}
443-
444-
case p: LogicalPlan if p.resolved => p
445-
446-
case p @ CreateTableAsSelect(desc, child, allowExisting) =>
447-
val (dbName, tblName) = processDatabaseAndTableName(desc.database, desc.name)
448-
449-
if (hive.convertCTAS) {
450-
if (desc.specifiedDatabase.isDefined) {
451-
throw new AnalysisException(
452-
"Cannot specify database name in a CTAS statement " +
453-
"when spark.sql.hive.convertCTAS is set to true.")
446+
val desc = if (table.serde.isEmpty) {
447+
// add default serde
448+
table.copy(
449+
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
450+
} else {
451+
table
454452
}
455453

456-
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
457-
CreateTableUsingAsSelect(
458-
tblName,
459-
conf.defaultDataSourceName,
460-
temporary = false,
461-
mode,
462-
options = Map.empty[String, String],
463-
child
464-
)
465-
} else {
454+
val (dbName, tblName) =
455+
processDatabaseAndTableName(
456+
desc.specifiedDatabase.getOrElse(client.currentDatabase), desc.name)
457+
466458
execution.CreateTableAsSelect(
467-
desc,
459+
desc.copy(
460+
specifiedDatabase = Some(dbName),
461+
name = tblName),
468462
child,
469463
allowExisting)
470464
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 172 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@ import java.sql.Date
2222
import scala.collection.mutable.ArrayBuffer
2323

2424
import org.apache.hadoop.hive.conf.HiveConf
25-
import org.apache.hadoop.hive.ql.Context
25+
import org.apache.hadoop.hive.serde.serdeConstants
26+
import org.apache.hadoop.hive.ql.{ErrorMsg, Context}
2627
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, FunctionInfo}
2728
import org.apache.hadoop.hive.ql.lib.Node
28-
import org.apache.hadoop.hive.ql.metadata.Table
2929
import org.apache.hadoop.hive.ql.parse._
3030
import org.apache.hadoop.hive.ql.plan.PlanUtils
31-
import org.apache.spark.sql.AnalysisException
31+
import org.apache.hadoop.hive.ql.session.SessionState
3232

33+
import org.apache.spark.sql.AnalysisException
3334
import org.apache.spark.sql.catalyst.analysis._
3435
import org.apache.spark.sql.catalyst.expressions._
3536
import org.apache.spark.sql.catalyst.plans._
@@ -62,7 +63,13 @@ case class CreateTableAsSelect(
6263
allowExisting: Boolean) extends UnaryNode with Command {
6364

6465
override def output: Seq[Attribute] = Seq.empty[Attribute]
65-
override lazy val resolved: Boolean = tableDesc.specifiedDatabase.isDefined && childrenResolved
66+
override lazy val resolved: Boolean =
67+
tableDesc.specifiedDatabase.isDefined &&
68+
tableDesc.schema.size > 0 &&
69+
tableDesc.serde.isDefined &&
70+
tableDesc.inputFormat.isDefined &&
71+
tableDesc.outputFormat.isDefined &&
72+
childrenResolved
6673
}
6774

6875
/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
@@ -240,12 +247,23 @@ private[hive] object HiveQl {
240247
* Otherwise, there will be Null pointer exception,
241248
* when retrieving properties form HiveConf.
242249
*/
243-
val hContext = new Context(new HiveConf())
250+
val hContext = new Context(hiveConf)
244251
val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext))
245252
hContext.clear()
246253
node
247254
}
248255

256+
/**
257+
* Returns the HiveConf
258+
*/
259+
private[this] def hiveConf(): HiveConf = {
260+
val ss = SessionState.get() // SessionState is lazy initializaion, it can be null here
261+
if (ss == null) {
262+
new HiveConf()
263+
} else {
264+
ss.getConf
265+
}
266+
}
249267

250268
/** Returns a LogicalPlan for a given HiveQL string. */
251269
def parseSql(sql: String): LogicalPlan = hqlParser.parse(sql)
@@ -476,8 +494,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
476494
DropTable(tableName, ifExists.nonEmpty)
477495
// Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan"
478496
case Token("TOK_ANALYZE",
479-
Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
480-
isNoscan) =>
497+
Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
498+
isNoscan) =>
481499
// Reference:
482500
// https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables
483501
if (partitionSpec.nonEmpty) {
@@ -547,13 +565,15 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
547565
val (
548566
Some(tableNameParts) ::
549567
_ /* likeTable */ ::
568+
externalTable ::
550569
Some(query) ::
551570
allowExisting +:
552571
ignores) =
553572
getClauses(
554573
Seq(
555574
"TOK_TABNAME",
556575
"TOK_LIKETABLE",
576+
"EXTERNAL",
557577
"TOK_QUERY",
558578
"TOK_IFNOTEXISTS",
559579
"TOK_TABLECOMMENT",
@@ -576,43 +596,153 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
576596
children)
577597
val (db, tableName) = extractDbNameTableName(tableNameParts)
578598

579-
var tableDesc =
580-
HiveTable(
581-
specifiedDatabase = db,
582-
name = tableName,
583-
schema = Seq.empty,
584-
partitionColumns = Seq.empty,
585-
properties = Map.empty,
586-
serdeProperties = Map.empty,
587-
tableType = ManagedTable,
588-
location = None,
589-
inputFormat = None,
590-
outputFormat = None,
591-
serde = None)
592-
593-
// TODO: Handle all the cases here...
594-
children.foreach {
595-
case Token("TOK_TBLRCFILE", Nil) =>
596-
import org.apache.hadoop.hive.ql.io.{RCFileInputFormat, RCFileOutputFormat}
599+
// TODO add bucket support
600+
var tableDesc: HiveTable = HiveTable(
601+
specifiedDatabase = db,
602+
name = tableName,
603+
schema = Seq.empty[HiveColumn],
604+
partitionColumns = Seq.empty[HiveColumn],
605+
properties = Map[String, String](),
606+
serdeProperties = Map[String, String](),
607+
tableType = if (externalTable.isDefined) ExternalTable else ManagedTable,
608+
location = None,
609+
inputFormat = None,
610+
outputFormat = None,
611+
serde = None,
612+
viewText = None)
613+
614+
// default storage type abbriviation (e.g. RCFile, ORC, PARQUET etc.)
615+
val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
616+
// handle the default format for the storage type abbriviation
617+
tableDesc = if ("SequenceFile".equalsIgnoreCase(defaultStorageType)) {
618+
tableDesc.copy(
619+
inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
620+
outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"))
621+
} else if ("RCFile".equalsIgnoreCase(defaultStorageType)) {
622+
tableDesc.copy(
623+
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
624+
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
625+
serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE)))
626+
} else if ("ORC".equalsIgnoreCase(defaultStorageType)) {
627+
tableDesc.copy(
628+
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
629+
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
630+
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
631+
} else if ("PARQUET".equalsIgnoreCase(defaultStorageType)) {
632+
tableDesc.copy(
633+
inputFormat =
634+
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
635+
outputFormat =
636+
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
637+
serde =
638+
Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
639+
} else {
640+
tableDesc.copy(
641+
inputFormat =
642+
Option("org.apache.hadoop.mapred.TextInputFormat"),
643+
outputFormat =
644+
Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
645+
}
646+
647+
children.collect {
648+
case list @ Token("TOK_TABCOLLIST", _) =>
649+
val cols = BaseSemanticAnalyzer.getColumns(list, true)
650+
if (cols != null) {
651+
tableDesc = tableDesc.copy(
652+
schema = cols.map { field =>
653+
HiveColumn(field.getName, field.getType, field.getComment)
654+
})
655+
}
656+
case Token("TOK_TABLECOMMENT", child :: Nil) =>
657+
val comment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
658+
// TODO support the sql text
659+
tableDesc = tableDesc.copy(viewText = Option(comment))
660+
case Token("TOK_TABLEPARTCOLS", list @ Token("TOK_TABCOLLIST", _) :: Nil) =>
661+
val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
662+
if (cols != null) {
663+
tableDesc = tableDesc.copy(
664+
partitionColumns = cols.map { field =>
665+
HiveColumn(field.getName, field.getType, field.getComment)
666+
})
667+
}
668+
case Token("TOK_TABLEROWFORMAT", Token("TOK_SERDEPROPS", child :: Nil) :: Nil)=>
669+
val serdeParams = new java.util.HashMap[String, String]()
670+
child match {
671+
case Token("TOK_TABLEROWFORMATFIELD", rowChild1 :: rowChild2) =>
672+
val fieldDelim = BaseSemanticAnalyzer.unescapeSQLString (rowChild1.getText())
673+
serdeParams.put(serdeConstants.FIELD_DELIM, fieldDelim)
674+
serdeParams.put(serdeConstants.SERIALIZATION_FORMAT, fieldDelim)
675+
if (rowChild2.length > 1) {
676+
val fieldEscape = BaseSemanticAnalyzer.unescapeSQLString (rowChild2(0).getText)
677+
serdeParams.put(serdeConstants.ESCAPE_CHAR, fieldEscape)
678+
}
679+
case Token("TOK_TABLEROWFORMATCOLLITEMS", rowChild :: Nil) =>
680+
val collItemDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
681+
serdeParams.put(serdeConstants.COLLECTION_DELIM, collItemDelim)
682+
case Token("TOK_TABLEROWFORMATMAPKEYS", rowChild :: Nil) =>
683+
val mapKeyDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
684+
serdeParams.put(serdeConstants.MAPKEY_DELIM, mapKeyDelim)
685+
case Token("TOK_TABLEROWFORMATLINES", rowChild :: Nil) =>
686+
val lineDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
687+
if (!(lineDelim == "\n") && !(lineDelim == "10")) {
688+
throw new AnalysisException(
689+
SemanticAnalyzer.generateErrorMessage(
690+
rowChild,
691+
ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg))
692+
}
693+
serdeParams.put(serdeConstants.LINE_DELIM, lineDelim)
694+
case Token("TOK_TABLEROWFORMATNULL", rowChild :: Nil) =>
695+
val nullFormat = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
696+
// TODO support the nullFormat
697+
case _ => assert(false)
698+
}
699+
tableDesc = tableDesc.copy(
700+
serdeProperties = tableDesc.serdeProperties ++ serdeParams)
701+
case Token("TOK_TABLELOCATION", child :: Nil) =>
702+
var location = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
703+
location = EximUtil.relativeToAbsolutePath(hiveConf, location)
704+
tableDesc = tableDesc.copy(location = Option(location))
705+
case Token("TOK_TABLESERIALIZER", child :: Nil) =>
597706
tableDesc = tableDesc.copy(
598-
outputFormat = Option(classOf[RCFileOutputFormat].getName),
599-
inputFormat = Option(classOf[RCFileInputFormat[_, _]].getName))
707+
serde = Option(BaseSemanticAnalyzer.unescapeSQLString(child.getChild(0).getText)))
708+
if (child.getChildCount == 2) {
709+
val serdeParams = new java.util.HashMap[String, String]()
710+
BaseSemanticAnalyzer.readProps(
711+
(child.getChild(1).getChild(0)).asInstanceOf[ASTNode], serdeParams)
712+
tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams)
713+
}
714+
case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) =>
715+
throw new SemanticException(
716+
"Unrecognized file format in STORED AS clause:${child.getText}")
600717

718+
case Token("TOK_TBLRCFILE", Nil) =>
719+
tableDesc = tableDesc.copy(
720+
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
721+
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
601722
if (tableDesc.serde.isEmpty) {
602723
tableDesc = tableDesc.copy(
603724
serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
604725
}
726+
605727
case Token("TOK_TBLORCFILE", Nil) =>
606728
tableDesc = tableDesc.copy(
607729
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
608-
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
609-
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
730+
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
731+
if (tableDesc.serde.isEmpty) {
732+
tableDesc = tableDesc.copy(
733+
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
734+
}
610735

611736
case Token("TOK_TBLPARQUETFILE", Nil) =>
612737
tableDesc = tableDesc.copy(
613-
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
614-
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
615-
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
738+
inputFormat =
739+
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
740+
outputFormat =
741+
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
742+
if (tableDesc.serde.isEmpty) {
743+
tableDesc = tableDesc.copy(
744+
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
745+
}
616746

617747
case Token("TOK_TABLESERIALIZER",
618748
Token("TOK_SERDENAME", Token(serdeName, Nil) :: otherProps) :: Nil) =>
@@ -627,13 +757,20 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
627757

628758
case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
629759
tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list))
630-
631-
case _ =>
760+
case list @ Token("TOK_TABLEFILEFORMAT", _) =>
761+
tableDesc = tableDesc.copy(
762+
inputFormat =
763+
Option(BaseSemanticAnalyzer.unescapeSQLString(list.getChild(0).getText)),
764+
outputFormat =
765+
Option(BaseSemanticAnalyzer.unescapeSQLString(list.getChild(1).getText)))
766+
case Token("TOK_STORAGEHANDLER", _) =>
767+
throw new AnalysisException(ErrorMsg.CREATE_NON_NATIVE_AS.getMsg())
768+
case _ => // Unsupport features
632769
}
633770

634771
CreateTableAsSelect(tableDesc, nodeToPlan(query), allowExisting != None)
635772

636-
// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
773+
// If its not a "CTAS" like above then take it as a native command
637774
case Token("TOK_CREATETABLE", _) => NativePlaceholder
638775

639776
// Support "TRUNCATE TABLE table_name [PARTITION partition_spec]"

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,12 @@ private[hive] class ClientWrapper(
225225
table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)))
226226
table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) }
227227
table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) }
228+
229+
// set owner
230+
qlTable.setOwner(conf.getUser)
231+
// set create time
232+
qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
233+
228234
version match {
229235
case hive.v12 =>
230236
table.location.map(new URI(_)).foreach(u => qlTable.call[URI, Unit]("setDataLocation", u))

0 commit comments

Comments
 (0)