Skip to content

Commit 6463e0b

Browse files
yhuaimarmbrus
authored andcommitted
[SPARK-4912][SQL] Persistent tables for the Spark SQL data sources api
With changes in this PR, users can persist metadata of tables created based on the data source API in metastore through DDLs. Author: Yin Huai <[email protected]> Author: Michael Armbrust <[email protected]> Closes #3960 from yhuai/persistantTablesWithSchema2 and squashes the following commits: 069c235 [Yin Huai] Make exception messages user friendly. c07cbc6 [Yin Huai] Get the location of test file in a correct way. 4456e98 [Yin Huai] Test data. 5315dfc [Yin Huai] rxin's comments. 7fc4b56 [Yin Huai] Add DDLStrategy and HiveDDLStrategy to plan DDLs based on the data source API. aeaf4b3 [Yin Huai] Add comments. 06f9b0c [Yin Huai] Revert unnecessary changes. feb88aa [Yin Huai] Merge remote-tracking branch 'apache/master' into persistantTablesWithSchema2 172db80 [Yin Huai] Fix unit test. 49bf1ac [Yin Huai] Unit tests. 8f8f1a1 [Yin Huai] [SPARK-4574][SQL] Adding support for defining schema in foreign DDL commands. #3431 f47fda1 [Yin Huai] Unit tests. 2b59723 [Michael Armbrust] Set external when creating tables c00bb1b [Michael Armbrust] Don't use reflection to read options 1ea6e7b [Michael Armbrust] Don't fail when trying to uncache a table that doesn't exist 6edc710 [Michael Armbrust] Add tests. d7da491 [Michael Armbrust] First draft of persistent tables.
1 parent 8ead999 commit 6463e0b

File tree

14 files changed

+461
-28
lines changed

14 files changed

+461
-28
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
330330
def strategies: Seq[Strategy] =
331331
extraStrategies ++ (
332332
DataSourceStrategy ::
333+
DDLStrategy ::
333334
TakeOrdered ::
334335
HashAggregation ::
335336
LeftSemiJoin ::

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.execution
1919

20+
import org.apache.spark.sql.sources.{CreateTempTableUsing, CreateTableUsing}
2021
import org.apache.spark.sql.{SQLContext, Strategy, execution}
2122
import org.apache.spark.sql.catalyst.expressions._
2223
import org.apache.spark.sql.catalyst.planning._
@@ -310,4 +311,17 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
310311
case _ => Nil
311312
}
312313
}
314+
315+
object DDLStrategy extends Strategy {
316+
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
317+
case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, options) =>
318+
ExecutedCommand(
319+
CreateTempTableUsing(tableName, userSpecifiedSchema, provider, options)) :: Nil
320+
321+
case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
322+
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
323+
324+
case _ => Nil
325+
}
326+
}
313327
}

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ case class DescribeCommand(
177177
override val output: Seq[Attribute]) extends RunnableCommand {
178178

179179
override def run(sqlContext: SQLContext) = {
180-
Row("# Registered as a temporary table", null, null) +:
181-
child.output.map(field => Row(field.name, field.dataType.toString, null))
180+
child.output.map(field => Row(field.name, field.dataType.toString, null))
182181
}
183182
}

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,21 +92,21 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
9292
protected lazy val ddl: Parser[LogicalPlan] = createTable
9393

9494
/**
95-
* `CREATE TEMPORARY TABLE avroTable
95+
* `CREATE [TEMPORARY] TABLE avroTable
9696
* USING org.apache.spark.sql.avro
9797
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
9898
* or
99-
* `CREATE TEMPORARY TABLE avroTable(intField int, stringField string...)
99+
* `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...)
100100
* USING org.apache.spark.sql.avro
101101
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
102102
*/
103103
protected lazy val createTable: Parser[LogicalPlan] =
104104
(
105-
CREATE ~ TEMPORARY ~ TABLE ~> ident
105+
(CREATE ~> TEMPORARY.? <~ TABLE) ~ ident
106106
~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
107-
case tableName ~ columns ~ provider ~ opts =>
107+
case temp ~ tableName ~ columns ~ provider ~ opts =>
108108
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
109-
CreateTableUsing(tableName, userSpecifiedSchema, provider, opts)
109+
CreateTableUsing(tableName, userSpecifiedSchema, provider, temp.isDefined, opts)
110110
}
111111
)
112112

