Skip to content

Conversation

NathanHowell
Copy link

This patch comprises of a few related pieces of work:

  • Schema inference is performed directly on the JSON token stream
  • String => Row conversion populate Spark SQL structures without intermediate types
  • Projection pushdown is implemented via CatalystScan for DataFrame queries
  • Support for the legacy parser by setting spark.sql.json.useJacksonStreamingAPI to false

Performance improvements depend on the schema and queries being executed, but it should be faster across the board. Below are benchmarks using the last.fm Million Song dataset:

Command                                            | Baseline | Patched
---------------------------------------------------|----------|--------
import sqlContext.implicits._                      |          |
val df = sqlContext.jsonFile("/tmp/lastfm.json")   |    70.0s |   14.6s
df.count()                                         |    28.8s |    6.2s
df.rdd.count()                                     |    35.3s |   21.5s
df.where($"artist" === "Robert Hood").collect()    |    28.3s |   16.9s

To prepare this dataset for benchmarking, follow these steps:

# Fetch the datasets from http://labrosa.ee.columbia.edu/millionsong/lastfm
wget http://labrosa.ee.columbia.edu/millionsong/sites/default/files/lastfm/lastfm_test.zip \
     http://labrosa.ee.columbia.edu/millionsong/sites/default/files/lastfm/lastfm_train.zip

# Decompress and combine, pipe through `jq -c` to ensure there is one record per line
unzip -p lastfm_test.zip lastfm_train.zip  | jq -c . > lastfm.json

@rxin
Copy link
Contributor

rxin commented Apr 30, 2015

Jenkins, ok to test.

@rxin
Copy link
Contributor

rxin commented Apr 30, 2015

I won't have time to look at this today, but this is pretty cool.

@NathanHowell
Copy link
Author

Looks like it may also resolve SPARK-5443.

@rxin
Copy link
Contributor

rxin commented Apr 30, 2015

Can you put both JIRA tickets in the title? It will then automatically linked to both tickets.

@NathanHowell NathanHowell changed the title [SPARK-5938][SQL] Improve JsonRDD performance [SPARK-5938][SPARK-5443][SQL] Improve JsonRDD performance Apr 30, 2015
@NathanHowell
Copy link
Author

Done.

@SparkQA
Copy link

SparkQA commented Apr 30, 2015

Test build #31399 has finished for PR 5801 at commit 1abf1d6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KMeansModel (
    • trait PMMLExportable
  • This patch adds the following new dependencies:
    • jaxb-api-2.2.7.jar
    • jaxb-core-2.2.7.jar
    • jaxb-impl-2.2.7.jar
    • pmml-agent-1.1.15.jar
    • pmml-model-1.1.15.jar
    • pmml-schema-1.1.15.jar
  • This patch removes the following dependencies:
    • activation-1.1.jar
    • jaxb-api-2.2.2.jar
    • jaxb-impl-2.2.3-1.jar

@NathanHowell
Copy link
Author

Benchmarked a small-ish real dataset... Runs are with 5 executors (for 5 input splits) with data in HDFS:

step before after
val df = sqlContext.jsonRDD(...) - schema inference 37.14s 18.16s
df.count() 125.8s 25.7s
df.select("col1").count() 96.9s 26.5s

Not sure why but the new code seems a bit slower when using projection pushdowns. It may be schema dependent or overhead from evaluating the projection expression.

@SparkQA
Copy link

SparkQA commented Apr 30, 2015

Test build #31449 has finished for PR 5801 at commit 55c2f39.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@marmbrus
Copy link
Contributor

marmbrus commented May 1, 2015

/cc @yhuai

@SparkQA
Copy link

SparkQA commented May 1, 2015

Test build #31526 has finished for PR 5801 at commit 67c381a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@NathanHowell
Copy link
Author

I think it's in a decent state now, if this qualifies for the 1.4.0 merge window I'll make time to work through any remaining issues (if any).

@SparkQA
Copy link

SparkQA commented May 1, 2015

Test build #31544 has finished for PR 5801 at commit bd2e929.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented May 1, 2015

@NathanHowell This is great! Is it possible to add a feature flag to choose what code path we use? By default, we use the this new code path. But, we still keep the option to use the old one in case there is any issue. Then, in 1.5, we can remove the old code path. What do you think?

