Skip to content

address review comments #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2446,7 +2446,18 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging

/**
* Type to keep track of table clauses:
* (partTransforms, partCols, bucketSpec, properties, options, location, comment, serde).
* - partition transforms
* - partition columns
* - bucketSpec
* - properties
* - options
* - location
* - comment
* - serde
*
* Note: Partition transforms are based on existing table schema definition. It can be simple
* column names, or functions like `year(date_col)`. Partition columns are column names with data
* types like `i INT`, which should be appended to the existing table schema.
*/
type TableClauses = (
Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String],
Expand Down Expand Up @@ -2773,7 +2784,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
(ctx.fileFormat, ctx.storageHandler) match {
// Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format
case (c: TableFileFormatContext, null) =>
SerdeInfo(formatClasses = Some((string(c.inFmt), string(c.outFmt))))
SerdeInfo(formatClasses = Some(FormatClasses(string(c.inFmt), string(c.outFmt))))
// Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO
case (c: GenericFileFormatContext, null) =>
SerdeInfo(storedAs = Some(c.identifier.getText))
Expand Down Expand Up @@ -2802,8 +2813,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* [NULL DEFINED AS char]
* }}}
*/
def visitRowFormat(
ctx: RowFormatContext): SerdeInfo = withOrigin(ctx) {
def visitRowFormat(ctx: RowFormatContext): SerdeInfo = withOrigin(ctx) {
ctx match {
case serde: RowFormatSerdeContext => visitRowFormatSerde(serde)
case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited)
Expand Down Expand Up @@ -2923,16 +2933,19 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
val location = visitLocationSpecList(ctx.locationSpec())
val (cleanedOptions, newLocation) = cleanTableOptions(ctx, options, location)
val comment = visitCommentSpecList(ctx.commentSpec())

validateRowFormatFileFormat(
ctx.rowFormat.asScala.toSeq, ctx.createFileFormat.asScala.toSeq, ctx)
val fileFormatSerdeInfo = ctx.createFileFormat.asScala.map(visitCreateFileFormat)
val rowFormatSerdeInfo = ctx.rowFormat.asScala.map(visitRowFormat)
val serdeInfo =
(fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption((x, y) => x.merge(y))

val serdeInfo = getSerdeInfo(ctx.rowFormat.asScala, ctx.createFileFormat.asScala, ctx)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make a method for it, so that we can reuse it in SparkSqlAstBuilder

(partTransforms, partCols, bucketSpec, cleanedProperties, cleanedOptions, newLocation, comment,
serdeInfo)
serdeInfo)
}

protected def getSerdeInfo(
rowFormatCtx: Seq[RowFormatContext],
createFileFormatCtx: Seq[CreateFileFormatContext],
ctx: ParserRuleContext): Option[SerdeInfo] = {
validateRowFormatFileFormat(rowFormatCtx, createFileFormatCtx, ctx)
val rowFormatSerdeInfo = rowFormatCtx.map(visitRowFormat)
val fileFormatSerdeInfo = createFileFormatCtx.map(visitCreateFileFormat)
(fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption((l, r) => l.merge(r))
}

private def partitionExpressions(
Expand All @@ -2943,8 +2956,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
if (partCols.nonEmpty) {
val references = partTransforms.map(_.describe()).mkString(", ")
val columns = partCols
.map(field => s"${field.name} ${field.dataType.simpleString}")
.mkString(", ")
.map(field => s"${field.name} ${field.dataType.simpleString}")
.mkString(", ")
operationNotAllowed(
s"""PARTITION BY: Cannot mix partition expressions and partition columns:
|Expressions: $references
Expand All @@ -2966,12 +2979,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* Expected format:
* {{{
* CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
* USING table_provider
* [USING table_provider]
* create_table_clauses
* [[AS] select_statement];
*
* create_table_clauses (order insensitive):
* partition_clauses
* [PARTITIONED BY (partition_fields)]
* [OPTIONS table_property_list]
* [ROW FORMAT row_format]
* [STORED AS file_format]
Expand All @@ -2982,15 +2995,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* [LOCATION path]
* [COMMENT table_comment]
* [TBLPROPERTIES (property_name=property_value, ...)]
* partition_clauses:
* [PARTITIONED BY (col_name, transform(col_name), transform(constant, col_name), ...)] |
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
*
* partition_fields:
* col_name, transform(col_name), transform(constant, col_name), ... |
* col_name data_type [NOT NULL] [COMMENT col_comment], ...
* }}}
*/
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)

val columns = Option(ctx.colTypeList()).map(visitColTypeList)
val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
visitCreateTableClauses(ctx.createTableClauses())
Expand All @@ -2999,18 +3013,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
}

val schema = columns
.map(dataCols => StructType(dataCols ++ partCols))
.getOrElse(StructType(partCols))
if (temp) {
val asSelect = if (ctx.query == null) "" else " AS ..."
operationNotAllowed(
s"CREATE TEMPORARY TABLE ...$asSelect, use CREATE TEMPORARY VIEW instead", ctx)
}

val partitioning = partitionExpressions(partTransforms, partCols, ctx)

Option(ctx.query).map(plan) match {
case Some(_) if temp =>
operationNotAllowed(
"CREATE TEMPORARY TABLE ... AS ..., use CREATE TEMPORARY VIEW instead",
ctx)

case Some(_) if columns.isDefined =>
case Some(_) if columns.nonEmpty =>
operationNotAllowed(
"Schema may not be specified in a Create Table As Select (CTAS) statement",
ctx)
Expand All @@ -3026,10 +3038,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
table, query, partitioning, bucketSpec, properties, provider, options, location, comment,
writeOptions = Map.empty, serdeInfo, external = external, ifNotExists = ifNotExists)

case None if temp =>
operationNotAllowed("CREATE TEMPORARY TABLE", ctx)

case _ =>
// Note: table schema includes both the table columns list and the partition columns
// with data type.
val schema = StructType(columns ++ partCols)
CreateTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
options, location, comment, serdeInfo, external = external, ifNotExists = ifNotExists)
}
Expand All @@ -3041,28 +3053,33 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* Expected format:
* {{{
* [CREATE OR] REPLACE TABLE [db_name.]table_name
* USING table_provider
* [USING table_provider]
* replace_table_clauses
* [[AS] select_statement];
*
* replace_table_clauses (order insensitive):
* [OPTIONS table_property_list]
* [PARTITIONED BY (col_name, transform(col_name), transform(constant, col_name), ...)]
* [PARTITIONED BY (partition_fields)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
* ]
* [LOCATION path]
* [COMMENT table_comment]
* [TBLPROPERTIES (property_name=property_value, ...)]
*
* partition_fields:
* col_name, transform(col_name), transform(constant, col_name), ... |
* col_name data_type [NOT NULL] [COMMENT col_comment], ...
* }}}
*/
override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitReplaceTableHeader(ctx.replaceTableHeader)
val orCreate = ctx.replaceTableHeader().CREATE() != null

if (temp) {
operationNotAllowed(
"CREATE OR REPLACE TEMPORARY TABLE ..., use CREATE TEMPORARY VIEW instead",
ctx)
val action = if (orCreate) "CREATE OR REPLACE" else "REPLACE"
operationNotAllowed(s"$action TEMPORARY TABLE ..., use $action TEMPORARY VIEW instead.", ctx)
}

if (external) {
Expand All @@ -3075,24 +3092,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging

val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
visitCreateTableClauses(ctx.createTableClauses())
val columns = Option(ctx.colTypeList()).map(visitColTypeList)
val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)

if (provider.isDefined && serdeInfo.isDefined) {
operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
}

val schema = columns.map(dataCols => StructType(dataCols ++ partCols))
val partitioning = partitionExpressions(partTransforms, partCols, ctx)
val orCreate = ctx.replaceTableHeader().CREATE() != null

Option(ctx.query).map(plan) match {
case Some(_) if schema.isDefined =>
case Some(_) if columns.nonEmpty =>
operationNotAllowed(
"Schema may not be specified in a Replace Table As Select (RTAS) statement",
ctx)

case Some(_) if partCols.nonEmpty =>
// non-reference partition columns are not allowed because schema can't be specified
operationNotAllowed(
"Partition column types may not be specified in Replace Table As Select (RTAS)",
ctx)
Expand All @@ -3103,9 +3119,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
orCreate = orCreate)

case _ =>
ReplaceTableStatement(table, schema.getOrElse(new StructType), partitioning,
Copy link
Author

@cloud-fan cloud-fan Nov 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, when table columns list is not specified, we ignore the partition columns with data type. It was fine before syntax merging, as there was no partition columns with data type in REPLACE TABLE. But now it's better to make it consistent with CREATE TABLE. I also added test to check it: https://github.com/rdblue/spark/pull/8/files#diff-b9e91f767e5562861565b0ce78759af3bcb7fff405a81e928894641147db2ae4R293

bucketSpec, properties, provider, options, location, comment, serdeInfo,
orCreate = orCreate)
// Note: table schema includes both the table columns list and the partition columns
// with data type.
val schema = StructType(columns ++ partCols)
ReplaceTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
options, location, comment, serdeInfo, orCreate = orCreate)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.ViewType
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, FunctionResource}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
Expand Down Expand Up @@ -58,25 +59,25 @@ abstract class ParsedStatement extends LogicalPlan {
*/
case class SerdeInfo(
storedAs: Option[String] = None,
formatClasses: Option[(String, String)] = None,
formatClasses: Option[FormatClasses] = None,
serde: Option[String] = None,
serdeProperties: Map[String, String] = Map.empty) {
// this uses assertions because validation is done in validateRowFormatFileFormat etc.
assert(storedAs.isEmpty || formatClasses.isEmpty,
s"Conflicting STORED AS $storedAs and INPUTFORMAT/OUTPUTFORMAT $formatClasses values")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bit weird to print scala Option directly.

"Cannot specify both STORED AS and INPUTFORMAT/OUTPUTFORMAT")

def describe: String = {
val serdeString = if (serde.isDefined || serdeProperties.nonEmpty) {
"ROW FORMAT" + serde.map(sd => s" SERDE $sd").getOrElse(" DELIMITED")
"ROW FORMAT " + serde.map(sd => s"SERDE $sd").getOrElse("DELIMITED")
} else {
""
}

this match {
case SerdeInfo(Some(format), _, _, _) =>
s"STORED AS $format $serdeString"
case SerdeInfo(_, Some((inFormat, outFormat)), _, _) =>
s"INPUTFORMAT $inFormat OUTPUTFORMAT $outFormat $serdeString"
case SerdeInfo(Some(storedAs), _, _, _) =>
s"STORED AS $storedAs $serdeString"
case SerdeInfo(_, Some(formatClasses), _, _) =>
s"STORED AS $formatClasses $serdeString"
case _ =>
serdeString
}
Expand All @@ -85,7 +86,7 @@ case class SerdeInfo(
def merge(other: SerdeInfo): SerdeInfo = {
def getOnly[T](desc: String, left: Option[T], right: Option[T]): Option[T] = {
(left, right) match {
case (Some(l), Some(r)) if l != r =>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise the assert below is useless.

case (Some(l), Some(r)) =>
assert(l == r, s"Conflicting $desc values: $l != $r")
left
case (Some(_), _) =>
Expand All @@ -97,6 +98,7 @@ case class SerdeInfo(
}
}

SerdeInfo.checkSerdePropMerging(serdeProperties, other.serdeProperties)
SerdeInfo(
getOnly("STORED AS", storedAs, other.storedAs),
getOnly("INPUTFORMAT/OUTPUTFORMAT", formatClasses, other.formatClasses),
Expand All @@ -105,9 +107,25 @@ case class SerdeInfo(
}
}

case class FormatClasses(input: String, output: String) {
override def toString: String = s"INPUTFORMAT $input OUTPUTFORMAT $output"
}

object SerdeInfo {
val empty: SerdeInfo = {
SerdeInfo(None, None, None, Map.empty)
val empty: SerdeInfo = SerdeInfo(None, None, None, Map.empty)

def checkSerdePropMerging(
props1: Map[String, String], props2: Map[String, String]): Unit = {
val conflictKeys = props1.keySet.intersect(props2.keySet)
if (conflictKeys.nonEmpty) {
throw new UnsupportedOperationException(
s"""
|Cannot safely merge SERDEPROPERTIES:
|${props1.map { case (k, v) => s"$k=$v" }.mkString("{", ",", "}")}
|${props2.map { case (k, v) => s"$k=$v" }.mkString("{", ",", "}")}
|The conflict keys: ${conflictKeys.mkString(", ")}
|""".stripMargin)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,26 +325,30 @@ private[sql] object CatalogV2Util {
options ++ // to make the transition to the "option." prefix easier, add both
options.map { case (key, value) => TableCatalog.OPTION_PREFIX + key -> value } ++
convertToProperties(serdeInfo) ++
(if (external) Map(TableCatalog.PROP_EXTERNAL -> "true") else Map.empty) ++
(if (external) Some(TableCatalog.PROP_EXTERNAL -> "true") else None) ++
provider.map(TableCatalog.PROP_PROVIDER -> _) ++
comment.map(TableCatalog.PROP_COMMENT -> _) ++
location.map(TableCatalog.PROP_LOCATION -> _)
}

/**
* Converts Hive Serde info to table properties. The mapped property keys are:
* - INPUTFORMAT/OUTPUTFORMAT: hive.input/output-format
* - STORED AS: hive.stored-as
* - ROW FORMAT SERDE: hive.serde
* - SERDEPROPERTIES: add "option." prefix
*/
private def convertToProperties(serdeInfo: Option[SerdeInfo]): Map[String, String] = {
serdeInfo match {
case Some(s) =>
((s.formatClasses match {
case Some((inputFormat, outputFormat)) =>
Map("hive.input-format" -> inputFormat, "hive.output-format" -> outputFormat)
case _ =>
Map.empty
}) ++
s.formatClasses.map { f =>
Map("hive.input-format" -> f.input, "hive.output-format" -> f.output)
}.getOrElse(Map.empty) ++
s.storedAs.map("hive.stored-as" -> _) ++
s.serde.map("hive.serde" -> _) ++
s.serdeProperties.map {
case (key, value) => TableCatalog.OPTION_PREFIX + key -> value
}).toMap
}
case None =>
Map.empty
}
Expand Down
Loading