Skip to content

[SPARK-14070] [SQL] Use ORC data source for SQL queries on ORC tables #11891

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ package org.apache.spark.sql.hive.execution
import java.io.File
import java.util.{Locale, TimeZone}

import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -38,6 +39,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
private val originalLocale = Locale.getDefault
private val originalColumnBatchSize = TestHive.conf.columnBatchSize
private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning
private val originalConvertMetastoreOrc = TestHive.convertMetastoreOrc

def testCases: Seq[(String, File)] = {
hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f)
Expand All @@ -56,6 +58,9 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
// Use Hive hash expression instead of the native one
TestHive.sessionState.functionRegistry.unregisterFunction("hash")
// Ensures that the plans generation use metastore relation and not OrcRelation
// Was done because SqlBuilder does not work with plans having logical relation
TestHive.setConf(HiveContext.CONVERT_METASTORE_ORC, false)
RuleExecutor.resetTime()
}

Expand All @@ -66,6 +71,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
Locale.setDefault(originalLocale)
TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
TestHive.setConf(HiveContext.CONVERT_METASTORE_ORC, originalConvertMetastoreOrc)
TestHive.sessionState.functionRegistry.restore()

// For debugging dump some statistics about how much time was spent in various optimizer rules.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ class HiveContext private[hive](
protected[sql] def convertMetastoreParquetWithSchemaMerging: Boolean =
getConf(CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)

/**
* When true, enables an experimental feature where metastore tables that use the Orc SerDe
* are automatically converted to use the Spark SQL ORC table scan, instead of the Hive
* SerDe.
*/
protected[sql] def convertMetastoreOrc: Boolean = getConf(CONVERT_METASTORE_ORC)

/**
* When true, a table created by a Hive CTAS statement (no USING clause) will be
* converted to a data source table, using the data source set by spark.sql.sources.default.
Expand Down Expand Up @@ -442,6 +449,11 @@ private[hive] object HiveContext extends Logging {
"different Parquet data files. This configuration is only effective " +
"when \"spark.sql.hive.convertMetastoreParquet\" is true.")

val CONVERT_METASTORE_ORC = booleanConf("spark.sql.hive.convertMetastoreOrc",
defaultValue = Some(true),
doc = "When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
"the built in support.")

val CONVERT_CTAS = booleanConf("spark.sql.hive.convertCTAS",
defaultValue = Some(false),
doc = "When true, a table created by a Hive CTAS statement (no USING clause) will be " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.{datasources, FileRelation}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource, ParquetRelation}
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.HiveNativeCommand
import org.apache.spark.sql.sources.{HadoopFsRelation, HDFSFileCatalog}
import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
import org.apache.spark.sql.sources.{FileFormat, HadoopFsRelation, HDFSFileCatalog}
import org.apache.spark.sql.types._

private[hive] case class HiveSerDe(
Expand Down Expand Up @@ -440,58 +441,72 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
}
}

private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging

val parquetOptions = Map(
ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString,
ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier(
metastoreRelation.tableName,
Some(metastoreRelation.databaseName)
).unquotedString
)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)

def getCached(
tableIdentifier: QualifiedTableName,
pathsInMetastore: Seq[String],
schemaInMetastore: StructType,
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
parquetRelation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet &&
logical.schema.sameType(metastoreSchema) &&
parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse {
PartitionSpec(StructType(Nil), Array.empty[datasources.PartitionDirectory])
private def getCached(
tableIdentifier: QualifiedTableName,
metastoreRelation: MetastoreRelation,
schemaInMetastore: StructType,
expectedFileFormat: Class[_ <: FileFormat],
expectedBucketSpec: Option[BucketSpec],
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {

cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
val pathsInMetastore = metastoreRelation.table.storage.locationUri.toSeq
val cachedRelationFileFormatClass = relation.fileFormat.getClass

expectedFileFormat match {
case `cachedRelationFileFormatClass` =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached relation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And bucketing spec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

val useCached =
relation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet &&
logical.schema.sameType(schemaInMetastore) &&
relation.bucketSpec == expectedBucketSpec &&
relation.partitionSpec == partitionSpecInMetastore.getOrElse {
PartitionSpec(StructType(Nil), Array.empty[PartitionDirectory])
}

if (useCached) {
Some(logical)
} else {
// If the cached relation is not updated, we invalidate it right away.
cachedDataSourceTables.invalidate(tableIdentifier)
None
}

if (useCached) {
Some(logical)
} else {
// If the cached relation is not updated, we invalidate it right away.
case _ =>
logWarning(
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} " +
s"should be stored as $expectedFileFormat. However, we are getting " +
s"a ${relation.fileFormat} from the metastore cache. This cached " +
s"entry will be invalidated.")
cachedDataSourceTables.invalidate(tableIdentifier)
None
}
case other =>
logWarning(
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " +
s"as Parquet. However, we are getting a $other from the metastore cache. " +
s"This cached entry will be invalidated.")
cachedDataSourceTables.invalidate(tableIdentifier)
None
}
}
case other =>
logWarning(
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " +
s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " +
s"This cached entry will be invalidated.")
cachedDataSourceTables.invalidate(tableIdentifier)
None
}
}

private def convertToLogicalRelation(metastoreRelation: MetastoreRelation,
options: Map[String, String],
defaultSource: FileFormat,
fileFormatClass: Class[_ <: FileFormat],
fileType: String): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
val bucketSpec = None // We don't support hive bucketed tables, only ones we write out.

val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
// We're converting the entire table into ParquetRelation, so predicates to Hive metastore
// We're converting the entire table into HadoopFsRelation, so predicates to Hive metastore
// are empty.
val partitions = metastoreRelation.getHiveQlPartitions().map { p =>
val location = p.getLocation
Expand All @@ -504,54 +519,65 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte

val cached = getCached(
tableIdentifier,
metastoreRelation.table.storage.locationUri.toSeq,
metastoreRelation,
metastoreSchema,
fileFormatClass,
bucketSpec,
Some(partitionSpec))

val parquetRelation = cached.getOrElse {
val hadoopFsRelation = cached.getOrElse {
val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil
val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec)
val format = new DefaultSource()
val inferredSchema = format.inferSchema(hive, parquetOptions, fileCatalog.allFiles())

val mergedSchema = inferredSchema.map { inferred =>
ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred)
}.getOrElse(metastoreSchema)
val inferredSchema = if (fileType.equals("parquet")) {
val inferredSchema = defaultSource.inferSchema(hive, options, fileCatalog.allFiles())
inferredSchema.map { inferred =>
ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred)
}.getOrElse(metastoreSchema)
} else {
defaultSource.inferSchema(hive, options, fileCatalog.allFiles()).get
}

val relation = HadoopFsRelation(
sqlContext = hive,
location = fileCatalog,
partitionSchema = partitionSchema,
dataSchema = mergedSchema,
bucketSpec = None, // We don't support hive bucketed tables, only ones we write out.
fileFormat = new DefaultSource(),
options = parquetOptions)
dataSchema = inferredSchema,
bucketSpec = bucketSpec,
fileFormat = defaultSource,
options = options)

val created = LogicalRelation(relation)
cachedDataSourceTables.put(tableIdentifier, created)
created
}

parquetRelation
hadoopFsRelation
} else {
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)

val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
val parquetRelation = cached.getOrElse {
val cached = getCached(tableIdentifier,
metastoreRelation,
metastoreSchema,
fileFormatClass,
bucketSpec,
None)
val logicalRelation = cached.getOrElse {
val created =
LogicalRelation(
DataSource(
sqlContext = hive,
paths = paths,
userSpecifiedSchema = Some(metastoreRelation.schema),
options = parquetOptions,
className = "parquet").resolveRelation())
bucketSpec = bucketSpec,
options = options,
className = fileType).resolveRelation())

cachedDataSourceTables.put(tableIdentifier, created)
created
}

parquetRelation
logicalRelation
}
result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
}
Expand All @@ -561,6 +587,27 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
* data source relations for better performance.
*/
object ParquetConversions extends Rule[LogicalPlan] {
private def shouldConvertMetastoreParquet(relation: MetastoreRelation): Boolean = {
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") &&
hive.convertMetastoreParquet
}

private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = {
val defaultSource = new ParquetDefaultSource()
val fileFormatClass = classOf[ParquetDefaultSource]

val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
val options = Map(
ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString,
ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier(
relation.tableName,
Some(relation.databaseName)
).unquotedString
)

convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "parquet")
}