@NathanHowell
Copy link
Author

@yhuai Fine with me, I'm reworking the patch set now.

@NathanHowell
Copy link
Author

@yhuai The updated patches do not test the old code. Do you have an opinion on the best way to address this? I can duplicate the entire JsonSuite or try to do something a bit better...

@marmbrus
Copy link
Contributor

marmbrus commented May 1, 2015

I'm okay with freezing the old code and not having tests. I just want a
quick fall back if a regression is found.
On May 1, 2015 4:01 PM, "Nathan Howell" [email protected] wrote:

@yhuai https://github.com/yhuai The updated patches do not test the old
code. Do you have an opinion on the best way to address this? I can
duplicate the entire JsonSuite or try to do something a bit better...


Reply to this email directly or view it on GitHub
#5801 (comment).

@NathanHowell
Copy link
Author

@marmbrus sounds good, I'll leave it as is.

@SparkQA
Copy link

SparkQA commented May 2, 2015

Test build #31626 has finished for PR 5801 at commit ab6ee87.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DataFrameStatFunctions(object):

@SparkQA
Copy link

SparkQA commented May 2, 2015

Test build #31630 has finished for PR 5801 at commit 842846d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

parser.nextToken()
inferField(parser)

case VALUE_STRING if parser.getTextLength < 1 => NullType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that we get an empty string? If so, can we keep the StringType? Otherwise, I feel we are destroying information.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, an empty string gets inferred as a NullType. After inference is
complete any remaining NullType fields get converted back to a StringType.
The old code does this and has test coverage for it, but it does seem a bit
odd.
On May 3, 2015 9:03 PM, "Yin Huai" [email protected] wrote:

In sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD2.scala
#5801 (comment):

  • }
  • }
  • /**
  • * Infer the type of a json document from the parser's token stream
  • */
  • private def inferField(parser: JsonParser): DataType = {
  • import com.fasterxml.jackson.core.JsonToken._
  • parser.getCurrentToken match {
  •  case null | VALUE_NULL => NullType
    
  •  case FIELD_NAME =>
    
  •    parser.nextToken()
    
  •    inferField(parser)
    
  •  case VALUE_STRING if parser.getTextLength < 1 => NullType
    

Does it mean that we get an empty string? If so, can we keep the
StringType? Otherwise, I feel we are destroying information.


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/5801/files#r29565168.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see. It is for those datasets that use a "" as a null. Since it is for inferring the type, it is same to use NullType because we can always get the StringType from other records. Actually, can we add a comment at there to explain its purpose?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@rxin
Copy link
Contributor

rxin commented May 4, 2015

You can try get some tweets and use that. That's what I usually demo on, but unfortunately I don't think it is legal to make collected tweets public.

There are some stuff here: https://www.opensciencedatacloud.org/publicdata/city-of-chicago-public-datasets/

@NathanHowell
Copy link
Author

@rxin thanks, the datasets there are currently not mounted on their rsync endpoint so I found another (last.fm) dataset that is about 2G and timed a few queries.

@rxin
Copy link
Contributor

rxin commented May 5, 2015

Slightly off topic - @NathanHowell do you know if Jackson allows returning UTF-8 encoded strings directly? If it supports that, we can skip string decoding/encoding altogether, since Spark SQL internally now uses UTF-8 encoded bytes for strings.

@NathanHowell
Copy link
Author

@rxin It supports writing a UTF8 encoding byte array, but there doesn't seem to be equivalent support for reads.. best that can be done is converting the current char[] buffer and offset/length directly to a byte[], avoiding an alloc/copy to String.

see: http://fasterxml.github.io/jackson-core/javadoc/2.3.0/com/fasterxml/jackson/core/base/ParserMinimalBase.html#getTextCharacters()
and http://fasterxml.github.io/jackson-core/javadoc/2.3.0/com/fasterxml/jackson/core/JsonGenerator.html#writeUTF8String(byte[], int, int)

@NathanHowell
Copy link
Author

@yhuai Is there still time to get this in for 1.4.0?

@marmbrus
Copy link
Contributor

marmbrus commented May 5, 2015

Yeah, given that there is a flag I think we can still include this.

@yhuai
Copy link
Contributor

yhuai commented May 5, 2015

@NathanHowell I will take a final check tomorrow. Can you also add the performance number of selecting all columns in the description? You can use df.rdd.count as the command to compare two versions.

@@ -160,6 +162,9 @@ private[sql] class SQLConf extends Serializable {

private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2, "true").toBoolean

private[spark] def useJacksonStreamingAPI: Boolean =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comment to explain that it is a temporary flag and we will remove the old code path in 1.5?

@yhuai
Copy link
Contributor

yhuai commented May 5, 2015

@NathanHowell I played with it. The issue I found is that insert does not work well because baseRDD is an input parameter of the JSON relation. For example, with the following code, we will have an exception.

sql(
      s"""
        |CREATE TEMPORARY TABLE jsonTable (a int, b string)
        |USING org.apache.spark.sql.json.DefaultSource
        |OPTIONS (
        |  path '/tmp/jsonTable'
        |)
      """.stripMargin)
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 5)
jsonRDD(rdd).registerTempTable("jt")
sql(
      s"""
        |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt
      """.stripMargin)

val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 1)
jsonRDD(rdd1).registerTempTable("jt1")
sql(
      s"""
        |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt1
      """.stripMargin)

