Skip to content

Commit f84f467

Browse files
yhuaiMarcelo Vanzin
authored andcommitted
[SPARK-10441] [SQL] [BRANCH-1.5] Save data correctly to json.
https://issues.apache.org/jira/browse/SPARK-10441 This is the backport of apache#8597 for 1.5 branch. Author: Yin Huai <[email protected]> Closes apache#8655 from yhuai/timestampJson-1.5. (cherry picked from commit 7fd4674)
1 parent 99c6893 commit f84f467

File tree

9 files changed

+205
-8
lines changed

9 files changed

+205
-8
lines changed

sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import java.math.MathContext
2323

2424
import scala.util.Random
2525

26+
import org.apache.spark.sql.catalyst.CatalystTypeConverters
27+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2628
import org.apache.spark.sql.types._
2729
import org.apache.spark.unsafe.types.CalendarInterval
2830

@@ -84,6 +86,7 @@ object RandomDataGenerator {
8486
* random data generator is defined for that data type. The generated values will use an external
8587
* representation of the data type; for example, the random generator for [[DateType]] will return
8688
* instances of [[java.sql.Date]] and the generator for [[StructType]] will return a [[Row]].
89+
* For a [[UserDefinedType]] for a class X, an instance of class X is returned.
8790
*
8891
* @param dataType the type to generate values for
8992
* @param nullable whether null values should be generated
@@ -106,7 +109,22 @@ object RandomDataGenerator {
106109
})
107110
case BooleanType => Some(() => rand.nextBoolean())
108111
case DateType => Some(() => new java.sql.Date(rand.nextInt()))
109-
case TimestampType => Some(() => new java.sql.Timestamp(rand.nextLong()))
112+
case TimestampType =>
113+
val generator =
114+
() => {
115+
var milliseconds = rand.nextLong() % 253402329599999L
116+
// -62135740800000L is the number of milliseconds before January 1, 1970, 00:00:00 GMT
117+
// for "0001-01-01 00:00:00.000000". We need to find a
118+
// number that is greater or equals to this number as a valid timestamp value.
119+
while (milliseconds < -62135740800000L) {
120+
// 253402329599999L is the the number of milliseconds since
121+
// January 1, 1970, 00:00:00 GMT for "9999-12-31 23:59:59.999999".
122+
milliseconds = rand.nextLong() % 253402329599999L
123+
}
124+
// DateTimeUtils.toJavaTimestamp takes microsecond.
125+
DateTimeUtils.toJavaTimestamp(milliseconds * 1000)
126+
}
127+
Some(generator)
110128
case CalendarIntervalType => Some(() => {
111129
val months = rand.nextInt(1000)
112130
val ns = rand.nextLong()
@@ -159,6 +177,27 @@ object RandomDataGenerator {
159177
None
160178
}
161179
}
180+
case udt: UserDefinedType[_] => {
181+
val maybeSqlTypeGenerator = forType(udt.sqlType, nullable, seed)
182+
// Because random data generator at here returns scala value, we need to
183+
// convert it to catalyst value to call udt's deserialize.
184+
val toCatalystType = CatalystTypeConverters.createToCatalystConverter(udt.sqlType)
185+
186+
if (maybeSqlTypeGenerator.isDefined) {
187+
val sqlTypeGenerator = maybeSqlTypeGenerator.get
188+
val generator = () => {
189+
val generatedScalaValue = sqlTypeGenerator.apply()
190+
if (generatedScalaValue == null) {
191+
null
192+
} else {
193+
udt.deserialize(toCatalystType(generatedScalaValue))
194+
}
195+
}
196+
Some(generator)
197+
} else {
198+
None
199+
}
200+
}
162201
case unsupportedType => None
163202
}
164203
// Handle nullability by wrapping the non-null value generator:

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.execution.datasources.json
1919

2020
import org.apache.spark.sql.catalyst.InternalRow
21+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2122

2223
import scala.collection.Map
2324

