Skip to content

Commit 65179a2

Browse files
author
sergei.rubtcov
committed
[SPARK-19228][SQL] Migrate on Java 8 time from FastDateFormat for meet the ISO8601 and parsing dates in csv correctly. Add support for inferring DateType and custom "dateFormat" option.
1 parent 6c35865 commit 65179a2

File tree

9 files changed

+126
-27
lines changed

9 files changed

+126
-27
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util
1919

2020
import java.sql.{Date, Timestamp}
2121
import java.text.{DateFormat, SimpleDateFormat}
22+
import java.time.LocalDateTime
23+
import java.time.temporal.ChronoField
2224
import java.util.{Calendar, Locale, TimeZone}
2325
import java.util.concurrent.ConcurrentHashMap
2426
import java.util.function.{Function => JFunction}
@@ -143,6 +145,12 @@ object DateTimeUtils {
143145
millisLocal - getOffsetFromLocalMillis(millisLocal, timeZone)
144146
}
145147

148+
def dateTimeToMicroseconds(localDateTime: LocalDateTime, timeZone: TimeZone): Long = {
149+
val microOfSecond = localDateTime.getLong(ChronoField.MICRO_OF_SECOND)
150+
val epochSecond = localDateTime.atZone(timeZone.toZoneId).toInstant.getEpochSecond
151+
epochSecond * 1000000L + microOfSecond
152+
}
153+
146154
def dateToString(days: SQLDate): String =
147155
getThreadLocalDateFormat.format(toJavaDate(days))
148156

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@ package org.apache.spark.sql.catalyst.util
1919

2020
import java.sql.{Date, Timestamp}
2121
import java.text.SimpleDateFormat
22+
import java.time.LocalDateTime
23+
import java.time.format.DateTimeFormatter
2224
import java.util.{Calendar, Locale, TimeZone}
2325

2426
import org.apache.spark.SparkFunSuite
2527
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
2628
import org.apache.spark.unsafe.types.UTF8String
29+
import org.junit.Assert.assertEquals
2730

2831
class DateTimeUtilsSuite extends SparkFunSuite {
2932

@@ -645,6 +648,18 @@ class DateTimeUtilsSuite extends SparkFunSuite {
645648
}
646649
}
647650

