Skip to content

Commit 692f4e6

Browse files
committed
cap numPartsToTry
1 parent c4483dc commit 692f4e6

File tree

3 files changed

+9
-4
lines changed

3 files changed

+9
-4
lines changed

core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,15 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
7878
// greater than totalParts because we actually cap it at totalParts in runJob.
7979
var numPartsToTry = 1
8080
if (partsScanned > 0) {
81-
// If we didn't find any rows after the first iteration, just try all partitions next.
81+
// If we didn't find any rows after the previous iteration, quadruple and retry.
8282
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
83-
// by 50%.
83+
// by 50%. We also cap the estimation in the end.
8484
if (results.size == 0) {
85-
numPartsToTry = totalParts - 1
85+
numPartsToTry = totalParts * 4
8686
} else {
8787
// the left side of max is >=1 whenever partsScanned >= 2
8888
numPartsToTry = ((1.5 * num * partsScanned / results.size).toInt - partsScanned) max 1
89+
numPartsToTry = numPartsToTry min (totalParts * 4)
8990
}
9091
}
9192

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1079,13 +1079,15 @@ abstract class RDD[T: ClassTag](
10791079
// greater than totalParts because we actually cap it at totalParts in runJob.
10801080
var numPartsToTry = 1
10811081
if (partsScanned > 0) {
1082-
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
1082+
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
10831083
// interpolate the number of partitions we need to try, but overestimate it by 50%.
1084+
// We also cap the estimation in the end.
10841085
if (buf.size == 0) {
10851086
numPartsToTry = partsScanned * 4
10861087
} else {
10871088
// the left side of max is >=1 whenever partsScanned >= 2
10881089
numPartsToTry = ((1.5 * num * partsScanned / buf.size).toInt - partsScanned) max 1
1090+
numPartsToTry = numPartsToTry min (partsScanned * 4)
10891091
}
10901092
}
10911093

python/pyspark/rdd.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,11 +1070,13 @@ def take(self, num):
10701070
# If we didn't find any rows after the previous iteration,
10711071
# quadruple and retry. Otherwise, interpolate the number of
10721072
# partitions we need to try, but overestimate it by 50%.
1073+
# We also cap the estimation in the end.
10731074
if len(items) == 0:
10741075
numPartsToTry = partsScanned * 4
10751076
else:
10761077
#the first paramter of max is >=1 whenever partsScanned >= 2
10771078
numPartsToTry = max(int(1.5 * num * partsScanned / len(items)) - partsScanned, 1)
1079+
numPartsToTry = min(numPartsToTry, partsScanned * 4)
10781080

10791081
left = num - len(items)
10801082

0 commit comments

Comments
 (0)