The exception is something like

org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 6.0 failed 1 times, most recent failure: Lost task 3.0 in stage 6.0 (TID 31, localhost): java.io.FileNotFoundException: File file:/tmp/testJson/part-00003 does not exist
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:520)
    at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:398)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:137)
    at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:339)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:763)
    at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:106)
    at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:235)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:212)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

@NathanHowell
Copy link
Author

@yhuai I'll be able to check on this a bit later today.

@yhuai
Copy link
Contributor

yhuai commented May 5, 2015

Seems our test cases are not sufficient to catch the problem. Can you also add the following test cases.

In InsertSuite, let's change the val rdd defined in beforeAll to val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 5). Then, let's change the test of INSERT OVERWRITE a JSONRelation multiple times to

test("INSERT OVERWRITE a JSONRelation multiple times") {
  sql(
    s"""
      |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt
    """.stripMargin)
  checkAnswer(
    sql("SELECT a, b FROM jsonTable"),
    (1 to 10).map(i => Row(i, s"str$i"))
  )

  // Writing the table to less part files.
  val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 5)
  jsonRDD(rdd1).registerTempTable("jt1")
  sql(
    s"""
    |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt1
    """.stripMargin)
  checkAnswer(
    sql("SELECT a, b FROM jsonTable"),
    (1 to 10).map(i => Row(i, s"str$i"))
  )

  // Writing the table to more part files.
  val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 10)
  jsonRDD(rdd2).registerTempTable("jt2")
  sql(
    s"""
    |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt2
    """.stripMargin)
  checkAnswer(
    sql("SELECT a, b FROM jsonTable"),
    (1 to 10).map(i => Row(i, s"str$i"))
  )

  sql(
    s"""
      |INSERT OVERWRITE TABLE jsonTable SELECT a * 10, b FROM jt1
    """.stripMargin)
  checkAnswer(
    sql("SELECT a, b FROM jsonTable"),
    (1 to 10).map(i => Row(i * 10, s"str$i"))
  )

  dropTempTable("jt1")
  dropTempTable("jt2")
}

Also, add the following in the InsertSuite.

test("save directly to the path of a JSON table") {
  table("jt").selectExpr("a * 5 as a", "b").save(path.toString, "json", SaveMode.Overwrite)
  checkAnswer(
    sql("SELECT a, b FROM jsonTable"),
    (1 to 10).map(i => Row(i * 5, s"str$i"))
  )

  table("jt").save(path.toString, "json", SaveMode.Overwrite)
  checkAnswer(
    sql("SELECT a, b FROM jsonTable"),
    (1 to 10).map(i => Row(i, s"str$i"))
  )
}

@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #31981 has finished for PR 5801 at commit e91d8c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@NathanHowell
Copy link
Author

@yhuai I've added the tests and fixed the failures, the change was minor... changed the type of baseRDD back to => RDD[String] and added some comments.

