Skip to content

Commit 2d6612c

Browse files
Nathan Howellyhuai
authored andcommitted
[SPARK-5938] [SPARK-5443] [SQL] Improve JsonRDD performance
This patch comprises of a few related pieces of work: * Schema inference is performed directly on the JSON token stream * `String => Row` conversion populate Spark SQL structures without intermediate types * Projection pushdown is implemented via CatalystScan for DataFrame queries * Support for the legacy parser by setting `spark.sql.json.useJacksonStreamingAPI` to `false` Performance improvements depend on the schema and queries being executed, but it should be faster across the board. Below are benchmarks using the last.fm Million Song dataset: ``` Command | Baseline | Patched ---------------------------------------------------|----------|-------- import sqlContext.implicits._ | | val df = sqlContext.jsonFile("/tmp/lastfm.json") | 70.0s | 14.6s df.count() | 28.8s | 6.2s df.rdd.count() | 35.3s | 21.5s df.where($"artist" === "Robert Hood").collect() | 28.3s | 16.9s ``` To prepare this dataset for benchmarking, follow these steps: ``` # Fetch the datasets from http://labrosa.ee.columbia.edu/millionsong/lastfm wget http://labrosa.ee.columbia.edu/millionsong/sites/default/files/lastfm/lastfm_test.zip \ http://labrosa.ee.columbia.edu/millionsong/sites/default/files/lastfm/lastfm_train.zip # Decompress and combine, pipe through `jq -c` to ensure there is one record per line unzip -p lastfm_test.zip lastfm_train.zip | jq -c . > lastfm.json ``` Author: Nathan Howell <[email protected]> Closes apache#5801 from NathanHowell/json-performance and squashes the following commits: 26fea31 [Nathan Howell] Recreate the baseRDD each for each scan operation a7ebeb2 [Nathan Howell] Increase coverage of inserts into a JSONRelation e06a1dd [Nathan Howell] Add comments to the `useJacksonStreamingAPI` config flag 6822712 [Nathan Howell] Split up JsonRDD2 into multiple objects fa8234f [Nathan Howell] Wrap long lines b31917b [Nathan Howell] Rename `useJsonRDD2` to `useJacksonStreamingAPI` 15c5d1b [Nathan Howell] JSONRelation's baseRDD need not be lazy f8add6e [Nathan Howell] Add comments on lack of support for precision and scale DecimalTypes fa0be47 [Nathan Howell] Remove unused default case in the field parser 80dba17 [Nathan Howell] Add comments regarding null handling and empty strings 842846d [Nathan Howell] Point the empty schema inference test at JsonRDD2 ab6ee87 [Nathan Howell] Add projection pushdown support to JsonRDD/JsonRDD2 f636c14 [Nathan Howell] Enable JsonRDD2 by default, add a flag to switch back to JsonRDD 0bbc445 [Nathan Howell] Improve JSON parsing and type inference performance 7ca70c1 [Nathan Howell] Eliminate arrow pattern, replace with pattern matches
1 parent 9cfa9a5 commit 2d6612c

File tree

13 files changed

+715
-128
lines changed

13 files changed