@@ -89,7 +90,7 @@ private[sql] object JacksonGenerator {
8990
def valWriter: (DataType, Any) => Unit = {
9091
case (_, null) | (NullType, _) => gen.writeNull()
9192
case (StringType, v) => gen.writeString(v.toString)
92-
case (TimestampType, v: java.sql.Timestamp) => gen.writeString(v.toString)
93+
case (TimestampType, v: Long) => gen.writeString(DateTimeUtils.toJavaTimestamp(v).toString)
9394
case (IntegerType, v: Int) => gen.writeNumber(v)
9495
case (ShortType, v: Short) => gen.writeNumber(v)
9596
case (FloatType, v: Float) => gen.writeNumber(v)
@@ -99,8 +100,12 @@ private[sql] object JacksonGenerator {
99100
case (ByteType, v: Byte) => gen.writeNumber(v.toInt)
100101
case (BinaryType, v: Array[Byte]) => gen.writeBinary(v)
101102
case (BooleanType, v: Boolean) => gen.writeBoolean(v)
102-
case (DateType, v) => gen.writeString(v.toString)
103-
case (udt: UserDefinedType[_], v) => valWriter(udt.sqlType, udt.serialize(v))
103+
case (DateType, v: Int) => gen.writeString(DateTimeUtils.toJavaDate(v).toString)
104+
// For UDT values, they should be in the SQL type's corresponding value type.
105+
// We should not see values in the user-defined class at here.
106+
// For example, VectorUDT's SQL type is an array of double. So, we should expect that v is
107+
// an ArrayData at here, instead of a Vector.
108+
case (udt: UserDefinedType[_], v) => valWriter(udt.sqlType, v)
104109

105110
case (ArrayType(ty, _), v: ArrayData) =>
106111
gen.writeStartArray()

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,37 @@ private[sql] object JacksonParser {
8181
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) =>
8282
parser.getFloatValue
8383

84+
case (VALUE_STRING, FloatType) =>
85+
// Special case handling for NaN and Infinity.
86+
val value = parser.getText
87+
val lowerCaseValue = value.toLowerCase()
88+
if (lowerCaseValue.equals("nan") ||
89+
lowerCaseValue.equals("infinity") ||
90+
lowerCaseValue.equals("-infinity") ||
91+
lowerCaseValue.equals("inf") ||
92+
lowerCaseValue.equals("-inf")) {
93+
value.toFloat
94+
} else {
95+
sys.error(s"Cannot parse $value as FloatType.")
96+
}
97+
8498
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, DoubleType) =>
8599
parser.getDoubleValue
86100

101+
case (VALUE_STRING, DoubleType) =>
102+
// Special case handling for NaN and Infinity.
103+
val value = parser.getText
104+
val lowerCaseValue = value.toLowerCase()
105+
if (lowerCaseValue.equals("nan") ||
106+
lowerCaseValue.equals("infinity") ||
107+
lowerCaseValue.equals("-infinity") ||
108+
lowerCaseValue.equals("inf") ||
109+
lowerCaseValue.equals("-inf")) {
110+
value.toDouble
111+
} else {
112+
sys.error(s"Cannot parse $value as DoubleType.")
113+
}
114+
87115
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, dt: DecimalType) =>
88116
Decimal(parser.getDecimalValue, dt.precision, dt.scale)
89117

@@ -126,6 +154,9 @@ private[sql] object JacksonParser {
126154

127155
case (_, udt: UserDefinedType[_]) =>
128156
convertField(factory, parser, udt.sqlType)
157+
158+
case (token, dataType) =>
159+
sys.error(s"Failed to parse a value for data type $dataType (current token: $token).")
129160
}
130161
}
131162

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,14 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
2929
import sqlContext._
3030
import sqlContext.implicits._
3131