override def apply(plan: LogicalPlan): LogicalPlan = {
if (!plan.resolved || plan.analyzed) {
return plan
Expand All @@ -570,28 +617,67 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
// Write path
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(r)
InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists)

// Write path
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(r)
InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists)

// Read path
case relation: MetastoreRelation if hive.convertMetastoreParquet &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) =>
val parquetRelation = convertToParquetRelation(relation)
SubqueryAlias(relation.alias.getOrElse(relation.tableName), parquetRelation)
}
}
}

/**
* When scanning Metastore ORC tables, convert them to ORC data source relations
* for better performance.
*/
object OrcConversions extends Rule[LogicalPlan] {
private def shouldConvertMetastoreOrc(relation: MetastoreRelation): Boolean = {
relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") &&
hive.convertMetastoreOrc
}

private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = {
val defaultSource = new OrcDefaultSource()
val fileFormatClass = classOf[OrcDefaultSource]
val options = Map[String, String]()

convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc")
}

override def apply(plan: LogicalPlan): LogicalPlan = {
if (!plan.resolved || plan.analyzed) {
return plan
}

plan transformUp {
// Write path
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Orc data source (yet).
if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists)

// Write path
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Orc data source (yet).
if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists)

// Read path
case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) =>
val orcRelation = convertToOrcRelation(relation)
SubqueryAlias(relation.alias.getOrElse(relation.tableName), orcRelation)
}
}
}

/**
* Creates any tables required for query execution.
* For example, because of a CREATE TABLE X AS statement.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class HiveSessionCatalog(
private val metastoreCatalog = new HiveMetastoreCatalog(client, context)

val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables
val PreInsertionCasts: Rule[LogicalPlan] = metastoreCatalog.PreInsertionCasts

Expand Down
Loading