Skip to content

Commit c399baa

Browse files
marmbrusrxin
authored andcommitted
SPARK-1456 Remove view bounds on Ordered in favor of a context bound on Ordering.
This doesn't require creating new Ordering objects per row. Additionally, [view bounds are going to be deprecated](https://issues.scala-lang.org/browse/SI-7629), so we should get rid of them while APIs are still flexible. Author: Michael Armbrust <[email protected]> Closes apache#410 from marmbrus/viewBounds and squashes the following commits: c574221 [Michael Armbrust] fix example. 812008e [Michael Armbrust] Update Java API. 1b9b85c [Michael Armbrust] Update scala doc. 35798a8 [Michael Armbrust] Remove view bounds on Ordered in favor of a context bound on Ordering.
1 parent 81a152c commit c399baa

File tree

5 files changed

+30
-18
lines changed

5 files changed

+30
-18
lines changed

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,14 @@ class HashPartitioner(partitions: Int) extends Partitioner {
8989
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
9090
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
9191
*/
92-
class RangePartitioner[K <% Ordered[K]: ClassTag, V](
92+
class RangePartitioner[K : Ordering : ClassTag, V](
9393
partitions: Int,
9494
@transient rdd: RDD[_ <: Product2[K,V]],
9595
private val ascending: Boolean = true)
9696
extends Partitioner {
9797

98+
private val ordering = implicitly[Ordering[K]]
99+
98100
// An array of upper bounds for the first (partitions - 1) partitions
99101
private val rangeBounds: Array[K] = {
100102
if (partitions == 1) {
@@ -103,7 +105,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V](
103105
val rddSize = rdd.count()
104106
val maxSampleSize = partitions * 20.0
105107
val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0)
106-
val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sortWith(_ < _)
108+
val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted
107109
if (rddSample.length == 0) {
108110
Array()
109111
} else {
@@ -126,7 +128,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V](
126128
var partition = 0
127129
if (rangeBounds.length < 1000) {
128130
// If we have less than 100 partitions naive search
129-
while (partition < rangeBounds.length && k > rangeBounds(partition)) {
131+
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
130132
partition += 1
131133
}
132134
} else {

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1273,7 +1273,7 @@ object SparkContext extends Logging {
12731273
rdd: RDD[(K, V)]) =
12741274
new SequenceFileRDDFunctions(rdd)
12751275

1276-
implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
1276+
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
12771277
rdd: RDD[(K, V)]) =
12781278
new OrderedRDDFunctions[K, V, (K, V)](rdd)
12791279

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -626,10 +626,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
626626
* order of the keys).
627627
*/
628628
def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = {
629-
class KeyOrdering(val a: K) extends Ordered[K] {
630-
override def compare(b: K) = comp.compare(a, b)
631-
}
632-
implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
629+
implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
633630
fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending))
634631
}
635632

@@ -640,10 +637,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
640637
* order of the keys).
641638
*/
642639
def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = {
643-
class KeyOrdering(val a: K) extends Ordered[K] {
644-
override def compare(b: K) = comp.compare(a, b)
645-
}
646-
implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
640+
implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
647641
fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions))
648642
}
649643

core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,31 @@ import org.apache.spark.{Logging, RangePartitioner}
2424
/**
2525
* Extra functions available on RDDs of (key, value) pairs where the key is sortable through
2626
* an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
27-
* use these functions. They will work with any key type that has a `scala.math.Ordered`
28-
* implementation.
27+
* use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
28+
* scope. Ordering objects already exist for all of the standard primitive types. Users can also
29+
* define their own orderings for custom types, or to override the default ordering. The implicit
30+
* ordering that is in the closest scope will be used.
31+
*
32+
* {{{
33+
* import org.apache.spark.SparkContext._
34+
*
35+
* val rdd: RDD[(String, Int)] = ...
36+
* implicit val caseInsensitiveOrdering = new Ordering[String] {
37+
* override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
38+
* }
39+
*
40+
* // Sort by key, using the above case insensitive ordering.
41+
* rdd.sortByKey()
42+
* }}}
2943
*/
30-
class OrderedRDDFunctions[K <% Ordered[K]: ClassTag,
44+
class OrderedRDDFunctions[K : Ordering : ClassTag,
3145
V: ClassTag,
3246
P <: Product2[K, V] : ClassTag](
3347
self: RDD[P])
3448
extends Logging with Serializable {
3549

50+
private val ordering = implicitly[Ordering[K]]
51+
3652
/**
3753
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
3854
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
@@ -45,9 +61,9 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassTag,
4561
shuffled.mapPartitions(iter => {
4662
val buf = iter.toArray
4763
if (ascending) {
48-
buf.sortWith((x, y) => x._1 < y._1).iterator
64+
buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
4965
} else {
50-
buf.sortWith((x, y) => x._1 > y._1).iterator
66+
buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
5167
}
5268
}, preservesPartitioning = true)
5369
}

core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.Array
2323
import scala.reflect._
2424

2525
private[spark] object CollectionsUtils {
26-
def makeBinarySearch[K <% Ordered[K] : ClassTag] : (Array[K], K) => Int = {
26+
def makeBinarySearch[K : Ordering : ClassTag] : (Array[K], K) => Int = {
2727
classTag[K] match {
2828
case ClassTag.Float =>
2929
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float])

0 commit comments

Comments
 (0)