@@ -175,13 +175,12 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
175175
primitiveType
176176
}
177177

178-
private[sql] case class CreateTableUsing(
179-
tableName: String,
180-
userSpecifiedSchema: Option[StructType],
181-
provider: String,
182-
options: Map[String, String]) extends RunnableCommand {
183-
184-
def run(sqlContext: SQLContext) = {
178+
object ResolvedDataSource {
179+
def apply(
180+
sqlContext: SQLContext,
181+
userSpecifiedSchema: Option[StructType],
182+
provider: String,
183+
options: Map[String, String]): ResolvedDataSource = {
185184
val loader = Utils.getContextOrSparkClassLoader
186185
val clazz: Class[_] = try loader.loadClass(provider) catch {
187186
case cnf: java.lang.ClassNotFoundException =>
@@ -199,22 +198,44 @@ private[sql] case class CreateTableUsing(
199198
.asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider]
200199
.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
201200
case _ =>
202-
sys.error(s"${clazz.getCanonicalName} should extend SchemaRelationProvider.")
201+
sys.error(s"${clazz.getCanonicalName} does not allow user-specified schemas.")
203202
}
204203
}
205204
case None => {
206205
clazz.newInstance match {
207-
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
206+
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
208207
dataSource
209208
.asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
210209
.createRelation(sqlContext, new CaseInsensitiveMap(options))
211210
case _ =>
212-
sys.error(s"${clazz.getCanonicalName} should extend RelationProvider.")
211+
sys.error(s"A schema needs to be specified when using ${clazz.getCanonicalName}.")
213212
}
214213
}
215214
}
216215

217-
sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
216+
new ResolvedDataSource(clazz, relation)
217+
}
218+
}
219+
220+
private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
221+
222+
private[sql] case class CreateTableUsing(
223+
tableName: String,
224+
userSpecifiedSchema: Option[StructType],
225+
provider: String,
226+
temporary: Boolean,
227+
options: Map[String, String]) extends Command
228+
229+
private [sql] case class CreateTempTableUsing(
230+
tableName: String,
231+
userSpecifiedSchema: Option[StructType],
232+
provider: String,
233+
options: Map[String, String]) extends RunnableCommand {
234+
235+
def run(sqlContext: SQLContext) = {
236+
val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)
237+
238+
sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName)
218239
Seq.empty
219240
}
220241
}

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}
2424
/**
2525
* ::DeveloperApi::
2626
* Implemented by objects that produce relations for a specific kind of data source. When
27-
* Spark SQL is given a DDL operation with a USING clause specified, this interface is used to
28-
* pass in the parameters specified by a user.
27+
* Spark SQL is given a DDL operation with a USING clause specified (to specify the implemented
28+
* RelationProvider), this interface is used to pass in the parameters specified by a user.
2929
*
3030
* Users may specify the fully qualified class name of a given data source. When that class is
3131
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
@@ -46,17 +46,22 @@ trait RelationProvider {
4646

4747
/**
4848
* ::DeveloperApi::
49-
* Implemented by objects that produce relations for a specific kind of data source. When
50-
* Spark SQL is given a DDL operation with
51-
* 1. USING clause: to specify the implemented SchemaRelationProvider
52-
* 2. User defined schema: users can define schema optionally when create table
49+
* Implemented by objects that produce relations for a specific kind of data source
50+
* with a given schema. When Spark SQL is given a DDL operation with a USING clause specified (
51+
* to specify the implemented SchemaRelationProvider) and a user defined schema, this interface
52+
* is used to pass in the parameters specified by a user.
5353
*
5454
* Users may specify the fully qualified class name of a given data source. When that class is
5555
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
5656
* less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the
5757
* data source 'org.apache.spark.sql.json.DefaultSource'
5858
*
5959
* A new instance of this class with be instantiated each time a DDL call is made.
60+
*
61+
* The difference between a [[RelationProvider]] and a [[SchemaRelationProvider]] is that
62+
* users need to provide a schema when using a SchemaRelationProvider.
63+
* A relation provider can inherits both [[RelationProvider]] and [[SchemaRelationProvider]]
64+
* if it can support both schema inference and user-specified schemas.
6065
*/
6166
@DeveloperApi
6267
trait SchemaRelationProvider {

sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,4 +314,34 @@ class TableScanSuite extends DataSourceTest {
314314
sql("SELECT * FROM oneToTenDef"),
315315
(1 to 10).map(Row(_)).toSeq)
316316
}
317+
318+
test("exceptions") {
319+
// Make sure we do throw correct exception when users use a relation provider that
320+
// only implements the RelationProvier or the SchemaRelationProvider.
321+
val schemaNotAllowed = intercept[Exception] {
322+
sql(
323+
"""
324+
|CREATE TEMPORARY TABLE relationProvierWithSchema (i int)
325+
|USING org.apache.spark.sql.sources.SimpleScanSource
326+
|OPTIONS (
327+
| From '1',
328+
| To '10'
329+
|)
330+
""".stripMargin)
331+
}
332+
assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas"))
333+
334+
val schemaNeeded = intercept[Exception] {
335+
sql(
336+
"""
337+
|CREATE TEMPORARY TABLE schemaRelationProvierWithoutSchema
338+
|USING org.apache.spark.sql.sources.AllDataTypesScanSource
339+
|OPTIONS (
340+
| From '1',
341+
| To '10'
342+
|)
343+
""".stripMargin)
344+
}
345+
assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using"))
346+
}
317347
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
115115
catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
116116
}
117117

