diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 6c575dd727b46..e7021cc3366ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -330,6 +330,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def strategies: Seq[Strategy] = extraStrategies ++ ( DataSourceStrategy :: + DDLStrategy :: TakeOrdered :: HashAggregation :: LeftSemiJoin :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 99b6611d3bbcf..d91b1fbc69834 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.sql.sources.{CreateTempTableUsing, CreateTableUsing} import org.apache.spark.sql.{SQLContext, Strategy, execution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ @@ -310,4 +311,17 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => Nil } } + + object DDLStrategy extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, options) => + ExecutedCommand( + CreateTempTableUsing(tableName, userSpecifiedSchema, provider, options)) :: Nil + + case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) => + sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") + + case _ => Nil + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 0d765c4c92f85..df8e61615104c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -177,7 +177,6 @@ case class DescribeCommand( override val output: Seq[Attribute]) extends RunnableCommand { override def run(sqlContext: SQLContext) = { - Row("# Registered as a temporary table", null, null) +: - child.output.map(field => Row(field.name, field.dataType.toString, null)) + child.output.map(field => Row(field.name, field.dataType.toString, null)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index fe2c4d8436b2b..f8741e0082098 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -92,21 +92,21 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected lazy val ddl: Parser[LogicalPlan] = createTable /** - * `CREATE TEMPORARY TABLE avroTable + * `CREATE [TEMPORARY] TABLE avroTable * USING org.apache.spark.sql.avro * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` * or - * `CREATE TEMPORARY TABLE avroTable(intField int, stringField string...) + * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) * USING org.apache.spark.sql.avro * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` */ protected lazy val createTable: Parser[LogicalPlan] = ( - CREATE ~ TEMPORARY ~ TABLE ~> ident + (CREATE ~> TEMPORARY.? <~ TABLE) ~ ident ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { - case tableName ~ columns ~ provider ~ opts => + case temp ~ tableName ~ columns ~ provider ~ opts => val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields))) - CreateTableUsing(tableName, userSpecifiedSchema, provider, opts) + CreateTableUsing(tableName, userSpecifiedSchema, provider, temp.isDefined, opts) } ) @@ -175,13 +175,12 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi primitiveType } -private[sql] case class CreateTableUsing( - tableName: String, - userSpecifiedSchema: Option[StructType], - provider: String, - options: Map[String, String]) extends RunnableCommand { - - def run(sqlContext: SQLContext) = { +object ResolvedDataSource { + def apply( + sqlContext: SQLContext, + userSpecifiedSchema: Option[StructType], + provider: String, + options: Map[String, String]): ResolvedDataSource = { val loader = Utils.getContextOrSparkClassLoader val clazz: Class[_] = try loader.loadClass(provider) catch { case cnf: java.lang.ClassNotFoundException => @@ -199,22 +198,44 @@ private[sql] case class CreateTableUsing( .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] .createRelation(sqlContext, new CaseInsensitiveMap(options), schema) case _ => - sys.error(s"${clazz.getCanonicalName} should extend SchemaRelationProvider.") + sys.error(s"${clazz.getCanonicalName} does not allow user-specified schemas.") } } case None => { clazz.newInstance match { - case dataSource: org.apache.spark.sql.sources.RelationProvider => + case dataSource: org.apache.spark.sql.sources.RelationProvider => dataSource .asInstanceOf[org.apache.spark.sql.sources.RelationProvider] .createRelation(sqlContext, new CaseInsensitiveMap(options)) case _ => - sys.error(s"${clazz.getCanonicalName} should extend RelationProvider.") + sys.error(s"A schema needs to be specified when using ${clazz.getCanonicalName}.") } } } - sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName) + new ResolvedDataSource(clazz, relation) + } +} + +private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation) + +private[sql] case class CreateTableUsing( + tableName: String, + userSpecifiedSchema: Option[StructType], + provider: String, + temporary: Boolean, + options: Map[String, String]) extends Command + +private [sql] case class CreateTempTableUsing( + tableName: String, + userSpecifiedSchema: Option[StructType], + provider: String, + options: Map[String, String]) extends RunnableCommand { + + def run(sqlContext: SQLContext) = { + val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options) + + sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName) Seq.empty } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 990f7e0e74bcf..2a7be23e37c74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute} /** * ::DeveloperApi:: * Implemented by objects that produce relations for a specific kind of data source. When - * Spark SQL is given a DDL operation with a USING clause specified, this interface is used to - * pass in the parameters specified by a user. + * Spark SQL is given a DDL operation with a USING clause specified (to specify the implemented + * RelationProvider), this interface is used to pass in the parameters specified by a user. * * Users may specify the fully qualified class name of a given data source. When that class is * not found Spark SQL will append the class name `DefaultSource` to the path, allowing for @@ -46,10 +46,10 @@ trait RelationProvider { /** * ::DeveloperApi:: - * Implemented by objects that produce relations for a specific kind of data source. When - * Spark SQL is given a DDL operation with - * 1. USING clause: to specify the implemented SchemaRelationProvider - * 2. User defined schema: users can define schema optionally when create table + * Implemented by objects that produce relations for a specific kind of data source + * with a given schema. When Spark SQL is given a DDL operation with a USING clause specified ( + * to specify the implemented SchemaRelationProvider) and a user defined schema, this interface + * is used to pass in the parameters specified by a user. * * Users may specify the fully qualified class name of a given data source. When that class is * not found Spark SQL will append the class name `DefaultSource` to the path, allowing for @@ -57,6 +57,11 @@ trait RelationProvider { * data source 'org.apache.spark.sql.json.DefaultSource' * * A new instance of this class with be instantiated each time a DDL call is made. + * + * The difference between a [[RelationProvider]] and a [[SchemaRelationProvider]] is that + * users need to provide a schema when using a SchemaRelationProvider. + * A relation provider can inherits both [[RelationProvider]] and [[SchemaRelationProvider]] + * if it can support both schema inference and user-specified schemas. */ @DeveloperApi trait SchemaRelationProvider { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 605190f5ae6a2..a1d2468b2573c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -314,4 +314,34 @@ class TableScanSuite extends DataSourceTest { sql("SELECT * FROM oneToTenDef"), (1 to 10).map(Row(_)).toSeq) } + + test("exceptions") { + // Make sure we do throw correct exception when users use a relation provider that + // only implements the RelationProvier or the SchemaRelationProvider. + val schemaNotAllowed = intercept[Exception] { + sql( + """ + |CREATE TEMPORARY TABLE relationProvierWithSchema (i int) + |USING org.apache.spark.sql.sources.SimpleScanSource + |OPTIONS ( + | From '1', + | To '10' + |) + """.stripMargin) + } + assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas")) + + val schemaNeeded = intercept[Exception] { + sql( + """ + |CREATE TEMPORARY TABLE schemaRelationProvierWithoutSchema + |USING org.apache.spark.sql.sources.AllDataTypesScanSource + |OPTIONS ( + | From '1', + | To '10' + |) + """.stripMargin) + } + assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using")) + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 02eac43b2103f..09ff4cc5ab437 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -115,6 +115,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting) } + def refreshTable(tableName: String): Unit = { + // TODO: Database support... + catalog.refreshTable("default", tableName) + } + + protected[hive] def invalidateTable(tableName: String): Unit = { + // TODO: Database support... + catalog.invalidateTable("default", tableName) + } + /** * Analyzes the given table in the current database to generate statistics, which will be * used in query optimizations. @@ -340,6 +350,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override def strategies: Seq[Strategy] = extraStrategies ++ Seq( DataSourceStrategy, HiveCommandStrategy(self), + HiveDDLStrategy, + DDLStrategy, TakeOrdered, ParquetOperations, InMemoryScans, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c25288e000122..daeabb6c8bab8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.hive import java.io.IOException import java.util.{List => JList} +import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder} + import org.apache.hadoop.util.ReflectionUtils import org.apache.hadoop.hive.metastore.TableType -import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition} +import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, FieldSchema} import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException} import org.apache.hadoop.hive.ql.metadata.InvalidTableException import org.apache.hadoop.hive.ql.plan.CreateTableDesc @@ -39,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource} import org.apache.spark.util.Utils /* Implicit conversions */ @@ -50,8 +52,76 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with /** Connection to hive metastore. Usages should lock on `this`. */ protected[hive] val client = Hive.get(hive.hiveconf) + // TODO: Use this everywhere instead of tuples or databaseName, tableName,. + /** A fully qualified identifier for a table (i.e., database.tableName) */ + case class QualifiedTableName(database: String, name: String) { + def toLowerCase = QualifiedTableName(database.toLowerCase, name.toLowerCase) + } + + /** A cache of Spark SQL data source tables that have been accessed. */ + protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = { + val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { + override def load(in: QualifiedTableName): LogicalPlan = { + logDebug(s"Creating new cached data source for $in") + val table = client.getTable(in.database, in.name) + val schemaString = table.getProperty("spark.sql.sources.schema") + val userSpecifiedSchema = + if (schemaString == null) { + None + } else { + Some(DataType.fromJson(schemaString).asInstanceOf[StructType]) + } + // It does not appear that the ql client for the metastore has a way to enumerate all the + // SerDe properties directly... + val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap + + val resolvedRelation = + ResolvedDataSource( + hive, + userSpecifiedSchema, + table.getProperty("spark.sql.sources.provider"), + options) + + LogicalRelation(resolvedRelation.relation) + } + } + + CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader) + } + + def refreshTable(databaseName: String, tableName: String): Unit = { + cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase) + } + + def invalidateTable(databaseName: String, tableName: String): Unit = { + cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase) + } + val caseSensitive: Boolean = false + def createDataSourceTable( + tableName: String, + userSpecifiedSchema: Option[StructType], + provider: String, + options: Map[String, String]) = { + val (dbName, tblName) = processDatabaseAndTableName("default", tableName) + val tbl = new Table(dbName, tblName) + + tbl.setProperty("spark.sql.sources.provider", provider) + if (userSpecifiedSchema.isDefined) { + tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json) + } + options.foreach { case (key, value) => tbl.setSerdeParam(key, value) } + + tbl.setProperty("EXTERNAL", "TRUE") + tbl.setTableType(TableType.EXTERNAL_TABLE) + + // create the table + synchronized { + client.createTable(tbl, false) + } + } + def tableExists(tableIdentifier: Seq[String]): Boolean = { val tableIdent = processTableIdentifier(tableIdentifier) val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( @@ -72,7 +142,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with hive.sessionState.getCurrentDatabase) val tblName = tableIdent.last val table = client.getTable(databaseName, tblName) - if (table.isView) { + + if (table.getProperty("spark.sql.sources.provider") != null) { + cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase) + } else if (table.isView) { // if the unresolved relation is from hive view // parse the text into logic node. HiveQl.createPlanForView(table, alias) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index c439b9ebfe104..cdff82e3d04d2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.parquet.ParquetRelation +import org.apache.spark.sql.sources.CreateTableUsing import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy} import scala.collection.JavaConversions._ @@ -208,6 +209,16 @@ private[hive] trait HiveStrategies { } } + object HiveDDLStrategy extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) => + ExecutedCommand( + CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil + + case _ => Nil + } + } + case class HiveCommandStrategy(context: HiveContext) extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case describe: DescribeCommand => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 1358a0eccb353..31c7ce96398eb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -395,6 +395,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { clearCache() loadedTables.clear() + catalog.cachedDataSourceTables.invalidateAll() catalog.client.getAllTables("default").foreach { t => logDebug(s"Deleting table $t") val table = catalog.client.getTable("default", t) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 6b733a280e6d5..e70cdeaad4c09 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.SQLContext @@ -52,6 +53,12 @@ case class DropTable( override def run(sqlContext: SQLContext) = { val hiveContext = sqlContext.asInstanceOf[HiveContext] val ifExistsClause = if (ifExists) "IF EXISTS " else "" + try { + hiveContext.tryUncacheQuery(hiveContext.table(tableName)) + } catch { + case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException => + } + hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") hiveContext.catalog.unregisterTable(Seq(tableName)) Seq.empty[Row] @@ -85,3 +92,17 @@ case class AddFile(path: String) extends RunnableCommand { Seq.empty[Row] } } + +case class CreateMetastoreDataSource( + tableName: String, + userSpecifiedSchema: Option[StructType], + provider: String, + options: Map[String, String]) extends RunnableCommand { + + override def run(sqlContext: SQLContext) = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] + hiveContext.catalog.createDataSourceTable(tableName, userSpecifiedSchema, provider, options) + + Seq.empty[Row] + } +} diff --git a/sql/hive/src/test/resources/sample.json b/sql/hive/src/test/resources/sample.json new file mode 100644 index 0000000000000..a2c2ffd5e0330 --- /dev/null +++ b/sql/hive/src/test/resources/sample.json @@ -0,0 +1,2 @@ +{"a" : "2" ,"b" : "blah", "c_!@(3)":1} +{"" : {"d!" : [4, 5], "=" : [{"Dd2": null}, {"Dd2" : true}]}} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala new file mode 100644 index 0000000000000..ec9ebb4a775a3 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File + +import org.scalatest.BeforeAndAfterEach + +import org.apache.commons.io.FileUtils + +import org.apache.spark.sql._ +import org.apache.spark.util.Utils + +/* Implicits */ +import org.apache.spark.sql.hive.test.TestHive._ + +/** + * Tests for persisting tables created though the data sources API into the metastore. + */ +class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { + override def afterEach(): Unit = { + reset() + } + + val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile + + test ("persistent JSON table") { + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + jsonFile(filePath).collect().toSeq) + } + + test ("persistent JSON table with a user specified schema") { + sql( + s""" + |CREATE TABLE jsonTable ( + |a string, + |b String, + |`c_!@(3)` int, + |`` Struct<`d!`:array, `=`:array>>) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + jsonFile(filePath).registerTempTable("expectedJsonTable") + + checkAnswer( + sql("SELECT a, b, `c_!@(3)`, ``.`d!`, ``.`=` FROM jsonTable"), + sql("SELECT a, b, `c_!@(3)`, ``.`d!`, ``.`=` FROM expectedJsonTable").collect().toSeq) + } + + test ("persistent JSON table with a user specified schema with a subset of fields") { + // This works because JSON objects are self-describing and JSONRelation can get needed + // field values based on field names. + sql( + s""" + |CREATE TABLE jsonTable (`` Struct<`=`:array>>, b String) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + val innerStruct = StructType( + StructField("=", ArrayType(StructType(StructField("Dd2", BooleanType, true) :: Nil))) :: Nil) + val expectedSchema = StructType( + StructField("", innerStruct, true) :: + StructField("b", StringType, true) :: Nil) + + assert(expectedSchema == table("jsonTable").schema) + + jsonFile(filePath).registerTempTable("expectedJsonTable") + + checkAnswer( + sql("SELECT b, ``.`=` FROM jsonTable"), + sql("SELECT b, ``.`=` FROM expectedJsonTable").collect().toSeq) + } + + test("resolve shortened provider names") { + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + jsonFile(filePath).collect().toSeq) + } + + test("drop table") { + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + jsonFile(filePath).collect().toSeq) + + sql("DROP TABLE jsonTable") + + intercept[Exception] { + sql("SELECT * FROM jsonTable").collect() + } + } + + test("check change without refresh") { + val tempDir = File.createTempFile("sparksql", "json") + tempDir.delete() + sparkContext.parallelize(("a", "b") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + ("a", "b") :: Nil) + + FileUtils.deleteDirectory(tempDir) + sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + // Schema is cached so the new column does not show. The updated values in existing columns + // will show. + checkAnswer( + sql("SELECT * FROM jsonTable"), + ("a1", "b1") :: Nil) + + refreshTable("jsonTable") + + // Check that the refresh worked + checkAnswer( + sql("SELECT * FROM jsonTable"), + ("a1", "b1", "c1") :: Nil) + FileUtils.deleteDirectory(tempDir) + } + + test("drop, change, recreate") { + val tempDir = File.createTempFile("sparksql", "json") + tempDir.delete() + sparkContext.parallelize(("a", "b") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + ("a", "b") :: Nil) + + FileUtils.deleteDirectory(tempDir) + sparkContext.parallelize(("a", "b", "c") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + sql("DROP TABLE jsonTable") + + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) + + // New table should reflect new schema. + checkAnswer( + sql("SELECT * FROM jsonTable"), + ("a", "b", "c") :: Nil) + FileUtils.deleteDirectory(tempDir) + } + + test("invalidate cache and reload") { + sql( + s""" + |CREATE TABLE jsonTable (`c_!@(3)` int) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + jsonFile(filePath).registerTempTable("expectedJsonTable") + + checkAnswer( + sql("SELECT * FROM jsonTable"), + sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) + + // Discard the cached relation. + invalidateTable("jsonTable") + + checkAnswer( + sql("SELECT * FROM jsonTable"), + sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) + + invalidateTable("jsonTable") + val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil) + + assert(expectedSchema == table("jsonTable").schema) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 700a45edb11d6..4decd1548534b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -623,7 +623,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { assertResult( Array( - Array("# Registered as a temporary table", null, null), Array("a", "IntegerType", null), Array("b", "StringType", null)) ) {