Skip to content

Commit 59bc16f

Browse files
committed
make coalesce test deterministic in RDDSuite
1 parent 7b4203a commit 59bc16f

File tree

1 file changed

+33
-28
lines changed

1 file changed

+33
-28
lines changed

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -274,37 +274,42 @@ class RDDSuite extends FunSuite with SharedSparkContext {
274274
test("coalesced RDDs with locality, large scale (10K partitions)") {
275275
// large scale experiment
276276
import collection.mutable
277-
val rnd = scala.util.Random
278277
val partitions = 10000
279278
val numMachines = 50
280279
val machines = mutable.ListBuffer[String]()
281-
(1 to numMachines).foreach(machines += "m"+_)
282-
283-
val blocks = (1 to partitions).map(i =>
284-
{ (i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList) } )
285-
286-
val data2 = sc.makeRDD(blocks)
287-
val coalesced2 = data2.coalesce(numMachines*2)
288-
289-
// test that you get over 90% locality in each group
290-
val minLocality = coalesced2.partitions
291-
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
292-
.foldLeft(1.0)((perc, loc) => math.min(perc,loc))
293-
assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.0).toInt + "%")
294-
295-
// test that the groups are load balanced with 100 +/- 20 elements in each
296-
val maxImbalance = coalesced2.partitions
297-
.map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size)
298-
.foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev))
299-
assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance)
300-
301-
val data3 = sc.makeRDD(blocks).map(i => i*2) // derived RDD to test *current* pref locs
302-
val coalesced3 = data3.coalesce(numMachines*2)
303-
val minLocality2 = coalesced3.partitions
304-
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
305-
.foldLeft(1.0)((perc, loc) => math.min(perc,loc))
306-
assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
307-
(minLocality2*100.0).toInt + "%")
280+
(1 to numMachines).foreach(machines += "m" + _)
281+
val rnd = scala.util.Random
282+
for (seed <- 1 to 5) {
283+
rnd.setSeed(seed)
284+
285+
val blocks = (1 to partitions).map { i =>
286+
(i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList)
287+
}
288+
289+
val data2 = sc.makeRDD(blocks)
290+
val coalesced2 = data2.coalesce(numMachines * 2)
291+
292+
// test that you get over 90% locality in each group
293+
val minLocality = coalesced2.partitions
294+
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
295+
.foldLeft(1.0)((perc, loc) => math.min(perc, loc))
296+
assert(minLocality >= 0.90, "Expected 90% locality but got " +
297+
(minLocality * 100.0).toInt + "%")
298+
299+
// test that the groups are load balanced with 100 +/- 20 elements in each
300+
val maxImbalance = coalesced2.partitions
301+
.map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size)
302+
.foldLeft(0)((dev, curr) => math.max(math.abs(100 - curr), dev))
303+
assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance)
304+
305+
val data3 = sc.makeRDD(blocks).map(i => i * 2) // derived RDD to test *current* pref locs
306+
val coalesced3 = data3.coalesce(numMachines * 2)
307+
val minLocality2 = coalesced3.partitions
308+
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
309+
.foldLeft(1.0)((perc, loc) => math.min(perc, loc))
310+
assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
311+
(minLocality2 * 100.0).toInt + "%")
312+
}
308313
}
309314

310315
test("zipped RDDs") {

0 commit comments

Comments
 (0)