Skip to content

Commit c4a8a51

Browse files
committed
Added java cogroup 4
1 parent e94963c commit c4a8a51

File tree

1 file changed

+51
-0
lines changed

1 file changed

+51
-0
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,18 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
543543
partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
544544
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
545545

546+
/**
547+
* For each key k in `this` or `other1` or `other2` or `other3`,
548+
* return a resulting RDD that contains a tuple with the list of values
549+
* for that key in `this`, `other1`, `other2` and `other3`.
550+
*/
551+
def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
552+
other2: JavaPairRDD[K, W2],
553+
other3: JavaPairRDD[K, W3],
554+
partitioner: Partitioner)
555+
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
556+
fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, partitioner)))
557+
546558
/**
547559
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
548560
* list of values for that key in `this` as well as `other`.
@@ -558,6 +570,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
558570
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
559571
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
560572

573+
/**
574+
* For each key k in `this` or `other1` or `other2` or `other3`,
575+
* return a resulting RDD that contains a tuple with the list of values
576+
* for that key in `this`, `other1`, `other2` and `other3`.
577+
*/
578+
def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
579+
other2: JavaPairRDD[K, W2],
580+
other3: JavaPairRDD[K, W3])
581+
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
582+
fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3)))
583+
561584
/**
562585
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
563586
* list of values for that key in `this` as well as `other`.
@@ -574,6 +597,18 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
574597
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
575598
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
576599

600+
/**
601+
* For each key k in `this` or `other1` or `other2` or `other3`,
602+
* return a resulting RDD that contains a tuple with the list of values
603+
* for that key in `this`, `other1`, `other2` and `other3`.
604+
*/
605+
def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
606+
other2: JavaPairRDD[K, W2],
607+
other3: JavaPairRDD[K, W3],
608+
numPartitions: Int)
609+
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
610+
fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, numPartitions)))
611+
577612
/** Alias for cogroup. */
578613
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
579614
fromRDD(cogroupResultToJava(rdd.groupWith(other)))
@@ -583,6 +618,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
583618
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
584619
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
585620

621+
/** Alias for cogroup. */
622+
def groupWith[W1, W2, W3](other1: JavaPairRDD[K, W1],
623+
other2: JavaPairRDD[K, W2],
624+
other3: JavaPairRDD[K, W3])
625+
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
626+
fromRDD(cogroupResult3ToJava(rdd.groupWith(other1, other2, other3)))
627+
586628
/**
587629
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
588630
* RDD has a known partitioner by only searching the partition that the key maps to.
@@ -786,6 +828,15 @@ object JavaPairRDD {
786828
.mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3)))
787829
}
788830

831+
private[spark]
832+
def cogroupResult3ToJava[K: ClassTag, V, W1, W2, W3](
833+
rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))])
834+
: RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3]))] = {
835+
rddToPairRDDFunctions(rdd)
836+
.mapValues(x =>
837+
(asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3), asJavaIterable(x._4)))
838+
}
839+
789840
def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
790841
new JavaPairRDD[K, V](rdd)
791842
}

0 commit comments

Comments
 (0)