Skip to content

Commit 2f001ba

Browse files
author
Yanbo Liang
committed
Use mutable rows for inner structures
1 parent 18e4ddc commit 2f001ba

File tree

1 file changed

+23
-22
lines changed

1 file changed

+23
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,11 @@ private[sql] object JsonRDD extends Logging {
3838
json: RDD[String],
3939
schema: StructType,
4040
columnNameOfCorruptRecords: String): RDD[Row] = {
41-
// Reuse the mutable row for each record, however we still need to
42-
// create a new row for every nested struct type in each record
43-
val mutableRow = new SpecificMutableRow(schema.fields.map(_.dataType))
44-
parseJson(json, columnNameOfCorruptRecords).mapPartitions( iter => {
45-
iter.map { parsed =>
46-
schema.fields.zipWithIndex.foreach {
47-
case (StructField(name, dataType, _, _), i) =>
48-
mutableRow.update(i, parsed.get(name).flatMap(v => Option(v)).map(
49-
enforceCorrectType(_, dataType)).orNull)
50-
}
51-
mutableRow: Row
52-
}
53-
})
41+
// Reuse the mutable row for each record and all innner nested structures
42+
parseJson(json, columnNameOfCorruptRecords).mapPartitions {
43+
val row = new GenericMutableRow(schema.fields.length)
44+
iter => iter.map(parsed => asRow(parsed, schema, row))
45+
}
5446
}
5547

5648
private[sql] def inferSchema(
@@ -413,7 +405,7 @@ private[sql] object JsonRDD extends Logging {
413405
}
414406
}
415407

416-
private[json] def enforceCorrectType(value: Any, desiredType: DataType): Any ={
408+
private[json] def enforceCorrectType(value: Any, desiredType: DataType, slot: Any = null): Any ={
417409
if (value == null) {
418410
null
419411
} else {
@@ -428,20 +420,29 @@ private[sql] object JsonRDD extends Logging {
428420
case NullType => null
429421
case ArrayType(elementType, _) =>
430422
value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
431-
case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct)
423+
case struct: StructType =>
424+
asRow(value.asInstanceOf[Map[String, Any]], struct, slot.asInstanceOf[GenericMutableRow])
432425
case DateType => toDate(value)
433426
case TimestampType => toTimestamp(value)
434427
}
435428
}
436429
}
437430

438-
private def asRow(json: Map[String,Any], schema: StructType): Row = {
439-
// TODO: Reuse the row instead of creating a new one for every record.
440-
val row = new GenericMutableRow(schema.fields.length)
441-
schema.fields.zipWithIndex.foreach {
442-
case (StructField(name, dataType, _, _), i) =>
443-
row.update(i, json.get(name).flatMap(v => Option(v)).map(
444-
enforceCorrectType(_, dataType)).orNull)
431+
private def asRow(
432+
json: Map[String,Any],
433+
schema: StructType,
434+
mutable: GenericMutableRow = null): Row = {
435+
val row = if (mutable == null) {
436+
new GenericMutableRow(schema.fields.length)
437+
} else {
438+
mutable
439+
}
440+
441+
for(i <- 0 until schema.fields.length) {
442+
val fieldName = schema.fields(i).name
443+
val fieldType = schema.fields(i).dataType
444+
row.update(i, json.get(fieldName).flatMap(v => Option(v)).map(
445+
enforceCorrectType(_, fieldType, row(i))).orNull)
445446
}
446447

447448
row

0 commit comments

Comments
 (0)