Skip to content

Commit 95d59d6

Browse files
committed
Add 'iterator' to reduce memory consumed by join
1 parent 652b781 commit 95d59d6

File tree

1 file changed

+8
-8
lines changed

1 file changed

+8
-8
lines changed

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
480480
*/
481481
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
482482
this.cogroup(other, partitioner).flatMapValues( pair =>
483-
for (v <- pair._1; w <- pair._2) yield (v, w)
483+
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
484484
)
485485
}
486486

@@ -493,9 +493,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
493493
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
494494
this.cogroup(other, partitioner).flatMapValues { pair =>
495495
if (pair._2.isEmpty) {
496-
pair._1.map(v => (v, None))
496+
pair._1.iterator.map(v => (v, None): (V, Option[W]))
497497
} else {
498-
for (v <- pair._1; w <- pair._2) yield (v, Some(w))
498+
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
499499
}
500500
}
501501
}
@@ -510,9 +510,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
510510
: RDD[(K, (Option[V], W))] = {
511511
this.cogroup(other, partitioner).flatMapValues { pair =>
512512
if (pair._1.isEmpty) {
513-
pair._2.map(w => (None, w))
513+
pair._2.iterator.map(w => (None, w): (Option[V], W))
514514
} else {
515-
for (v <- pair._1; w <- pair._2) yield (Some(v), w)
515+
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
516516
}
517517
}
518518
}
@@ -528,9 +528,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
528528
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
529529
: RDD[(K, (Option[V], Option[W]))] = {
530530
this.cogroup(other, partitioner).flatMapValues {
531-
case (vs, Seq()) => vs.map(v => (Some(v), None))
532-
case (Seq(), ws) => ws.map(w => (None, Some(w)))
533-
case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
531+
case (vs, Seq()) => vs.iterator.map(v => (Some(v), None): (Option[V], Option[W]))
532+
case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)): (Option[V], Option[W]))
533+
case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))
534534
}
535535
}
536536

0 commit comments

Comments
 (0)