Skip to content

Commit be4cc38

Browse files
committed
Merge pull request #5 from vnivargi/master-csd
adding full outer join support
2 parents 73691b1 + 13c1e5f commit be4cc38

File tree

10 files changed

+212
-0
lines changed

10 files changed

+212
-0
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,19 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
315315
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
316316
}
317317

318+
/**
319+
* Perform a full outer join of `this` and `other`. Output will have
320+
* each row from both RDDs or `None` where missing, i.e. one of
321+
* (k, (Some(v), Some(w)), (k, (Some(v), None)) or (k, (None, Some(w))
322+
* depending on the presence of (k, v) and/or (k, w) in `this` and `other`
323+
* Uses the given Partitioner to partition the output RDD.
324+
*/
325+
def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
326+
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
327+
val joinResult = rdd.fullOuterJoin(other, partitioner)
328+
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))})
329+
}
330+
318331
/**
319332
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
320333
* partitioner/parallelism level.
@@ -404,6 +417,24 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
404417
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
405418
}
406419

420+
/**
421+
* Perform a full outer join of `this` and `other`. Hash-partitions the resulting
422+
* RDD using the existing partitioner/parallelism level.
423+
*/
424+
def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = {
425+
val joinResult = rdd.fullOuterJoin(other)
426+
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))})
427+
}
428+
429+
/**
430+
* Perform a full outer join of `this` and `other`. Hash-partitions the resulting
431+
* RDD into the given number of partitions.
432+
*/
433+
def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], Optional[W])] = {
434+
val joinResult = rdd.fullOuterJoin(other, numPartitions)
435+
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))})
436+
}
437+
407438
/**
408439
* Return the key-value pairs in this RDD to the master as a Map.
409440
*/

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,26 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
292292
}
293293
}
294294

295+
/**
296+
* Perform a full outer join of `this` and `other`. Output will have
297+
* each row from both RDDs or `None` where missing, i.e. one of
298+
* (k, (Some(v), Some(w)), (k, (Some(v), None)) or (k, (None, Some(w))
299+
* depending on the presence of (k, v) and/or (k, w) in `this` and `other`
300+
* Uses the given Partitioner to partition the output RDD.
301+
*/
302+
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
303+
: RDD[(K, (Option[V], Option[W]))] = {
304+
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
305+
if (vs.isEmpty && !ws.isEmpty) {
306+
ws.iterator.map(w => (None, Some(w)))
307+
} else if (ws.isEmpty && !vs.isEmpty) {
308+
vs.iterator.map(v => (Some(v), None))
309+
} else {
310+
for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))
311+
}
312+
}
313+
}
314+
295315
/**
296316
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
297317
* existing partitioner/parallelism level.
@@ -377,6 +397,22 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
377397
rightOuterJoin(other, new HashPartitioner(numPartitions))
378398
}
379399

400+
/**
401+
* Perform a full outer join of `this` and `other`. Hash-partitions the resulting
402+
* RDD using the existing partitioner/parallelism level.
403+
*/
404+
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = {
405+
fullOuterJoin(other, defaultPartitioner(self, other))
406+
}
407+
408+
/**
409+
* Perform a full outer join of `this` and `other`. Hash-partitions the resulting
410+
* RDD into the given number of partitions.
411+
*/
412+
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = {
413+
fullOuterJoin(other, new HashPartitioner(numPartitions))
414+
}
415+
380416
/**
381417
* Return the key-value pairs in this RDD to the master as a Map.
382418
*/

core/src/test/scala/org/apache/spark/PartitioningSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,13 @@ class PartitioningSuite extends FunSuite with SharedSparkContext {
102102
assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
103103
assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
104104
assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
105+
assert(grouped2.fullOuterJoin(grouped4).partitioner === grouped4.partitioner)
105106
assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)
106107

107108
assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
108109
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
109110
assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
111+
assert(grouped2.fullOuterJoin(reduced2).partitioner === grouped2.partitioner)
110112
assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)
111113

112114
assert(grouped2.map(_ => 1).partitioner === None)
@@ -127,6 +129,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext {
127129
assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
128130
assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
129131
assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
132+
assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array"))
130133
assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
131134
assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
132135
assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))

core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,21 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
165165
))
166166
}
167167

168+
test("fullOuterJoin") {
169+
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
170+
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
171+
val joined = rdd1.fullOuterJoin(rdd2).collect()
172+
assert(joined.size === 6)
173+
assert(joined.toSet === Set(
174+
(1, (Some(1), Some('x'))),
175+
(1, (Some(2), Some('x'))),
176+
(2, (Some(1), Some('y'))),
177+
(2, (Some(1), Some('z'))),
178+
(3, (Some(1), None)),
179+
(4, (None, Some('w')))
180+
))
181+
}
182+
168183
test("join with no matches") {
169184
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
170185
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
135135
assert(rdd.join(emptyKv).collect().size === 0)
136136
assert(rdd.rightOuterJoin(emptyKv).collect().size === 0)
137137
assert(rdd.leftOuterJoin(emptyKv).collect().size === 2)
138+
assert(rdd.fullOuterJoin(emptyKv).collect().size === 2)
138139
assert(rdd.cogroup(emptyKv).collect().size === 2)
139140
assert(rdd.union(emptyKv).collect().size === 2)
140141
}

python/pyspark/join.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,22 @@ def dispatch(seq):
7878
return _do_python_join(rdd, other, numPartitions, dispatch)
7979

