Skip to content

[SPARK-4912][SQL] Persistent tables for the Spark SQL data sources api #3960

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def strategies: Seq[Strategy] =
extraStrategies ++ (
DataSourceStrategy ::
DDLStrategy ::
TakeOrdered ::
HashAggregation ::
LeftSemiJoin ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.sources.{CreateTempTableUsing, CreateTableUsing}
import org.apache.spark.sql.{SQLContext, Strategy, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
Expand Down Expand Up @@ -310,4 +311,17 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case _ => Nil
}
}

object DDLStrategy extends Strategy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid make this strategy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was original in CommandStrategy. I was trying to find a good place for these, but I did not find a suitable Strategy. Any suggestion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scwf Actually, I think that it is better to put all rules for the data data source API in the same place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai, i mean since CreateTableUsing and CreateTempTableUsing is command, we'd better make it follow strategy:

  object BasicOperators extends Strategy {
    def numPartitions = self.numPartitions

    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
      case r: RunnableCommand => ExecutedCommand(r) :: Nil

i will try for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I am not sure we should put them in BasicOperators. We cannot just create a RunnableCommand in ddl.scala since SQLContext does not allow persistent table and we need to throw the error in SparkStrategies. Also, I feel code is clear when we put stuff related to the data source API together.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai,i write a draft version for this, can you have a look(https://github.com/scwf/spark/compare/apache:master...scwf:createDataSourceTable?expand=1)

why we put case r: RunnableCommand => ExecutedCommand(r) in BasicOperators is because we no need make a new strategy for only one rule.

And after we refactor command implementation in spark sql, we should make the newly added command follow RunnableCommand if possible, then we can avoid adding new strategy for newly added command.

/cc @marmbrus

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, options) =>
ExecutedCommand(
CreateTempTableUsing(tableName, userSpecifiedSchema, provider, options)) :: Nil

case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")

case _ => Nil
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ case class DescribeCommand(
override val output: Seq[Attribute]) extends RunnableCommand {

override def run(sqlContext: SQLContext) = {
Row("# Registered as a temporary table", null, null) +:
child.output.map(field => Row(field.name, field.dataType.toString, null))
child.output.map(field => Row(field.name, field.dataType.toString, null))
}
}
53 changes: 37 additions & 16 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,21 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
protected lazy val ddl: Parser[LogicalPlan] = createTable

/**
* `CREATE TEMPORARY TABLE avroTable
* `CREATE [TEMPORARY] TABLE avroTable
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
* `CREATE TEMPORARY TABLE avroTable(intField int, stringField string...)
* `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...)
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
*/
protected lazy val createTable: Parser[LogicalPlan] =
(
CREATE ~ TEMPORARY ~ TABLE ~> ident
(CREATE ~> TEMPORARY.? <~ TABLE) ~ ident
~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
case tableName ~ columns ~ provider ~ opts =>
case temp ~ tableName ~ columns ~ provider ~ opts =>
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
CreateTableUsing(tableName, userSpecifiedSchema, provider, opts)
CreateTableUsing(tableName, userSpecifiedSchema, provider, temp.isDefined, opts)
}
)

Expand Down Expand Up @@ -175,13 +175,12 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
primitiveType
}

private[sql] case class CreateTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String]) extends RunnableCommand {

def run(sqlContext: SQLContext) = {
object ResolvedDataSource {
def apply(
sqlContext: SQLContext,
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String]): ResolvedDataSource = {
val loader = Utils.getContextOrSparkClassLoader
val clazz: Class[_] = try loader.loadClass(provider) catch {
case cnf: java.lang.ClassNotFoundException =>
Expand All @@ -199,22 +198,44 @@ private[sql] case class CreateTableUsing(
.asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
case _ =>
sys.error(s"${clazz.getCanonicalName} should extend SchemaRelationProvider.")
sys.error(s"${clazz.getCanonicalName} does not allow user-specified schemas.")
}
}
case None => {
clazz.newInstance match {
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
dataSource
.asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options))
case _ =>
sys.error(s"${clazz.getCanonicalName} should extend RelationProvider.")
sys.error(s"A schema needs to be specified when using ${clazz.getCanonicalName}.")
}
}
}

sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
new ResolvedDataSource(clazz, relation)
}
}

private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)

private[sql] case class CreateTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
temporary: Boolean,
options: Map[String, String]) extends Command

private [sql] case class CreateTempTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String]) extends RunnableCommand {

def run(sqlContext: SQLContext) = {
val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)

sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName)
Seq.empty
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}
/**
* ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source. When
* Spark SQL is given a DDL operation with a USING clause specified, this interface is used to
* pass in the parameters specified by a user.
* Spark SQL is given a DDL operation with a USING clause specified (to specify the implemented
* RelationProvider), this interface is used to pass in the parameters specified by a user.
*
* Users may specify the fully qualified class name of a given data source. When that class is
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
Expand All @@ -46,17 +46,22 @@ trait RelationProvider {

/**
* ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source. When
* Spark SQL is given a DDL operation with
* 1. USING clause: to specify the implemented SchemaRelationProvider
* 2. User defined schema: users can define schema optionally when create table
* Implemented by objects that produce relations for a specific kind of data source
* with a given schema. When Spark SQL is given a DDL operation with a USING clause specified (
* to specify the implemented SchemaRelationProvider) and a user defined schema, this interface
* is used to pass in the parameters specified by a user.
*
* Users may specify the fully qualified class name of a given data source. When that class is
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
* less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the
* data source 'org.apache.spark.sql.json.DefaultSource'
*
* A new instance of this class with be instantiated each time a DDL call is made.
*
* The difference between a [[RelationProvider]] and a [[SchemaRelationProvider]] is that
* users need to provide a schema when using a SchemaRelationProvider.
* A relation provider can inherits both [[RelationProvider]] and [[SchemaRelationProvider]]
* if it can support both schema inference and user-specified schemas.
*/
@DeveloperApi
trait SchemaRelationProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,4 +314,34 @@ class TableScanSuite extends DataSourceTest {
sql("SELECT * FROM oneToTenDef"),
(1 to 10).map(Row(_)).toSeq)
}

