Skip to content

Commit 949d6bb

Browse files
committed
When creating a SchemaRDD for a JSON dataset, users can apply an existing schema.
1 parent 7a6a7e5 commit 949d6bb

File tree

2 files changed

+31
-8
lines changed

2 files changed

+31
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,23 @@ class SQLContext(@transient val sparkContext: SparkContext)
128128
*
129129
* @group userf
130130
*/
131-
def jsonFile(path: String): SchemaRDD = jsonFile(path, 1.0)
131+
def jsonFile(path: String): SchemaRDD = jsonFile(path, 1.0, None)
132+
133+
/**
134+
* Loads a JSON file (one object per line) and applies the given schema,
135+
* returning the result as a [[SchemaRDD]].
136+
*
137+
* @group userf
138+
*/
139+
def jsonFile(path: String, schema: StructType): SchemaRDD = jsonFile(path, 1.0, Option(schema))
132140

133141
/**
134142
* :: Experimental ::
135143
*/
136144
@Experimental
137-
def jsonFile(path: String, samplingRatio: Double): SchemaRDD = {
145+
def jsonFile(path: String, samplingRatio: Double, schema: Option[StructType]): SchemaRDD = {
138146
val json = sparkContext.textFile(path)
139-
jsonRDD(json, samplingRatio)
147+
jsonRDD(json, samplingRatio, schema)
140148
}
141149

142150
/**
@@ -146,15 +154,28 @@ class SQLContext(@transient val sparkContext: SparkContext)
146154
*
147155
* @group userf
148156
*/
149-
def jsonRDD(json: RDD[String]): SchemaRDD = jsonRDD(json, 1.0)
157+
def jsonRDD(json: RDD[String]): SchemaRDD = jsonRDD(json, 1.0, None)
158+
159+
/**
160+
* Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
161+
* returning the result as a [[SchemaRDD]].
162+
*
163+
* @group userf
164+
*/
165+
def jsonRDD(json: RDD[String], schema: StructType): SchemaRDD = jsonRDD(json, 1.0, Option(schema))
150166

151167
/**
152168
* :: Experimental ::
153169
*/
154170
@Experimental
155-
def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = {
156-
val schema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, samplingRatio))
157-
applySchemaToPartitions(json, schema, JsonRDD.jsonStringToRow(schema, _: Iterator[String]))
171+
def jsonRDD(json: RDD[String], samplingRatio: Double, schema: Option[StructType]): SchemaRDD = {
172+
val appliedSchema =
173+
schema.getOrElse(JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, samplingRatio)))
174+
175+
applySchemaToPartitions(
176+
json,
177+
appliedSchema,
178+
JsonRDD.jsonStringToRow(appliedSchema, _: Iterator[String]))
158179
}
159180

160181
/**

sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ import org.apache.spark.sql.Logging
3131

3232
private[sql] object JsonRDD extends Logging {
3333

34-
private[sql] def jsonStringToRow(schema: StructType, jsonIter: Iterator[String]): Iterator[Row] = {
34+
private[sql] def jsonStringToRow(
35+
schema: StructType,
36+
jsonIter: Iterator[String]): Iterator[Row] = {
3537
parseJson(jsonIter).map(parsed => asRow(parsed, schema))
3638
}
3739

0 commit comments

Comments
 (0)