+715
-128
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,33 +26,36 @@ object HiveTypeCoercion {
2626
// See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
2727
// The conversion for integral and floating point types have a linear widening hierarchy:
2828
private val numericPrecedence =
29-
Seq(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType.Unlimited)
29+
IndexedSeq(
30+
ByteType,
31+
ShortType,
32+
IntegerType,
33+
LongType,
34+
FloatType,
35+
DoubleType,
36+
DecimalType.Unlimited)
3037

3138
/**
3239
* Find the tightest common type of two types that might be used in a binary expression.
3340
* This handles all numeric types except fixed-precision decimals interacting with each other or
3441
* with primitive types, because in that case the precision and scale of the result depends on
3542
* the operation. Those rules are implemented in [[HiveTypeCoercion.DecimalPrecision]].
3643
*/
37-
def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = {
38-
val valueTypes = Seq(t1, t2).filter(t => t != NullType)
39-
if (valueTypes.distinct.size > 1) {
40-
// Promote numeric types to the highest of the two and all numeric types to unlimited decimal
41-
if (numericPrecedence.contains(t1) && numericPrecedence.contains(t2)) {
42-
Some(numericPrecedence.filter(t => t == t1 || t == t2).last)
43-
} else if (t1.isInstanceOf[DecimalType] && t2.isInstanceOf[DecimalType]) {
44-
// Fixed-precision decimals can up-cast into unlimited
45-
if (t1 == DecimalType.Unlimited || t2 == DecimalType.Unlimited) {
46-
Some(DecimalType.Unlimited)
47-
} else {
48-
None
49-
}
50-
} else {
51-
None
52-
}
53-
} else {
54-
Some(if (valueTypes.size == 0) NullType else valueTypes.head)
55-
}
44+
val findTightestCommonType: (DataType, DataType) => Option[DataType] = {
45+
case (t1, t2) if t1 == t2 => Some(t1)
46+
case (NullType, t1) => Some(t1)
47+
case (t1, NullType) => Some(t1)
48+
49+
// Promote numeric types to the highest of the two and all numeric types to unlimited decimal
50+
case (t1, t2) if Seq(t1, t2).forall(numericPrecedence.contains) =>
51+
val index = numericPrecedence.lastIndexWhere(t => t == t1 || t == t2)
52+
Some(numericPrecedence(index))
53+
54+
// Fixed-precision decimals can up-cast into unlimited
55+
case (DecimalType.Unlimited, _: DecimalType) => Some(DecimalType.Unlimited)
56+
case (_: DecimalType, DecimalType.Unlimited) => Some(DecimalType.Unlimited)
57+
58+
case _ => None
5659
}
5760
}
5861

sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
134134
throw new IllegalArgumentException(s"""Field "$name" does not exist."""))
135135
}
136136

137+
private[sql] def getFieldIndex(name: String): Option[Int] = {
138+
nameToIndex.get(name)
139+
}
140+
137141
protected[sql] def toAttributes: Seq[AttributeReference] =
138142
map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
139143

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
4242
import org.apache.spark.sql.catalyst.plans.logical._
4343
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
4444
import org.apache.spark.sql.jdbc.JDBCWriteDetails
45-
import org.apache.spark.sql.json.JsonRDD
45+
import org.apache.spark.sql.json.{JacksonGenerator, JsonRDD}
4646
import org.apache.spark.sql.types._
4747
import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
4848
import org.apache.spark.util.Utils
@@ -1415,7 +1415,7 @@ class DataFrame private[sql](
14151415
new Iterator[String] {
14161416
override def hasNext: Boolean = iter.hasNext
14171417
override def next(): String = {
1418-
JsonRDD.rowToJSON(rowSchema, gen)(iter.next())
1418+
JacksonGenerator(rowSchema, gen)(iter.next())
14191419
gen.flush()
14201420

14211421
val json = writer.toString

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ private[spark] object SQLConf {
7373

7474
val USE_SQL_SERIALIZER2 = "spark.sql.useSerializer2"
7575

76+
val USE_JACKSON_STREAMING_API = "spark.sql.json.useJacksonStreamingAPI"
77+
7678
object Deprecated {
7779
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
7880
}
@@ -166,6 +168,12 @@ private[sql] class SQLConf extends Serializable {
166168

167169
private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2, "true").toBoolean
168170

171+
/**
172+
* Selects between the new (true) and old (false) JSON handlers, to be removed in Spark 1.5.0
173+
*/
174+
private[spark] def useJacksonStreamingAPI: Boolean =
175+
getConf(USE_JACKSON_STREAMING_API, "true").toBoolean
176+
169177
/**
170178
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
171179
* a broadcast value during the physical executions of join operations. Setting this to -1

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -659,13 +659,17 @@ class SQLContext(@transient val sparkContext: SparkContext)
659659
*/
660660
@Experimental
661661
def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
662-
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
663-
val appliedSchema =
664-
Option(schema).getOrElse(
665-
JsonRDD.nullTypeToStringType(
666-
JsonRDD.inferSchema(json, 1.0, columnNameOfCorruptJsonRecord)))
667-
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
668-
createDataFrame(rowRDD, appliedSchema, needsConversion = false)
662+
if (conf.useJacksonStreamingAPI) {
663+
baseRelationToDataFrame(new JSONRelation(() => json, None, 1.0, Some(schema))(this))
664+
} else {
665+
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
666+
val appliedSchema =
667+
Option(schema).getOrElse(
668+
JsonRDD.nullTypeToStringType(
669+
JsonRDD.inferSchema(json, 1.0, columnNameOfCorruptJsonRecord)))
670+
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
671+
createDataFrame(rowRDD, appliedSchema, needsConversion = false)
672+
}
669673
}
670674

671675
/**
@@ -689,12 +693,16 @@ class SQLContext(@transient val sparkContext: SparkContext)
689693
*/
690694
@Experimental
691695
def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
692-
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
693-
val appliedSchema =
694-
JsonRDD.nullTypeToStringType(
695-
JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord))
696-
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
697-
createDataFrame(rowRDD, appliedSchema, needsConversion = false)
696+
if (conf.useJacksonStreamingAPI) {
697+
baseRelationToDataFrame(new JSONRelation(() => json, None, samplingRatio, None)(this))
698+
} else {
699+
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
700+
val appliedSchema =
701+
JsonRDD.nullTypeToStringType(
702+
JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord))
703+
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
704+
createDataFrame(rowRDD, appliedSchema, needsConversion = false)
705+
}
698706
}
699707

