Skip to content

Commit 1b841ca

Browse files
committed
WIP towards copying
1 parent b420a71 commit 1b841ca

File tree

3 files changed

+10
-9
lines changed

3 files changed

+10
-9
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/execution/joins/SortMergeJoinIterator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ private boolean nextMatchingPair() {
112112
// Iterate the right side to buffer all rows that match.
113113
// As the records should be ordered, exit when we meet the first record that not match.
114114
while (!stop && rightElement != null) {
115-
rightMatches.$plus$eq(rightElement);
115+
rightMatches.$plus$eq(rightElement.copy());
116116
fetchRight();
117117
stop = keyOrdering.compare(leftKey, rightKey) != 0;
118118
}

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,9 @@ case class ExternalSort(
250250

251251
/**
252252
* :: DeveloperApi ::
253-
* TODO(josh): document
254-
* Performs a sort, spilling to disk as needed.
253+
* Optimized version of [[ExternalSort]] that operates on binary data (implemented as part of
254+
* Project Tungsten).
255+
*
255256
* @param global when true performs a global sort of all partitions by shuffling the data first
256257
* if necessary.
257258
*/
@@ -262,7 +263,6 @@ case class UnsafeExternalSort(
262263
child: SparkPlan)
263264
extends UnaryNode {
264265

265-
private[this] val numFields: Int = child.schema.size
266266
private[this] val schema: StructType = child.schema
267267

268268
override def requiredChildDistribution: Seq[Distribution] =
@@ -275,6 +275,7 @@ case class UnsafeExternalSort(
275275
val prefixComparator = new PrefixComparator {
276276
override def compare(prefix1: Long, prefix2: Long): Int = 0
277277
}
278+
// TODO: do real prefix comparsion. For dev/testing purposes, this is a dummy implementation.
278279
def prefixComputer(row: Row): Long = 0
279280
new UnsafeExternalRowSorter(schema, ordering, prefixComparator, prefixComputer).sort(iterator)
280281
}

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/UnsafeSortMergeJoin.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
package org.apache.spark.sql.execution.joins
1919

20-
2120
import org.apache.spark.annotation.DeveloperApi
2221
import org.apache.spark.rdd.RDD
2322
import org.apache.spark.sql.Row
23+
import org.apache.spark.sql.catalyst.expressions
2424
import org.apache.spark.sql.catalyst.expressions._
2525
import org.apache.spark.sql.catalyst.plans.physical._
2626
import org.apache.spark.sql.execution.{UnsafeExternalSort, BinaryNode, SparkPlan}
@@ -63,17 +63,17 @@ case class UnsafeSortMergeJoin(
6363

6464
// Only sort if necessary:
6565
val leftOrder = requiredOrders(leftKeys)
66-
val leftResults = {
66+
val leftResults: RDD[Row] = {
6767
if (left.outputOrdering == leftOrder) {
68-
left.execute().map(_.copy())
68+
left.execute()
6969
} else {
7070
new UnsafeExternalSort(leftOrder, global = false, left).execute()
7171
}
7272
}
7373
val rightOrder = requiredOrders(rightKeys)
74-
val rightResults = {
74+
val rightResults: RDD[Row] = {
7575
if (right.outputOrdering == rightOrder) {
76-
right.execute().map(_.copy())
76+
right.execute()
7777
} else {
7878
new UnsafeExternalSort(rightOrder, global = false, right).execute()
7979
}

0 commit comments

Comments
 (0)