8080

81+
def python_full_outer_join(rdd, other, numPartitions):
82+
def dispatch(seq):
83+
vbuf, wbuf = [], []
84+
for (n, v) in seq:
85+
if n == 1:
86+
vbuf.append(v)
87+
elif n == 2:
88+
wbuf.append(v)
89+
if not vbuf:
90+
vbuf.append(None)
91+
if not wbuf:
92+
wbuf.append(None)
93+
return [(v, w) for v in vbuf for w in wbuf]
94+
return _do_python_join(rdd, other, numPartitions, dispatch)
95+
96+
8197
def python_cogroup(rdd, other, numPartitions):
8298
vs = rdd.map(lambda (k, v): (k, (1, v)))
8399
ws = other.map(lambda (k, v): (k, (2, v)))

python/pyspark/rdd.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,23 @@ def rightOuterJoin(self, other, numPartitions=None):
728728
"""
729729
return python_right_outer_join(self, other, numPartitions)
730730

731+
def fullOuterJoin(self, other, numPartitions=None):
732+
"""
733+
Perform a full outer join of C{self} and C{other}.
734+
735+
Output will have each row from both RDDs or None where missing, i.e.
736+
one of (k, (v, w)), (k, (v, None)), or (k, (None, w)) depending on
737+
the presence of (k, v) and/or (k, w) in C{self} and C{other}
738+
739+
Hash-partitions the resulting RDD into the given number of partitions.
740+
741+
>>> x = sc.parallelize([("a", 1), ("b", 4)])
742+
>>> y = sc.parallelize([("a", 2), ("c", 3)])
743+
>>> sorted(y.fullOuterJoin(x).collect())
744+
[('a', (2, 1)), ('b', (None, 4)), ('c', (3, None))]
745+
"""
746+
return python_full_outer_join(self, other, numPartitions)
747+
731748
# TODO: add option to control map-side combining
732749
def partitionBy(self, numPartitions, partitionFunc=hash):
733750
"""

streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,42 @@ extends Serializable {
549549
)
550550
}
551551

552+
/**
553+
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
554+
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
555+
* number of partitions.
556+
*/
557+
def fullOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = {
558+
fullOuterJoin[W](other, defaultPartitioner())
559+
}
560+
561+
/**
562+
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
563+
* `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
564+
* partitions.
565+
*/
566+
def fullOuterJoin[W: ClassManifest](
567+
other: DStream[(K, W)],
568+
numPartitions: Int
569+
): DStream[(K, (Option[V], Option[W]))] = {
570+
fullOuterJoin[W](other, defaultPartitioner(numPartitions))
571+
}
572+
573+
/**
574+
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
575+
* `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
576+
* the partitioning of each RDD.
577+
*/
578+
def fullOuterJoin[W: ClassManifest](
579+
other: DStream[(K, W)],
580+
partitioner: Partitioner
581+
): DStream[(K, (Option[V], Option[W]))] = {
582+
self.transformWith(
583+
other,
584+
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner)
585+
)
586+
}
587+
552588
/**
553589
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
554590
* is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,48 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
647647
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
648648
}
649649

650+
/**
651+
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
652+
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
653+
* number of partitions.
654+
*/
655+
def fullOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], Optional[W])] = {
656+
implicit val cm: ClassManifest[W] =
657+
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
658+
val joinResult = dstream.fullOuterJoin(other.dstream)
659+
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))}
660+
}
661+
662+
/**
663+
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
664+
* `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
665+
* partitions.
666+
*/
667+
def fullOuterJoin[W](
668+
other: JavaPairDStream[K, W],
669+
numPartitions: Int
670+
): JavaPairDStream[K, (Optional[V], Optional[W])] = {
671+
implicit val cm: ClassManifest[W] =
672+
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
673+
val joinResult = dstream.fullOuterJoin(other.dstream, numPartitions)
674+
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))}
675+
}
676+
677+
/**
678+
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
679+
* `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
680+
* the partitioning of each RDD.
681+
*/
682+
def fullOuterJoin[W](
683+
other: JavaPairDStream[K, W],
684+
partitioner: Partitioner
685+
): JavaPairDStream[K, (Optional[V], Optional[W])] = {
686+
implicit val cm: ClassManifest[W] =
687+
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
688+
val joinResult = dstream.fullOuterJoin(other.dstream, partitioner)
689+
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))}
690+
}
691+
650692
/**
651693
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
652694
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".

streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,21 @@ class BasicOperationsSuite extends TestSuiteBase {
310310
testOperation(inputData1, inputData2, operation, outputData, true)
311311
}
312312

313+
test("fullOuterJoin") {
314+
val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
315+
val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
316+
val outputData = Seq(
317+
Seq( ("a", (Some(1), Some("x"))), ("b", (Some(1), Some("x"))) ),
318+
Seq( ("", (Some(1), Some("x"))), ("b", (None, Some("x"))), ("a", (Some(1), None)) ),
319+
Seq( ("", (Some(1), None)) ),
320+
Seq( ("", (None, Some("x"))) )
321+
)
322+
val operation = (s1: DStream[String], s2: DStream[String]) => {
323+
s1.map(x => (x, 1)).fullOuterJoin(s2.map(x => (x, "x")))
324+
}
325+
testOperation(inputData1, inputData2, operation, outputData, true)
326+
}
327+
313328
test("updateStateByKey") {
314329
val inputData =
315330
Seq(

0 commit comments

Comments
 (0)