Skip to content

Commit 7787ec7

Browse files
committed
added SchemaRelationProvider
1 parent 0ba70df commit 7787ec7

File tree

7 files changed

+41
-12
lines changed

7 files changed

+41
-12
lines changed

sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ import org.apache.spark.sql.SQLContext
2121
import org.apache.spark.sql.catalyst.types.StructType
2222
import org.apache.spark.sql.sources._
2323

24-
private[sql] class DefaultSource extends RelationProvider {
24+
private[sql] class DefaultSource extends SchemaRelationProvider {
2525
/** Returns a new base relation with the given parameters. */
2626
override def createRelation(
2727
sqlContext: SQLContext,
2828
parameters: Map[String, String],
29-
schema: Option[StructType]): BaseRelation = {
29+
schema: Option[StructType] = None): BaseRelation = {
3030
val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
3131
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
3232

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ import scala.collection.JavaConversions._
4343
* required is `path`, which should be the location of a collection of, optionally partitioned,
4444
* parquet files.
4545
*/
46-
class DefaultSource extends RelationProvider {
46+
class DefaultSource extends SchemaRelationProvider {
4747
/** Returns a new base relation with the given parameters. */
4848
override def createRelation(
4949
sqlContext: SQLContext,
5050
parameters: Map[String, String],
51-
schema: Option[StructType]): BaseRelation = {
51+
schema: Option[StructType] = None): BaseRelation = {
5252
val path =
5353
parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables."))
5454

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,8 @@ private[sql] case class CreateTableUsing(
217217
sys.error(s"Failed to load class for data source: $provider")
218218
}
219219
}
220-
val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
220+
val dataSource =
221+
clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider]
221222
val relation = dataSource.createRelation(
222223
sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols)))
223224

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,35 @@ trait RelationProvider {
4141
* Note: the parameters' keywords are case insensitive and this insensitivity is enforced
4242
* by the Map that is passed to the function.
4343
*/
44+
def createRelation(
45+
sqlContext: SQLContext,
46+
parameters: Map[String, String]): BaseRelation
47+
}
48+
49+
/**
50+
* ::DeveloperApi::
51+
* Implemented by objects that produce relations for a specific kind of data source. When
52+
* Spark SQL is given a DDL operation with a USING clause specified and user defined schema optionally,
53+
* this interface is used to pass in the parameters specified by a user.
54+
*
55+
* Users may specify the fully qualified class name of a given data source. When that class is
56+
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
57+
* less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the
58+
* data source 'org.apache.spark.sql.json.DefaultSource'
59+
*
60+
* A new instance of this class with be instantiated each time a DDL call is made.
61+
*/
62+
@DeveloperApi
63+
trait SchemaRelationProvider {
64+
/**
65+
* Returns a new base relation with the given parameters and user defined schema.
66+
* Note: the parameters' keywords are case insensitive and this insensitivity is enforced
67+
* by the Map that is passed to the function.
68+
*/
4469
def createRelation(
4570
sqlContext: SQLContext,
4671
parameters: Map[String, String],
47-
schema: Option[StructType]): BaseRelation
72+
schema: Option[StructType] = None): BaseRelation
4873
}
4974

5075
/**

sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ import scala.language.existentials
2121

2222
import org.apache.spark.sql._
2323

24-
class FilteredScanSource extends RelationProvider {
24+
class FilteredScanSource extends SchemaRelationProvider {
2525
override def createRelation(
2626
sqlContext: SQLContext,
27-
parameters: Map[String, String]): BaseRelation = {
27+
parameters: Map[String, String],
28+
schema: Option[StructType] = None): BaseRelation = {
2829
SimpleFilteredScan(parameters("from").toInt, parameters("to").toInt)(sqlContext)
2930
}
3031
}

sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ package org.apache.spark.sql.sources
1919

2020
import org.apache.spark.sql._
2121

22-
class PrunedScanSource extends RelationProvider {
22+
class PrunedScanSource extends SchemaRelationProvider {
2323
override def createRelation(
2424
sqlContext: SQLContext,
25-
parameters: Map[String, String]): BaseRelation = {
25+
parameters: Map[String, String],
26+
schema: Option[StructType] = None): BaseRelation = {
2627
SimplePrunedScan(parameters("from").toInt, parameters("to").toInt)(sqlContext)
2728
}
2829
}

sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ import org.apache.spark.sql._
2121

2222
class DefaultSource extends SimpleScanSource
2323

24-
class SimpleScanSource extends RelationProvider {
24+
class SimpleScanSource extends SchemaRelationProvider {
2525
override def createRelation(
2626
sqlContext: SQLContext,
27-
parameters: Map[String, String]): BaseRelation = {
27+
parameters: Map[String, String],
28+
schema: Option[StructType] = None): BaseRelation = {
2829
SimpleScan(parameters("from").toInt, parameters("TO").toInt)(sqlContext)
2930
}
3031
}

0 commit comments

Comments
 (0)