Skip to content

Commit ba4a9dc

Browse files
authored
Merge branch 'master' into csv-date-inferring
2 parents 0ec5c76 + e3e33d8 commit ba4a9dc

File tree

16 files changed

+542
-373
lines changed

16 files changed

+542
-373
lines changed

docs/sql-migration-guide-upgrade.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ displayTitle: Spark SQL Upgrading Guide
3535

3636
- Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0.
3737

38-
- Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
38+
- Since Spark 3.0, CSV/JSON datasources use java.time API for parsing and generating CSV/JSON content. In Spark version 2.4 and earlier, java.text.SimpleDateFormat is used for the same purpuse with fallbacks to the parsing mechanisms of Spark 2.0 and 1.x. For example, `2018-12-08 10:39:21.123` with the pattern `yyyy-MM-dd'T'HH:mm:ss.SSS` cannot be parsed since Spark 3.0 because the timestamp does not match to the pattern but it can be parsed by earlier Spark versions due to a fallback to `Timestamp.valueOf`. To parse the same timestamp since Spark 3.0, the pattern should be `yyyy-MM-dd HH:mm:ss.SSS`. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
3939

4040
- In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully.
4141

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ import scala.util.control.Exception.allCatch
2424
import org.apache.spark.rdd.RDD
2525
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
2626
import org.apache.spark.sql.catalyst.expressions.ExprUtils
27-
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter}
27+
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
2828
import org.apache.spark.sql.types._
2929

