Skip to content

Commit a9ed4f9

Browse files
author
Andrew Or
committed
Add a few missing scopes to certain RDD methods
1 parent 6b3403b commit a9ed4f9

File tree

1 file changed

+14
-6
lines changed
  • core/src/main/scala/org/apache/spark/rdd

1 file changed

+14
-6
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,9 @@ abstract class RDD[T: ClassTag](
317317
/**
318318
* Return a new RDD containing the distinct elements in this RDD.
319319
*/
320-
def distinct(): RDD[T] = distinct(partitions.length)
320+
def distinct(): RDD[T] = withScope {
321+
distinct(partitions.length)
322+
}
321323

322324
/**
323325
* Return a new RDD that has exactly numPartitions partitions.
@@ -429,7 +431,7 @@ abstract class RDD[T: ClassTag](
429431
def takeSample(
430432
withReplacement: Boolean,
431433
num: Int,
432-
seed: Long = Utils.random.nextLong): Array[T] = {
434+
seed: Long = Utils.random.nextLong): Array[T] = withScope {
433435
val numStDev = 10.0
434436

435437
if (num < 0) {
@@ -487,7 +489,9 @@ abstract class RDD[T: ClassTag](
487489
* Return the union of this RDD and another one. Any identical elements will appear multiple
488490
* times (use `.distinct()` to eliminate them).
489491
*/
490-
def ++(other: RDD[T]): RDD[T] = this.union(other)
492+
def ++(other: RDD[T]): RDD[T] = withScope {
493+
this.union(other)
494+
}
491495

492496
/**
493497
* Return this RDD sorted by the given key function.
@@ -567,8 +571,9 @@ abstract class RDD[T: ClassTag](
567571
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
568572
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
569573
*/
570-
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
574+
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
571575
groupBy[K](f, defaultPartitioner(this))
576+
}
572577

573578
/**
574579
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
@@ -579,8 +584,11 @@ abstract class RDD[T: ClassTag](
579584
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
580585
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
581586
*/
582-
def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
587+
def groupBy[K](
588+
f: T => K,
589+
numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
583590
groupBy(f, new HashPartitioner(numPartitions))
591+
}
584592

585593
/**
586594
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
@@ -739,7 +747,7 @@ abstract class RDD[T: ClassTag](
739747
mapPartitionsWithIndex { (index, iter) =>
740748
val a = constructA(index)
741749
iter.map(t => {f(t, a); t})
742-
}.foreach(_ => {})
750+
}
743751
}
744752

745753
/**

0 commit comments

Comments
 (0)