Skip to content

[SPARK-5240][SQL] Adding createDataSourceTable interface to Catalog #4036

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable

import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
import org.apache.spark.sql.types.StructType

/**
* An interface for looking up relations by name. Used by an [[Analyzer]].
Expand All @@ -40,6 +41,12 @@ trait Catalog {

def unregisterAllTables(): Unit

def createDataSourceTable(
tableName: String,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'd better using tableidentifier here, after #4062 merged, i will update this

userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String]): Unit

protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = {
if (!caseSensitive) {
tableIdentifier.map(_.toLowerCase)
Expand Down Expand Up @@ -81,6 +88,14 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
tables.clear()
}

override def createDataSourceTable(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String]) = {
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
}

override def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier)
tables.get(getDbTableName(tableIdent)) match {
Expand Down Expand Up @@ -180,5 +195,13 @@ object EmptyCatalog extends Catalog {
throw new UnsupportedOperationException
}

def createDataSourceTable(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String]): Unit = {
throw new UnsupportedOperationException
}

override def unregisterAllTables(): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
def strategies: Seq[Strategy] =
extraStrategies ++ (
DataSourceStrategy ::
DDLStrategy ::
TakeOrdered ::
HashAggregation ::
LeftSemiJoin ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,16 +312,4 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, options) =>
ExecutedCommand(
CreateTempTableUsing(tableName, userSpecifiedSchema, provider, options)) :: Nil

case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")

case _ => Nil
}
}
}
15 changes: 12 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
case temp ~ tableName ~ columns ~ provider ~ opts =>
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
CreateTableUsing(tableName, userSpecifiedSchema, provider, temp.isDefined, opts)
if (temp.isDefined) {
CreateTempTableUsing(tableName, userSpecifiedSchema, provider, opts)
} else {
CreateTableUsing(tableName, userSpecifiedSchema, provider, opts)
}
}
)

Expand Down Expand Up @@ -223,8 +227,13 @@ private[sql] case class CreateTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
temporary: Boolean,
options: Map[String, String]) extends Command
options: Map[String, String]) extends RunnableCommand {

def run(sqlContext: SQLContext) = {
sqlContext.catalog.createDataSourceTable(tableName, userSpecifiedSchema, provider, options)
Seq.empty
}
}

private [sql] case class CreateTempTableUsing(
tableName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override def strategies: Seq[Strategy] = extraStrategies ++ Seq(
DataSourceStrategy,
HiveCommandStrategy(self),
HiveDDLStrategy,
DDLStrategy,
TakeOrdered,
ParquetOperations,
InMemoryScans,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

val caseSensitive: Boolean = false

def createDataSourceTable(
override def createDataSourceTable(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,6 @@ private[hive] trait HiveStrategies {
}
}

object HiveDDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
ExecutedCommand(
CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil

case _ => Nil
}
}

case class HiveCommandStrategy(context: HiveContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case describe: DescribeCommand =>
Expand Down