3030
class CSVInferSchema(val options: CSVOptions) extends Serializable {
3131

3232
@transient
33-
private lazy val timeParser = DateTimeFormatter(
33+
private lazy val timestampParser = TimestampFormatter(
3434
options.timestampFormat,
3535
options.timeZone,
3636
options.locale)
@@ -167,7 +167,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
167167
}
168168

169169
private def tryParseTimestamp(field: String): DataType = {
170-
if ((allCatch opt timeParser.parse(field)).isDefined) {
170+
if ((allCatch opt timestampParser.parse(field)).isDefined) {
171171
TimestampType
172172
} else {
173173
tryParseDate(field)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.io.Writer
2222
import com.univocity.parsers.csv.CsvWriter
2323

2424
import org.apache.spark.sql.catalyst.InternalRow
25-
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter}
25+
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
2626
import org.apache.spark.sql.types._
2727

2828
class UnivocityGenerator(
@@ -41,18 +41,18 @@ class UnivocityGenerator(
4141
private val valueConverters: Array[ValueConverter] =
4242
schema.map(_.dataType).map(makeConverter).toArray
4343

44-
private val timeFormatter = DateTimeFormatter(
44+
private val timestampFormatter = TimestampFormatter(
4545
options.timestampFormat,
4646
options.timeZone,
4747
options.locale)
48-
private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
48+
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
4949

5050
private def makeConverter(dataType: DataType): ValueConverter = dataType match {
5151
case DateType =>
5252
(row: InternalRow, ordinal: Int) => dateFormatter.format(row.getInt(ordinal))
5353

5454
case TimestampType =>
55-
(row: InternalRow, ordinal: Int) => timeFormatter.format(row.getLong(ordinal))
55+
(row: InternalRow, ordinal: Int) => timestampFormatter.format(row.getLong(ordinal))
5656

5757
case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
5858

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,11 @@ class UnivocityParser(
7474

7575
private val row = new GenericInternalRow(requiredSchema.length)
7676

77-
private val timeFormatter = DateTimeFormatter(
77+
private val timestampFormatter = TimestampFormatter(
7878
options.timestampFormat,
7979
options.timeZone,
8080
options.locale)
81-
private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
81+
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
8282

8383
// Retrieve the raw record string.
8484
private def getCurrentInput: UTF8String = {
@@ -158,7 +158,7 @@ class UnivocityParser(
158158
}
159159

160160
case _: TimestampType => (d: String) =>
161-
nullSafeDatum(d, name, nullable, options)(timeFormatter.parse)
161+
nullSafeDatum(d, name, nullable, options)(timestampFormatter.parse)
162162

163163
case _: DateType => (d: String) =>
164164
nullSafeDatum(d, name, nullable, options)(dateFormatter.parse)
@@ -239,6 +239,7 @@ class UnivocityParser(
239239
} catch {
240240
case NonFatal(e) =>
241241
badRecordException = badRecordException.orElse(Some(e))
242+
row.setNullAt(i)
242243
}
243244
i += 1
244245
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.nio.charset.{Charset, StandardCharsets}
2121
import java.util.{Locale, TimeZone}
2222

2323
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
24-
import org.apache.commons.lang3.time.FastDateFormat
2524

2625
import org.apache.spark.internal.Logging
2726
import org.apache.spark.sql.catalyst.util._
@@ -82,13 +81,10 @@ private[sql] class JSONOptions(
8281
val timeZone: TimeZone = DateTimeUtils.getTimeZone(
8382
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
8483

85-
// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
86-
val dateFormat: FastDateFormat =
87-
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), locale)
84+
val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd")
8885

89-
val timestampFormat: FastDateFormat =
90-
FastDateFormat.getInstance(
91-
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, locale)
86+
val timestampFormat: String =
87+
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
9288

9389
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
9490

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.core._
2323

2424
import org.apache.spark.sql.catalyst.InternalRow
2525
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
26-
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
26+
import org.apache.spark.sql.catalyst.util._
2727
import org.apache.spark.sql.types._
2828

2929
/**
@@ -77,6 +77,12 @@ private[sql] class JacksonGenerator(
7777

7878
private val lineSeparator: String = options.lineSeparatorInWrite
7979

80+
private val timestampFormatter = TimestampFormatter(
81+
options.timestampFormat,
82+
options.timeZone,
83+
options.locale)
84+
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
85+
8086
private def makeWriter(dataType: DataType): ValueWriter = dataType match {
8187
case NullType =>
8288
(row: SpecializedGetters, ordinal: Int) =>
@@ -116,14 +122,12 @@ private[sql] class JacksonGenerator(
116122

117123
case TimestampType =>
118124
(row: SpecializedGetters, ordinal: Int) =>
119-
val timestampString =
120-
options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
125+
val timestampString = timestampFormatter.format(row.getLong(ordinal))
121126
gen.writeString(timestampString)
122127

123128
case DateType =>
124129
(row: SpecializedGetters, ordinal: Int) =>
125-
val dateString =
126-
options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
130+
val dateString = dateFormatter.format(row.getInt(ordinal))
127131
gen.writeString(dateString)
128132

129133
case BinaryType =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ class JacksonParser(
5555
private val factory = new JsonFactory()
5656
options.setJacksonOptions(factory)
5757

58+
private val timestampFormatter = TimestampFormatter(
59+
options.timestampFormat,
60+
options.timeZone,
61+
options.locale)
62+
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
63+
5864
/**
5965
* Create a converter which converts the JSON documents held by the `JsonParser`
6066
* to a value according to a desired schema. This is a wrapper for the method
@@ -218,17 +224,7 @@ class JacksonParser(
218224
case TimestampType =>
219225
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
220226
case VALUE_STRING if parser.getTextLength >= 1 =>
221-
val stringValue = parser.getText
222-
// This one will lose microseconds parts.
223-
// See https://issues.apache.org/jira/browse/SPARK-10681.
224-
Long.box {
225-
Try(options.timestampFormat.parse(stringValue).getTime * 1000L)
226-
.getOrElse {
227-
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
228-
// compatibility.
229-
DateTimeUtils.stringToTime(stringValue).getTime * 1000L
230-
}
231-
}
227+
timestampFormatter.parse(parser.getText)
232228

233229
case VALUE_NUMBER_INT =>
234230
parser.getLongValue * 1000000L
@@ -237,22 +233,7 @@ class JacksonParser(
237233
case DateType =>
238234
(parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
239235
case VALUE_STRING if parser.getTextLength >= 1 =>
240-
val stringValue = parser.getText
241-
// This one will lose microseconds parts.
242-
// See https://issues.apache.org/jira/browse/SPARK-10681.x
243-
Int.box {
244-
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(stringValue).getTime))
245-
.orElse {
246-
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
247-
// compatibility.
248-
Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(stringValue).getTime))
249-
}
250-
.getOrElse {
251-
// In Spark 1.5.0, we store the data as number of days since epoch in string.
252-
// So, we just convert it to Int.
253-
stringValue.toInt
254-
}
255-
}
236+
dateFormatter.parse(parser.getText)
256237
}
257238

258239
case BinaryType =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala

Lines changed: 61 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
1919

2020
import scala.collection.mutable.ArrayBuffer
2121

22-
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.AnalysisException
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
2525
import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -43,31 +43,53 @@ import org.apache.spark.sql.types._
4343
* condition.
4444
*/
4545
object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
46-
private def dedupJoin(joinPlan: LogicalPlan): LogicalPlan = joinPlan match {
46+
47+
private def buildJoin(
48+
outerPlan: LogicalPlan,
49+
subplan: LogicalPlan,
50+
joinType: JoinType,
51+
condition: Option[Expression]): Join = {
52+
// Deduplicate conflicting attributes if any.
53+
val dedupSubplan = dedupSubqueryOnSelfJoin(outerPlan, subplan, None, condition)
54+
Join(outerPlan, dedupSubplan, joinType, condition)
55+
}
56+
57+
private def dedupSubqueryOnSelfJoin(
58+
outerPlan: LogicalPlan,
59+
subplan: LogicalPlan,
60+
valuesOpt: Option[Seq[Expression]],
61+
condition: Option[Expression] = None): LogicalPlan = {
4762
// SPARK-21835: It is possibly that the two sides of the join have conflicting attributes,
4863
// the produced join then becomes unresolved and break structural integrity. We should
49-
// de-duplicate conflicting attributes. We don't use transformation here because we only
50-
// care about the most top join converted from correlated predicate subquery.
51-
case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti | ExistenceJoin(_)), joinCond) =>
52-
val duplicates = right.outputSet.intersect(left.outputSet)
53-
if (duplicates.nonEmpty) {
54-
val aliasMap = AttributeMap(duplicates.map { dup =>
55-
dup -> Alias(dup, dup.toString)()
56-
}.toSeq)
57-
val aliasedExpressions = right.output.map { ref =>
58-
aliasMap.getOrElse(ref, ref)
59-
}
60-
val newRight = Project(aliasedExpressions, right)
61-
val newJoinCond = joinCond.map { condExpr =>
62-
condExpr transform {
63-
case a: Attribute => aliasMap.getOrElse(a, a).toAttribute
64+
// de-duplicate conflicting attributes.
65+
// SPARK-26078: it may also happen that the subquery has conflicting attributes with the outer
66+
// values. In this case, the resulting join would contain trivially true conditions (eg.
67+
// id#3 = id#3) which cannot be de-duplicated after. In this method, if there are conflicting
68+
// attributes in the join condition, the subquery's conflicting attributes are changed using
69+
// a projection which aliases them and resolves the problem.
70+
val outerReferences = valuesOpt.map(values =>
71+
AttributeSet.fromAttributeSets(values.map(_.references))).getOrElse(AttributeSet.empty)
72+
val outerRefs = outerPlan.outputSet ++ outerReferences
73+
val duplicates = outerRefs.intersect(subplan.outputSet)
74+
if (duplicates.nonEmpty) {
75+
condition.foreach { e =>
76+
val conflictingAttrs = e.references.intersect(duplicates)
77+
if (conflictingAttrs.nonEmpty) {
78+
throw new AnalysisException("Found conflicting attributes " +
79+
s"${conflictingAttrs.mkString(",")} in the condition joining outer plan:\n " +
80+
s"$outerPlan\nand subplan:\n $subplan")
6481
}
65-
}
66-
Join(left, newRight, joinType, newJoinCond)
67-
} else {
68-
j
6982
}
70-
case _ => joinPlan
83+
val rewrites = AttributeMap(duplicates.map { dup =>
84+
dup -> Alias(dup, dup.toString)()
85+
}.toSeq)
86+
val aliasedExpressions = subplan.output.map { ref =>
87+
rewrites.getOrElse(ref, ref)
88+
}
89+
Project(aliasedExpressions, subplan)
90+
} else {
91+
subplan
92+
}
7193
}
7294

7395
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -85,25 +107,27 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
85107
withSubquery.foldLeft(newFilter) {
86108
case (p, Exists(sub, conditions, _)) =>
87109
val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
88-
// Deduplicate conflicting attributes if any.
89-
dedupJoin(Join(outerPlan, sub, LeftSemi, joinCond))
110+
buildJoin(outerPlan, sub, LeftSemi, joinCond)
90111
case (p, Not(Exists(sub, conditions, _))) =>
91112
val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
92-
// Deduplicate conflicting attributes if any.
93-
dedupJoin(Join(outerPlan, sub, LeftAnti, joinCond))
113+
buildJoin(outerPlan, sub, LeftAnti, joinCond)
94114
case (p, InSubquery(values, ListQuery(sub, conditions, _, _))) =>
95-
val inConditions = values.zip(sub.output).map(EqualTo.tupled)
96-
val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p)
97115
// Deduplicate conflicting attributes if any.
98-
dedupJoin(Join(outerPlan, sub, LeftSemi, joinCond))
116+
val newSub = dedupSubqueryOnSelfJoin(p, sub, Some(values))
117+
val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
118+
val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p)
119+
Join(outerPlan, newSub, LeftSemi, joinCond)
99120
case (p, Not(InSubquery(values, ListQuery(sub, conditions, _, _)))) =>
100121
// This is a NULL-aware (left) anti join (NAAJ) e.g. col NOT IN expr
101122
// Construct the condition. A NULL in one of the conditions is regarded as a positive
102123
// result; such a row will be filtered out by the Anti-Join operator.
103124

104125
// Note that will almost certainly be planned as a Broadcast Nested Loop join.
105126
// Use EXISTS if performance matters to you.
106-
val inConditions = values.zip(sub.output).map(EqualTo.tupled)
127+
128+
// Deduplicate conflicting attributes if any.
129+
val newSub = dedupSubqueryOnSelfJoin(p, sub, Some(values))
130+
val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
107131
val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions, p)
108132
// Expand the NOT IN expression with the NULL-aware semantic
109133
// to its full form. That is from:
@@ -118,8 +142,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
118142
// will have the final conditions in the LEFT ANTI as
119143
// (A.A1 = B.B1 OR ISNULL(A.A1 = B.B1)) AND (B.B2 = A.A2) AND B.B3 > 1
120144
val finalJoinCond = (nullAwareJoinConds ++ conditions).reduceLeft(And)
121-
// Deduplicate conflicting attributes if any.
122-
dedupJoin(Join(outerPlan, sub, LeftAnti, Option(finalJoinCond)))
145+
Join(outerPlan, newSub, LeftAnti, Option(finalJoinCond))
123146
case (p, predicate) =>
124147
val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p)
125148
Project(p.output, Filter(newCond.get, inputPlan))
@@ -140,16 +163,16 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
140163
e transformUp {
141164
case Exists(sub, conditions, _) =>
142165
val exists = AttributeReference("exists", BooleanType, nullable = false)()
143-
// Deduplicate conflicting attributes if any.
144-
newPlan = dedupJoin(
145-
Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)))
166+
newPlan =
167+
buildJoin(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))
146168
exists
147169
case InSubquery(values, ListQuery(sub, conditions, _, _)) =>
148170
val exists = AttributeReference("exists", BooleanType, nullable = false)()
149-
val inConditions = values.zip(sub.output).map(EqualTo.tupled)
150-
val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
151171
// Deduplicate conflicting attributes if any.
152-
newPlan = dedupJoin(Join(newPlan, sub, ExistenceJoin(exists), newConditions))
172+
val newSub = dedupSubqueryOnSelfJoin(newPlan, sub, Some(values))
173+
val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
174+
val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
175+
newPlan = Join(newPlan, newSub, ExistenceJoin(exists), newConditions)
153176
exists
154177
}
155178
}

0 commit comments

Comments
 (0)