118+
def refreshTable(tableName: String): Unit = {
119+
// TODO: Database support...
120+
catalog.refreshTable("default", tableName)
121+
}
122+
123+
protected[hive] def invalidateTable(tableName: String): Unit = {
124+
// TODO: Database support...
125+
catalog.invalidateTable("default", tableName)
126+
}
127+
118128
/**
119129
* Analyzes the given table in the current database to generate statistics, which will be
120130
* used in query optimizations.
@@ -340,6 +350,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
340350
override def strategies: Seq[Strategy] = extraStrategies ++ Seq(
341351
DataSourceStrategy,
342352
HiveCommandStrategy(self),
353+
HiveDDLStrategy,
354+
DDLStrategy,
343355
TakeOrdered,
344356
ParquetOperations,
345357
InMemoryScans,

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ package org.apache.spark.sql.hive
2020
import java.io.IOException
2121
import java.util.{List => JList}
2222

23+
import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder}
24+
2325
import org.apache.hadoop.util.ReflectionUtils
2426
import org.apache.hadoop.hive.metastore.TableType
25-
import org.apache.hadoop.hive.metastore.api.FieldSchema
26-
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
27+
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, FieldSchema}
2728
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException}
2829
import org.apache.hadoop.hive.ql.metadata.InvalidTableException
2930
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
@@ -39,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical
3940
import org.apache.spark.sql.catalyst.plans.logical._
4041
import org.apache.spark.sql.catalyst.rules._
4142
import org.apache.spark.sql.catalyst.types._
43+
import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
4244
import org.apache.spark.util.Utils
4345

4446
/* Implicit conversions */
@@ -50,8 +52,76 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
5052
/** Connection to hive metastore. Usages should lock on `this`. */
5153
protected[hive] val client = Hive.get(hive.hiveconf)
5254