700708
/**
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.json
19+
20+
import com.fasterxml.jackson.core._
21+
22+
import org.apache.spark.rdd.RDD
23+
import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
24+
import org.apache.spark.sql.json.JacksonUtils.nextUntil
25+
import org.apache.spark.sql.types._
26+
27+
private[sql] object InferSchema {
28+
/**
29+
* Infer the type of a collection of json records in three stages:
30+
* 1. Infer the type of each record
31+
* 2. Merge types by choosing the lowest type necessary to cover equal keys
32+
* 3. Replace any remaining null fields with string, the top type
33+
*/
34+
def apply(
35+
json: RDD[String],
36+
samplingRatio: Double = 1.0,
37+
columnNameOfCorruptRecords: String): StructType = {
38+
require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
39+
val schemaData = if (samplingRatio > 0.99) {
40+
json
41+
} else {
42+
json.sample(withReplacement = false, samplingRatio, 1)
43+
}
44+
45+
// perform schema inference on each row and merge afterwards
46+
schemaData.mapPartitions { iter =>
47+
val factory = new JsonFactory()
48+
iter.map { row =>
49+
try {
50+
val parser = factory.createParser(row)
51+
parser.nextToken()
52+
inferField(parser)
53+
} catch {
54+
case _: JsonParseException =>
55+
StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
56+
}
57+
}
58+
}.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, compatibleRootType) match {
59+
case st: StructType => nullTypeToStringType(st)
60+
}
61+
}
62+
63+
/**
64+
* Infer the type of a json document from the parser's token stream
65+
*/
66+
private def inferField(parser: JsonParser): DataType = {
67+
import com.fasterxml.jackson.core.JsonToken._
68+
parser.getCurrentToken match {
69+
case null | VALUE_NULL => NullType
70+
71+
case FIELD_NAME =>
72+
parser.nextToken()
73+
inferField(parser)
74+
75+
case VALUE_STRING if parser.getTextLength < 1 =>
76+
// Zero length strings and nulls have special handling to deal
77+
// with JSON generators that do not distinguish between the two.
78+
// To accurately infer types for empty strings that are really
79+
// meant to represent nulls we assume that the two are isomorphic
80+
// but will defer treating null fields as strings until all the
81+
// record fields' types have been combined.
82+
NullType
83+
84+
case VALUE_STRING => StringType
85+
case START_OBJECT =>
86+
val builder = Seq.newBuilder[StructField]
87+
while (nextUntil(parser, END_OBJECT)) {
88+
builder += StructField(parser.getCurrentName, inferField(parser), nullable = true)
89+
}
90+
91+
StructType(builder.result().sortBy(_.name))
92+
93+
case START_ARRAY =>
94+
// If this JSON array is empty, we use NullType as a placeholder.
95+
// If this array is not empty in other JSON objects, we can resolve
96+
// the type as we pass through all JSON objects.
97+
var elementType: DataType = NullType
98+
while (nextUntil(parser, END_ARRAY)) {
99+
elementType = compatibleType(elementType, inferField(parser))
100+
}
101+
102+
ArrayType(elementType)
103+
104+
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
105+
import JsonParser.NumberType._
106+
parser.getNumberType match {
107+
// For Integer values, use LongType by default.
108+
case INT | LONG => LongType
109+
// Since we do not have a data type backed by BigInteger,
110+
// when we see a Java BigInteger, we use DecimalType.
111+
case BIG_INTEGER | BIG_DECIMAL => DecimalType.Unlimited
112+
case FLOAT | DOUBLE => DoubleType
113+
}
114+
115+
case VALUE_TRUE | VALUE_FALSE => BooleanType
116+
}
117+
}
118+
119+
private def nullTypeToStringType(struct: StructType): StructType = {
120+
val fields = struct.fields.map {
121+
case StructField(fieldName, dataType, nullable, _) =>
122+
val newType = dataType match {
123+
case NullType => StringType
124+
case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull)
125+
case ArrayType(struct: StructType, containsNull) =>
126+
ArrayType(nullTypeToStringType(struct), containsNull)
127+
case struct: StructType =>nullTypeToStringType(struct)
128+
case other: DataType => other
129+
}
130+
131+
StructField(fieldName, newType, nullable)
132+
}
133+
134+
StructType(fields)
135+
}
136+
137+
/**
138+
* Remove top-level ArrayType wrappers and merge the remaining schemas
139+
*/
140+
private def compatibleRootType: (DataType, DataType) => DataType = {
141+
case (ArrayType(ty1, _), ty2) => compatibleRootType(ty1, ty2)
142+
case (ty1, ArrayType(ty2, _)) => compatibleRootType(ty1, ty2)
143+
case (ty1, ty2) => compatibleType(ty1, ty2)
144+
}
145+
146+
/**
147+
* Returns the most general data type for two given data types.
148+
*/
149+
private[json] def compatibleType(t1: DataType, t2: DataType): DataType = {
150+
HiveTypeCoercion.findTightestCommonType(t1, t2).getOrElse {
151+
// t1 or t2 is a StructType, ArrayType, or an unexpected type.
152+
(t1, t2) match {
153+
case (other: DataType, NullType) => other
154+
case (NullType, other: DataType) => other
155+
case (StructType(fields1), StructType(fields2)) =>
156+
val newFields = (fields1 ++ fields2).groupBy(field => field.name).map {
157+
case (name, fieldTypes) =>
158+
val dataType = fieldTypes.view.map(_.dataType).reduce(compatibleType)
159+
StructField(name, dataType, nullable = true)
160+
}
161+
StructType(newFields.toSeq.sortBy(_.name))
162+
163+
case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) =>
164+
ArrayType(compatibleType(elementType1, elementType2), containsNull1 || containsNull2)
165+
166+
// strings and every string is a Json object.
167+
case (_, _) => StringType
168+
}
169+
}
170+
}
171+
}

0 commit comments

Comments
 (0)