private[sql] class JSONRelation(
// baseRDD needs to be created on scan and not when JSONRelation is
// constructed, so we need a function (call by name) instead of a value
baseRDD: => RDD[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you update the comment to document why it needs to be created on scan?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we explicitly pass a closure (more reader friendly)?

@NathanHowell
Copy link
Author

@rxin yep, I've updated the comment.

// underlying inputs are modified. To be safe, a call-by-name
// value (a function) is used instead of a regular value to
// ensure the RDD is recreated on each and every operation.
baseRDD: => RDD[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NathanHowell Do you think a closure at here will be better (as mentioned in https://github.com/databricks/scala-style-guide#call_by_name)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is corrected.

@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #32000 has finished for PR 5801 at commit e1187eb.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #32001 has finished for PR 5801 at commit 26fea31.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented May 7, 2015

Thank you! LGTM. I am merging it to master and branch 1.4.

asfgit pushed a commit that referenced this pull request May 7, 2015
This patch comprises of a few related pieces of work:

* Schema inference is performed directly on the JSON token stream
* `String => Row` conversion populate Spark SQL structures without intermediate types
* Projection pushdown is implemented via CatalystScan for DataFrame queries
* Support for the legacy parser by setting `spark.sql.json.useJacksonStreamingAPI` to `false`

Performance improvements depend on the schema and queries being executed, but it should be faster across the board. Below are benchmarks using the last.fm Million Song dataset:

```
Command                                            | Baseline | Patched
---------------------------------------------------|----------|--------
import sqlContext.implicits._                      |          |
val df = sqlContext.jsonFile("/tmp/lastfm.json")   |    70.0s |   14.6s
df.count()                                         |    28.8s |    6.2s
df.rdd.count()                                     |    35.3s |   21.5s
df.where($"artist" === "Robert Hood").collect()    |    28.3s |   16.9s
```

To prepare this dataset for benchmarking, follow these steps:

```
# Fetch the datasets from http://labrosa.ee.columbia.edu/millionsong/lastfm
wget http://labrosa.ee.columbia.edu/millionsong/sites/default/files/lastfm/lastfm_test.zip \
     http://labrosa.ee.columbia.edu/millionsong/sites/default/files/lastfm/lastfm_train.zip

# Decompress and combine, pipe through `jq -c` to ensure there is one record per line
unzip -p lastfm_test.zip lastfm_train.zip  | jq -c . > lastfm.json
```

Author: Nathan Howell <[email protected]>

Closes #5801 from NathanHowell/json-performance and squashes the following commits:

26fea31 [Nathan Howell] Recreate the baseRDD each for each scan operation
a7ebeb2 [Nathan Howell] Increase coverage of inserts into a JSONRelation
e06a1dd [Nathan Howell] Add comments to the `useJacksonStreamingAPI` config flag
6822712 [Nathan Howell] Split up JsonRDD2 into multiple objects
fa8234f [Nathan Howell] Wrap long lines
b31917b [Nathan Howell] Rename `useJsonRDD2` to `useJacksonStreamingAPI`
15c5d1b [Nathan Howell] JSONRelation's baseRDD need not be lazy
f8add6e [Nathan Howell] Add comments on lack of support for precision and scale DecimalTypes
fa0be47 [Nathan Howell] Remove unused default case in the field parser
80dba17 [Nathan Howell] Add comments regarding null handling and empty strings
842846d [Nathan Howell] Point the empty schema inference test at JsonRDD2
ab6ee87 [Nathan Howell] Add projection pushdown support to JsonRDD/JsonRDD2
f636c14 [Nathan Howell] Enable JsonRDD2 by default, add a flag to switch back to JsonRDD
0bbc445 [Nathan Howell] Improve JSON parsing and type inference performance
7ca70c1 [Nathan Howell] Eliminate arrow pattern, replace with pattern matches

(cherry picked from commit 2d6612c)
Signed-off-by: Yin Huai <[email protected]>
@asfgit asfgit closed this in 2d6612c May 7, 2015
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 28, 2015
This patch comprises of a few related pieces of work:

* Schema inference is performed directly on the JSON token stream
* `String => Row` conversion populate Spark SQL structures without intermediate types
* Projection pushdown is implemented via CatalystScan for DataFrame queries
* Support for the legacy parser by setting `spark.sql.json.useJacksonStreamingAPI` to `false`

Performance improvements depend on the schema and queries being executed, but it should be faster across the board. Below are benchmarks using the last.fm Million Song dataset:

```
Command                                            | Baseline | Patched
---------------------------------------------------|----------|--------
import sqlContext.implicits._                      |          |
val df = sqlContext.jsonFile("/tmp/lastfm.json")   |    70.0s |   14.6s
df.count()                                         |    28.8s |    6.2s
df.rdd.count()                                     |    35.3s |   21.5s
df.where($"artist" === "Robert Hood").collect()    |    28.3s |   16.9s
```

To prepare this dataset for benchmarking, follow these steps:

```
# Fetch the datasets from http://labrosa.ee.columbia.edu/millionsong/lastfm
wget http://labrosa.ee.columbia.edu/millionsong/sites/default/files/lastfm/lastfm_test.zip \
     http://labrosa.ee.columbia.edu/millionsong/sites/default/files/lastfm/lastfm_train.zip

# Decompress and combine, pipe through `jq -c` to ensure there is one record per line
unzip -p lastfm_test.zip lastfm_train.zip  | jq -c . > lastfm.json
```

Author: Nathan Howell <[email protected]>

Closes apache#5801 from NathanHowell/json-performance and squashes the following commits:

26fea31 [Nathan Howell] Recreate the baseRDD each for each scan operation
a7ebeb2 [Nathan Howell] Increase coverage of inserts into a JSONRelation
e06a1dd [Nathan Howell] Add comments to the `useJacksonStreamingAPI` config flag
6822712 [Nathan Howell] Split up JsonRDD2 into multiple objects
fa8234f [Nathan Howell] Wrap long lines
b31917b [Nathan Howell] Rename `useJsonRDD2` to `useJacksonStreamingAPI`
15c5d1b [Nathan Howell] JSONRelation's baseRDD need not be lazy
f8add6e [Nathan Howell] Add comments on lack of support for precision and scale DecimalTypes
fa0be47 [Nathan Howell] Remove unused default case in the field parser
80dba17 [Nathan Howell] Add comments regarding null handling and empty strings
842846d [Nathan Howell] Point the empty schema inference test at JsonRDD2
ab6ee87 [Nathan Howell] Add projection pushdown support to JsonRDD/JsonRDD2
f636c14 [Nathan Howell] Enable JsonRDD2 by default, add a flag to switch back to JsonRDD
0bbc445 [Nathan Howell] Improve JSON parsing and type inference performance
7ca70c1 [Nathan Howell] Eliminate arrow pattern, replace with pattern matches
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
This patch comprises of a few related pieces of work:

* Schema inference is performed directly on the JSON token stream
* `String => Row` conversion populate Spark SQL structures without intermediate types
* Projection pushdown is implemented via CatalystScan for DataFrame queries
* Support for the legacy parser by setting `spark.sql.json.useJacksonStreamingAPI` to `false`

Performance improvements depend on the schema and queries being executed, but it should be faster across the board. Below are benchmarks using the last.fm Million Song dataset:

```
Command                                            | Baseline | Patched
---------------------------------------------------|----------|--------
import sqlContext.implicits._                      |          |
val df = sqlContext.jsonFile("/tmp/lastfm.json")   |    70.0s |   14.6s
df.count()                                         |    28.8s |    6.2s
df.rdd.count()                                     |    35.3s |   21.5s
df.where($"artist" === "Robert Hood").collect()    |    28.3s |   16.9s
```

To prepare this dataset for benchmarking, follow these steps:

```
# Fetch the datasets from http://labrosa.ee.columbia.edu/millionsong/lastfm
wget http://labrosa.ee.columbia.edu/millionsong/sites/default/files/lastfm/lastfm_test.zip \
     http://labrosa.ee.columbia.edu/millionsong/sites/default/files/lastfm/lastfm_train.zip

# Decompress and combine, pipe through `jq -c` to ensure there is one record per line
unzip -p lastfm_test.zip lastfm_train.zip  | jq -c . > lastfm.json
```

Author: Nathan Howell <[email protected]>

Closes apache#5801 from NathanHowell/json-performance and squashes the following commits:

26fea31 [Nathan Howell] Recreate the baseRDD each for each scan operation
a7ebeb2 [Nathan Howell] Increase coverage of inserts into a JSONRelation
e06a1dd [Nathan Howell] Add comments to the `useJacksonStreamingAPI` config flag
6822712 [Nathan Howell] Split up JsonRDD2 into multiple objects
fa8234f [Nathan Howell] Wrap long lines
b31917b [Nathan Howell] Rename `useJsonRDD2` to `useJacksonStreamingAPI`
15c5d1b [Nathan Howell] JSONRelation's baseRDD need not be lazy
f8add6e [Nathan Howell] Add comments on lack of support for precision and scale DecimalTypes
fa0be47 [Nathan Howell] Remove unused default case in the field parser
80dba17 [Nathan Howell] Add comments regarding null handling and empty strings
842846d [Nathan Howell] Point the empty schema inference test at JsonRDD2
ab6ee87 [Nathan Howell] Add projection pushdown support to JsonRDD/JsonRDD2
f636c14 [Nathan Howell] Enable JsonRDD2 by default, add a flag to switch back to JsonRDD
0bbc445 [Nathan Howell] Improve JSON parsing and type inference performance
7ca70c1 [Nathan Howell] Eliminate arrow pattern, replace with pattern matches
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
This patch comprises of a few related pieces of work:

* Schema inference is performed directly on the JSON token stream
* `String => Row` conversion populate Spark SQL structures without intermediate types
* Projection pushdown is implemented via CatalystScan for DataFrame queries
* Support for the legacy parser by setting `spark.sql.json.useJacksonStreamingAPI` to `false`

Performance improvements depend on the schema and queries being executed, but it should be faster across the board. Below are benchmarks using the last.fm Million Song dataset:

```
Command                                            | Baseline | Patched
---------------------------------------------------|----------|--------
import sqlContext.implicits._                      |          |
val df = sqlContext.jsonFile("/tmp/lastfm.json")   |    70.0s |   14.6s
df.count()                                         |    28.8s |    6.2s
df.rdd.count()                                     |    35.3s |   21.5s
df.where($"artist" === "Robert Hood").collect()    |    28.3s |   16.9s
```

To prepare this dataset for benchmarking, follow these steps:

```
# Fetch the datasets from http://labrosa.ee.columbia.edu/millionsong/lastfm
wget http://labrosa.ee.columbia.edu/millionsong/sites/default/files/lastfm/lastfm_test.zip \
     http://labrosa.ee.columbia.edu/millionsong/sites/default/files/lastfm/lastfm_train.zip

# Decompress and combine, pipe through `jq -c` to ensure there is one record per line
unzip -p lastfm_test.zip lastfm_train.zip  | jq -c . > lastfm.json
```

Author: Nathan Howell <[email protected]>

Closes apache#5801 from NathanHowell/json-performance and squashes the following commits:

26fea31 [Nathan Howell] Recreate the baseRDD each for each scan operation
a7ebeb2 [Nathan Howell] Increase coverage of inserts into a JSONRelation
e06a1dd [Nathan Howell] Add comments to the `useJacksonStreamingAPI` config flag
6822712 [Nathan Howell] Split up JsonRDD2 into multiple objects
fa8234f [Nathan Howell] Wrap long lines
b31917b [Nathan Howell] Rename `useJsonRDD2` to `useJacksonStreamingAPI`
15c5d1b [Nathan Howell] JSONRelation's baseRDD need not be lazy
f8add6e [Nathan Howell] Add comments on lack of support for precision and scale DecimalTypes
fa0be47 [Nathan Howell] Remove unused default case in the field parser
80dba17 [Nathan Howell] Add comments regarding null handling and empty strings
842846d [Nathan Howell] Point the empty schema inference test at JsonRDD2
ab6ee87 [Nathan Howell] Add projection pushdown support to JsonRDD/JsonRDD2
f636c14 [Nathan Howell] Enable JsonRDD2 by default, add a flag to switch back to JsonRDD
0bbc445 [Nathan Howell] Improve JSON parsing and type inference performance
7ca70c1 [Nathan Howell] Eliminate arrow pattern, replace with pattern matches
@NathanHowell NathanHowell deleted the json-performance branch December 8, 2016 00:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants