Skip to content

Commit e5b078f

Browse files
committed
address PR comments
1 parent d91bcdd commit e5b078f

File tree

2 files changed

+146
-82
lines changed

2 files changed

+146
-82
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 0 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -994,88 +994,6 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
994994
}
995995
}
996996
}
997-
998-
test("EnsureRequirements.reorder should fallback to the right side HashPartitioning") {
999-
val plan1 = DummySparkPlan(
1000-
outputPartitioning = HashPartitioning(exprA :: exprB :: exprC :: Nil, 5))
1001-
val plan2 = DummySparkPlan(
1002-
outputPartitioning = HashPartitioning(exprB :: exprC :: Nil, 5))
1003-
// The left keys cannot be reordered to match the left partitioning, and it should
1004-
// fall back to reorder the right side.
1005-
val smjExec = SortMergeJoinExec(
1006-
exprA :: exprB :: Nil, exprC :: exprB :: Nil, Inner, None, plan1, plan2)
1007-
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(smjExec)
1008-
outputPlan match {
1009-
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
1010-
SortExec(_, _,
1011-
ShuffleExchangeExec(HashPartitioning(leftPartitioningExpressions, _), _, _), _),
1012-
SortExec(_, _,
1013-
DummySparkPlan(_, _, HashPartitioning(rightPartitioningExpressions, _), _, _), _), _) =>
1014-
assert(leftKeys !== smjExec.leftKeys)
1015-
assert(rightKeys !== smjExec.rightKeys)
1016-
assert(leftKeys === leftPartitioningExpressions)
1017-
assert(rightKeys === rightPartitioningExpressions)
1018-
case _ => fail(outputPlan.toString)
1019-
}
1020-
}
1021-
1022-
test("EnsureRequirements.reorder should handle PartitioningCollection") {
1023-
// PartitioningCollection on the left side of join.
1024-
val plan1 = DummySparkPlan(
1025-
outputPartitioning = PartitioningCollection(Seq(
1026-
HashPartitioning(exprA :: exprB :: Nil, 5),
1027-
HashPartitioning(exprA :: Nil, 5))))
1028-
val plan2 = DummySparkPlan()
1029-
val smjExec1 = SortMergeJoinExec(
1030-
exprB :: exprA :: Nil, exprA :: exprB :: Nil, Inner, None, plan1, plan2)
1031-
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(smjExec1)
1032-
outputPlan match {
1033-
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
1034-
SortExec(_, _,
1035-
DummySparkPlan(_, _, PartitioningCollection(leftPartitionings), _, _), _),
1036-
SortExec(_, _,
1037-
ShuffleExchangeExec(HashPartitioning(rightPartitioningExpressions, _), _, _), _), _) =>
1038-
assert(leftKeys !== smjExec1.leftKeys)
1039-
assert(rightKeys !== smjExec1.rightKeys)
1040-
assert(leftKeys === leftPartitionings(0).asInstanceOf[HashPartitioning].expressions)
1041-
assert(rightKeys === rightPartitioningExpressions)
1042-
case _ => fail(outputPlan.toString)
1043-
}
1044-
1045-
// PartitioningCollection on the right side of join.
1046-
val smjExec2 = SortMergeJoinExec(
1047-
exprA :: exprB :: Nil, exprB :: exprA :: Nil, Inner, None, plan2, plan1)
1048-
val outputPlan2 = EnsureRequirements(spark.sessionState.conf).apply(smjExec2)
1049-
outputPlan2 match {
1050-
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
1051-
SortExec(_, _,
1052-
ShuffleExchangeExec(HashPartitioning(leftPartitioningExpressions, _), _, _), _),
1053-
SortExec(_, _,
1054-
DummySparkPlan(_, _, PartitioningCollection(rightPartitionings), _, _), _), _) =>
1055-
assert(leftKeys !== smjExec2.leftKeys)
1056-
assert(rightKeys !== smjExec2.rightKeys)
1057-
assert(leftKeys === leftPartitioningExpressions)
1058-
assert(rightKeys === rightPartitionings(0).asInstanceOf[HashPartitioning].expressions)
1059-
case _ => fail(outputPlan2.toString)
1060-
}
1061-
1062-
// Both sides are PartitioningCollection and falls back to the right side.
1063-
val smjExec3 = SortMergeJoinExec(
1064-
exprA :: exprC :: Nil, exprB :: exprA :: Nil, Inner, None, plan1, plan1)
1065-
val outputPlan3 = EnsureRequirements(spark.sessionState.conf).apply(smjExec2)
1066-
outputPlan3 match {
1067-
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
1068-
SortExec(_, _,
1069-
ShuffleExchangeExec(HashPartitioning(leftPartitioningExpressions, _), _, _), _),
1070-
SortExec(_, _,
1071-
DummySparkPlan(_, _, PartitioningCollection(rightPartitionings), _, _), _), _) =>
1072-
assert(leftKeys !== smjExec2.leftKeys)
1073-
assert(rightKeys !== smjExec2.rightKeys)
1074-
assert(leftKeys === leftPartitioningExpressions)
1075-
assert(rightKeys === rightPartitionings(0).asInstanceOf[HashPartitioning].expressions)
1076-
case _ => fail(outputPlan3.toString)
1077-
}
1078-
}
1079997
}
1080998

