Skip to content

[SPARK-5723][SQL]Change the default file format to Parquet for CTAS statements. #4639

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
protected[sql] def convertMetastoreParquet: Boolean =
getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"

/**
* When true, a table created by a Hive CTAS statement (no USING clause) will be
* converted to a data source table, using the data source set by spark.sql.sources.default.
* The table in CTAS statement will be converted when it meets any of the following conditions:
* - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File Format (STORED AS), or
* a Storage Hanlder (STORED BY), and the value of hive.default.fileformat in hive-site.xml
* is either TextFile or SequenceFile.
* - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the file format and no SerDe
* is specified (no ROW FORMAT SERDE clause).
* - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as the file format
* and no SerDe is specified (no ROW FORMAT SERDE clause).
*/
protected[sql] def convertCTAS: Boolean =
getConf("spark.sql.hive.convertCTAS", "false").toBoolean

override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution(plan)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
import org.apache.hadoop.util.ReflectionUtils

import org.apache.spark.Logging
import org.apache.spark.sql.{AnalysisException, SQLContext}
import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext}
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Catalog, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => ParquetPartition, PartitionSpec}
import org.apache.spark.sql.sources.{DDLParser, LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, DDLParser, LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -502,24 +502,69 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
Some(sa.getQB().getTableDesc)
}

execution.CreateTableAsSelect(
databaseName,
tableName,
child,
allowExisting,
desc)
// Check if the query specifies file format or storage handler.
val hasStorageSpec = desc match {
case Some(crtTbl) =>
crtTbl != null && (crtTbl.getSerName != null || crtTbl.getStorageHandler != null)
case None => false
}

if (hive.convertCTAS && !hasStorageSpec) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
if (dbName.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
}

val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
tblName,
hive.conf.defaultDataSourceName,
temporary = false,
mode,
options = Map.empty[String, String],
child
)
} else {
execution.CreateTableAsSelect(
databaseName,
tableName,
child,
allowExisting,
desc)
}

case p: LogicalPlan if p.resolved => p

case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
execution.CreateTableAsSelect(
databaseName,
tableName,
child,
allowExisting,
None)
if (hive.convertCTAS) {
if (dbName.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
}

val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
tblName,
hive.conf.defaultDataSourceName,
temporary = false,
mode,
options = Map.empty[String, String],
child
)
} else {
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
execution.CreateTableAsSelect(
databaseName,
tableName,
child,
allowExisting,
None)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -121,7 +122,7 @@ case class CreateMetastoreDataSource(
if (allowExisting) {
return Seq.empty[Row]
} else {
sys.error(s"Table $tableName already exists.")
throw new AnalysisException(s"Table $tableName already exists.")
}
}

Expand Down Expand Up @@ -172,9 +173,11 @@ case class CreateMetastoreDataSourceAsSelect(
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
sys.error(s"Table $tableName already exists. " +
s"If you want to append into it, please set mode to SaveMode.Append. " +
s"Or, if you want to overwrite it, please set mode to SaveMode.Overwrite.")
throw new AnalysisException(s"Table $tableName already exists. " +
s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " +
s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" +
s"the existing data. " +
s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.")
case SaveMode.Ignore =>
// Since the table already exists and the save mode is Ignore, we will just return.
return Seq.empty[Row]
Expand All @@ -199,7 +202,7 @@ case class CreateMetastoreDataSourceAsSelect(
s"== Actual Schema ==" +:
createdRelation.schema.treeString.split("\\\n")).mkString("\n")}
""".stripMargin
sys.error(errorMessage)
throw new AnalysisException(errorMessage)
} else if (i != createdRelation.relation) {
val errorDescription =
s"Cannot append to table $tableName because the resolved relation does not " +
Expand All @@ -216,10 +219,10 @@ case class CreateMetastoreDataSourceAsSelect(
s"== Actual Relation ==" ::
createdRelation.toString :: Nil).mkString("\n")}
""".stripMargin
sys.error(errorMessage)
throw new AnalysisException(errorMessage)
}
case o =>
sys.error(s"Saving data in ${o.toString} is not supported.")
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
}
case SaveMode.Overwrite =>
hiveContext.sql(s"DROP TABLE IF EXISTS $tableName")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
|SELECT * FROM jsonTable
""".stripMargin)

// Create the table again should trigger a AlreadyExistsException.
val message = intercept[RuntimeException] {
// Create the table again should trigger a AnalysisException.
val message = intercept[AnalysisException] {
sql(
s"""
|CREATE TABLE ctasJsonTable
Expand Down Expand Up @@ -516,7 +516,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT * FROM createdJsonTable"),
df.collect())

var message = intercept[RuntimeException] {
var message = intercept[AnalysisException] {
createExternalTable("createdJsonTable", filePath.toString)
}.getMessage
assert(message.contains("Table createdJsonTable already exists."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.hive.HiveShim
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.hive.{MetastoreRelation, HiveShim}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}

Expand All @@ -42,6 +45,73 @@ class SQLQuerySuite extends QueryTest {
)
}

test("CTAS without serde") {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
relation match {
case LogicalRelation(r: ParquetRelation2) =>
if (!isDataSourceParquet) {
fail(
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
s"${ParquetRelation2.getClass.getCanonicalName}.")
}

case r: MetastoreRelation =>
if (isDataSourceParquet) {
fail(
s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " +
s"${classOf[MetastoreRelation].getCanonicalName}.")
}
}
}

val originalConf = getConf("spark.sql.hive.convertCTAS", "false")

setConf("spark.sql.hive.convertCTAS", "true")

sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
sql("CREATE TABLE IF NOT EXISTS ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
var message = intercept[AnalysisException] {
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
}.getMessage
assert(message.contains("Table ctas1 already exists"))
checkRelation("ctas1", true)
sql("DROP TABLE ctas1")

// Specifying database name for query can be converted to data source write path
// is not allowed right now.
message = intercept[AnalysisException] {
sql("CREATE TABLE default.ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
}.getMessage
assert(
message.contains("Cannot specify database name in a CTAS statement"),
"When spark.sql.hive.convertCTAS is true, we should not allow " +
"database name specified.")

sql("CREATE TABLE ctas1 stored as textfile AS SELECT key k, value FROM src ORDER BY k, value")
checkRelation("ctas1", true)
sql("DROP TABLE ctas1")

sql(
"CREATE TABLE ctas1 stored as sequencefile AS SELECT key k, value FROM src ORDER BY k, value")
checkRelation("ctas1", true)
sql("DROP TABLE ctas1")

sql("CREATE TABLE ctas1 stored as rcfile AS SELECT key k, value FROM src ORDER BY k, value")
checkRelation("ctas1", false)
sql("DROP TABLE ctas1")

sql("CREATE TABLE ctas1 stored as orc AS SELECT key k, value FROM src ORDER BY k, value")
checkRelation("ctas1", false)
sql("DROP TABLE ctas1")

sql("CREATE TABLE ctas1 stored as parquet AS SELECT key k, value FROM src ORDER BY k, value")
checkRelation("ctas1", false)
sql("DROP TABLE ctas1")

setConf("spark.sql.hive.convertCTAS", originalConf)
}

test("CTAS with serde") {
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect()
sql(
Expand Down