diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java index 58973e7a3acf1..c02c7b729c183 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java @@ -30,6 +30,8 @@ import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.array.ByteArrayMethods; +import org.apache.spark.unsafe.hash.Murmur3_x86_32; import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET; @@ -112,6 +114,22 @@ public UnsafeArrayData valueArray() { return values; } + @Override + public int hashCode() { + return Murmur3_x86_32.hashUnsafeBytes(baseObject, baseOffset, sizeInBytes, 42); + } + + @Override + public boolean equals(Object other) { + if (other instanceof UnsafeMapData) { + UnsafeMapData o = (UnsafeMapData) other; + return (sizeInBytes == o.sizeInBytes) && + ByteArrayMethods.arrayEquals(baseObject, baseOffset, o.baseObject, o.baseOffset, + sizeInBytes); + } + return false; + } + public void writeToMemory(Object target, long targetOffset) { Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes); } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index c1578483ca921..4b9cc72557daa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -61,14 +61,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { dt.existsRecursively(_.isInstanceOf[MapType]) } - protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match { - case _: Intersect | _: Except | _: Distinct => - plan.output.find(a => hasMapType(a.dataType)) - case d: Deduplicate => - d.keys.find(a => hasMapType(a.dataType)) - case _ => None - } - private def checkLimitLikeClause(name: String, limitExpr: Expression): Unit = { limitExpr match { case e if !e.foldable => failAnalysis( @@ -588,14 +580,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { |Conflicting attributes: ${conflictingAttributes.mkString(",")} """.stripMargin) - // TODO: although map type is not orderable, technically map type should be able to be - // used in equality comparison, remove this type check once we support it. - case o if mapColumnInSetOperation(o).isDefined => - val mapCol = mapColumnInSetOperation(o).get - failAnalysis("Cannot have map type columns in DataFrame which calls " + - s"set operations(intersect, except, etc.), but the type of column ${mapCol.name} " + - "is " + mapCol.dataType.catalogString) - case o if o.expressions.exists(!_.deterministic) && !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] && !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 7f2c1c652dc8e..852d78896c4c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -625,6 +625,7 @@ class CodegenContext extends Logging { case dt: DataType if dt.isInstanceOf[AtomicType] => s"$c1.equals($c2)" case array: ArrayType => genComp(array, c1, c2) + " == 0" case struct: StructType => genComp(struct, c1, c2) + " == 0" + case map: MapType => genComp(map, c1, c2) + " == 0" case udt: UserDefinedType[_] => genEqual(udt.sqlType, c1, c2) case NullType => "false" case _ => @@ -700,6 +701,32 @@ class CodegenContext extends Logging { } """ s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)" + + case _ @ MapType(keyType, valueType, _) => + val keyArrayType = ArrayType(keyType) + val valueArrayType = ArrayType(valueType) + val compareFunc = freshName("compareMap") + val funcCode: String = + s""" + public int $compareFunc(MapData a, MapData b) { + ArrayData aKeys = a.keyArray(); + ArrayData bKeys = b.keyArray(); + int keyComp = ${genComp(keyArrayType, "aKeys", "bKeys")}; + if (keyComp != 0) { + return keyComp; + } + + ArrayData aValues = a.valueArray(); + ArrayData bValues = b.valueArray(); + int valueComp = ${genComp(valueArrayType, "aValues", "bValues")}; + if (valueComp != 0) { + return valueComp; + } + return 0; + } + """ + s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)" + case schema: StructType => val comparisons = GenerateOrdering.genComparisons(this, schema) val compareFunc = freshName("compareStruct") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala index ba3ed02e06ef1..9f7639ec235d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala @@ -65,6 +65,10 @@ class InterpretedOrdering(ordering: Seq[SortOrder]) extends BaseOrdering { a.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right) case a: ArrayType if order.direction == Descending => - a.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right) + case a: MapType if order.direction == Ascending => + a.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right) + case a: MapType if order.direction == Descending => + - a.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right) case s: StructType if order.direction == Ascending => s.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right) case s: StructType if order.direction == Descending => @@ -104,6 +108,7 @@ object RowOrdering extends CodeGeneratorWithInterpretedFallback[Seq[SortOrder], case dt: AtomicType => true case struct: StructType => struct.fields.forall(f => isOrderable(f.dataType)) case array: ArrayType => isOrderable(array.elementType) + case map: MapType => isOrderable(map.keyType) && isOrderable(map.valueType) case udt: UserDefinedType[_] => isOrderable(udt.sqlType) case _ => false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala index 6d0f46baa0984..101a7832018bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.expressions.{Alias, And, ArrayTransform, CaseWhen, Coalesce, CreateArray, CreateMap, CreateNamedStruct, EqualTo, ExpectsInputTypes, Expression, GetStructField, If, IsNull, KnownFloatingPointNormalized, LambdaFunction, Literal, NamedLambdaVariable, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, And, ArrayTransform, CaseWhen, Coalesce, CreateArray, CreateMap, CreateNamedStruct, EqualTo, ExpectsInputTypes, Expression, GetStructField, If, IsNull, KnownFloatingPointNormalized, LambdaFunction, Literal, NamedLambdaVariable, TransformKeys, TransformValues, UnaryExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Window} @@ -96,9 +96,7 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] { case FloatType | DoubleType => true case StructType(fields) => fields.exists(f => needNormalize(f.dataType)) case ArrayType(et, _) => needNormalize(et) - // Currently MapType is not comparable and analyzer should fail earlier if this case happens. - case _: MapType => - throw new IllegalStateException("grouping/join/window partition keys cannot be map type.") + case MapType(kt, vt, _) => needNormalize(kt) || needNormalize(vt) case _ => false } @@ -142,6 +140,26 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] { val function = normalize(lv) KnownFloatingPointNormalized(ArrayTransform(expr, LambdaFunction(function, Seq(lv)))) + case _ if expr.dataType.isInstanceOf[MapType] => + val MapType(kt, vt, containsNull) = expr.dataType + val maybeKeyNormalized = if (needNormalize(kt)) { + val lv1 = NamedLambdaVariable("arg1", kt, nullable = false) + val lv2 = NamedLambdaVariable("arg2", vt, containsNull) + val function = normalize(lv1) + TransformKeys(expr, LambdaFunction(function, Seq(lv1, lv2))) + } else { + expr + } + val maybeKeyValueNormalized = if (needNormalize(vt)) { + val lv1 = NamedLambdaVariable("arg1", kt, nullable = false) + val lv2 = NamedLambdaVariable("arg2", vt, containsNull) + val function = normalize(lv2) + TransformValues(maybeKeyNormalized, LambdaFunction(function, Seq(lv1, lv2))) + } else { + maybeKeyNormalized + } + KnownFloatingPointNormalized(maybeKeyValueNormalized) + case _ => throw new IllegalStateException(s"fail to normalize $expr") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeMaps.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeMaps.scala new file mode 100644 index 0000000000000..182a354ed2d0f --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeMaps.scala @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.math.Ordering + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator._ +import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Window} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData, TypeUtils} +import org.apache.spark.sql.types._ + +/** + * When comparing two maps, we have to make sure two maps having the same key value pairs but + * with different key ordering are equal (e.g., Map('a' -> 1, 'b' -> 2) should equal to + * Map('b' -> 2, 'a' -> 1). To make sure the assumption holds, + * this rule inserts a [[SortMapKeys]] expression to sort map entries by keys. + * + * NOTE: this rule must be executed at the end of the optimizer because it may create + * new joins (the subquery rewrite) and new join conditions (the join reorder). + */ +object NormalizeMaps extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions { + // The analyzer guarantees left and right types are the same, so + // we only need to check a type in one side. + case cmp @ BinaryComparison(left, right) if needNormalize(left) => + cmp.withNewChildren(SortMapKeys(left) :: SortMapKeys(right) :: Nil) + + case In(value, list) if needNormalize(value) => + In(SortMapKeys(value), list.map(SortMapKeys)) + + case in @ InSet(value, list) if needNormalize(value) => + val newHset = list.map(c => SortMapKeys(Literal(c, in.child.dataType)).eval()) + InSet(SortMapKeys(value), newHset) + + case sort: SortOrder if needNormalize(sort.child) => + sort.copy(child = SortMapKeys(sort.child)) + }.transform { + case w: Window if w.partitionSpec.exists(p => needNormalize(p)) => + w.copy(partitionSpec = w.partitionSpec.map(normalize)) + + // TODO: `NormalizeMaps` has the same restriction with `NormalizeFloatingNumbers`; + // ideally Aggregate should also be handled here, but its grouping expressions are + // mixed in its aggregate expressions. It's unreliable to change the grouping expressions + // here. For now we normalize grouping expressions in `AggUtils` during planning. + } + + private def needNormalize(expr: Expression): Boolean = expr match { + case SortMapKeys(_) => false + case _ => needNormalize(expr.dataType) + } + + private def needNormalize(dt: DataType): Boolean = dt match { + case StructType(fields) => fields.exists(f => needNormalize(f.dataType)) + case ArrayType(et, _) => needNormalize(et) + case _: MapType => true + case _ => false + } + + private[sql] def normalize(expr: Expression): Expression = expr match { + case _ if !needNormalize(expr) => expr + case _ => SortMapKeys(expr) + } +} + +/** + * This expression sorts all maps in an expression's result. This expression enables the use of + * maps in comparisons and equality operations. + */ +case class SortMapKeys(child: Expression) extends UnaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = + Seq(TypeCollection(ArrayType, MapType, StructType)) + + override def dataType: DataType = child.dataType + + override protected def withNewChildInternal(newChild: Expression): Expression = { + copy(child = newChild) + } + + private def createFuncToSortRecursively(dt: DataType): Any => Any = dt match { + case m @ MapType(keyType, valueType, _) => + val sf = createFuncToSortRecursively(valueType) + val keyOrdering = new Ordering[(Any, Any)] { + val ordering = TypeUtils.getInterpretedOrdering(keyType) + override def compare(x: (Any, Any), y: (Any, Any)): Int = ordering.compare(x._1, y._1) + + } + (data: Any) => { + val input = data.asInstanceOf[MapData] + val length = input.numElements() + val keys = input.keyArray() + val values = input.valueArray() + val buffer = Array.ofDim[(Any, Any)](length) + var i = 0 + while (i < length) { + // Map keys cannot contain map types (See `TypeUtils.checkForMapKeyType`), + // so we recursively sort values only. + val k = keys.get(i, m.keyType) + val v = if (!values.isNullAt(i)) { + sf(values.get(i, m.valueType)) + } else { + null + } + buffer(i) = k -> v + i += 1 + } + + java.util.Arrays.sort(buffer, keyOrdering) + + ArrayBasedMapData(buffer.toIterator, length, identity, identity) + } + + case ArrayType(dt, _) => + val sf = createFuncToSortRecursively(dt) + (data: Any) => { + val input = data.asInstanceOf[ArrayData] + val length = input.numElements() + val output = Array.ofDim[Any](length) + var i = 0 + while (i < length) { + if (!input.isNullAt(i)) { + output(i) = sf(input.get(i, dt)) + } else { + output(i) = null + } + i += 1 + } + new GenericArrayData(output) + } + + case StructType(fields) => + val fs = fields.map { field => + val sf = createFuncToSortRecursively(field.dataType) + (input: InternalRow, i: Int) => { + sf(input.get(i, field.dataType)) + } + } + val length = fields.length + (data: Any) => { + val input = data.asInstanceOf[InternalRow] + val output = Array.ofDim[Any](length) + var i = 0 + while (i < length) { + if (!input.isNullAt(i)) { + output(i) = fs(i)(input, i) + } else { + output(i) = null + } + i += 1 + } + new GenericInternalRow(output) + } + + case _ => + identity + } + + @transient private[this] lazy val sortFunc = { + createFuncToSortRecursively(dataType) + } + + override def nullSafeEval(input: Any): Any = sortFunc(input) + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + // TODO: we should code generate this + val tf = ctx.addReferenceObj("sortFunc", sortFunc, classOf[Any => Any].getCanonicalName) + nullSafeCodeGen(ctx, ev, eval => { + s"${ev.value} = (${javaType(dataType)})$tf.apply($eval);" + }) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index c79fd7a87a83b..56721111c73a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -167,6 +167,10 @@ abstract class Optimizer(catalogManager: CatalogManager) RemoveNoopUnion) :: Batch("OptimizeLimitZero", Once, OptimizeLimitZero) :: + // After applying ConvertToLocalRelation, we cannot normalize maps in Filter/Project. + // So, we need to apply NormalizeMaps just before ConvertToLocalRelation. + Batch("Normalize Maps Before Converting LocalRelation", Once, + NormalizeMaps) :: // Run this once earlier. This might simplify the plan and reduce cost of optimizer. // For example, a query such as Filter(LocalRelation) would go through all the heavy // optimizer rules that are triggered when there is a filter @@ -234,8 +238,9 @@ abstract class Optimizer(catalogManager: CatalogManager) ColumnPruning, CollapseProject, RemoveNoopOperators) :+ - // This batch must be executed after the `RewriteSubquery` batch, which creates joins. + // Following batches must be executed after the `RewriteSubquery` batch, which creates joins. Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers) :+ + Batch("NormalizeMaps", Once, NormalizeMaps) :+ Batch("ReplaceUpdateFieldsExpression", Once, ReplaceUpdateFieldsExpression) // remove any batches with no rules. this may happen when subclasses do not add optional rules. @@ -271,7 +276,8 @@ abstract class Optimizer(catalogManager: CatalogManager) RewritePredicateSubquery.ruleName :: NormalizeFloatingNumbers.ruleName :: ReplaceUpdateFieldsExpression.ruleName :: - PullOutGroupingExpressions.ruleName :: Nil + PullOutGroupingExpressions.ruleName :: + NormalizeMaps.ruleName :: Nil /** * Optimize all the subqueries inside expression. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala index 3768f7a1824f1..b84f6d4234d64 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala @@ -32,6 +32,23 @@ class ArrayBasedMapData(val keyArray: ArrayData, val valueArray: ArrayData) exte override def copy(): MapData = new ArrayBasedMapData(keyArray.copy(), valueArray.copy()) + override def equals(o: Any): Boolean = { + if (!o.isInstanceOf[ArrayBasedMapData]) { + return false + } + + val other = o.asInstanceOf[ArrayBasedMapData] + if (other eq null) { + return false + } + + this.keyArray == other.keyArray && this.valueArray == other.valueArray + } + + override def hashCode: Int = { + keyArray.hashCode() * 37 + valueArray.hashCode() + } + override def toString: String = { s"keys: $keyArray, values: $valueArray" } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/MapData.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/MapData.scala index 94e8824cd18cc..40db6067adf71 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/MapData.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/MapData.scala @@ -19,11 +19,6 @@ package org.apache.spark.sql.catalyst.util import org.apache.spark.sql.types.DataType -/** - * This is an internal data representation for map type in Spark SQL. This should not implement - * `equals` and `hashCode` because the type cannot be used as join keys, grouping keys, or - * in equality tests. See SPARK-9415 and PR#13847 for the discussions. - */ abstract class MapData extends Serializable { def numElements(): Int diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala index 015dca844a429..3c781800df83b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala @@ -82,6 +82,7 @@ object TypeUtils { t match { case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]] case a: ArrayType => a.interpretedOrdering.asInstanceOf[Ordering[Any]] + case m: MapType => m.interpretedOrdering.asInstanceOf[Ordering[Any]] case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]] case udt: UserDefinedType[_] => getInterpretedOrdering(udt.sqlType) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala index a3a2ccf5ab12c..24e90303d41ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala @@ -108,6 +108,7 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT case dt: AtomicType => dt.ordering.asInstanceOf[Ordering[Any]] case a : ArrayType => a.interpretedOrdering.asInstanceOf[Ordering[Any]] case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]] + case m: MapType => m.interpretedOrdering.asInstanceOf[Ordering[Any]] case other => throw new IllegalArgumentException( s"Type ${other.catalogString} does not support ordered operations") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala index 2e5c7f731dcc7..26d4424450967 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.types +import scala.math.Ordering + import org.json4s.JsonAST.JValue import org.json4s.JsonDSL._ import org.apache.spark.annotation.Stable +import org.apache.spark.sql.catalyst.util.MapData import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat /** @@ -79,6 +82,31 @@ case class MapType( override private[spark] def existsRecursively(f: (DataType) => Boolean): Boolean = { f(this) || keyType.existsRecursively(f) || valueType.existsRecursively(f) } + + @transient + private[sql] lazy val interpretedOrdering: Ordering[MapData] = new Ordering[MapData] { + + private val keyArrayOrdering = ArrayType(keyType).interpretedOrdering + private val valueArrayOrdering = ArrayType(valueType).interpretedOrdering + + def compare(left: MapData, right: MapData): Int = { + val leftKeys = left.keyArray() + val rightKeys = right.keyArray() + val keyComp = keyArrayOrdering.compare(leftKeys, rightKeys) + if (keyComp != 0) { + return keyComp + } + + val leftValues = left.valueArray() + val rightValues = right.valueArray() + val valueComp = valueArrayOrdering.compare(leftValues, rightValues) + if (valueComp != 0) { + valueComp + } else { + 0 + } + } + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 6cda05360aea3..dd7048e3a5067 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -52,34 +52,34 @@ private[sql] class GroupableUDT extends UserDefinedType[GroupableData] { private[spark] override def asNullable: GroupableUDT = this } -private[sql] case class UngroupableData(data: Map[Int, Int]) { +private[sql] case class GroupableData2(data: Map[Int, Int]) { def getData: Map[Int, Int] = data } -private[sql] class UngroupableUDT extends UserDefinedType[UngroupableData] { +private[sql] class GroupableUDT2 extends UserDefinedType[GroupableData2] { override def sqlType: DataType = MapType(IntegerType, IntegerType) - override def serialize(ungroupableData: UngroupableData): MapData = { - val keyArray = new GenericArrayData(ungroupableData.data.keys.toSeq) - val valueArray = new GenericArrayData(ungroupableData.data.values.toSeq) + override def serialize(groupableData: GroupableData2): MapData = { + val keyArray = new GenericArrayData(groupableData.data.keys.toSeq) + val valueArray = new GenericArrayData(groupableData.data.values.toSeq) new ArrayBasedMapData(keyArray, valueArray) } - override def deserialize(datum: Any): UngroupableData = { + override def deserialize(datum: Any): GroupableData2 = { datum match { case data: MapData => val keyArray = data.keyArray().array val valueArray = data.valueArray().array assert(keyArray.length == valueArray.length) val mapData = keyArray.zip(valueArray).toMap.asInstanceOf[Map[Int, Int]] - UngroupableData(mapData) + GroupableData2(mapData) } } - override def userClass: Class[UngroupableData] = classOf[UngroupableData] + override def userClass: Class[GroupableData2] = classOf[GroupableData2] - private[spark] override def asNullable: UngroupableUDT = this + private[spark] override def asNullable: GroupableUDT2 = this } case class TestFunction( @@ -299,11 +299,6 @@ class AnalysisErrorSuite extends AnalysisTest { testRelation2.groupBy($"a")(sum(UnresolvedStar(None))), "Invalid usage of '*'" :: "in expression 'sum'" :: Nil) - errorTest( - "sorting by unsupported column types", - mapRelation.orderBy($"map".asc), - "sort" :: "type" :: "map" :: Nil) - errorTest( "sorting by attributes are not from grouping expressions", testRelation2.groupBy($"a", $"c")($"a", $"c", count($"a").as("a3")).orderBy($"b".asc), @@ -594,19 +589,14 @@ class AnalysisErrorSuite extends AnalysisTest { new StructType() .add("f1", FloatType, nullable = true) .add("f2", ArrayType(BooleanType, containsNull = true), nullable = true), - new GroupableUDT()) - supportedDataTypes.foreach { dataType => - checkDataType(dataType, shouldSuccess = true) - } - - val unsupportedDataTypes = Seq( MapType(StringType, LongType), new StructType() .add("f1", FloatType, nullable = true) .add("f2", MapType(StringType, LongType), nullable = true), - new UngroupableUDT()) - unsupportedDataTypes.foreach { dataType => - checkDataType(dataType, shouldSuccess = false) + new GroupableUDT(), + new GroupableUDT2()) + supportedDataTypes.foreach { dataType => + checkDataType(dataType, shouldSuccess = true) } } @@ -625,7 +615,7 @@ class AnalysisErrorSuite extends AnalysisTest { "another aggregate function." :: Nil) } - test("Join can work on binary types but can't work on map types") { + test("Join can work on binary/map types") { val left = LocalRelation(Symbol("a").binary, Symbol("b").map(StringType, StringType)) val right = LocalRelation(Symbol("c").binary, Symbol("d").map(StringType, StringType)) @@ -640,7 +630,7 @@ class AnalysisErrorSuite extends AnalysisTest { right, joinType = Cross, condition = Some(Symbol("b") === Symbol("d"))) - assertAnalysisError(plan2, "EqualTo does not support ordering on type map" :: Nil) + assertAnalysisSuccess(plan2) } test("PredicateSubQuery is used outside of a filter") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala index 9ba03efed1839..135f986b720fe 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala @@ -113,19 +113,6 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite { assertErrorForDifferingTypes(GreaterThan(Symbol("intField"), Symbol("booleanField"))) assertErrorForDifferingTypes(GreaterThanOrEqual(Symbol("intField"), Symbol("booleanField"))) - assertError(EqualTo(Symbol("mapField"), Symbol("mapField")), - "EqualTo does not support ordering on type map") - assertError(EqualNullSafe(Symbol("mapField"), Symbol("mapField")), - "EqualNullSafe does not support ordering on type map") - assertError(LessThan(Symbol("mapField"), Symbol("mapField")), - "LessThan does not support ordering on type map") - assertError(LessThanOrEqual(Symbol("mapField"), Symbol("mapField")), - "LessThanOrEqual does not support ordering on type map") - assertError(GreaterThan(Symbol("mapField"), Symbol("mapField")), - "GreaterThan does not support ordering on type map") - assertError(GreaterThanOrEqual(Symbol("mapField"), Symbol("mapField")), - "GreaterThanOrEqual does not support ordering on type map") - assertError(If(Symbol("intField"), Symbol("stringField"), Symbol("stringField")), "type of predicate expression in If should be boolean") assertErrorForDifferingTypes( @@ -156,8 +143,8 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite { assertSuccess(new BoolAnd(Symbol("booleanField"))) assertSuccess(new BoolOr(Symbol("booleanField"))) - assertError(Min(Symbol("mapField")), "min does not support ordering on type") - assertError(Max(Symbol("mapField")), "max does not support ordering on type") + assertSuccess(Min(Symbol("mapField"))) + assertSuccess(Max(Symbol("mapField"))) assertError(Sum(Symbol("booleanField")), "function sum requires numeric or interval types") assertError(Average(Symbol("booleanField")), "function average requires numeric or interval types") @@ -228,8 +215,6 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite { assertError(operator(Seq(Symbol("booleanField"))), "requires at least two arguments") assertError(operator(Seq(Symbol("intField"), Symbol("stringField"))), "should all have the same type") - assertError(operator(Seq(Symbol("mapField"), Symbol("mapField"))), - "does not support ordering") } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala index 6f75623dc59ae..1b0850512a36c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala @@ -233,14 +233,6 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper { StructField("a", colOneType) :: StructField("b", colTwoType) :: Nil) testWithRandomDataGeneration(structType, nullable) } - - // In doesn't support map type and will fail the analyzer. - val map = Literal.create(create_map(1 -> 1), MapType(IntegerType, IntegerType)) - In(map, Seq(map)).checkInputDataTypes() match { - case TypeCheckResult.TypeCheckFailure(msg) => - assert(msg.contains("function in does not support ordering on type map")) - case _ => fail("In should not work on map type") - } } test("switch statements in InSet for bytes, shorts, ints, dates") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingPointNumbersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingPointNumbersSuite.scala index bb9919f94eef2..e0111b2439640 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingPointNumbersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingPointNumbersSuite.scala @@ -19,10 +19,11 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{CaseWhen, If, IsNull, KnownFloatingPointNormalized} +import org.apache.spark.sql.catalyst.expressions.{CaseWhen, Expression, If, IsNull, KnownFloatingPointNormalized, LambdaFunction, NamedLambdaVariable, TransformKeys, TransformValues} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType} class NormalizeFloatingPointNumbersSuite extends PlanTest { @@ -124,5 +125,66 @@ class NormalizeFloatingPointNumbersSuite extends PlanTest { comparePlans(doubleOptimized, correctAnswer) } + + def mapKeyNormalized(input: Expression, tpe1: DataType, tpe2: DataType): Expression = { + val lv1 = NamedLambdaVariable("arg1", tpe1, nullable = false) + val lv2 = NamedLambdaVariable("arg2", tpe2, nullable = true) + val f = KnownFloatingPointNormalized(NormalizeNaNAndZero(lv1)) + TransformKeys(input, LambdaFunction(f, Seq(lv1, lv2))) + } + + def mapValueNormalized(input: Expression, tpe1: DataType, tpe2: DataType): Expression = { + val lv1 = NamedLambdaVariable("arg1", tpe1, nullable = false) + val lv2 = NamedLambdaVariable("arg2", tpe2, nullable = true) + val f = KnownFloatingPointNormalized(NormalizeNaNAndZero(lv2)) + TransformValues(input, LambdaFunction(f, Seq(lv1, lv2))) + } + + test("SPARK-34819: normalize map keys and values - normalized keys only") { + val t1 = LocalRelation('a.map(DoubleType, IntegerType)) + val a = t1.output(0) + val t2 = LocalRelation('a.map(DoubleType, IntegerType)) + val b = t2.output(0) + val query = t1.join(t2, condition = Some(a === b)) + val optimized = Optimize.execute(query) + val doubleOptimized = Optimize.execute(optimized) + + val correctAnswer = t1.join(t2, condition = Some( + KnownFloatingPointNormalized(mapKeyNormalized(a, DoubleType, IntegerType)) === + KnownFloatingPointNormalized(mapKeyNormalized(b, DoubleType, IntegerType)))) + comparePlans(doubleOptimized, correctAnswer) + } + + test("SPARK-34819: normalize map keys and values - normalized values only") { + val t1 = LocalRelation('a.map(IntegerType, DoubleType)) + val a = t1.output(0) + val t2 = LocalRelation('a.map(IntegerType, DoubleType)) + val b = t2.output(0) + val query = t1.join(t2, condition = Some(a === b)) + val optimized = Optimize.execute(query) + val doubleOptimized = Optimize.execute(optimized) + + val correctAnswer = t1.join(t2, condition = Some( + KnownFloatingPointNormalized(mapValueNormalized(a, IntegerType, DoubleType)) === + KnownFloatingPointNormalized(mapValueNormalized(b, IntegerType, DoubleType)))) + comparePlans(doubleOptimized, correctAnswer) + } + + test("SPARK-34819: normalize map keys and values - normalized both keys/values") { + val t1 = LocalRelation('a.map(DoubleType, DoubleType)) + val a = t1.output(0) + val t2 = LocalRelation('a.map(DoubleType, DoubleType)) + val b = t2.output(0) + val query = t1.join(t2, condition = Some(a === b)) + val optimized = Optimize.execute(query) + val doubleOptimized = Optimize.execute(optimized) + + val correctAnswer = t1.join(t2, condition = Some( + KnownFloatingPointNormalized(mapValueNormalized(mapKeyNormalized( + a, DoubleType, DoubleType), DoubleType, DoubleType)) === + KnownFloatingPointNormalized(mapValueNormalized(mapKeyNormalized( + b, DoubleType, DoubleType), DoubleType, DoubleType)))) + comparePlans(doubleOptimized, correctAnswer) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeMapsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeMapsSuite.scala new file mode 100644 index 0000000000000..db02ea1bd2cc5 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeMapsSuite.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData} +import org.apache.spark.sql.types.{MapType, StringType} + +class NormalizeMapsSuite extends PlanTest with ExpressionEvalHelper { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = Batch("NormalizeMaps", Once, NormalizeMaps) :: Nil + } + + val testRelation = LocalRelation( + 'a.int, + 'm1.map(MapType(StringType, StringType, false)), + 'm2.map(MapType(StringType, StringType, false))) + val a = testRelation.output(0) + val m1 = testRelation.output(1) + val m2 = testRelation.output(2) + + test("SPARK-34819: normalize map types in window function expressions") { + val query = testRelation.window(Seq(sum(a).as("sum")), Seq(m1), Seq(m1.asc)) + val optimized = Optimize.execute(query) + // For idempotence checks + val doubleOptimized = Optimize.execute(optimized) + val correctAnswer = testRelation.window(Seq(sum(a).as("sum")), + Seq(SortMapKeys(m1)), Seq(SortMapKeys(m1).asc)) + comparePlans(doubleOptimized, correctAnswer) + } + + test("SPARK-34819: normalize map types in join keys") { + val testRelation2 = LocalRelation('a.int, 'm.map(MapType(StringType, StringType, false))) + val a2 = testRelation2.output(0) + val m21 = testRelation2.output(1) + val query = testRelation.join(testRelation2, condition = Some(m1 === m21)) + val optimized = Optimize.execute(query) + // For idempotence checks + val doubleOptimized = Optimize.execute(optimized) + val joinCond = Some(SortMapKeys(m1) === SortMapKeys(m21)) + val correctAnswer = testRelation.join(testRelation2, condition = joinCond) + comparePlans(doubleOptimized, correctAnswer) + } + + test("SPARK-34819: normalize map types in predicates") { + val query = testRelation.where(m1 === m2) + val optimized = Optimize.execute(query) + // For idempotence checks + val doubleOptimized = Optimize.execute(optimized) + val correctAnswer = testRelation.where(SortMapKeys(m1) === SortMapKeys(m2)) + comparePlans(doubleOptimized, correctAnswer) + } + + test("SPARK-34819: normalize map types in sort orders") { + val query = testRelation.orderBy(m1.asc, m2.desc) + val optimized = Optimize.execute(query) + // For idempotence checks + val doubleOptimized = Optimize.execute(optimized) + val correctAnswer = testRelation.orderBy(SortMapKeys(m1).asc, SortMapKeys(m2).desc) + comparePlans(doubleOptimized, correctAnswer) + } + + test("SPARK-34819: sort map keys") { + def createMap(keys: Seq[_], values: Seq[_]): ArrayBasedMapData = { + val keyArray = CatalystTypeConverters + .convertToCatalyst(keys) + .asInstanceOf[ArrayData] + val valueArray = CatalystTypeConverters + .convertToCatalyst(values) + .asInstanceOf[ArrayData] + new ArrayBasedMapData(keyArray, valueArray) + } + + checkEvaluation(SortMapKeys(CreateMap(Seq(2, "b", 1, "a"))), + createMap(Seq(1, 2), Seq("a", "b"))) + checkEvaluation(SortMapKeys(CreateArray(Seq(CreateMap(Seq(2, "b", 1, "a"))))), + Seq(createMap(Seq(1, 2), Seq("a", "b")))) + checkEvaluation(SortMapKeys(CreateNamedStruct(Seq("c", CreateMap(Seq(2, "b", 1, "a"))))), + InternalRow(createMap(Seq(1, 2), Seq("a", "b")))) + checkEvaluation(SortMapKeys( + CreateMap(Seq(2, CreateMap(Seq(4, "d", 3, "c")), 1, CreateMap(Seq(2, "b", 1, "a"))))), + createMap(Seq(1, 2), Seq( + createMap(Seq(1, 2), Seq("a", "b")), + createMap(Seq(3, 4), Seq("c", "d"))))) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ComplexDataSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ComplexDataSuite.scala index f921f06537080..fa695133d107b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ComplexDataSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ComplexDataSuite.scala @@ -27,31 +27,37 @@ import org.apache.spark.unsafe.types.UTF8String class ComplexDataSuite extends SparkFunSuite { def utf8(str: String): UTF8String = UTF8String.fromString(str) - test("inequality tests for MapData") { - // test data - val testMap1 = Map(utf8("key1") -> 1) - val testMap2 = Map(utf8("key1") -> 1, utf8("key2") -> 2) - val testMap3 = Map(utf8("key1") -> 1) - val testMap4 = Map(utf8("key1") -> 1, utf8("key2") -> 2) + test("SPARK-34819: equality tests for MapData") { + val testArrayMap1 = ArrayBasedMapData(Array(), Array()) + val testArrayMap2 = ArrayBasedMapData(Array(utf8("k1")), Array(1)) + val testArrayMap3 = ArrayBasedMapData(Array(utf8("k1"), utf8("k2")), Array(1, 2)) + val testArrayMap4 = ArrayBasedMapData(Array(utf8("k2"), utf8("k1")), Array(2, 1)) // ArrayBasedMapData - val testArrayMap1 = ArrayBasedMapData(testMap1.toMap) - val testArrayMap2 = ArrayBasedMapData(testMap2.toMap) - val testArrayMap3 = ArrayBasedMapData(testMap3.toMap) - val testArrayMap4 = ArrayBasedMapData(testMap4.toMap) - assert(testArrayMap1 !== testArrayMap3) - assert(testArrayMap2 !== testArrayMap4) + assert(testArrayMap1 === testArrayMap1.copy()) + assert(testArrayMap2 === testArrayMap2.copy()) + assert(testArrayMap3 === testArrayMap3.copy()) + assert(testArrayMap4 === testArrayMap4.copy()) + assert(testArrayMap1 !== testArrayMap2) + assert(testArrayMap2 !== testArrayMap3) + assert(testArrayMap3 !== testArrayMap4) - // UnsafeMapData val unsafeConverter = UnsafeProjection.create(Array[DataType](MapType(StringType, IntegerType))) val row = new GenericInternalRow(1) - def toUnsafeMap(map: ArrayBasedMapData): UnsafeMapData = { + def toUnsafeMap(map: MapData): UnsafeMapData = { row.update(0, map) val unsafeRow = unsafeConverter.apply(row) unsafeRow.getMap(0).copy } - assert(toUnsafeMap(testArrayMap1) !== toUnsafeMap(testArrayMap3)) - assert(toUnsafeMap(testArrayMap2) !== toUnsafeMap(testArrayMap4)) + + // UnsafeMapData + assert(toUnsafeMap(testArrayMap1) === toUnsafeMap(testArrayMap1.copy())) + assert(toUnsafeMap(testArrayMap2) === toUnsafeMap(testArrayMap2.copy())) + assert(toUnsafeMap(testArrayMap3) === toUnsafeMap(testArrayMap3.copy())) + assert(toUnsafeMap(testArrayMap4) === toUnsafeMap(testArrayMap4.copy())) + assert(toUnsafeMap(testArrayMap1) !== toUnsafeMap(testArrayMap2)) + assert(toUnsafeMap(testArrayMap2) !== toUnsafeMap(testArrayMap3)) + assert(toUnsafeMap(testArrayMap3) !== toUnsafeMap(testArrayMap4)) } test("GenericInternalRow.copy return a new instance that is independent from the old one") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 65a592302c6af..c74e54b628ada 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression -import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, JoinSelectionHelper, NormalizeFloatingNumbers} +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, JoinSelectionHelper} import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -326,14 +326,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { val stateVersion = conf.getConf(SQLConf.STREAMING_AGGREGATION_STATE_FORMAT_VERSION) - // Ideally this should be done in `NormalizeFloatingNumbers`, but we do it here because - // `groupingExpressions` is not extracted during logical phase. - val normalizedGroupingExpressions = namedGroupingExpressions.map { e => - NormalizeFloatingNumbers.normalize(e) match { - case n: NamedExpression => n - case other => Alias(other, e.name)(exprId = e.exprId) - } - } + val normalizedGroupingExpressions = + AggUtils.normalizeGroupingExprs(namedGroupingExpressions) AggUtils.planStreamingAggregation( normalizedGroupingExpressions, @@ -434,15 +428,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { "Spark user mailing list.") } - // Ideally this should be done in `NormalizeFloatingNumbers`, but we do it here because - // `groupingExpressions` is not extracted during logical phase. - val normalizedGroupingExpressions = groupingExpressions.map { e => - NormalizeFloatingNumbers.normalize(e) match { - case n: NamedExpression => n - // Keep the name of the original expression. - case other => Alias(other, e.name)(exprId = e.exprId) - } - } + val normalizedGroupingExpressions = + AggUtils.normalizeGroupingExprs(groupingExpressions) val aggregateOperator = if (functionsWithDistinct.isEmpty) { @@ -460,20 +447,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // aggregates have different column expressions. val distinctExpressions = functionsWithDistinct.head.aggregateFunction.children.filterNot(_.foldable) - val normalizedNamedDistinctExpressions = distinctExpressions.map { e => - // Ideally this should be done in `NormalizeFloatingNumbers`, but we do it here - // because `distinctExpressions` is not extracted during logical phase. - NormalizeFloatingNumbers.normalize(e) match { - case ne: NamedExpression => ne - case other => - // Keep the name of the original expression. - val name = e match { - case ne: NamedExpression => ne.name - case _ => e.toString - } - Alias(other, name)() - } - } + val normalizedNamedDistinctExpressions = + AggUtils.normalizeDistinctGroupingExprs(distinctExpressions) AggUtils.planAggregateWithOneDistinct( normalizedGroupingExpressions, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala index 58d3411073476..287592e1634fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.optimizer.{NormalizeFloatingNumbers, NormalizeMaps} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.streaming.{StateStoreRestoreExec, StateStoreSaveExec} @@ -27,6 +28,38 @@ import org.apache.spark.sql.execution.streaming.{StateStoreRestoreExec, StateSto */ object AggUtils { + // Ideally, this should be done in `NormalizeFloatingNumbers` and `NormalizeMaps`, + // but we do it in the physical planning phase because `groupingExpressions` + // is not extracted during the logical phase. + def normalizeGroupingExpr(groupingExpr: Expression): Expression = { + NormalizeMaps.normalize(NormalizeFloatingNumbers.normalize(groupingExpr)) + } + + def normalizeGroupingExprs(groupingExprs: Seq[NamedExpression]): Seq[NamedExpression] = { + groupingExprs.map { e => + normalizeGroupingExpr(e) match { + case n: NamedExpression => n + // Keep the name of the original expression + case other => Alias(other, e.name)(exprId = e.exprId) + } + } + } + + def normalizeDistinctGroupingExprs(groupingExprs: Seq[Expression]): Seq[NamedExpression] = { + groupingExprs.map { e => + normalizeGroupingExpr(e) match { + case ne: NamedExpression => ne + case other => + // Keep the name of the original expression + val name = e match { + case ne: NamedExpression => ne.name + case _ => e.toString + } + Alias(other, name)() + } + } + } + private def mayRemoveAggFilters(exprs: Seq[AggregateExpression]): Seq[AggregateExpression] = { exprs.map { ae => if (ae.filter.isDefined) { diff --git a/sql/core/src/test/resources/sql-tests/inputs/comparable-map-explain.sql b/sql/core/src/test/resources/sql-tests/inputs/comparable-map-explain.sql new file mode 100644 index 0000000000000..562b078336f56 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/comparable-map-explain.sql @@ -0,0 +1,34 @@ +-- test cases for the explain results of comparable maps + +--SET spark.sql.adaptive.enabled = false +--SET spark.sql.mapKeyDedupPolicy=LAST_WIN; + +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES + (map(1, 'a', 2, 'b'), map(2, 'b', 1, 'a')), + (map(2, 'b', 1, 'a'), map(2, 'b', 1, 'A')), + (map(3, 'c', 1, 'a'), map(4, 'd', 3, 'c')), + (map(3, 'c', 1, 'a'), map(3, 'c')), + (map(3, 'c'), map(4, 'd', 3, 'c')), + (map(1, 'a', 2, null), map(2, null, 1, 'a')), + (map(), map(1, 'a')), + (map(1, 'a'), map()) +AS t(v1, v2); + +-- Checks if `SortMapKeys` inserted correctly, the explain results +-- of following complicated query cases are shown here. + +-- Combination tests of Sort/Filter/Join/Aggregate/Window + binary comparisons +EXPLAIN SELECT * FROM t1 ORDER BY v1 = v2; +EXPLAIN SELECT * FROM t1 WHERE v1 = v2 AND v1 = map_concat(v2, map(1, 'a')); +EXPLAIN SELECT * FROM t1 l, t1 r WHERE l.v1 = r.v2 AND l.v1 = map_concat(r.v2, map(1, 'a')); +EXPLAIN SELECT v1 = v2, count(1) FROM t1 GROUP BY v1 = v2; +EXPLAIN SELECT v1 = v2, count(1) OVER(PARTITION BY v1 = v2) FROM t1 ORDER BY v1; + +-- Combination tests of floating-point/map value normalization +CREATE TEMPORARY VIEW t9 AS SELECT * FROM VALUES + (map("a", 0.0D, "b", -0.0D)), (map("b", 0.0D, "a", -0.0D)) +AS t(v); + +EXPLAIN SELECT * FROM t9 l, t9 r WHERE l.v = r.v; +EXPLAIN SELECT v, count(1) FROM t9 GROUP BY v; +EXPLAIN SELECT v, count(1) OVER(PARTITION BY v) FROM t9 ORDER BY v; diff --git a/sql/core/src/test/resources/sql-tests/inputs/comparable-map.sql b/sql/core/src/test/resources/sql-tests/inputs/comparable-map.sql new file mode 100644 index 0000000000000..d260b4a499dae --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/comparable-map.sql @@ -0,0 +1,182 @@ +-- test cases for comparable maps + +--CONFIG_DIM1 spark.sql.codegen.wholeStage=true +--CONFIG_DIM1 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY +--CONFIG_DIM1 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=NO_CODEGEN + +--SET spark.sql.mapKeyDedupPolicy=LAST_WIN; + +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES + (map(1, 'a', 2, 'b'), map(2, 'b', 1, 'a')), + (map(2, 'b', 1, 'a'), map(2, 'b', 1, 'A')), + (map(3, 'c', 1, 'a'), map(4, 'd', 3, 'c')), + (map(3, 'c', 1, 'a'), map(3, 'c')), + (map(3, 'c'), map(4, 'd', 3, 'c')), + (map(1, 'a', 2, null), map(2, null, 1, 'a')), + (map(), map(1, 'a')), + (map(1, 'a'), map()) +AS t(v1, v2); + +CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES + (array(map(1, 'a', 2, 'b')), array(map(2, 'b', 1, 'a'))), + (array(map(2, 'b', 1, 'a')), array(map(2, 'b', 1, 'A'))), + (array(map(3, 'c', 1, 'a')), array(map(4, 'd', 3, 'c'))), + (array(map(1, 'a', 2, null), null), array(map(2, null, 1, 'a'), null)) +AS t(v1, v2); + +CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES + (struct(map(1, 'a', 2, 'b'), 'x'), struct(map(2, 'b', 1, 'a'), 'x')), + (struct(map(2, 'b', 1, 'a'), 'x'), struct(map(2, 'b', 1, 'A'), 'x')), + (struct(map(3, 'c', 1, 'a'), 'x'), struct(map(4, 'd', 3, 'c'), 'x')), + (struct(map(1, 'a', 2, null), null), struct(map(2, null, 1, 'a'), null)) +AS t(v1, v2); + +CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES + (map(1, map(1, 'a', 2, 'b'), 2, map(3, 'c', 4, 'd')), map(2, map(4, 'd', 3, 'c'), 1, map(2, 'b', 1, 'a'))), + (map(2, map(4, 'd', 3, 'c'), 1, map(2, 'b', 1, 'a')), map(2, map(4, 'd', 3, 'c'), 1, map(2, 'b', 1, 'A'))), + (map(3, map(5, 'e', 6, 'f'), 1, map(1, 'a', 2, 'b')), map(4, map(7, 'g', 8, 'h'), 3, map(6, 'f', 5, 'e'))), + (map(1, map(1, 'a', 2, null), 2, map(3, null, 4, 'd')), map(2, map(4, 'd', 3, null), 1, map(2, null, 1, 'a'))) +AS t(v1, v2); + +CREATE TEMPORARY VIEW t5 AS SELECT * FROM VALUES + (map(array(1, 2), 'a', array(3, 4), 'b'), map(array(3, 4), 'b', array(1, 2), 'a')), + (map(array(3, 4), 'b', array(1, 2), 'a'), map(array(4, 3), 'b', array(1, 2), 'a')), + (map(array(5, 6), 'a', array(1, 2), 'a'), map(array(2, 1), 'a', array(5, 6), 'c')), + (map(array(1, null), 'a', array(3, null), 'b'), map(array(3, null), 'b', array(1, null), 'a')) +AS t(v1, v2); + +CREATE TEMPORARY VIEW t6 AS SELECT * FROM VALUES + (map(1, array(1, 2), 2, array(3, 4)), map(2, array(3, 4), 1, array(1, 2))), + (map(2, array(3, 4), 1, array(1, 2)), map(2, array(4, 3), 1, array(1, 2))), + (map(3, array(5, 6), 1, array(1, 2)), map(1, array(2, 1), 3, array(5, 6))), + (map(1, array(1, 2), 2, null), map(2, null, 1, array(2, 1))) +AS t(v1, v2); + +CREATE TEMPORARY VIEW t7 AS SELECT * FROM VALUES + (map(struct(1), 'a', struct(2), 'b'), map(struct(2), 'b', struct(1), 'a')), + (map(struct(2), 'b', struct(1), 'a'), map(struct(1), 'b', struct(2), 'a')), + (map(struct(3), 'c', struct(1), 'a'), map(struct(4), 'd', struct(3), 'c')), + (map(struct(1), 'a', struct(null), 'b'), map(struct(null), 'b', struct(1), 'a')) +AS t(v1, v2); + +CREATE TEMPORARY VIEW t8 AS SELECT * FROM VALUES + (map(1, struct('a'), 2, struct('b')), map(2, struct('b'), 1, struct('a'))), + (map(2, struct('b'), 1, struct('a')), map(2, struct('a'), 1, struct('b'))), + (map(3, struct('c'), 1, struct('a')), map(4, struct('d'), 3, struct('c'))), + (map(1, struct('a'), 2, struct(null)), map(2, struct(null), 1, struct('a'))) +AS t(v1, v2); + +-- ORDER BY cases +SELECT * FROM t1 ORDER BY v1, v2; +SELECT * FROM t2 ORDER BY v1, v2; +SELECT * FROM t3 ORDER BY v1, v2; +SELECT * FROM t4 ORDER BY v1, v2; +SELECT * FROM t5 ORDER BY v1, v2; +SELECT * FROM t6 ORDER BY v1, v2; +SELECT * FROM t7 ORDER BY v1, v2; +SELECT * FROM t8 ORDER BY v1, v2; + +-- FILTER cases +SELECT * FROM t1 WHERE v1 = v2; +SELECT * FROM t2 WHERE v1 = v2; +SELECT * FROM t3 WHERE v1 = v2; +SELECT * FROM t4 WHERE v1 = v2; +SELECT * FROM t5 WHERE v1 = v2; +SELECT * FROM t6 WHERE v1 = v2; +SELECT * FROM t7 WHERE v1 = v2; +SELECT * FROM t8 WHERE v1 = v2; + +SELECT * FROM t1 WHERE v1 <=> v2; +SELECT * FROM t2 WHERE v1 <=> v2; +SELECT * FROM t3 WHERE v1 <=> v2; + +SELECT * FROM t1 WHERE v1 > v2; +SELECT * FROM t2 WHERE v1 > v2; +SELECT * FROM t3 WHERE v1 > v2; + +SELECT * FROM t1 WHERE v1 IN (v2); +SELECT * FROM t2 WHERE v1 IN (v2); +SELECT * FROM t3 WHERE v1 IN (v2); + +SELECT * FROM t1 WHERE v1 IN (map(2, 'b', 1, 'a'), map(1, 'a', 3, 'c')); +SELECT * FROM t2 WHERE v1 IN (array(map(2, 'b', 1, 'a'))); +SELECT * FROM t3 WHERE v1 IN (struct(map(2, 'b', 1, 'a'), 'x')); + +-- JOIN cases +SELECT * FROM t1 l, t1 r WHERE l.v1 = r.v1 AND l.v2 = r.v2; +SELECT * FROM t2 l, t2 r WHERE l.v1 = r.v1 AND l.v2 = r.v2; +SELECT * FROM t3 l, t3 r WHERE l.v1 = r.v1 AND l.v2 = r.v2; +SELECT * FROM t4 l, t4 r WHERE l.v1 = r.v1 AND l.v2 = r.v2; +SELECT * FROM t5 l, t5 r WHERE l.v1 = r.v1 AND l.v2 = r.v2; +SELECT * FROM t6 l, t6 r WHERE l.v1 = r.v1 AND l.v2 = r.v2; +SELECT * FROM t7 l, t7 r WHERE l.v1 = r.v1 AND l.v2 = r.v2; +SELECT * FROM t8 l, t8 r WHERE l.v1 = r.v1 AND l.v2 = r.v2; + +-- GROUP BY cases +SELECT v1, count(1) FROM t1 GROUP BY v1; +SELECT v1, count(1) FROM t2 GROUP BY v1; +SELECT v1, count(1) FROM t3 GROUP BY v1; +SELECT v1, count(1) FROM t4 GROUP BY v1; +SELECT v1, count(1) FROM t5 GROUP BY v1; +SELECT v1, count(1) FROM t6 GROUP BY v1; +SELECT v1, count(1) FROM t7 GROUP BY v1; +SELECT v1, count(1) FROM t8 GROUP BY v1; + +-- WINDOW cases +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t1 ORDER BY v1; +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t2 ORDER BY v1; +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t3 ORDER BY v1; +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t4 ORDER BY v1; +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t5 ORDER BY v1; +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t6 ORDER BY v1; +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t7 ORDER BY v1; +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t8 ORDER BY v1; + +-- INTERSECT cases +(SELECT v1 FROM t1) INTERSECT (SELECT v1 FROM t1); +(SELECT v1 FROM t2) INTERSECT (SELECT v1 FROM t2); +(SELECT v1 FROM t3) INTERSECT (SELECT v1 FROM t3); +(SELECT v1 FROM t4) INTERSECT (SELECT v1 FROM t4); +(SELECT v1 FROM t5) INTERSECT (SELECT v1 FROM t5); +(SELECT v1 FROM t6) INTERSECT (SELECT v1 FROM t6); +(SELECT v1 FROM t7) INTERSECT (SELECT v1 FROM t7); +(SELECT v1 FROM t8) INTERSECT (SELECT v1 FROM t8); + +-- EXCEPT cases +(SELECT v1 FROM t1) EXCEPT (SELECT v1 FROM t1 ORDER BY v1 LIMIT 1); +(SELECT v1 FROM t2) EXCEPT (SELECT v1 FROM t2 ORDER BY v1 LIMIT 1); +(SELECT v1 FROM t3) EXCEPT (SELECT v1 FROM t3 ORDER BY v1 LIMIT 1); +(SELECT v1 FROM t4) EXCEPT (SELECT v1 FROM t4 ORDER BY v1 LIMIT 1); +(SELECT v1 FROM t5) EXCEPT (SELECT v1 FROM t5 ORDER BY v1 LIMIT 1); +(SELECT v1 FROM t6) EXCEPT (SELECT v1 FROM t6 ORDER BY v1 LIMIT 1); +(SELECT v1 FROM t7) EXCEPT (SELECT v1 FROM t7 ORDER BY v1 LIMIT 1); +(SELECT v1 FROM t8) EXCEPT (SELECT v1 FROM t8 ORDER BY v1 LIMIT 1); + +-- DISTINCT cases +SELECT DISTINCT v1 FROM t1; +SELECT DISTINCT v1 FROM t2; +SELECT DISTINCT v1 FROM t3; +SELECT DISTINCT v1 FROM t4; +SELECT DISTINCT v1 FROM t5; +SELECT DISTINCT v1 FROM t6; +SELECT DISTINCT v1 FROM t7; +SELECT DISTINCT v1 FROM t8; + +-- To check if `SortMapKeys` inserted correctly, the explain results +-- of following complicated query cases are shown in `comparable-map-explain.sql`. + +-- Combination tests of Sort/Filter/Join/Aggregate/Window + binary comparisons +SELECT * FROM t1 ORDER BY v1 = v2; +SELECT * FROM t1 WHERE v1 = v2 AND v1 = map_concat(v2, map(1, 'a')); +SELECT * FROM t1 l, t1 r WHERE l.v1 = r.v2 AND l.v1 = map_concat(r.v2, map(1, 'a')); +SELECT v1 = v2, count(1) FROM t1 GROUP BY v1 = v2; +SELECT v1 = v2, count(1) OVER(PARTITION BY v1 = v2) FROM t1 ORDER BY v1; + +-- Combination tests of floating-point/map value normalization +CREATE TEMPORARY VIEW t9 AS SELECT * FROM VALUES + (map("a", 0.0D, "b", -0.0D)), (map("b", 0.0D, "a", -0.0D)) +AS t(v); + +SELECT * FROM t9 l, t9 r WHERE l.v = r.v; +SELECT v, count(1) FROM t9 GROUP BY v; +SELECT v, count(1) OVER(PARTITION BY v) FROM t9 ORDER BY v; diff --git a/sql/core/src/test/resources/sql-tests/results/comparable-map-explain.sql.out b/sql/core/src/test/resources/sql-tests/results/comparable-map-explain.sql.out new file mode 100644 index 0000000000000..f8c4992998df4 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/comparable-map-explain.sql.out @@ -0,0 +1,130 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 10 + + +-- !query +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES + (map(1, 'a', 2, 'b'), map(2, 'b', 1, 'a')), + (map(2, 'b', 1, 'a'), map(2, 'b', 1, 'A')), + (map(3, 'c', 1, 'a'), map(4, 'd', 3, 'c')), + (map(3, 'c', 1, 'a'), map(3, 'c')), + (map(3, 'c'), map(4, 'd', 3, 'c')), + (map(1, 'a', 2, null), map(2, null, 1, 'a')), + (map(), map(1, 'a')), + (map(1, 'a'), map()) +AS t(v1, v2) +-- !query schema +struct<> +-- !query output + + + +-- !query +EXPLAIN SELECT * FROM t1 ORDER BY v1 = v2 +-- !query schema +struct +-- !query output +== Physical Plan == +*Sort [(sortmapkeys(v1#x) = sortmapkeys(v2#x)) ASC NULLS FIRST], true, 0 ++- Exchange rangepartitioning((sortmapkeys(v1#x) = sortmapkeys(v2#x)) ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [id=#x] + +- LocalTableScan [v1#x, v2#x] + + +-- !query +EXPLAIN SELECT * FROM t1 WHERE v1 = v2 AND v1 = map_concat(v2, map(1, 'a')) +-- !query schema +struct +-- !query output +== Physical Plan == +*Filter ((sortmapkeys(v1#x) = sortmapkeys(v2#x)) AND (sortmapkeys(v1#x) = sortmapkeys(map_concat(v2#x, map(keys: [1], values: [a]))))) ++- *LocalTableScan [v1#x, v2#x] + + +-- !query +EXPLAIN SELECT * FROM t1 l, t1 r WHERE l.v1 = r.v2 AND l.v1 = map_concat(r.v2, map(1, 'a')) +-- !query schema +struct +-- !query output +== Physical Plan == +*BroadcastHashJoin [sortmapkeys(v1#x), sortmapkeys(v1#x)], [sortmapkeys(v2#x), sortmapkeys(map_concat(v2#x, map(keys: [1], values: [a])))], Inner, BuildRight, false +:- *LocalTableScan [v1#x, v2#x] ++- BroadcastExchange HashedRelationBroadcastMode(List(sortmapkeys(input[1, map, false]), sortmapkeys(map_concat(input[1, map, false], map(keys: [1], values: [a])))),false), [id=#x] + +- LocalTableScan [v1#x, v2#x] + + +-- !query +EXPLAIN SELECT v1 = v2, count(1) FROM t1 GROUP BY v1 = v2 +-- !query schema +struct +-- !query output +== Physical Plan == +*HashAggregate(keys=[_groupingexpression#x], functions=[count(1)]) ++- Exchange hashpartitioning(_groupingexpression#x, 4), ENSURE_REQUIREMENTS, [id=#x] + +- *HashAggregate(keys=[_groupingexpression#x], functions=[partial_count(1)]) + +- *Project [(sortmapkeys(v1#x) = sortmapkeys(v2#x)) AS _groupingexpression#x] + +- *LocalTableScan [v1#x, v2#x] + + +-- !query +EXPLAIN SELECT v1 = v2, count(1) OVER(PARTITION BY v1 = v2) FROM t1 ORDER BY v1 +-- !query schema +struct +-- !query output +== Physical Plan == +*Project [(v1 = v2)#x, count(1) OVER (PARTITION BY (v1 = v2) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#xL] ++- *Sort [sortmapkeys(v1#x) ASC NULLS FIRST], true, 0 + +- Exchange rangepartitioning(sortmapkeys(v1#x) ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [id=#x] + +- *Project [(v1 = v2)#x, count(1) OVER (PARTITION BY (v1 = v2) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#xL, v1#x] + +- Window [count(1) windowspecdefinition(_w0#x, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS count(1) OVER (PARTITION BY (v1 = v2) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#xL], [_w0#x] + +- *Sort [_w0#x ASC NULLS FIRST], false, 0 + +- Exchange hashpartitioning(_w0#x, 4), ENSURE_REQUIREMENTS, [id=#x] + +- *Project [(sortmapkeys(v1#x) = sortmapkeys(v2#x)) AS (v1 = v2)#x, (sortmapkeys(v1#x) = sortmapkeys(v2#x)) AS _w0#x, v1#x] + +- *LocalTableScan [v1#x, v2#x] + + +-- !query +CREATE TEMPORARY VIEW t9 AS SELECT * FROM VALUES + (map("a", 0.0D, "b", -0.0D)), (map("b", 0.0D, "a", -0.0D)) +AS t(v) +-- !query schema +struct<> +-- !query output + + + +-- !query +EXPLAIN SELECT * FROM t9 l, t9 r WHERE l.v = r.v +-- !query schema +struct +-- !query output +== Physical Plan == +BroadcastHashJoin [sortmapkeys(knownfloatingpointnormalized(transform_values(sortmapkeys(v#x), lambdafunction(knownfloatingpointnormalized(normalizenanandzero(lambda arg2#x)), lambda arg1#x, lambda arg2#x, false))))], [sortmapkeys(knownfloatingpointnormalized(transform_values(sortmapkeys(v#x), lambdafunction(knownfloatingpointnormalized(normalizenanandzero(lambda arg2#x)), lambda arg1#x, lambda arg2#x, false))))], Inner, BuildRight, false +:- LocalTableScan [v#x] ++- BroadcastExchange HashedRelationBroadcastMode(List(sortmapkeys(knownfloatingpointnormalized(transform_values(sortmapkeys(input[0, map, false]), lambdafunction(knownfloatingpointnormalized(normalizenanandzero(lambda arg2#x)), lambda arg1#x, lambda arg2#x, false))))),false), [id=#x] + +- LocalTableScan [v#x] + + +-- !query +EXPLAIN SELECT v, count(1) FROM t9 GROUP BY v +-- !query schema +struct +-- !query output +== Physical Plan == +*HashAggregate(keys=[v#x], functions=[count(1)]) ++- Exchange hashpartitioning(v#x, 4), ENSURE_REQUIREMENTS, [id=#x] + +- HashAggregate(keys=[sortmapkeys(knownfloatingpointnormalized(transform_values(v#x, lambdafunction(knownfloatingpointnormalized(normalizenanandzero(lambda arg2#x)), lambda arg1#x, lambda arg2#x, false)))) AS v#x], functions=[partial_count(1)]) + +- LocalTableScan [v#x] + + +-- !query +EXPLAIN SELECT v, count(1) OVER(PARTITION BY v) FROM t9 ORDER BY v +-- !query schema +struct +-- !query output +== Physical Plan == +*Sort [sortmapkeys(v#x) ASC NULLS FIRST], true, 0 ++- Exchange rangepartitioning(sortmapkeys(v#x) ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [id=#x] + +- Window [count(1) windowspecdefinition(v#x, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS count(1) OVER (PARTITION BY v ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#xL], [sortmapkeys(knownfloatingpointnormalized(transform_values(sortmapkeys(v#x), lambdafunction(knownfloatingpointnormalized(normalizenanandzero(lambda arg2#x)), lambda arg1#x, lambda arg2#x, false))))] + +- Sort [sortmapkeys(knownfloatingpointnormalized(transform_values(sortmapkeys(v#x), lambdafunction(knownfloatingpointnormalized(normalizenanandzero(lambda arg2#x)), lambda arg1#x, lambda arg2#x, false)))) ASC NULLS FIRST], false, 0 + +- Exchange hashpartitioning(sortmapkeys(knownfloatingpointnormalized(transform_values(sortmapkeys(v#x), lambdafunction(knownfloatingpointnormalized(normalizenanandzero(lambda arg2#x)), lambda arg1#x, lambda arg2#x, false)))), 4), ENSURE_REQUIREMENTS, [id=#x] + +- LocalTableScan [v#x] diff --git a/sql/core/src/test/resources/sql-tests/results/comparable-map.sql.out b/sql/core/src/test/resources/sql-tests/results/comparable-map.sql.out new file mode 100644 index 0000000000000..70a8d1252d823 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/comparable-map.sql.out @@ -0,0 +1,986 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 93 + + +-- !query +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES + (map(1, 'a', 2, 'b'), map(2, 'b', 1, 'a')), + (map(2, 'b', 1, 'a'), map(2, 'b', 1, 'A')), + (map(3, 'c', 1, 'a'), map(4, 'd', 3, 'c')), + (map(3, 'c', 1, 'a'), map(3, 'c')), + (map(3, 'c'), map(4, 'd', 3, 'c')), + (map(1, 'a', 2, null), map(2, null, 1, 'a')), + (map(), map(1, 'a')), + (map(1, 'a'), map()) +AS t(v1, v2) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES + (array(map(1, 'a', 2, 'b')), array(map(2, 'b', 1, 'a'))), + (array(map(2, 'b', 1, 'a')), array(map(2, 'b', 1, 'A'))), + (array(map(3, 'c', 1, 'a')), array(map(4, 'd', 3, 'c'))), + (array(map(1, 'a', 2, null), null), array(map(2, null, 1, 'a'), null)) +AS t(v1, v2) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES + (struct(map(1, 'a', 2, 'b'), 'x'), struct(map(2, 'b', 1, 'a'), 'x')), + (struct(map(2, 'b', 1, 'a'), 'x'), struct(map(2, 'b', 1, 'A'), 'x')), + (struct(map(3, 'c', 1, 'a'), 'x'), struct(map(4, 'd', 3, 'c'), 'x')), + (struct(map(1, 'a', 2, null), null), struct(map(2, null, 1, 'a'), null)) +AS t(v1, v2) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES + (map(1, map(1, 'a', 2, 'b'), 2, map(3, 'c', 4, 'd')), map(2, map(4, 'd', 3, 'c'), 1, map(2, 'b', 1, 'a'))), + (map(2, map(4, 'd', 3, 'c'), 1, map(2, 'b', 1, 'a')), map(2, map(4, 'd', 3, 'c'), 1, map(2, 'b', 1, 'A'))), + (map(3, map(5, 'e', 6, 'f'), 1, map(1, 'a', 2, 'b')), map(4, map(7, 'g', 8, 'h'), 3, map(6, 'f', 5, 'e'))), + (map(1, map(1, 'a', 2, null), 2, map(3, null, 4, 'd')), map(2, map(4, 'd', 3, null), 1, map(2, null, 1, 'a'))) +AS t(v1, v2) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMPORARY VIEW t5 AS SELECT * FROM VALUES + (map(array(1, 2), 'a', array(3, 4), 'b'), map(array(3, 4), 'b', array(1, 2), 'a')), + (map(array(3, 4), 'b', array(1, 2), 'a'), map(array(4, 3), 'b', array(1, 2), 'a')), + (map(array(5, 6), 'a', array(1, 2), 'a'), map(array(2, 1), 'a', array(5, 6), 'c')), + (map(array(1, null), 'a', array(3, null), 'b'), map(array(3, null), 'b', array(1, null), 'a')) +AS t(v1, v2) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMPORARY VIEW t6 AS SELECT * FROM VALUES + (map(1, array(1, 2), 2, array(3, 4)), map(2, array(3, 4), 1, array(1, 2))), + (map(2, array(3, 4), 1, array(1, 2)), map(2, array(4, 3), 1, array(1, 2))), + (map(3, array(5, 6), 1, array(1, 2)), map(1, array(2, 1), 3, array(5, 6))), + (map(1, array(1, 2), 2, null), map(2, null, 1, array(2, 1))) +AS t(v1, v2) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMPORARY VIEW t7 AS SELECT * FROM VALUES + (map(struct(1), 'a', struct(2), 'b'), map(struct(2), 'b', struct(1), 'a')), + (map(struct(2), 'b', struct(1), 'a'), map(struct(1), 'b', struct(2), 'a')), + (map(struct(3), 'c', struct(1), 'a'), map(struct(4), 'd', struct(3), 'c')), + (map(struct(1), 'a', struct(null), 'b'), map(struct(null), 'b', struct(1), 'a')) +AS t(v1, v2) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMPORARY VIEW t8 AS SELECT * FROM VALUES + (map(1, struct('a'), 2, struct('b')), map(2, struct('b'), 1, struct('a'))), + (map(2, struct('b'), 1, struct('a')), map(2, struct('a'), 1, struct('b'))), + (map(3, struct('c'), 1, struct('a')), map(4, struct('d'), 3, struct('c'))), + (map(1, struct('a'), 2, struct(null)), map(2, struct(null), 1, struct('a'))) +AS t(v1, v2) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM t1 ORDER BY v1, v2 +-- !query schema +struct,v2:map> +-- !query output +{} {1:"a"} +{1:"a"} {} +{1:"a",2:null} {1:"a",2:null} +{1:"a",2:"b"} {1:"A",2:"b"} +{1:"a",2:"b"} {1:"a",2:"b"} +{1:"a",3:"c"} {3:"c"} +{1:"a",3:"c"} {3:"c",4:"d"} +{3:"c"} {3:"c",4:"d"} + + +-- !query +SELECT * FROM t2 ORDER BY v1, v2 +-- !query schema +struct>,v2:array>> +-- !query output +[{1:"a",2:null},null] [{1:"a",2:null},null] +[{1:"a",2:"b"}] [{1:"A",2:"b"}] +[{1:"a",2:"b"}] [{1:"a",2:"b"}] +[{1:"a",3:"c"}] [{3:"c",4:"d"}] + + +-- !query +SELECT * FROM t3 ORDER BY v1, v2 +-- !query schema +struct,col2:string>,v2:struct,col2:string>> +-- !query output +{"col1":{1:"a",2:null},"col2":null} {"col1":{1:"a",2:null},"col2":null} +{"col1":{1:"a",2:"b"},"col2":"x"} {"col1":{1:"A",2:"b"},"col2":"x"} +{"col1":{1:"a",2:"b"},"col2":"x"} {"col1":{1:"a",2:"b"},"col2":"x"} +{"col1":{1:"a",3:"c"},"col2":"x"} {"col1":{3:"c",4:"d"},"col2":"x"} + + +-- !query +SELECT * FROM t4 ORDER BY v1, v2 +-- !query schema +struct>,v2:map>> +-- !query output +{1:{1:"a",2:null},2:{3:null,4:"d"}} {1:{1:"a",2:null},2:{3:null,4:"d"}} +{1:{1:"a",2:"b"},2:{3:"c",4:"d"}} {1:{1:"A",2:"b"},2:{3:"c",4:"d"}} +{1:{1:"a",2:"b"},2:{3:"c",4:"d"}} {1:{1:"a",2:"b"},2:{3:"c",4:"d"}} +{1:{1:"a",2:"b"},3:{5:"e",6:"f"}} {3:{5:"e",6:"f"},4:{7:"g",8:"h"}} + + +-- !query +SELECT * FROM t5 ORDER BY v1, v2 +-- !query schema +struct,string>,v2:map,string>> +-- !query output +{[1,null]:"a",[3,null]:"b"} {[1,null]:"a",[3,null]:"b"} +{[1,2]:"a",[3,4]:"b"} {[1,2]:"a",[3,4]:"b"} +{[1,2]:"a",[3,4]:"b"} {[1,2]:"a",[4,3]:"b"} +{[1,2]:"a",[5,6]:"a"} {[2,1]:"a",[5,6]:"c"} + + +-- !query +SELECT * FROM t6 ORDER BY v1, v2 +-- !query schema +struct>,v2:map>> +-- !query output +{1:[1,2],2:null} {1:[2,1],2:null} +{1:[1,2],2:[3,4]} {1:[1,2],2:[3,4]} +{1:[1,2],2:[3,4]} {1:[1,2],2:[4,3]} +{1:[1,2],3:[5,6]} {1:[2,1],3:[5,6]} + + +-- !query +SELECT * FROM t7 ORDER BY v1, v2 +-- !query schema +struct,string>,v2:map,string>> +-- !query output +{{"col1":1}:"a",{"col1":null}:"b"} {{"col1":1}:"a",{"col1":null}:"b"} +{{"col1":1}:"a",{"col1":2}:"b"} {{"col1":1}:"a",{"col1":2}:"b"} +{{"col1":1}:"a",{"col1":2}:"b"} {{"col1":1}:"b",{"col1":2}:"a"} +{{"col1":1}:"a",{"col1":3}:"c"} {{"col1":3}:"c",{"col1":4}:"d"} + + +-- !query +SELECT * FROM t8 ORDER BY v1, v2 +-- !query schema +struct>,v2:map>> +-- !query output +{1:{"col1":"a"},2:{"col1":null}} {1:{"col1":"a"},2:{"col1":null}} +{1:{"col1":"a"},2:{"col1":"b"}} {1:{"col1":"a"},2:{"col1":"b"}} +{1:{"col1":"a"},2:{"col1":"b"}} {1:{"col1":"b"},2:{"col1":"a"}} +{1:{"col1":"a"},3:{"col1":"c"}} {3:{"col1":"c"},4:{"col1":"d"}} + + +-- !query +SELECT * FROM t1 WHERE v1 = v2 +-- !query schema +struct,v2:map> +-- !query output +{1:"a",2:"b"} {1:"a",2:"b"} +{1:"a",2:null} {1:"a",2:null} + + +-- !query +SELECT * FROM t2 WHERE v1 = v2 +-- !query schema +struct>,v2:array>> +-- !query output +[{1:"a",2:"b"}] [{1:"a",2:"b"}] +[{1:"a",2:null},null] [{1:"a",2:null},null] + + +-- !query +SELECT * FROM t3 WHERE v1 = v2 +-- !query schema +struct,col2:string>,v2:struct,col2:string>> +-- !query output +{"col1":{1:"a",2:"b"},"col2":"x"} {"col1":{1:"a",2:"b"},"col2":"x"} +{"col1":{1:"a",2:null},"col2":null} {"col1":{1:"a",2:null},"col2":null} + + +-- !query +SELECT * FROM t4 WHERE v1 = v2 +-- !query schema +struct>,v2:map>> +-- !query output +{1:{1:"a",2:"b"},2:{3:"c",4:"d"}} {1:{1:"a",2:"b"},2:{3:"c",4:"d"}} +{1:{1:"a",2:null},2:{3:null,4:"d"}} {1:{1:"a",2:null},2:{3:null,4:"d"}} + + +-- !query +SELECT * FROM t5 WHERE v1 = v2 +-- !query schema +struct,string>,v2:map,string>> +-- !query output +{[1,2]:"a",[3,4]:"b"} {[1,2]:"a",[3,4]:"b"} +{[1,null]:"a",[3,null]:"b"} {[1,null]:"a",[3,null]:"b"} + + +-- !query +SELECT * FROM t6 WHERE v1 = v2 +-- !query schema +struct>,v2:map>> +-- !query output +{1:[1,2],2:[3,4]} {1:[1,2],2:[3,4]} + + +-- !query +SELECT * FROM t7 WHERE v1 = v2 +-- !query schema +struct,string>,v2:map,string>> +-- !query output +{{"col1":1}:"a",{"col1":2}:"b"} {{"col1":1}:"a",{"col1":2}:"b"} +{{"col1":1}:"a",{"col1":null}:"b"} {{"col1":1}:"a",{"col1":null}:"b"} + + +-- !query +SELECT * FROM t8 WHERE v1 = v2 +-- !query schema +struct>,v2:map>> +-- !query output +{1:{"col1":"a"},2:{"col1":"b"}} {1:{"col1":"a"},2:{"col1":"b"}} +{1:{"col1":"a"},2:{"col1":null}} {1:{"col1":"a"},2:{"col1":null}} + + +-- !query +SELECT * FROM t1 WHERE v1 <=> v2 +-- !query schema +struct,v2:map> +-- !query output +{1:"a",2:"b"} {1:"a",2:"b"} +{1:"a",2:null} {1:"a",2:null} + + +-- !query +SELECT * FROM t2 WHERE v1 <=> v2 +-- !query schema +struct>,v2:array>> +-- !query output +[{1:"a",2:"b"}] [{1:"a",2:"b"}] +[{1:"a",2:null},null] [{1:"a",2:null},null] + + +-- !query +SELECT * FROM t3 WHERE v1 <=> v2 +-- !query schema +struct,col2:string>,v2:struct,col2:string>> +-- !query output +{"col1":{1:"a",2:"b"},"col2":"x"} {"col1":{1:"a",2:"b"},"col2":"x"} +{"col1":{1:"a",2:null},"col2":null} {"col1":{1:"a",2:null},"col2":null} + + +-- !query +SELECT * FROM t1 WHERE v1 > v2 +-- !query schema +struct,v2:map> +-- !query output +{1:"a",2:"b"} {1:"A",2:"b"} +{1:"a"} {} + + +-- !query +SELECT * FROM t2 WHERE v1 > v2 +-- !query schema +struct>,v2:array>> +-- !query output +[{1:"a",2:"b"}] [{1:"A",2:"b"}] + + +-- !query +SELECT * FROM t3 WHERE v1 > v2 +-- !query schema +struct,col2:string>,v2:struct,col2:string>> +-- !query output +{"col1":{1:"a",2:"b"},"col2":"x"} {"col1":{1:"A",2:"b"},"col2":"x"} + + +-- !query +SELECT * FROM t1 WHERE v1 IN (v2) +-- !query schema +struct,v2:map> +-- !query output +{1:"a",2:"b"} {1:"a",2:"b"} +{1:"a",2:null} {1:"a",2:null} + + +-- !query +SELECT * FROM t2 WHERE v1 IN (v2) +-- !query schema +struct>,v2:array>> +-- !query output +[{1:"a",2:"b"}] [{1:"a",2:"b"}] +[{1:"a",2:null},null] [{1:"a",2:null},null] + + +-- !query +SELECT * FROM t3 WHERE v1 IN (v2) +-- !query schema +struct,col2:string>,v2:struct,col2:string>> +-- !query output +{"col1":{1:"a",2:"b"},"col2":"x"} {"col1":{1:"a",2:"b"},"col2":"x"} +{"col1":{1:"a",2:null},"col2":null} {"col1":{1:"a",2:null},"col2":null} + + +-- !query +SELECT * FROM t1 WHERE v1 IN (map(2, 'b', 1, 'a'), map(1, 'a', 3, 'c')) +-- !query schema +struct,v2:map> +-- !query output +{1:"a",2:"b"} {1:"A",2:"b"} +{1:"a",2:"b"} {1:"a",2:"b"} +{1:"a",3:"c"} {3:"c",4:"d"} +{1:"a",3:"c"} {3:"c"} + + +-- !query +SELECT * FROM t2 WHERE v1 IN (array(map(2, 'b', 1, 'a'))) +-- !query schema +struct>,v2:array>> +-- !query output +[{1:"a",2:"b"}] [{1:"A",2:"b"}] +[{1:"a",2:"b"}] [{1:"a",2:"b"}] + + +-- !query +SELECT * FROM t3 WHERE v1 IN (struct(map(2, 'b', 1, 'a'), 'x')) +-- !query schema +struct,col2:string>,v2:struct,col2:string>> +-- !query output +{"col1":{1:"a",2:"b"},"col2":"x"} {"col1":{1:"A",2:"b"},"col2":"x"} +{"col1":{1:"a",2:"b"},"col2":"x"} {"col1":{1:"a",2:"b"},"col2":"x"} + + +-- !query +SELECT * FROM t1 l, t1 r WHERE l.v1 = r.v1 AND l.v2 = r.v2 +-- !query schema +struct,v2:map,v1:map,v2:map> +-- !query output +{1:"a",2:"b"} {1:"A",2:"b"} {1:"a",2:"b"} {1:"A",2:"b"} +{1:"a",2:"b"} {1:"a",2:"b"} {1:"a",2:"b"} {1:"a",2:"b"} +{1:"a",2:null} {1:"a",2:null} {1:"a",2:null} {1:"a",2:null} +{1:"a",3:"c"} {3:"c",4:"d"} {1:"a",3:"c"} {3:"c",4:"d"} +{1:"a",3:"c"} {3:"c"} {1:"a",3:"c"} {3:"c"} +{1:"a"} {} {1:"a"} {} +{3:"c"} {3:"c",4:"d"} {3:"c"} {3:"c",4:"d"} +{} {1:"a"} {} {1:"a"} + + +-- !query +SELECT * FROM t2 l, t2 r WHERE l.v1 = r.v1 AND l.v2 = r.v2 +-- !query schema +struct>,v2:array>,v1:array>,v2:array>> +-- !query output +[{1:"a",2:"b"}] [{1:"A",2:"b"}] [{1:"a",2:"b"}] [{1:"A",2:"b"}] +[{1:"a",2:"b"}] [{1:"a",2:"b"}] [{1:"a",2:"b"}] [{1:"a",2:"b"}] +[{1:"a",2:null},null] [{1:"a",2:null},null] [{1:"a",2:null},null] [{1:"a",2:null},null] +[{1:"a",3:"c"}] [{3:"c",4:"d"}] [{1:"a",3:"c"}] [{3:"c",4:"d"}] + + +-- !query +SELECT * FROM t3 l, t3 r WHERE l.v1 = r.v1 AND l.v2 = r.v2 +-- !query schema +struct,col2:string>,v2:struct,col2:string>,v1:struct,col2:string>,v2:struct,col2:string>> +-- !query output +{"col1":{1:"a",2:"b"},"col2":"x"} {"col1":{1:"A",2:"b"},"col2":"x"} {"col1":{1:"a",2:"b"},"col2":"x"} {"col1":{1:"A",2:"b"},"col2":"x"} +{"col1":{1:"a",2:"b"},"col2":"x"} {"col1":{1:"a",2:"b"},"col2":"x"} {"col1":{1:"a",2:"b"},"col2":"x"} {"col1":{1:"a",2:"b"},"col2":"x"} +{"col1":{1:"a",2:null},"col2":null} {"col1":{1:"a",2:null},"col2":null} {"col1":{1:"a",2:null},"col2":null} {"col1":{1:"a",2:null},"col2":null} +{"col1":{1:"a",3:"c"},"col2":"x"} {"col1":{3:"c",4:"d"},"col2":"x"} {"col1":{1:"a",3:"c"},"col2":"x"} {"col1":{3:"c",4:"d"},"col2":"x"} + + +-- !query +SELECT * FROM t4 l, t4 r WHERE l.v1 = r.v1 AND l.v2 = r.v2 +-- !query schema +struct>,v2:map>,v1:map>,v2:map>> +-- !query output +{1:{1:"a",2:"b"},2:{3:"c",4:"d"}} {1:{1:"A",2:"b"},2:{3:"c",4:"d"}} {1:{1:"a",2:"b"},2:{3:"c",4:"d"}} {1:{1:"A",2:"b"},2:{3:"c",4:"d"}} +{1:{1:"a",2:"b"},2:{3:"c",4:"d"}} {1:{1:"a",2:"b"},2:{3:"c",4:"d"}} {1:{1:"a",2:"b"},2:{3:"c",4:"d"}} {1:{1:"a",2:"b"},2:{3:"c",4:"d"}} +{1:{1:"a",2:"b"},3:{5:"e",6:"f"}} {3:{5:"e",6:"f"},4:{7:"g",8:"h"}} {1:{1:"a",2:"b"},3:{5:"e",6:"f"}} {3:{5:"e",6:"f"},4:{7:"g",8:"h"}} +{1:{1:"a",2:null},2:{3:null,4:"d"}} {1:{1:"a",2:null},2:{3:null,4:"d"}} {1:{1:"a",2:null},2:{3:null,4:"d"}} {1:{1:"a",2:null},2:{3:null,4:"d"}} + + +-- !query +SELECT * FROM t5 l, t5 r WHERE l.v1 = r.v1 AND l.v2 = r.v2 +-- !query schema +struct,string>,v2:map,string>,v1:map,string>,v2:map,string>> +-- !query output +{[1,2]:"a",[3,4]:"b"} {[1,2]:"a",[3,4]:"b"} {[1,2]:"a",[3,4]:"b"} {[1,2]:"a",[3,4]:"b"} +{[1,2]:"a",[3,4]:"b"} {[1,2]:"a",[4,3]:"b"} {[1,2]:"a",[3,4]:"b"} {[1,2]:"a",[4,3]:"b"} +{[1,2]:"a",[5,6]:"a"} {[2,1]:"a",[5,6]:"c"} {[1,2]:"a",[5,6]:"a"} {[2,1]:"a",[5,6]:"c"} +{[1,null]:"a",[3,null]:"b"} {[1,null]:"a",[3,null]:"b"} {[1,null]:"a",[3,null]:"b"} {[1,null]:"a",[3,null]:"b"} + + +-- !query +SELECT * FROM t6 l, t6 r WHERE l.v1 = r.v1 AND l.v2 = r.v2 +-- !query schema +struct>,v2:map>,v1:map>,v2:map>> +-- !query output +{1:[1,2],2:[3,4]} {1:[1,2],2:[3,4]} {1:[1,2],2:[3,4]} {1:[1,2],2:[3,4]} +{1:[1,2],2:[3,4]} {1:[1,2],2:[4,3]} {1:[1,2],2:[3,4]} {1:[1,2],2:[4,3]} +{1:[1,2],2:null} {1:[2,1],2:null} {1:[1,2],2:null} {1:[2,1],2:null} +{1:[1,2],3:[5,6]} {1:[2,1],3:[5,6]} {1:[1,2],3:[5,6]} {1:[2,1],3:[5,6]} + + +-- !query +SELECT * FROM t7 l, t7 r WHERE l.v1 = r.v1 AND l.v2 = r.v2 +-- !query schema +struct,string>,v2:map,string>,v1:map,string>,v2:map,string>> +-- !query output +{{"col1":1}:"a",{"col1":2}:"b"} {{"col1":1}:"a",{"col1":2}:"b"} {{"col1":1}:"a",{"col1":2}:"b"} {{"col1":1}:"a",{"col1":2}:"b"} +{{"col1":1}:"a",{"col1":2}:"b"} {{"col1":1}:"b",{"col1":2}:"a"} {{"col1":1}:"a",{"col1":2}:"b"} {{"col1":1}:"b",{"col1":2}:"a"} +{{"col1":1}:"a",{"col1":3}:"c"} {{"col1":3}:"c",{"col1":4}:"d"} {{"col1":1}:"a",{"col1":3}:"c"} {{"col1":3}:"c",{"col1":4}:"d"} +{{"col1":1}:"a",{"col1":null}:"b"} {{"col1":1}:"a",{"col1":null}:"b"} {{"col1":1}:"a",{"col1":null}:"b"} {{"col1":1}:"a",{"col1":null}:"b"} + + +-- !query +SELECT * FROM t8 l, t8 r WHERE l.v1 = r.v1 AND l.v2 = r.v2 +-- !query schema +struct>,v2:map>,v1:map>,v2:map>> +-- !query output +{1:{"col1":"a"},2:{"col1":"b"}} {1:{"col1":"a"},2:{"col1":"b"}} {1:{"col1":"a"},2:{"col1":"b"}} {1:{"col1":"a"},2:{"col1":"b"}} +{1:{"col1":"a"},2:{"col1":"b"}} {1:{"col1":"b"},2:{"col1":"a"}} {1:{"col1":"a"},2:{"col1":"b"}} {1:{"col1":"b"},2:{"col1":"a"}} +{1:{"col1":"a"},2:{"col1":null}} {1:{"col1":"a"},2:{"col1":null}} {1:{"col1":"a"},2:{"col1":null}} {1:{"col1":"a"},2:{"col1":null}} +{1:{"col1":"a"},3:{"col1":"c"}} {3:{"col1":"c"},4:{"col1":"d"}} {1:{"col1":"a"},3:{"col1":"c"}} {3:{"col1":"c"},4:{"col1":"d"}} + + +-- !query +SELECT v1, count(1) FROM t1 GROUP BY v1 +-- !query schema +struct,count(1):bigint> +-- !query output +{1:"a",2:"b"} 2 +{1:"a",2:null} 1 +{1:"a",3:"c"} 2 +{1:"a"} 1 +{3:"c"} 1 +{} 1 + + +-- !query +SELECT v1, count(1) FROM t2 GROUP BY v1 +-- !query schema +struct>,count(1):bigint> +-- !query output +[{1:"a",2:"b"}] 2 +[{1:"a",2:null},null] 1 +[{1:"a",3:"c"}] 1 + + +-- !query +SELECT v1, count(1) FROM t3 GROUP BY v1 +-- !query schema +struct,col2:string>,count(1):bigint> +-- !query output +{"col1":{1:"a",2:"b"},"col2":"x"} 2 +{"col1":{1:"a",2:null},"col2":null} 1 +{"col1":{1:"a",3:"c"},"col2":"x"} 1 + + +-- !query +SELECT v1, count(1) FROM t4 GROUP BY v1 +-- !query schema +struct>,count(1):bigint> +-- !query output +{1:{1:"a",2:"b"},2:{3:"c",4:"d"}} 2 +{1:{1:"a",2:"b"},3:{5:"e",6:"f"}} 1 +{1:{1:"a",2:null},2:{3:null,4:"d"}} 1 + + +-- !query +SELECT v1, count(1) FROM t5 GROUP BY v1 +-- !query schema +struct,string>,count(1):bigint> +-- !query output +{[1,2]:"a",[3,4]:"b"} 2 +{[1,2]:"a",[5,6]:"a"} 1 +{[1,null]:"a",[3,null]:"b"} 1 + + +-- !query +SELECT v1, count(1) FROM t6 GROUP BY v1 +-- !query schema +struct>,count(1):bigint> +-- !query output +{1:[1,2],2:[3,4]} 2 +{1:[1,2],2:null} 1 +{1:[1,2],3:[5,6]} 1 + + +-- !query +SELECT v1, count(1) FROM t7 GROUP BY v1 +-- !query schema +struct,string>,count(1):bigint> +-- !query output +{{"col1":1}:"a",{"col1":2}:"b"} 2 +{{"col1":1}:"a",{"col1":3}:"c"} 1 +{{"col1":1}:"a",{"col1":null}:"b"} 1 + + +-- !query +SELECT v1, count(1) FROM t8 GROUP BY v1 +-- !query schema +struct>,count(1):bigint> +-- !query output +{1:{"col1":"a"},2:{"col1":"b"}} 2 +{1:{"col1":"a"},2:{"col1":null}} 1 +{1:{"col1":"a"},3:{"col1":"c"}} 1 + + +-- !query +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t1 ORDER BY v1 +-- !query schema +struct,count(1) OVER (PARTITION BY v1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint> +-- !query output +{} 1 +{1:"a"} 1 +{1:"a",2:null} 1 +{1:"a",2:"b"} 2 +{1:"a",2:"b"} 2 +{1:"a",3:"c"} 2 +{1:"a",3:"c"} 2 +{3:"c"} 1 + + +-- !query +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t2 ORDER BY v1 +-- !query schema +struct>,count(1) OVER (PARTITION BY v1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint> +-- !query output +[{1:"a",2:null},null] 1 +[{1:"a",2:"b"}] 2 +[{1:"a",2:"b"}] 2 +[{1:"a",3:"c"}] 1 + + +-- !query +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t3 ORDER BY v1 +-- !query schema +struct,col2:string>,count(1) OVER (PARTITION BY v1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint> +-- !query output +{"col1":{1:"a",2:null},"col2":null} 1 +{"col1":{1:"a",2:"b"},"col2":"x"} 2 +{"col1":{1:"a",2:"b"},"col2":"x"} 2 +{"col1":{1:"a",3:"c"},"col2":"x"} 1 + + +-- !query +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t4 ORDER BY v1 +-- !query schema +struct>,count(1) OVER (PARTITION BY v1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint> +-- !query output +{1:{1:"a",2:null},2:{3:null,4:"d"}} 1 +{1:{1:"a",2:"b"},2:{3:"c",4:"d"}} 2 +{1:{1:"a",2:"b"},2:{3:"c",4:"d"}} 2 +{1:{1:"a",2:"b"},3:{5:"e",6:"f"}} 1 + + +-- !query +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t5 ORDER BY v1 +-- !query schema +struct,string>,count(1) OVER (PARTITION BY v1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint> +-- !query output +{[1,null]:"a",[3,null]:"b"} 1 +{[1,2]:"a",[3,4]:"b"} 2 +{[1,2]:"a",[3,4]:"b"} 2 +{[1,2]:"a",[5,6]:"a"} 1 + + +-- !query +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t6 ORDER BY v1 +-- !query schema +struct>,count(1) OVER (PARTITION BY v1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint> +-- !query output +{1:[1,2],2:null} 1 +{1:[1,2],2:[3,4]} 2 +{1:[1,2],2:[3,4]} 2 +{1:[1,2],3:[5,6]} 1 + + +-- !query +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t7 ORDER BY v1 +-- !query schema +struct,string>,count(1) OVER (PARTITION BY v1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint> +-- !query output +{{"col1":1}:"a",{"col1":null}:"b"} 1 +{{"col1":1}:"a",{"col1":2}:"b"} 2 +{{"col1":1}:"a",{"col1":2}:"b"} 2 +{{"col1":1}:"a",{"col1":3}:"c"} 1 + + +-- !query +SELECT v1, count(1) OVER(PARTITION BY v1) FROM t8 ORDER BY v1 +-- !query schema +struct>,count(1) OVER (PARTITION BY v1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint> +-- !query output +{1:{"col1":"a"},2:{"col1":null}} 1 +{1:{"col1":"a"},2:{"col1":"b"}} 2 +{1:{"col1":"a"},2:{"col1":"b"}} 2 +{1:{"col1":"a"},3:{"col1":"c"}} 1 + + +-- !query +(SELECT v1 FROM t1) INTERSECT (SELECT v1 FROM t1) +-- !query schema +struct> +-- !query output +{1:"a",2:"b"} +{1:"a",2:null} +{1:"a",3:"c"} +{1:"a"} +{3:"c"} +{} + + +-- !query +(SELECT v1 FROM t2) INTERSECT (SELECT v1 FROM t2) +-- !query schema +struct>> +-- !query output +[{1:"a",2:"b"}] +[{1:"a",2:null},null] +[{1:"a",3:"c"}] + + +-- !query +(SELECT v1 FROM t3) INTERSECT (SELECT v1 FROM t3) +-- !query schema +struct,col2:string>> +-- !query output +{"col1":{1:"a",2:"b"},"col2":"x"} +{"col1":{1:"a",2:null},"col2":null} +{"col1":{1:"a",3:"c"},"col2":"x"} + + +-- !query +(SELECT v1 FROM t4) INTERSECT (SELECT v1 FROM t4) +-- !query schema +struct>> +-- !query output +{1:{1:"a",2:"b"},2:{3:"c",4:"d"}} +{1:{1:"a",2:"b"},3:{5:"e",6:"f"}} +{1:{1:"a",2:null},2:{3:null,4:"d"}} + + +-- !query +(SELECT v1 FROM t5) INTERSECT (SELECT v1 FROM t5) +-- !query schema +struct,string>> +-- !query output +{[1,2]:"a",[3,4]:"b"} +{[1,2]:"a",[5,6]:"a"} +{[1,null]:"a",[3,null]:"b"} + + +-- !query +(SELECT v1 FROM t6) INTERSECT (SELECT v1 FROM t6) +-- !query schema +struct>> +-- !query output +{1:[1,2],2:[3,4]} +{1:[1,2],2:null} +{1:[1,2],3:[5,6]} + + +-- !query +(SELECT v1 FROM t7) INTERSECT (SELECT v1 FROM t7) +-- !query schema +struct,string>> +-- !query output +{{"col1":1}:"a",{"col1":2}:"b"} +{{"col1":1}:"a",{"col1":3}:"c"} +{{"col1":1}:"a",{"col1":null}:"b"} + + +-- !query +(SELECT v1 FROM t8) INTERSECT (SELECT v1 FROM t8) +-- !query schema +struct>> +-- !query output +{1:{"col1":"a"},2:{"col1":"b"}} +{1:{"col1":"a"},2:{"col1":null}} +{1:{"col1":"a"},3:{"col1":"c"}} + + +-- !query +(SELECT v1 FROM t1) EXCEPT (SELECT v1 FROM t1 ORDER BY v1 LIMIT 1) +-- !query schema +struct> +-- !query output +{1:"a",2:"b"} +{1:"a",3:"c"} +{1:"a",2:null} +{1:"a"} +{3:"c"} + + +-- !query +(SELECT v1 FROM t2) EXCEPT (SELECT v1 FROM t2 ORDER BY v1 LIMIT 1) +-- !query schema +struct>> +-- !query output +[{1:"a",2:"b"}] +[{1:"a",3:"c"}] + + +-- !query +(SELECT v1 FROM t3) EXCEPT (SELECT v1 FROM t3 ORDER BY v1 LIMIT 1) +-- !query schema +struct,col2:string>> +-- !query output +{"col1":{1:"a",2:"b"},"col2":"x"} +{"col1":{1:"a",3:"c"},"col2":"x"} + + +-- !query +(SELECT v1 FROM t4) EXCEPT (SELECT v1 FROM t4 ORDER BY v1 LIMIT 1) +-- !query schema +struct>> +-- !query output +{1:{1:"a",2:"b"},2:{3:"c",4:"d"}} +{1:{1:"a",2:"b"},3:{5:"e",6:"f"}} + + +-- !query +(SELECT v1 FROM t5) EXCEPT (SELECT v1 FROM t5 ORDER BY v1 LIMIT 1) +-- !query schema +struct,string>> +-- !query output +{[1,2]:"a",[3,4]:"b"} +{[1,2]:"a",[5,6]:"a"} + + +-- !query +(SELECT v1 FROM t6) EXCEPT (SELECT v1 FROM t6 ORDER BY v1 LIMIT 1) +-- !query schema +struct>> +-- !query output +{1:[1,2],2:[3,4]} +{1:[1,2],3:[5,6]} + + +-- !query +(SELECT v1 FROM t7) EXCEPT (SELECT v1 FROM t7 ORDER BY v1 LIMIT 1) +-- !query schema +struct,string>> +-- !query output +{{"col1":1}:"a",{"col1":2}:"b"} +{{"col1":1}:"a",{"col1":3}:"c"} + + +-- !query +(SELECT v1 FROM t8) EXCEPT (SELECT v1 FROM t8 ORDER BY v1 LIMIT 1) +-- !query schema +struct>> +-- !query output +{1:{"col1":"a"},2:{"col1":"b"}} +{1:{"col1":"a"},3:{"col1":"c"}} + + +-- !query +SELECT DISTINCT v1 FROM t1 +-- !query schema +struct> +-- !query output +{1:"a",2:"b"} +{1:"a",2:null} +{1:"a",3:"c"} +{1:"a"} +{3:"c"} +{} + + +-- !query +SELECT DISTINCT v1 FROM t2 +-- !query schema +struct>> +-- !query output +[{1:"a",2:"b"}] +[{1:"a",2:null},null] +[{1:"a",3:"c"}] + + +-- !query +SELECT DISTINCT v1 FROM t3 +-- !query schema +struct,col2:string>> +-- !query output +{"col1":{1:"a",2:"b"},"col2":"x"} +{"col1":{1:"a",2:null},"col2":null} +{"col1":{1:"a",3:"c"},"col2":"x"} + + +-- !query +SELECT DISTINCT v1 FROM t4 +-- !query schema +struct>> +-- !query output +{1:{1:"a",2:"b"},2:{3:"c",4:"d"}} +{1:{1:"a",2:"b"},3:{5:"e",6:"f"}} +{1:{1:"a",2:null},2:{3:null,4:"d"}} + + +-- !query +SELECT DISTINCT v1 FROM t5 +-- !query schema +struct,string>> +-- !query output +{[1,2]:"a",[3,4]:"b"} +{[1,2]:"a",[5,6]:"a"} +{[1,null]:"a",[3,null]:"b"} + + +-- !query +SELECT DISTINCT v1 FROM t6 +-- !query schema +struct>> +-- !query output +{1:[1,2],2:[3,4]} +{1:[1,2],2:null} +{1:[1,2],3:[5,6]} + + +-- !query +SELECT DISTINCT v1 FROM t7 +-- !query schema +struct,string>> +-- !query output +{{"col1":1}:"a",{"col1":2}:"b"} +{{"col1":1}:"a",{"col1":3}:"c"} +{{"col1":1}:"a",{"col1":null}:"b"} + + +-- !query +SELECT DISTINCT v1 FROM t8 +-- !query schema +struct>> +-- !query output +{1:{"col1":"a"},2:{"col1":"b"}} +{1:{"col1":"a"},2:{"col1":null}} +{1:{"col1":"a"},3:{"col1":"c"}} + + +-- !query +SELECT * FROM t1 ORDER BY v1 = v2 +-- !query schema +struct,v2:map> +-- !query output +{1:"a",2:"b"} {1:"A",2:"b"} +{1:"a",3:"c"} {3:"c",4:"d"} +{1:"a",3:"c"} {3:"c"} +{3:"c"} {3:"c",4:"d"} +{} {1:"a"} +{1:"a"} {} +{1:"a",2:"b"} {1:"a",2:"b"} +{1:"a",2:null} {1:"a",2:null} + + +-- !query +SELECT * FROM t1 WHERE v1 = v2 AND v1 = map_concat(v2, map(1, 'a')) +-- !query schema +struct,v2:map> +-- !query output +{1:"a",2:"b"} {1:"a",2:"b"} +{1:"a",2:null} {1:"a",2:null} + + +-- !query +SELECT * FROM t1 l, t1 r WHERE l.v1 = r.v2 AND l.v1 = map_concat(r.v2, map(1, 'a')) +-- !query schema +struct,v2:map,v1:map,v2:map> +-- !query output +{1:"a",2:"b"} {1:"A",2:"b"} {1:"a",2:"b"} {1:"a",2:"b"} +{1:"a",2:"b"} {1:"a",2:"b"} {1:"a",2:"b"} {1:"a",2:"b"} +{1:"a",2:null} {1:"a",2:null} {1:"a",2:null} {1:"a",2:null} +{1:"a"} {} {} {1:"a"} + + +-- !query +SELECT v1 = v2, count(1) FROM t1 GROUP BY v1 = v2 +-- !query schema +struct<(v1 = v2):boolean,count(1):bigint> +-- !query output +false 6 +true 2 + + +-- !query +SELECT v1 = v2, count(1) OVER(PARTITION BY v1 = v2) FROM t1 ORDER BY v1 +-- !query schema +struct<(v1 = v2):boolean,count(1) OVER (PARTITION BY (v1 = v2) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint> +-- !query output +false 6 +false 6 +true 2 +false 6 +true 2 +false 6 +false 6 +false 6 + + +-- !query +CREATE TEMPORARY VIEW t9 AS SELECT * FROM VALUES + (map("a", 0.0D, "b", -0.0D)), (map("b", 0.0D, "a", -0.0D)) +AS t(v) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM t9 l, t9 r WHERE l.v = r.v +-- !query schema +struct,v:map> +-- !query output +{"a":-0.0,"b":0.0} {"a":-0.0,"b":0.0} +{"a":-0.0,"b":0.0} {"a":0.0,"b":-0.0} +{"a":0.0,"b":-0.0} {"a":-0.0,"b":0.0} +{"a":0.0,"b":-0.0} {"a":0.0,"b":-0.0} + + +-- !query +SELECT v, count(1) FROM t9 GROUP BY v +-- !query schema +struct,count(1):bigint> +-- !query output +{"a":0.0,"b":0.0} 2 + + +-- !query +SELECT v, count(1) OVER(PARTITION BY v) FROM t9 ORDER BY v +-- !query schema +struct,count(1) OVER (PARTITION BY v ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint> +-- !query output +{"a":0.0,"b":-0.0} 2 +{"a":-0.0,"b":0.0} 2 diff --git a/sql/core/src/test/resources/sql-tests/results/pivot.sql.out b/sql/core/src/test/resources/sql-tests/results/pivot.sql.out index 69679f8be5fe4..52ca01ff04085 100644 --- a/sql/core/src/test/resources/sql-tests/results/pivot.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/pivot.sql.out @@ -455,10 +455,10 @@ PIVOT ( FOR m IN (map('1', 1), map('2', 2)) ) -- !query schema -struct<> +struct 1}:bigint,{2 -> 2}:bigint> -- !query output -org.apache.spark.sql.AnalysisException -Invalid pivot column 'm#x'. Pivot columns must be comparable. +2012 35000 NULL +2013 NULL 78000 -- !query @@ -472,10 +472,10 @@ PIVOT ( FOR (course, m) IN (('dotNET', map('1', 1)), ('Java', map('2', 2))) ) -- !query schema -struct<> +struct 1}}:bigint,{Java, {2 -> 2}}:bigint> -- !query output -org.apache.spark.sql.AnalysisException -Invalid pivot column 'named_struct(course, course#x, m, m#x)'. Pivot columns must be comparable. +2012 15000 NULL +2013 NULL 30000 -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-pivot.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-pivot.sql.out index dc5cc29762657..eb2af4ec5d4eb 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-pivot.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-pivot.sql.out @@ -421,10 +421,10 @@ PIVOT ( FOR m IN (map('1', 1), map('2', 2)) ) -- !query schema -struct<> +struct 1}:bigint,{2 -> 2}:bigint> -- !query output -org.apache.spark.sql.AnalysisException -Invalid pivot column 'm#x'. Pivot columns must be comparable. +2012 35000 NULL +2013 NULL 78000 -- !query @@ -438,10 +438,10 @@ PIVOT ( FOR (course, m) IN (('dotNET', map('1', 1)), ('Java', map('2', 2))) ) -- !query schema -struct<> +struct 1}}:bigint,{Java, {2 -> 2}}:bigint> -- !query output -org.apache.spark.sql.AnalysisException -Invalid pivot column 'named_struct(course, course#x, m, m#x)'. Pivot columns must be comparable. +2012 15000 NULL +2013 NULL 30000 -- !query diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index d0a122e0fe094..73d8b3b18c797 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -918,11 +918,10 @@ class DataFrameAggregateSuite extends QueryTest .toDF("x", "y") .select($"x", map($"x", $"y").as("y")) .createOrReplaceTempView("tempView") - val error = intercept[AnalysisException] { - sql("SELECT max_by(x, y) FROM tempView").show - } - assert( - error.message.contains("function max_by does not support ordering on type map")) + checkAnswer( + sql("SELECT max_by(x, y) FROM tempView"), + Row(2) :: Nil + ) } } @@ -974,11 +973,10 @@ class DataFrameAggregateSuite extends QueryTest .toDF("x", "y") .select($"x", map($"x", $"y").as("y")) .createOrReplaceTempView("tempView") - val error = intercept[AnalysisException] { - sql("SELECT min_by(x, y) FROM tempView").show - } - assert( - error.message.contains("function min_by does not support ordering on type map")) + checkAnswer( + sql("SELECT min_by(x, y) FROM tempView"), + Row(0) :: Nil + ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index 38b9a75dfbf11..f4f5903f8d4b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -3108,11 +3108,6 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { df.select(map_zip_with(col("mis"), col("i"), (x, y, z) => concat(x, y, z))) } assert(ex4a.getMessage.contains("type mismatch: argument 2 requires map type")) - - val ex5 = intercept[AnalysisException] { - df.selectExpr("map_zip_with(mmi, mmi, (x, y, z) -> x)") - } - assert(ex5.getMessage.contains("function map_zip_with does not support ordering on type map")) } test("transform keys function - primitive data types") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index e3259a24601f7..fa125743b25be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -347,25 +347,6 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { dates.intersect(widenTypedRows).collect() } - test("SPARK-19893: cannot run set operations with map type") { - val df = spark.range(1).select(map(lit("key"), $"id").as("m")) - val e = intercept[AnalysisException](df.intersect(df)) - assert(e.message.contains( - "Cannot have map type columns in DataFrame which calls set operations")) - val e2 = intercept[AnalysisException](df.except(df)) - assert(e2.message.contains( - "Cannot have map type columns in DataFrame which calls set operations")) - val e3 = intercept[AnalysisException](df.distinct()) - assert(e3.message.contains( - "Cannot have map type columns in DataFrame which calls set operations")) - withTempView("v") { - df.createOrReplaceTempView("v") - val e4 = intercept[AnalysisException](sql("SELECT DISTINCT m FROM v")) - assert(e4.message.contains( - "Cannot have map type columns in DataFrame which calls set operations")) - } - } - test("union all") { val unionDF = testData.union(testData).union(testData) .union(testData).union(testData) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 1e3d219220fa6..a7422a2bb6c2d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2937,6 +2937,11 @@ class DataFrameSuite extends QueryTest checkAnswer(sql("SELECT sum(c1 * c3) + sum(c2 * c3) FROM tbl"), Row(2.00000000000) :: Nil) } } + + test("SPARK-34819: dropDuplicates supports MapType") { + val df = Seq(Map("k1" -> 1, "k2" -> 2), Map("k2" -> 2, "k1" -> 1)).toDF("v") + assert(df.dropDuplicates().count() === 1) + } } case class GroupByKey(a: Int, b: Int) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 0a5feda1bd533..b5e8b2e69e48c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -91,11 +91,6 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { assert(e.getMessage == "sortBy must be used together with bucketBy") } - test("sorting by non-orderable column") { - val df = Seq("a" -> Map(1 -> 1), "b" -> Map(2 -> 2)).toDF("i", "j") - intercept[AnalysisException](df.write.bucketBy(2, "i").sortBy("j").saveAsTable("tt")) - } - test("write bucketed data using save()") { val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")