Skip to content

Commit 6cd28cc

Browse files
huitseekersrowen
authored andcommitted
[SPARK-9236] [CORE] Make defaultPartitioner not reuse a parent RDD's partitioner if it has 0 partitions
See also comments on https://issues.apache.org/jira/browse/SPARK-9236 Author: François Garillot <[email protected]> Closes apache#7616 from huitseeker/issue/SPARK-9236 and squashes the following commits: 217f902 [François Garillot] [SPARK-9236] Make defaultPartitioner not reuse a parent RDD's partitioner if it has 0 partitions
1 parent 6a7e537 commit 6cd28cc

File tree

2 files changed

+24
-1
lines changed

2 files changed

+24
-1
lines changed

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ object Partitioner {
5656
*/
5757
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
5858
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
59-
for (r <- bySize if r.partitioner.isDefined) {
59+
for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
6060
return r.partitioner.get
6161
}
6262
if (rdd.context.conf.contains("spark.default.parallelism")) {

core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,29 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
282282
))
283283
}
284284

285+
// See SPARK-9326
286+
test("cogroup with empty RDD") {
287+
import scala.reflect.classTag
288+
val intPairCT = classTag[(Int, Int)]
289+
290+
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
291+
val rdd2 = sc.emptyRDD[(Int, Int)](intPairCT)
292+
293+
val joined = rdd1.cogroup(rdd2).collect()
294+
assert(joined.size > 0)
295+
}
296+
297+
// See SPARK-9326
298+
test("cogroup with groupByed RDD having 0 partitions") {
299+
import scala.reflect.classTag
300+
val intCT = classTag[Int]
301+
302+
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
303+
val rdd2 = sc.emptyRDD[Int](intCT).groupBy((x) => 5)
304+
val joined = rdd1.cogroup(rdd2).collect()
305+
assert(joined.size > 0)
306+
}
307+
285308
test("rightOuterJoin") {
286309
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
287310
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))

0 commit comments

Comments
 (0)