32+
// ORC does not play well with NullType and UDT.
33+
override protected def supportsDataType(dataType: DataType): Boolean = dataType match {
34+
case _: NullType => false
35+
case _: CalendarIntervalType => false
36+
case _: UserDefinedType[_] => false
37+
case _ => true
38+
}
39+
3240
test("save()/load() - partitioned table - simple queries - partition columns in data") {
3341
withTempDir { file =>
3442
val basePath = new Path(file.getCanonicalPath)

sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
3030

3131
import sqlContext._
3232

33+
// JSON does not write data of NullType and does not play well with BinaryType.
34+
override protected def supportsDataType(dataType: DataType): Boolean = dataType match {
35+
case _: NullType => false
36+
case _: BinaryType => false
37+
case _: CalendarIntervalType => false
38+
case _ => true
39+
}
40+
3341
test("save()/load() - partitioned table - simple queries - partition columns in data") {
3442
withTempDir { file =>
3543
val basePath = new Path(file.getCanonicalPath)

sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path
2424

2525
import org.apache.spark.deploy.SparkHadoopUtil
2626
import org.apache.spark.sql.{execution, AnalysisException, SaveMode}
27-
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
27+
import org.apache.spark.sql.types._
2828

2929

3030
class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
@@ -33,6 +33,13 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
3333
import sqlContext._
3434
import sqlContext.implicits._
3535

36+
// Parquet does not play well with NullType.
37+
override protected def supportsDataType(dataType: DataType): Boolean = dataType match {
38+
case _: NullType => false
39+
case _: CalendarIntervalType => false
40+
case _ => true
41+
}
42+
3643
test("save()/load() - partitioned table - simple queries - partition columns in data") {
3744
withTempDir { file =>
3845
val basePath = new Path(file.getCanonicalPath)

sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,30 @@ package org.apache.spark.sql.sources
2020
import org.apache.hadoop.fs.Path
2121

2222
import org.apache.spark.deploy.SparkHadoopUtil
23-
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
23+
import org.apache.spark.sql.types._
2424

2525
class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
2626
override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName
2727

2828
import sqlContext._
2929

30+
// We have a very limited number of supported types at here since it is just for a
31+
// test relation and we do very basic testing at here.
32+
override protected def supportsDataType(dataType: DataType): Boolean = dataType match {
33+
case _: BinaryType => false
34+
// We are using random data generator and the generated strings are not really valid string.
35+
case _: StringType => false
36+
case _: BooleanType => false // see https://issues.apache.org/jira/browse/SPARK-10442
37+
case _: CalendarIntervalType => false
38+
case _: DateType => false
39+
case _: TimestampType => false
40+
case _: ArrayType => false
41+
case _: MapType => false
42+
case _: StructType => false
43+
case _: UserDefinedType[_] => false
44+
case _ => true
45+
}
46+
3047
test("save()/load() - partitioned table - simple queries - partition columns in data") {
3148
withTempDir { file =>
3249
val basePath = new Path(file.getCanonicalPath)

sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends
6868
new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
6969

7070
override def write(row: Row): Unit = {
71-
val serialized = row.toSeq.map(_.toString).mkString(",")
71+
val serialized = row.toSeq.map { v =>
72+
if (v == null) "" else v.toString
73+
}.mkString(",")
7274
recordWriter.write(null, new Text(serialized))
7375
}
7476

@@ -112,7 +114,8 @@ class SimpleTextRelation(
112114
val fields = dataSchema.map(_.dataType)
113115

114116
sparkContext.textFile(inputStatuses.map(_.getPath).mkString(",")).map { record =>
115-
Row(record.split(",").zip(fields).map { case (value, dataType) =>
117+
Row(record.split(",", -1).zip(fields).map { case (v, dataType) =>
118+
val value = if (v == "") null else v
116119
// `Cast`ed values are always of Catalyst types (i.e. UTF8String instead of String, etc.)
117120
val catalystValue = Cast(Literal(value), dataType).eval()
118121
// Here we're converting Catalyst values to Scala values to test `needsConversion`

sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
4040

4141
val dataSourceName: String
4242

43+
protected def supportsDataType(dataType: DataType): Boolean = true
44+
4345
val dataSchema =
4446
StructType(
4547
Seq(
@@ -100,6 +102,83 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
100102
}
101103
}
102104

105+
test("test all data types") {
106+
withTempPath { file =>
107+
// Create the schema.
108+
val struct =
109+
StructType(
110+
StructField("f1", FloatType, true) ::
111+
StructField("f2", ArrayType(BooleanType), true) :: Nil)
112+
// TODO: add CalendarIntervalType to here once we can save it out.
113+
val dataTypes =
114+
Seq(
115+
StringType, BinaryType, NullType, BooleanType,
116+
ByteType, ShortType, IntegerType, LongType,
117+
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
118+
DateType, TimestampType,
119+
ArrayType(IntegerType), MapType(StringType, LongType), struct,
120+
new MyDenseVectorUDT())
121+
val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
122+
StructField(s"col$index", dataType, nullable = true)
123+
}
124+
val schema = StructType(fields)
125+
126+
// Generate data at the driver side. We need to materialize the data first and then
127+
// create RDD.
128+
val maybeDataGenerator =
129+
RandomDataGenerator.forType(
130+
dataType = schema,
131+
nullable = true,
132+
seed = Some(System.nanoTime()))
133+
val dataGenerator =
134+
maybeDataGenerator
135+
.getOrElse(fail(s"Failed to create data generator for schema $schema"))
136+
val data = (1 to 10).map { i =>
137+
dataGenerator.apply() match {
138+
case row: Row => row
139+
case null => Row.fromSeq(Seq.fill(schema.length)(null))
140+
case other =>
141+
fail(s"Row or null is expected to be generated, " +
142+
s"but a ${other.getClass.getCanonicalName} is generated.")
143+
}
144+
}
145+
146+
// Create a DF for the schema with random data.
147+
val rdd = sqlContext.sparkContext.parallelize(data, 10)
148+
val df = sqlContext.createDataFrame(rdd, schema)
149+
150+
// All columns that have supported data types of this source.
151+
val supportedColumns = schema.fields.collect {
152+
case StructField(name, dataType, _, _) if supportsDataType(dataType) => name
153+
}
154+
val selectedColumns = util.Random.shuffle(supportedColumns.toSeq)
155+
156+
val dfToBeSaved = df.selectExpr(selectedColumns: _*)
157+
158+
// Save the data out.
159+
dfToBeSaved
160+
.write
161+
.format(dataSourceName)
162+
.option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
163+
.save(file.getCanonicalPath)
164+
165+
val loadedDF =
166+
sqlContext
167+
.read
168+
.format(dataSourceName)
169+
.schema(dfToBeSaved.schema)
170+
.option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
171+
.load(file.getCanonicalPath)
172+
.selectExpr(selectedColumns: _*)
173+
174+
// Read the data back.
175+
checkAnswer(
176+
loadedDF,
177+
dfToBeSaved
178+
)
179+
}
180+
}
181+
103182
test("save()/load() - non-partitioned table - Overwrite") {
104183
withTempPath { file =>
105184
testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath)

0 commit comments

Comments
 (0)