Skip to content

Commit 269cf86

Browse files
committed
Back out SMJ operator change; isolate changes to selection of sort op.
I'll consider fusing the sort and merge steps in a followup PR.
1 parent 1b841ca commit 269cf86

File tree

4 files changed

+104
-254
lines changed

4 files changed

+104
-254
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/execution/joins/SortMergeJoinIterator.java

Lines changed: 0 additions & 153 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
102102
// for now let's support inner join first, then add outer join
103103
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
104104
if sqlContext.conf.sortMergeJoinEnabled =>
105-
val mergeJoin = if (sqlContext.conf.unsafeEnabled) {
106-
joins.UnsafeSortMergeJoin(leftKeys, rightKeys, planLater(left), planLater(right))
107-
} else {
105+
val mergeJoin =
108106
joins.SortMergeJoin(leftKeys, rightKeys, planLater(left), planLater(right))
109-
}
110107
condition.map(Filter(_, mergeJoin)).getOrElse(mergeJoin) :: Nil
111108

112109
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala

Lines changed: 103 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717

1818
package org.apache.spark.sql.execution.joins
1919

20+
import java.util.NoSuchElementException
21+
2022
import org.apache.spark.annotation.DeveloperApi
2123
import org.apache.spark.rdd.RDD
2224
import org.apache.spark.sql.catalyst.expressions._
25+
import org.apache.spark.sql.catalyst.plans._
2326
import org.apache.spark.sql.catalyst.plans.physical._
2427
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
28+
import org.apache.spark.util.collection.CompactBuffer
2529

2630
/**
2731
* :: DeveloperApi ::
@@ -60,12 +64,105 @@ case class SortMergeJoin(
6064
val rightResults = right.execute().map(_.copy())
6165

6266
leftResults.zipPartitions(rightResults) { (leftIter, rightIter) =>
63-
new SortMergeJoinIterator(
64-
leftIter,
65-
rightIter,
66-
leftKeyGenerator,
67-
rightKeyGenerator,
68-
keyOrdering);
67+
new Iterator[Row] {
68+
// Mutable per row objects.
69+
private[this] val joinRow = new JoinedRow5
70+
private[this] var leftElement: Row = _
71+
private[this] var rightElement: Row = _
72+
private[this] var leftKey: Row = _
73+
private[this] var rightKey: Row = _
74+
private[this] var rightMatches: CompactBuffer[Row] = _
75+
private[this] var rightPosition: Int = -1
76+
private[this] var stop: Boolean = false
77+
private[this] var matchKey: Row = _
78+
79+
// initialize iterator
80+
initialize()
81+
82+
override final def hasNext: Boolean = nextMatchingPair()
83+
84+
override final def next(): Row = {
85+
if (hasNext) {
86+
// we are using the buffered right rows and run down left iterator
87+
val joinedRow = joinRow(leftElement, rightMatches(rightPosition))
88+
rightPosition += 1
89+
if (rightPosition >= rightMatches.size) {
90+
rightPosition = 0
91+
fetchLeft()
92+
if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) {
93+
stop = false
94+
rightMatches = null
95+
}
96+
}
97+
joinedRow
98+
} else {
99+
// no more result
100+
throw new NoSuchElementException
101+
}
102+
}
103+
104+
private def fetchLeft() = {
105+
if (leftIter.hasNext) {
106+
leftElement = leftIter.next()
107+
leftKey = leftKeyGenerator(leftElement)
108+
} else {
109+
leftElement = null
110+
}
111+
}
112+
113+
private def fetchRight() = {
114+
if (rightIter.hasNext) {
115+
rightElement = rightIter.next()
116+
rightKey = rightKeyGenerator(rightElement)
117+
} else {
118+
rightElement = null
119+
}
120+
}
121+
122+
private def initialize() = {
123+
fetchLeft()
124+
fetchRight()
125+
}
126+
127+
/**
128+
* Searches the right iterator for the next rows that have matches in left side, and store
129+
* them in a buffer.
130+
*
131+
* @return true if the search is successful, and false if the right iterator runs out of
132+
* tuples.
133+
*/
134+
private def nextMatchingPair(): Boolean = {
135+
if (!stop && rightElement != null) {
136+
// run both side to get the first match pair
137+
while (!stop && leftElement != null && rightElement != null) {
138+
val comparing = keyOrdering.compare(leftKey, rightKey)
139+
// for inner join, we need to filter those null keys
140+
stop = comparing == 0 && !leftKey.anyNull
141+
if (comparing > 0 || rightKey.anyNull) {
142+
fetchRight()
143+
} else if (comparing < 0 || leftKey.anyNull) {
144+
fetchLeft()
145+
}
146+
}
147+
rightMatches = new CompactBuffer[Row]()
148+
if (stop) {
149+
stop = false
150+
// iterate the right side to buffer all rows that matches
151+
// as the records should be ordered, exit when we meet the first that not match
152+
while (!stop && rightElement != null) {
153+
rightMatches += rightElement
154+
fetchRight()
155+
stop = keyOrdering.compare(leftKey, rightKey) != 0
156+
}
157+
if (rightMatches.size > 0) {
158+
rightPosition = 0
159+
matchKey = leftKey
160+
}
161+
}
162+
}
163+
rightMatches != null && rightMatches.size > 0
164+
}
165+
}
69166
}
70167
}
71168
}

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/UnsafeSortMergeJoin.scala

Lines changed: 0 additions & 91 deletions
This file was deleted.

0 commit comments

Comments
 (0)