1081999
// Used for unit-testing EnsureRequirements
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.exchange
19+
20+
import org.apache.spark.sql.catalyst.expressions.Literal
21+
import org.apache.spark.sql.catalyst.plans.Inner
22+
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, PartitioningCollection}
23+
import org.apache.spark.sql.execution.{DummySparkPlan, SortExec}
24+
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
25+
import org.apache.spark.sql.test.SharedSparkSession
26+
27+
class EnsureRequirementsSuite extends SharedSparkSession {
28+
private val exprA = Literal(1)
29+
private val exprB = Literal(2)
30+
private val exprC = Literal(3)
31+
32+
test("EnsureRequirements.reorder should handle PartitioningCollection") {
33+
val plan1 = DummySparkPlan(
34+
outputPartitioning = PartitioningCollection(Seq(
35+
HashPartitioning(exprA :: exprB :: Nil, 5),
36+
HashPartitioning(exprA :: Nil, 5))))
37+
val plan2 = DummySparkPlan()
38+
39+
// Test PartitioningCollection on the left side of join.
40+
val smjExec1 = SortMergeJoinExec(
41+
exprB :: exprA :: Nil, exprA :: exprB :: Nil, Inner, None, plan1, plan2)
42+
EnsureRequirements(spark.sessionState.conf).apply(smjExec1) match {
43+
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
44+
SortExec(_, _,
45+
DummySparkPlan(_, _, PartitioningCollection(leftPartitionings), _, _), _),
46+
SortExec(_, _,
47+
ShuffleExchangeExec(HashPartitioning(rightPartitioningExpressions, _), _, _), _), _) =>
48+
assert(leftKeys !== smjExec1.leftKeys)
49+
assert(rightKeys !== smjExec1.rightKeys)
50+
assert(leftKeys === leftPartitionings.head.asInstanceOf[HashPartitioning].expressions)
51+
assert(rightKeys === rightPartitioningExpressions)
52+
case other => fail(other.toString)
53+
}
54+
55+
// Test PartitioningCollection on the right side of join.
56+
val smjExec2 = SortMergeJoinExec(
57+
exprA :: exprB :: Nil, exprB :: exprA :: Nil, Inner, None, plan2, plan1)
58+
EnsureRequirements(spark.sessionState.conf).apply(smjExec2) match {
59+
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
60+
SortExec(_, _,
61+
ShuffleExchangeExec(HashPartitioning(leftPartitioningExpressions, _), _, _), _),
62+
SortExec(_, _,
63+
DummySparkPlan(_, _, PartitioningCollection(rightPartitionings), _, _), _), _) =>
64+
assert(leftKeys !== smjExec2.leftKeys)
65+
assert(rightKeys !== smjExec2.rightKeys)
66+
assert(leftKeys === leftPartitioningExpressions)
67+
assert(rightKeys === rightPartitionings.head.asInstanceOf[HashPartitioning].expressions)
68+
case other => fail(other.toString)
69+
}
70+
71+
// Both sides are PartitioningCollection, but left side cannot be reorderd to match
72+
// and it should fall back to the right side.
73+
val smjExec3 = SortMergeJoinExec(
74+
exprA :: exprC :: Nil, exprB :: exprA :: Nil, Inner, None, plan1, plan1)
75+
EnsureRequirements(spark.sessionState.conf).apply(smjExec3) match {
76+
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
77+
SortExec(_, _,
78+
ShuffleExchangeExec(HashPartitioning(leftPartitioningExpressions, _), _, _), _),
79+
SortExec(_, _,
80+
DummySparkPlan(_, _, PartitioningCollection(rightPartitionings), _, _), _), _) =>
81+
assert(leftKeys !== smjExec3.leftKeys)
82+
assert(rightKeys !== smjExec3.rightKeys)
83+
assert(leftKeys === leftPartitioningExpressions)
84+
assert(rightKeys === rightPartitionings.head.asInstanceOf[HashPartitioning].expressions)
85+
case other => fail(other.toString)
86+
}
87+
}
88+
89+
test("EnsureRequirements.reorder should fallback to the other side partitioning") {
90+
val plan1 = DummySparkPlan(
91+
outputPartitioning = HashPartitioning(exprA :: exprB :: exprC :: Nil, 5))
92+
val plan2 = DummySparkPlan(
93+
outputPartitioning = HashPartitioning(exprB :: exprC :: Nil, 5))
94+
95+
// Test fallback to the right side, which has PartitioningCollection.
96+
val smjExec1 = SortMergeJoinExec(
97+
exprA :: exprB :: Nil, exprC :: exprB :: Nil, Inner, None, plan1, plan2)
98+
EnsureRequirements(spark.sessionState.conf).apply(smjExec1) match {
99+
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
100+
SortExec(_, _,
101+
ShuffleExchangeExec(HashPartitioning(leftPartitioningExpressions, _), _, _), _),
102+
SortExec(_, _,
103+
DummySparkPlan(_, _, HashPartitioning(rightPartitioningExpressions, _), _, _), _), _) =>
104+
assert(leftKeys !== smjExec1.leftKeys)
105+
assert(rightKeys !== smjExec1.rightKeys)
106+
assert(leftKeys === leftPartitioningExpressions)
107+
assert(rightKeys === rightPartitioningExpressions)
108+
case other => fail(other.toString)
109+
}
110+
111+
// Test fallback to the right side, which has PartitioningCollection.
112+
val plan3 = DummySparkPlan(
113+
outputPartitioning = PartitioningCollection(Seq(HashPartitioning(exprB :: exprC :: Nil, 5))))
114+
val smjExec2 = SortMergeJoinExec(
115+
exprA :: exprB :: Nil, exprC :: exprB :: Nil, Inner, None, plan1, plan3)
116+
EnsureRequirements(spark.sessionState.conf).apply(smjExec2) match {
117+
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
118+
SortExec(_, _,
119+
ShuffleExchangeExec(HashPartitioning(leftPartitioningExpressions, _), _, _), _),
120+
SortExec(_, _,
121+
DummySparkPlan(_, _, PartitioningCollection(rightPartitionings), _, _), _), _) =>
122+
assert(leftKeys !== smjExec2.leftKeys)
123+
assert(rightKeys !== smjExec2.rightKeys)
124+
assert(leftKeys === leftPartitioningExpressions)
125+
assert(rightKeys === rightPartitionings.head.asInstanceOf[HashPartitioning].expressions)
126+
case other => fail(other.toString)
127+
}
128+
129+
// The right side has HashPartitioning, so it is matched first, but no reordering match is
130+
// found, and it should fall back to the left side, which has a PartitioningCollection.
131+
val smjExec3 = SortMergeJoinExec(
132+
exprC :: exprB :: Nil, exprA :: exprB :: Nil, Inner, None, plan3, plan1)
133+
EnsureRequirements(spark.sessionState.conf).apply(smjExec3) match {
134+
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
135+
SortExec(_, _,
136+
DummySparkPlan(_, _, PartitioningCollection(leftPartitionings), _, _), _),
137+
SortExec(_, _,
138+
ShuffleExchangeExec(HashPartitioning(rightPartitioningExpressions, _), _, _), _), _) =>
139+
assert(leftKeys !== smjExec3.leftKeys)
140+
assert(rightKeys !== smjExec3.rightKeys)
141+
assert(leftKeys === leftPartitionings.head.asInstanceOf[HashPartitioning].expressions)
142+
assert(rightKeys === rightPartitioningExpressions)
143+
case other => fail(other.toString)
144+
}
145+
}
146+
}

0 commit comments

Comments
 (0)