Skip to content

Commit 6412ea1

Browse files
dongjoon-hyuncloud-fan
authored andcommitted
[SPARK-21247][SQL] Type comparison should respect case-sensitive SQL conf
## What changes were proposed in this pull request? This is an effort to reduce the difference between Hive and Spark. Spark supports case-sensitivity in columns. Especially, for Struct types, with `spark.sql.caseSensitive=true`, the following is supported. ```scala scala> sql("select named_struct('a', 1, 'A', 2).a").show +--------------------------+ |named_struct(a, 1, A, 2).a| +--------------------------+ | 1| +--------------------------+ scala> sql("select named_struct('a', 1, 'A', 2).A").show +--------------------------+ |named_struct(a, 1, A, 2).A| +--------------------------+ | 2| +--------------------------+ ``` And vice versa, with `spark.sql.caseSensitive=false`, the following is supported. ```scala scala> sql("select named_struct('a', 1).A, named_struct('A', 1).a").show +--------------------+--------------------+ |named_struct(a, 1).A|named_struct(A, 1).a| +--------------------+--------------------+ | 1| 1| +--------------------+--------------------+ ``` However, types are considered different. For example, SET operations fail. ```scala scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct<A:int> <> struct<a:int> at the first column of the second table;; 'Union :- Project [named_struct(a, 1) AS named_struct(a, 1)apache#57] : +- OneRowRelation$ +- Project [named_struct(A, 2) AS named_struct(A, 2)apache#58] +- OneRowRelation$ ``` This PR aims to support case-insensitive type equality. For example, in Set operation, the above operation succeed when `spark.sql.caseSensitive=false`. ```scala scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show +------------------+ |named_struct(a, 1)| +------------------+ | [1]| | [2]| +------------------+ ``` ## How was this patch tested? Pass the Jenkins with a newly add test case. Author: Dongjoon Hyun <[email protected]> Closes apache#18460 from dongjoon-hyun/SPARK-21247.
1 parent e6e3600 commit 6412ea1

File tree

4 files changed

+102
-5
lines changed

4 files changed

+102
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,16 @@ object TypeCoercion {
100100
case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) =>
101101
Some(TimestampType)
102102

103+
case (t1 @ StructType(fields1), t2 @ StructType(fields2)) if t1.sameType(t2) =>
104+
Some(StructType(fields1.zip(fields2).map { case (f1, f2) =>
105+
// Since `t1.sameType(t2)` is true, two StructTypes have the same DataType
106+
// except `name` (in case of `spark.sql.caseSensitive=false`) and `nullable`.
107+
// - Different names: use f1.name
108+
// - Different nullabilities: `nullable` is true iff one of them is nullable.
109+
val dataType = findTightestCommonType(f1.dataType, f2.dataType).get
110+
StructField(f1.name, dataType, nullable = f1.nullable || f2.nullable)
111+
}))
112+
103113
case _ => None
104114
}
105115

sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.json4s.jackson.JsonMethods._
2626

2727
import org.apache.spark.annotation.InterfaceStability
2828
import org.apache.spark.sql.catalyst.expressions.Expression
29+
import org.apache.spark.sql.internal.SQLConf
2930
import org.apache.spark.util.Utils
3031

3132
/**
@@ -80,7 +81,11 @@ abstract class DataType extends AbstractDataType {
8081
* (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
8182
*/
8283
private[spark] def sameType(other: DataType): Boolean =
83-
DataType.equalsIgnoreNullability(this, other)
84+
if (SQLConf.get.caseSensitiveAnalysis) {
85+
DataType.equalsIgnoreNullability(this, other)
86+
} else {
87+
DataType.equalsIgnoreCaseAndNullability(this, other)
88+
}
8489