test("exceptions") {
// Make sure we do throw correct exception when users use a relation provider that
// only implements the RelationProvier or the SchemaRelationProvider.
val schemaNotAllowed = intercept[Exception] {
sql(
"""
|CREATE TEMPORARY TABLE relationProvierWithSchema (i int)
|USING org.apache.spark.sql.sources.SimpleScanSource
|OPTIONS (
| From '1',
| To '10'
|)
""".stripMargin)
}
assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas"))

val schemaNeeded = intercept[Exception] {
sql(
"""
|CREATE TEMPORARY TABLE schemaRelationProvierWithoutSchema
|USING org.apache.spark.sql.sources.AllDataTypesScanSource
|OPTIONS (
| From '1',
| To '10'
|)
""".stripMargin)
}
assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
}

def refreshTable(tableName: String): Unit = {
// TODO: Database support...
catalog.refreshTable("default", tableName)
}

protected[hive] def invalidateTable(tableName: String): Unit = {
// TODO: Database support...
catalog.invalidateTable("default", tableName)
}

/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
Expand Down Expand Up @@ -340,6 +350,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override def strategies: Seq[Strategy] = extraStrategies ++ Seq(
DataSourceStrategy,
HiveCommandStrategy(self),
HiveDDLStrategy,
DDLStrategy,
TakeOrdered,
ParquetOperations,
InMemoryScans,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.util.{List => JList}

import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder}

import org.apache.hadoop.util.ReflectionUtils
import org.apache.hadoop.hive.metastore.TableType
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, FieldSchema}
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException}
import org.apache.hadoop.hive.ql.metadata.InvalidTableException
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
Expand All @@ -39,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.util.Utils

/* Implicit conversions */
Expand All @@ -50,8 +52,76 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
/** Connection to hive metastore. Usages should lock on `this`. */
protected[hive] val client = Hive.get(hive.hiveconf)

// TODO: Use this everywhere instead of tuples or databaseName, tableName,.
/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't really match the rest of the API any more now that we have the concept of a tableIdentifier. We can fix this in a followup PR.

def toLowerCase = QualifiedTableName(database.toLowerCase, name.toLowerCase)
}

/** A cache of Spark SQL data source tables that have been accessed. */
protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = {
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
override def load(in: QualifiedTableName): LogicalPlan = {
logDebug(s"Creating new cached data source for $in")
val table = client.getTable(in.database, in.name)
val schemaString = table.getProperty("spark.sql.sources.schema")
val userSpecifiedSchema =
if (schemaString == null) {
None
} else {
Some(DataType.fromJson(schemaString).asInstanceOf[StructType])
}
// It does not appear that the ql client for the metastore has a way to enumerate all the
// SerDe properties directly...
val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap

val resolvedRelation =
ResolvedDataSource(
hive,
userSpecifiedSchema,
table.getProperty("spark.sql.sources.provider"),
options)

LogicalRelation(resolvedRelation.relation)
}
}

CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
}

def refreshTable(databaseName: String, tableName: String): Unit = {
cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
}

def invalidateTable(databaseName: String, tableName: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should probably also take tableIdentifiers?

cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase)
}

val caseSensitive: Boolean = false

def createDataSourceTable(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String]) = {
val (dbName, tblName) = processDatabaseAndTableName("default", tableName)
val tbl = new Table(dbName, tblName)

tbl.setProperty("spark.sql.sources.provider", provider)
if (userSpecifiedSchema.isDefined) {
tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json)
}
options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }

tbl.setProperty("EXTERNAL", "TRUE")
tbl.setTableType(TableType.EXTERNAL_TABLE)

// create the table
synchronized {
client.createTable(tbl, false)
}
}

def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier)
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
Expand All @@ -72,7 +142,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
hive.sessionState.getCurrentDatabase)
val tblName = tableIdent.last
val table = client.getTable(databaseName, tblName)
if (table.isView) {

if (table.getProperty("spark.sql.sources.provider") != null) {
cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
} else if (table.isView) {
// if the unresolved relation is from hive view
// parse the text into logic node.
HiveQl.createPlanForView(table, alias)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.sources.CreateTableUsing
import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -208,6 +209,16 @@ private[hive] trait HiveStrategies {
}
}

object HiveDDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
ExecutedCommand(
CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil

case _ => Nil
}
}

case class HiveCommandStrategy(context: HiveContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case describe: DescribeCommand =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {

clearCache()
loadedTables.clear()
catalog.cachedDataSourceTables.invalidateAll()
catalog.client.getAllTables("default").foreach { t =>
logDebug(s"Deleting table $t")
val table = catalog.client.getTable("default", t)
Expand Down
Loading