651+
test("Java 8 LocalDateTime to microseconds") {
652+
val nanos = "2015-05-09 00:10:23.999750987"
653+
var formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS")
654+
val localDateTimeInNanos = LocalDateTime.parse(nanos, formatter)
655+
val timeInMicros = dateTimeToMicroseconds(localDateTimeInNanos, TimeZonePST)
656+
assertEquals(1431155423999750L, timeInMicros)
657+
val micros = "2015-05-09 00:10:23.999750"
658+
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS")
659+
val localDateTimeInMicros = LocalDateTime.parse(micros, formatter)
660+
assertEquals(timeInMicros, dateTimeToMicroseconds(localDateTimeInMicros, TimeZonePST))
661+
}
662+
648663
test("daysToMillis and millisToDays") {
649664
val c = Calendar.getInstance(TimeZonePST)
650665

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ private[csv] object CSVInferSchema {
9090
// DecimalTypes have different precisions and scales, so we try to find the common type.
9191
findTightestCommonType(typeSoFar, tryParseDecimal(field, options)).getOrElse(StringType)
9292
case DoubleType => tryParseDouble(field, options)
93+
case DateType => tryParseDate(field, options)
9394
case TimestampType => tryParseTimestamp(field, options)
9495
case BooleanType => tryParseBoolean(field, options)
9596
case StringType => StringType
@@ -140,14 +141,23 @@ private[csv] object CSVInferSchema {
140141
private def tryParseDouble(field: String, options: CSVOptions): DataType = {
141142
if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field, options)) {
142143
DoubleType
144+
} else {
145+
tryParseDate(field, options)
146+
}
147+
}
148+
149+
private def tryParseDate(field: String, options: CSVOptions): DataType = {
150+
// This case infers a custom `dateFormat` is set.
151+
if ((allCatch opt options.dateFormatter.parse(field)).isDefined) {
152+
DateType
143153
} else {
144154
tryParseTimestamp(field, options)
145155
}
146156
}
147157

148158
private def tryParseTimestamp(field: String, options: CSVOptions): DataType = {
149-
// This case infers a custom `dataFormat` is set.
150-
if ((allCatch opt options.timestampFormat.parse(field)).isDefined) {
159+
// This case infers a custom `timestampFormat` is set.
160+
if ((allCatch opt options.timestampFormatter.parse(field)).isDefined) {
151161
TimestampType
152162
} else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
153163
// We keep this for backwards compatibility.
@@ -216,6 +226,9 @@ private[csv] object CSVInferSchema {
216226
} else {
217227
Some(DecimalType(range + scale, scale))
218228
}
229+
// By design 'TimestampType' (8 bytes) is larger than 'DateType' (4 bytes).
230+
case (t1: DateType, t2: TimestampType) => Some(TimestampType)
231+
case (t1: TimestampType, t2: DateType) => Some(TimestampType)
219232

220233
case _ => None
221234
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
package org.apache.spark.sql.execution.datasources.csv
1919

2020
import java.nio.charset.StandardCharsets
21+
import java.time.format.{DateTimeFormatter, ResolverStyle}
2122
import java.util.{Locale, TimeZone}
2223

2324
import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling}
24-
import org.apache.commons.lang3.time.FastDateFormat
2525

2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.catalyst.util._
@@ -119,7 +119,6 @@ class CSVOptions(
119119
val positiveInf = parameters.getOrElse("positiveInf", "Inf")
120120
val negativeInf = parameters.getOrElse("negativeInf", "-Inf")
121121

122-
123122
val compressionCodec: Option[String] = {
124123
val name = parameters.get("compression").orElse(parameters.get("codec"))
125124
name.map(CompressionCodecs.getCodecClassName)
@@ -128,13 +127,20 @@ class CSVOptions(
128127
val timeZone: TimeZone = DateTimeUtils.getTimeZone(
129128
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
130129

131-
// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
132-
val dateFormat: FastDateFormat =
133-
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US)
130+
val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd")
131+
132+
val timestampFormat: String = parameters
133+
.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
134134

135-
val timestampFormat: FastDateFormat =
136-
FastDateFormat.getInstance(
137-
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, Locale.US)
135+
@transient lazy val dateFormatter: DateTimeFormatter = {
136+
DateTimeFormatter.ofPattern(dateFormat)
137+
.withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART)
138+
}
139+
140+
@transient lazy val timestampFormatter: DateTimeFormatter = {
141+
DateTimeFormatter.ofPattern(timestampFormat)
142+
.withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART)
143+
}
138144

139145
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
140146

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.csv
1919

2020
import java.io.InputStream
2121
import java.math.BigDecimal
22+
import java.time.{LocalDate, LocalDateTime}
23+
import java.time.temporal.ChronoField
2224

2325
import scala.util.Try
2426
import scala.util.control.NonFatal
@@ -131,9 +133,8 @@ class UnivocityParser(
131133

132134
case _: TimestampType => (d: String) =>
133135
nullSafeDatum(d, name, nullable, options) { datum =>
134-
// This one will lose microseconds parts.
135-
// See https://issues.apache.org/jira/browse/SPARK-10681.
136-
Try(options.timestampFormat.parse(datum).getTime * 1000L)
136+
Try(DateTimeUtils.dateTimeToMicroseconds(LocalDateTime
137+
.parse(datum, options.timestampFormatter), options.timeZone))
137138
.getOrElse {
138139
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
139140
// compatibility.
@@ -143,9 +144,8 @@ class UnivocityParser(
143144

144145
case _: DateType => (d: String) =>
145146
nullSafeDatum(d, name, nullable, options) { datum =>
146-
// This one will lose microseconds parts.
147-
// See https://issues.apache.org/jira/browse/SPARK-10681.x
148-
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
147+
Try(Math.toIntExact(LocalDate.parse(datum, options.dateFormatter)
148+
.getLong(ChronoField.EPOCH_DAY)))
149149
.getOrElse {
150150
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
151151
// compatibility.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
timestamp,date
2+
26/08/2015 22:31:46.913,27/09/2015
3+
27/10/2014 22:33:31.601,26/12/2016
4+
28/01/2016 22:33:52.888,28/01/2017

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,21 @@ class CSVInferSchemaSuite extends SparkFunSuite {
5959
assert(CSVInferSchema.inferField(IntegerType, textValueOne, options) == expectedTypeOne)
6060
}
6161

62-
test("Timestamp field types are inferred correctly via custom data format") {
63-
var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT")
62+
test("Timestamp field types are inferred correctly via custom date format") {
63+
var options = new CSVOptions(Map("timestampFormat" -> "yyyy-MM"), "GMT")
6464
assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
6565
options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT")
6666
assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType)
6767
}
6868

69+
test("Date field types are inferred correctly via custom date and timestamp format") {
70+
val options = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy",
71+
"timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS"), "GMT")
72+
assert(CSVInferSchema.inferField(TimestampType,
73+
"28/01/2017 22:31:46.913", options) == TimestampType)
74+
assert(CSVInferSchema.inferField(DateType, "16/12/2012", options) == DateType)
75+
}
76+
6977
test("Timestamp field types are inferred correctly from other types") {
7078
val options = new CSVOptions(Map.empty[String, String], "GMT")
7179
assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType)
@@ -111,7 +119,7 @@ class CSVInferSchemaSuite extends SparkFunSuite {
111119
}
112120

113121
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
114-
val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"), "GMT")
122+
val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-MM"), "GMT")
115123
assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
116124
}
117125

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
5353
private val simpleSparseFile = "test-data/simple_sparse.csv"
5454
private val numbersFile = "test-data/numbers.csv"
5555
private val datesFile = "test-data/dates.csv"
56+
private val datesAndTimestampsFile = "test-data/dates-and-timestamps.csv"
5657
private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
5758
private val valueMalformedFile = "test-data/value-malformed.csv"
5859

@@ -565,6 +566,44 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
565566
assert(results.toSeq.map(_.toSeq) === expected)
566567
}
567568

569+
test("inferring timestamp types and date types via custom formats") {
570+
val options = Map(
571+
"header" -> "true",
572+
"inferSchema" -> "true",
573+
"timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS",
574+
"dateFormat" -> "dd/MM/yyyy")
575+
val results = spark.read
576+
.format("csv")
577+
.options(options)
578+
.load(testFile(datesAndTimestampsFile))
579+
assert(results.schema{0}.dataType===TimestampType)
580+
assert(results.schema{1}.dataType===DateType)
581+
val timestamps = spark.read
582+
.format("csv")
583+
.options(options)
584+
.load(testFile(datesAndTimestampsFile))
585+
.select("timestamp")
586+
.collect()
587+
val timestampFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SSS", Locale.US)
588+
val timestampExpected =
589+
Seq(Seq(new Timestamp(timestampFormat.parse("26/08/2015 22:31:46.913").getTime)),
590+
Seq(new Timestamp(timestampFormat.parse("27/10/2014 22:33:31.601").getTime)),
591+
Seq(new Timestamp(timestampFormat.parse("28/01/2016 22:33:52.888").getTime)))
592+
assert(timestamps.toSeq.map(_.toSeq) === timestampExpected)
593+
val dates = spark.read
594+
.format("csv")
595+
.options(options)
596+
.load(testFile(datesAndTimestampsFile))
597+
.select("date")
598+
.collect()
599+
val dateFormat = new SimpleDateFormat("dd/MM/yyyy", Locale.US)
600+
val dateExpected =
601+
Seq(Seq(new Date(dateFormat.parse("27/09/2015").getTime)),
602+
Seq(new Date(dateFormat.parse("26/12/2016").getTime)),
603+
Seq(new Date(dateFormat.parse("28/01/2017").getTime)))
604+
assert(dates.toSeq.map(_.toSeq) === dateExpected)
605+
}
606+
568607
test("load date types via custom date format") {
569608
val customSchema = new StructType(Array(StructField("date", DateType, true)))
570609
val options = Map(

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.execution.datasources.csv
1919

2020
import java.math.BigDecimal
21-
import java.util.Locale
21+
import java.time.{LocalDate, LocalDateTime}
2222

2323
import org.apache.spark.SparkFunSuite
2424
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -107,20 +107,26 @@ class UnivocityParserSuite extends SparkFunSuite {
107107
assert(parser.makeConverter("_1", BooleanType, options = options).apply("true") == true)
108108

109109
val timestampsOptions =
110-
new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), "GMT")
110+
new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy HH:mm"), "GMT")
111111
val customTimestamp = "31/01/2015 00:00"
112-
val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime
112+
113+
val expectedTime = LocalDateTime.parse(customTimestamp, timestampsOptions.timestampFormatter)
114+
.atZone(options.timeZone.toZoneId)
115+
.toInstant.toEpochMilli
113116
val castedTimestamp =
114-
parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions)
117+
parser.makeConverter("_1", TimestampType, nullable = true, timestampsOptions)
115118
.apply(customTimestamp)
116119
assert(castedTimestamp == expectedTime * 1000L)
117120

118-
val customDate = "31/01/2015"
119121
val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), "GMT")
120-
val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
122+
val customDate = "31/01/2015"
123+
124+
val expectedDate = LocalDate.parse(customDate, dateOptions.dateFormatter)
125+
.atStartOfDay(options.timeZone.toZoneId)
126+
.toInstant.toEpochMilli
121127
val castedDate =
122-
parser.makeConverter("_1", DateType, nullable = true, options = dateOptions)
123-
.apply(customTimestamp)
128+
parser.makeConverter("_1", DateType, nullable = true, dateOptions)
129+
.apply(customDate)
124130
assert(castedDate == DateTimeUtils.millisToDays(expectedDate))
125131

126132
val timestamp = "2015-01-01 00:00:00"

0 commit comments

Comments
 (0)