8590
/**
8691
* Returns the same data type but set all nullability fields are true

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,14 +131,17 @@ class TypeCoercionSuite extends AnalysisTest {
131131
widenFunc: (DataType, DataType) => Option[DataType],
132132
t1: DataType,
133133
t2: DataType,
134-
expected: Option[DataType]): Unit = {
134+
expected: Option[DataType],
135+
isSymmetric: Boolean = true): Unit = {
135136
var found = widenFunc(t1, t2)
136137
assert(found == expected,
137138
s"Expected $expected as wider common type for $t1 and $t2, found $found")
138139
// Test both directions to make sure the widening is symmetric.
139-
found = widenFunc(t2, t1)
140-
assert(found == expected,
141-
s"Expected $expected as wider common type for $t2 and $t1, found $found")
140+
if (isSymmetric) {
141+
found = widenFunc(t2, t1)
142+
assert(found == expected,
143+
s"Expected $expected as wider common type for $t2 and $t1, found $found")
144+
}
142145
}
143146

144147
test("implicit type cast - ByteType") {
@@ -385,6 +388,47 @@ class TypeCoercionSuite extends AnalysisTest {
385388
widenTest(NullType, StructType(Seq()), Some(StructType(Seq())))
386389
widenTest(StringType, MapType(IntegerType, StringType, true), None)
387390
widenTest(ArrayType(IntegerType), StructType(Seq()), None)
391+
392+
widenTest(
393+
StructType(Seq(StructField("a", IntegerType))),
394+
StructType(Seq(StructField("b", IntegerType))),
395+
None)
396+
widenTest(
397+
StructType(Seq(StructField("a", IntegerType, nullable = false))),
398+
StructType(Seq(StructField("a", DoubleType, nullable = false))),
399+
None)
400+
401+
widenTest(
402+
StructType(Seq(StructField("a", IntegerType, nullable = false))),
403+
StructType(Seq(StructField("a", IntegerType, nullable = false))),
404+
Some(StructType(Seq(StructField("a", IntegerType, nullable = false)))))
405+
widenTest(
406+
StructType(Seq(StructField("a", IntegerType, nullable = false))),
407+
StructType(Seq(StructField("a", IntegerType, nullable = true))),
408+
Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
409+
widenTest(
410+
StructType(Seq(StructField("a", IntegerType, nullable = true))),
411+
StructType(Seq(StructField("a", IntegerType, nullable = false))),
412+
Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
413+
widenTest(
414+
StructType(Seq(StructField("a", IntegerType, nullable = true))),
415+
StructType(Seq(StructField("a", IntegerType, nullable = true))),
416+
Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
417+
418+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
419+
widenTest(
420+
StructType(Seq(StructField("a", IntegerType))),
421+
StructType(Seq(StructField("A", IntegerType))),
422+
None)
423+
}
424+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
425+
checkWidenType(
426+
TypeCoercion.findTightestCommonType,
427+
StructType(Seq(StructField("a", IntegerType), StructField("B", IntegerType))),
428+
StructType(Seq(StructField("A", IntegerType), StructField("b", IntegerType))),
429+
Some(StructType(Seq(StructField("a", IntegerType), StructField("B", IntegerType)))),
430+
isSymmetric = false)
431+
}
388432
}
389433

390434
test("wider common type for decimal and array") {

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2646,6 +2646,44 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
26462646
}
26472647
}
26482648

2649+
test("SPARK-21247: Allow case-insensitive type equality in Set operation") {
2650+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
2651+
sql("SELECT struct(1 a) UNION ALL (SELECT struct(2 A))")
2652+
sql("SELECT struct(1 a) EXCEPT (SELECT struct(2 A))")
2653+
2654+
withTable("t", "S") {
2655+
sql("CREATE TABLE t(c struct<f:int>) USING parquet")
2656+
sql("CREATE TABLE S(C struct<F:int>) USING parquet")
2657+
Seq(("c", "C"), ("C", "c"), ("c.f", "C.F"), ("C.F", "c.f")).foreach {
2658+
case (left, right) =>
2659+
checkAnswer(sql(s"SELECT * FROM t, S WHERE t.$left = S.$right"), Seq.empty)
2660+
}
2661+
}
2662+
}
2663+
2664+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
2665+
val m1 = intercept[AnalysisException] {
2666+
sql("SELECT struct(1 a) UNION ALL (SELECT struct(2 A))")
2667+
}.message
2668+
assert(m1.contains("Union can only be performed on tables with the compatible column types"))
2669+
2670+
val m2 = intercept[AnalysisException] {
2671+
sql("SELECT struct(1 a) EXCEPT (SELECT struct(2 A))")
2672+
}.message
2673+
assert(m2.contains("Except can only be performed on tables with the compatible column types"))
2674+
2675+
withTable("t", "S") {
2676+
sql("CREATE TABLE t(c struct<f:int>) USING parquet")
2677+
sql("CREATE TABLE S(C struct<F:int>) USING parquet")
2678+
checkAnswer(sql("SELECT * FROM t, S WHERE t.c.f = S.C.F"), Seq.empty)
2679+
val m = intercept[AnalysisException] {
2680+
sql("SELECT * FROM t, S WHERE c = C")
2681+
}.message
2682+
assert(m.contains("cannot resolve '(t.`c` = S.`C`)' due to data type mismatch"))
2683+
}
2684+
}
2685+
}
2686+
26492687
test("SPARK-21335: support un-aliased subquery") {
26502688
withTempView("v") {
26512689
Seq(1 -> "a").toDF("i", "j").createOrReplaceTempView("v")

0 commit comments

Comments
 (0)