55+
// TODO: Use this everywhere instead of tuples or databaseName, tableName,.
56+
/** A fully qualified identifier for a table (i.e., database.tableName) */
57+
case class QualifiedTableName(database: String, name: String) {
58+
def toLowerCase = QualifiedTableName(database.toLowerCase, name.toLowerCase)
59+
}
60+
61+
/** A cache of Spark SQL data source tables that have been accessed. */
62+
protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = {
63+
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
64+
override def load(in: QualifiedTableName): LogicalPlan = {
65+
logDebug(s"Creating new cached data source for $in")
66+
val table = client.getTable(in.database, in.name)
67+
val schemaString = table.getProperty("spark.sql.sources.schema")
68+
val userSpecifiedSchema =
69+
if (schemaString == null) {
70+
None
71+
} else {
72+
Some(DataType.fromJson(schemaString).asInstanceOf[StructType])
73+
}
74+
// It does not appear that the ql client for the metastore has a way to enumerate all the
75+
// SerDe properties directly...
76+
val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap
77+
78+
val resolvedRelation =
79+
ResolvedDataSource(
80+
hive,
81+
userSpecifiedSchema,
82+
table.getProperty("spark.sql.sources.provider"),
83+
options)
84+
85+
LogicalRelation(resolvedRelation.relation)
86+
}
87+
}
88+
89+
CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
90+
}
91+
92+
def refreshTable(databaseName: String, tableName: String): Unit = {
93+
cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
94+
}
95+
96+
def invalidateTable(databaseName: String, tableName: String): Unit = {
97+
cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase)
98+
}
99+
53100
val caseSensitive: Boolean = false
54101

102+
def createDataSourceTable(
103+
tableName: String,
104+
userSpecifiedSchema: Option[StructType],
105+
provider: String,
106+
options: Map[String, String]) = {
107+
val (dbName, tblName) = processDatabaseAndTableName("default", tableName)
108+
val tbl = new Table(dbName, tblName)
109+
110+
tbl.setProperty("spark.sql.sources.provider", provider)
111+
if (userSpecifiedSchema.isDefined) {
112+
tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json)
113+
}
114+
options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }
115+
116+
tbl.setProperty("EXTERNAL", "TRUE")
117+
tbl.setTableType(TableType.EXTERNAL_TABLE)
118+
119+
// create the table
120+
synchronized {
121+
client.createTable(tbl, false)
122+
}
123+
}
124+
55125
def tableExists(tableIdentifier: Seq[String]): Boolean = {
56126
val tableIdent = processTableIdentifier(tableIdentifier)
57127
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
@@ -72,7 +142,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
72142
hive.sessionState.getCurrentDatabase)
73143
val tblName = tableIdent.last
74144
val table = client.getTable(databaseName, tblName)
75-
if (table.isView) {
145+
146+
if (table.getProperty("spark.sql.sources.provider") != null) {
147+
cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
148+
} else if (table.isView) {
76149
// if the unresolved relation is from hive view
77150
// parse the text into logic node.
78151
HiveQl.createPlanForView(table, alias)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution._
3030
import org.apache.spark.sql.hive
3131
import org.apache.spark.sql.hive.execution._
3232
import org.apache.spark.sql.parquet.ParquetRelation
33+
import org.apache.spark.sql.sources.CreateTableUsing
3334
import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy}
3435

3536
import scala.collection.JavaConversions._
@@ -208,6 +209,16 @@ private[hive] trait HiveStrategies {
208209
}
209210
}
210211

212+
object HiveDDLStrategy extends Strategy {
213+
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
214+
case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
215+
ExecutedCommand(
216+
CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil
217+
218+
case _ => Nil
219+
}
220+
}
221+
211222
case class HiveCommandStrategy(context: HiveContext) extends Strategy {
212223
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
213224
case describe: DescribeCommand =>

sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
395395

396396
clearCache()
397397
loadedTables.clear()
398+
catalog.cachedDataSourceTables.invalidateAll()
398399
catalog.client.getAllTables("default").foreach { t =>
399400
logDebug(s"Deleting table $t")
400401
val table = catalog.client.getTable("default", t)

0 commit comments

Comments
 (0)