Skip to content

Commit cf80011

Browse files
rednaxelafxkai-chi
authored andcommitted
[SPARK-26352][SQL][FOLLOWUP-2.4] Fix missing sameOutput in branch-2.4
## What changes were proposed in this pull request? After apache#23303 was merged to branch-2.3/2.4, the builds on those branches were broken due to missing a `LogicalPlan.sameOutput` function which came from apache#22713 only available on master. This PR is to follow-up with the broken 2.3/2.4 branches and make a copy of the new `LogicalPlan.sameOutput` into `ReorderJoin` to make it locally available. ## How was this patch tested? Fix the build of 2.3/2.4. Closes apache#23330 from rednaxelafx/clean-build-2.4. Authored-by: Kris Mok <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent a6e6828 commit cf80011

File tree

2 files changed

+33
-3
lines changed

2 files changed

+33
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,29 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
100100
createOrderedJoin(input, conditions)
101101
}
102102

103-
if (p.sameOutput(reordered)) {
103+
if (sameOutput(p, reordered)) {
104104
reordered
105105
} else {
106106
// Reordering the joins have changed the order of the columns.
107107
// Inject a projection to make sure we restore to the expected ordering.
108108
Project(p.output, reordered)
109109
}
110110
}
111+
112+
/**
113+
* Returns true iff output of both plans are semantically the same, ie.:
114+
* - they contain the same number of `Attribute`s;
115+
* - references are the same;
116+
* - the order is equal too.
117+
* NOTE: this is copied over from SPARK-25691 from master.
118+
*/
119+
def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = {
120+
val output1 = plan1.output
121+
val output2 = plan2.output
122+
output1.length == output2.length && output1.zip(output2).forall {
123+
case (a1, a2) => a1.semanticEquals(a2)
124+
}
125+
}
111126
}
112127

113128
/**

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,13 +291,28 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
291291
val optimized = Optimize.execute(analyzed)
292292
val expected = groundTruthBestPlan.analyze
293293

294-
assert(analyzed.sameOutput(expected)) // if this fails, the expected plan itself is incorrect
295-
assert(analyzed.sameOutput(optimized))
294+
assert(sameOutput(analyzed, expected)) // if this fails, the expected plan itself is incorrect
295+
assert(sameOutput(analyzed, optimized))
296296

297297
compareJoinOrder(optimized, expected)
298298
}
299299

300300
private def outputsOf(plans: LogicalPlan*): Seq[Attribute] = {
301301
plans.map(_.output).reduce(_ ++ _)
302302
}
303+
304+
/**
305+
* Returns true iff output of both plans are semantically the same, ie.:
306+
* - they contain the same number of `Attribute`s;
307+
* - references are the same;
308+
* - the order is equal too.
309+
* NOTE: this is copied over from SPARK-25691 from master.
310+
*/
311+
def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = {
312+
val output1 = plan1.output
313+
val output2 = plan2.output
314+
output1.length == output2.length && output1.zip(output2).forall {
315+
case (a1, a2) => a1.semanticEquals(a2)
316+
}
317+
}
303318
}

0 commit comments

Comments
 (0)