Skip to content

Commit 98ab411

Browse files
sryzarxin
authored andcommitted
SPARK-2519 part 2. Remove pattern matching on Tuple2 in critical section...
...s of CoGroupedRDD and PairRDDFunctions This also removes an unnecessary tuple creation in cogroup. Author: Sandy Ryza <[email protected]> Closes apache#1447 from sryza/sandy-spark-2519-2 and squashes the following commits: b6d9699 [Sandy Ryza] Remove missed Tuple2 match in CoGroupedRDD a109828 [Sandy Ryza] Remove another pattern matching in MappedValuesRDD and revert some changes in PairRDDFunctions be10f8a [Sandy Ryza] SPARK-2519 part 2. Remove pattern matching on Tuple2 in critical sections of CoGroupedRDD and PairRDDFunctions
1 parent 4da01e3 commit 98ab411

File tree

3 files changed

+33
-33
lines changed

3 files changed

+33
-33
lines changed

core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
170170

171171
val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
172172
val newCombiner = Array.fill(numRdds)(new CoGroup)
173-
value match { case (v, depNum) => newCombiner(depNum) += v }
173+
newCombiner(value._2) += value._1
174174
newCombiner
175175
}
176176
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
177177
(combiner, value) => {
178-
value match { case (v, depNum) => combiner(depNum) += v }
178+
combiner(value._2) += value._1
179179
combiner
180180
}
181181
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =

core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@ class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
2828
override val partitioner = firstParent[Product2[K, U]].partitioner
2929

3030
override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
31-
firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) }
31+
firstParent[Product2[K, V]].iterator(split, context).map { pair => (pair._1, f(pair._2)) }
3232
}
3333
}

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -216,17 +216,17 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
216216

217217
val reducePartition = (iter: Iterator[(K, V)]) => {
218218
val map = new JHashMap[K, V]
219-
iter.foreach { case (k, v) =>
220-
val old = map.get(k)
221-
map.put(k, if (old == null) v else func(old, v))
219+
iter.foreach { pair =>
220+
val old = map.get(pair._1)
221+
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
222222
}
223223
Iterator(map)
224224
} : Iterator[JHashMap[K, V]]
225225

226226
val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
227-
m2.foreach { case (k, v) =>
228-
val old = m1.get(k)
229-
m1.put(k, if (old == null) v else func(old, v))
227+
m2.foreach { pair =>
228+
val old = m1.get(pair._1)
229+
m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
230230
}
231231
m1
232232
} : JHashMap[K, V]
@@ -401,9 +401,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
401401
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
402402
*/
403403
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
404-
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
405-
for (v <- vs; w <- ws) yield (v, w)
406-
}
404+
this.cogroup(other, partitioner).flatMapValues( pair =>
405+
for (v <- pair._1; w <- pair._2) yield (v, w)
406+
)
407407
}
408408

409409
/**
@@ -413,11 +413,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
413413
* partition the output RDD.
414414
*/
415415
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
416-
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
417-
if (ws.isEmpty) {
418-
vs.map(v => (v, None))
416+
this.cogroup(other, partitioner).flatMapValues { pair =>
417+
if (pair._2.isEmpty) {
418+
pair._1.map(v => (v, None))
419419
} else {
420-
for (v <- vs; w <- ws) yield (v, Some(w))
420+
for (v <- pair._1; w <- pair._2) yield (v, Some(w))
421421
}
422422
}
423423
}
@@ -430,11 +430,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
430430
*/
431431
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
432432
: RDD[(K, (Option[V], W))] = {
433-
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
434-
if (vs.isEmpty) {
435-
ws.map(w => (None, w))
433+
this.cogroup(other, partitioner).flatMapValues { pair =>
434+
if (pair._1.isEmpty) {
435+
pair._2.map(w => (None, w))
436436
} else {
437-
for (v <- vs; w <- ws) yield (Some(v), w)
437+
for (v <- pair._1; w <- pair._2) yield (Some(v), w)
438438
}
439439
}
440440
}
@@ -535,7 +535,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
535535
val data = self.collect()
536536
val map = new mutable.HashMap[K, V]
537537
map.sizeHint(data.length)
538-
data.foreach { case (k, v) => map.put(k, v) }
538+
data.foreach { pair => map.put(pair._1, pair._2) }
539539
map
540540
}
541541

@@ -572,10 +572,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
572572
}
573573
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
574574
cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
575-
(vs.asInstanceOf[Seq[V]],
576-
w1s.asInstanceOf[Seq[W1]],
577-
w2s.asInstanceOf[Seq[W2]],
578-
w3s.asInstanceOf[Seq[W3]])
575+
(vs.asInstanceOf[Seq[V]],
576+
w1s.asInstanceOf[Seq[W1]],
577+
w2s.asInstanceOf[Seq[W2]],
578+
w3s.asInstanceOf[Seq[W3]])
579579
}
580580
}
581581

@@ -589,8 +589,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
589589
throw new SparkException("Default partitioner cannot partition array keys.")
590590
}
591591
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
592-
cg.mapValues { case Seq(vs, ws) =>
593-
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
592+
cg.mapValues { case Seq(vs, w1s) =>
593+
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
594594
}
595595
}
596596

@@ -606,8 +606,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
606606
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
607607
cg.mapValues { case Seq(vs, w1s, w2s) =>
608608
(vs.asInstanceOf[Seq[V]],
609-
w1s.asInstanceOf[Seq[W1]],
610-
w2s.asInstanceOf[Seq[W2]])
609+
w1s.asInstanceOf[Seq[W1]],
610+
w2s.asInstanceOf[Seq[W2]])
611611
}
612612
}
613613

@@ -712,8 +712,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
712712
val index = p.getPartition(key)
713713
val process = (it: Iterator[(K, V)]) => {
714714
val buf = new ArrayBuffer[V]
715-
for ((k, v) <- it if k == key) {
716-
buf += v
715+
for (pair <- it if pair._1 == key) {
716+
buf += pair._2
717717
}
718718
buf
719719
} : Seq[V]
@@ -858,8 +858,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
858858
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
859859
try {
860860
while (iter.hasNext) {
861-
val (k, v) = iter.next()
862-
writer.write(k, v)
861+
val pair = iter.next()
862+
writer.write(pair._1, pair._2)
863863
}
864864
} finally {
865865
writer.close(hadoopContext)

0 commit comments

Comments
 (0)