Skip to content

Commit c9d37e1

Browse files
committed
[SPARK-2726] and [SPARK-2727] Remove SortOrder and do in-place sort.
1 parent 16ef4d1 commit c9d37e1

File tree

4 files changed

+18
-31
lines changed

4 files changed

+18
-31
lines changed

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark
1919

2020
import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.rdd.RDD
22-
import org.apache.spark.rdd.SortOrder.SortOrder
2322
import org.apache.spark.serializer.Serializer
2423
import org.apache.spark.shuffle.ShuffleHandle
2524

@@ -63,8 +62,7 @@ class ShuffleDependency[K, V, C](
6362
val serializer: Option[Serializer] = None,
6463
val keyOrdering: Option[Ordering[K]] = None,
6564
val aggregator: Option[Aggregator[K, V, C]] = None,
66-
val mapSideCombine: Boolean = false,
67-
val sortOrder: Option[SortOrder] = None)
65+
val mapSideCombine: Boolean = false)
6866
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
6967

7068
val shuffleId: Int = rdd.context.newShuffleId()

core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,6 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
5858
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
5959
val part = new RangePartitioner(numPartitions, self, ascending)
6060
new ShuffledRDD[K, V, V, P](self, part)
61-
.setKeyOrdering(ordering)
62-
.setSortOrder(if (ascending) SortOrder.ASCENDING else SortOrder.DESCENDING)
61+
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
6362
}
6463
}
65-
66-
private[spark] object SortOrder extends Enumeration {
67-
type SortOrder = Value
68-
val ASCENDING, DESCENDING = Value
69-
}

core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import scala.reflect.ClassTag
2121

2222
import org.apache.spark._
2323
import org.apache.spark.annotation.DeveloperApi
24-
import org.apache.spark.rdd.SortOrder.SortOrder
2524
import org.apache.spark.serializer.Serializer
2625

2726
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
@@ -52,8 +51,6 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
5251

5352
private var mapSideCombine: Boolean = false
5453

55-
private var sortOrder: Option[SortOrder] = None
56-
5754
/** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
5855
def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C, P] = {
5956
this.serializer = Option(serializer)
@@ -78,15 +75,8 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
7875
this
7976
}
8077

81-
/** Set sort order for RDD's sorting. */
82-
def setSortOrder(sortOrder: SortOrder): ShuffledRDD[K, V, C, P] = {
83-
this.sortOrder = Option(sortOrder)
84-
this
85-
}
86-
8778
override def getDependencies: Seq[Dependency[_]] = {
88-
List(new ShuffleDependency(prev, part, serializer,
89-
keyOrdering, aggregator, mapSideCombine, sortOrder))
79+
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
9080
}
9181

9282
override val partitioner = Some(part)

core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.shuffle.hash
1919

2020
import org.apache.spark.{InterruptibleIterator, TaskContext}
21-
import org.apache.spark.rdd.SortOrder
2221
import org.apache.spark.serializer.Serializer
2322
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}
2423

@@ -51,16 +50,22 @@ class HashShuffleReader[K, C](
5150
iter
5251
}
5352

54-
val sortedIter = for (sortOrder <- dep.sortOrder; ordering <- dep.keyOrdering) yield {
55-
val buf = aggregatedIter.toArray
56-
if (sortOrder == SortOrder.ASCENDING) {
57-
buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
58-
} else {
59-
buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
60-
}
53+
// Sort the output if there is a sort ordering defined.
54+
dep.keyOrdering match {
55+
case Some(keyOrd: Ordering[K]) =>
56+
// Define a Comparator for the whole record based on the key Ordering.
57+
val cmp = new Ordering[Product2[K, C]] {
58+
override def compare(o1: Product2[K, C], o2: Product2[K, C]): Int = {
59+
keyOrd.compare(o1._1, o2._1)
60+
}
61+
}
62+
val sortBuffer: Array[Product2[K, C]] = aggregatedIter.toArray
63+
// TODO: do external sort.
64+
scala.util.Sorting.quickSort(sortBuffer)(cmp)
65+
sortBuffer.iterator
66+
case None =>
67+
aggregatedIter
6168
}
62-
63-
sortedIter.getOrElse(aggregatedIter)
6469
}
6570

6671
/** Close this reader */

0 commit comments

Comments
 (0)