Skip to content

Commit eb1a89b

Browse files
committed
[SPARK-4719][API] Consolidate various narrow dep RDD classes with MapPartitionsRDD.
MappedRDD, MappedValuesRDD, FlatMappedValuesRDD, FilteredRDD, GlommedRDD, FlatMappedRDD are not necessary. They can be implemented trivially using MapPartitionsRDD.
1 parent 77be8b9 commit eb1a89b

File tree

10 files changed

+52
-245
lines changed

10 files changed

+52
-245
lines changed

core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ import org.apache.spark.input.StreamFileInputFormat
2424
import org.apache.spark.{ Partition, SparkContext }
2525

2626
private[spark] class BinaryFileRDD[T](
27-
sc: SparkContext,
28-
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
29-
keyClass: Class[String],
30-
valueClass: Class[T],
31-
@transient conf: Configuration,
32-
minPartitions: Int)
27+
sc: SparkContext,
28+
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
29+
keyClass: Class[String],
30+
valueClass: Class[T],
31+
@transient conf: Configuration,
32+
minPartitions: Int)
3333
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {
3434

3535
override def getPartitions: Array[Partition] = {

core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala

Lines changed: 0 additions & 35 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala

Lines changed: 0 additions & 34 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala

Lines changed: 0 additions & 35 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala

Lines changed: 0 additions & 31 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala

Lines changed: 0 additions & 32 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala

Lines changed: 0 additions & 33 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
660660
*/
661661
def mapValues[U](f: V => U): RDD[(K, U)] = {
662662
val cleanF = self.context.clean(f)
663-
new MappedValuesRDD(self, cleanF)
663+
new MapPartitionsRDD[(K, U), (K, V)](self,
664+
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
665+
preservesPartitioning = true)
664666
}
665667

666668
/**
@@ -669,7 +671,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
669671
*/
670672
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
671673
val cleanF = self.context.clean(f)
672-
new FlatMappedValuesRDD(self, cleanF)
674+
new MapPartitionsRDD[(K, U), (K, V)](self,
675+
(context, pid, iter) => iter.flatMap { case (k, v) =>
676+
cleanF(v).map(x => (k, x))
677+
},
678+
preservesPartitioning = true)
673679
}
674680

675681
/**

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.rdd
1919

20-
import java.util.{Properties, Random}
20+
import java.util.Random
2121

2222
import scala.collection.{mutable, Map}
2323
import scala.collection.mutable.ArrayBuffer
@@ -36,13 +36,12 @@ import org.apache.spark._
3636
import org.apache.spark.Partitioner._
3737
import org.apache.spark.annotation.{DeveloperApi, Experimental}
3838
import org.apache.spark.api.java.JavaRDD
39-
import org.apache.spark.broadcast.Broadcast
4039
import org.apache.spark.partial.BoundedDouble
4140
import org.apache.spark.partial.CountEvaluator
4241
import org.apache.spark.partial.GroupedCountEvaluator
4342
import org.apache.spark.partial.PartialResult
4443
import org.apache.spark.storage.StorageLevel
45-
import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
44+
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
4645
import org.apache.spark.util.collection.OpenHashMap
4746
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, BernoulliCellSampler,
4847
SamplingUtils}
@@ -270,19 +269,27 @@ abstract class RDD[T: ClassTag](
270269
/**
271270
* Return a new RDD by applying a function to all elements of this RDD.
272271
*/
273-
def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
272+
def map[U: ClassTag](f: T => U): RDD[U] = {
273+
val cleanF = sc.clean(f)
274+
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
275+
}
274276

275277
/**
276278
* Return a new RDD by first applying a function to all elements of this
277279
* RDD, and then flattening the results.
278280
*/
279-
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =
280-
new FlatMappedRDD(this, sc.clean(f))
281+
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
282+
val cleanF = sc.clean(f)
283+
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
284+
}
281285

282286
/**
283287
* Return a new RDD containing only the elements that satisfy a predicate.
284288
*/
285-
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
289+
def filter(f: T => Boolean): RDD[T] = {
290+
val cleanF = sc.clean(f)
291+
new MapPartitionsRDD[T, T](this, (context, pid, iter) => iter.filter(cleanF))
292+
}
286293

287294
/**
288295
* Return a new RDD containing the distinct elements in this RDD.
@@ -503,7 +510,9 @@ abstract class RDD[T: ClassTag](
503510
/**
504511
* Return an RDD created by coalescing all elements within each partition into an array.
505512
*/
506-
def glom(): RDD[Array[T]] = new GlommedRDD(this)
513+
def glom(): RDD[Array[T]] = {
514+
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
515+
}
507516

508517
/**
509518
* Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of

0 commit comments

Comments
 (0)