Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 475e346

Browse files
committed
Latest round of reviews.
1 parent e9fb45e commit 475e346

File tree

3 files changed

+58
-45
lines changed

3 files changed

+58
-45
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
4545
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
4646
*/
4747
override protected[streaming] val rateController: Option[RateController] =
48-
RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) }
48+
if (RateController.isBackPressureEnabled(ssc.conf))
49+
RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) }
50+
else
51+
None
4952

5053
/**
5154
* Gets the receiver object that will be sent to the worker nodes

streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
2222

2323
import scala.concurrent.{ExecutionContext, Future}
2424

25-
import org.apache.spark.annotation.DeveloperApi
25+
import org.apache.spark.SparkConf
2626
import org.apache.spark.streaming.scheduler.rate.RateEstimator
2727
import org.apache.spark.util.{ThreadUtils, Utils}
2828

@@ -83,3 +83,8 @@ private[streaming] abstract class RateController(val streamUID: Int, rateEstimat
8383
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
8484
}
8585
}
86+
87+
object RateController {
88+
def isBackPressureEnabled(conf: SparkConf): Boolean =
89+
conf.getBoolean("spark.streaming.backpressure.enable", false)
90+
}

streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala

Lines changed: 48 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -34,65 +34,70 @@ class RateControllerSuite extends TestSuiteBase {
3434

3535
test("rate controller publishes updates") {
3636
val ssc = new StreamingContext(conf, batchDuration)
37-
val dstream = new MockRateLimitDStream(ssc, Seq(Seq(1)), 1)
38-
val output = new TestOutputStreamWithPartitions(dstream)
39-
output.register()
40-
runStreams(ssc, 1, 1)
41-
42-
eventually(timeout(2.seconds)) {
43-
assert(dstream.publishCalls === 1)
37+
withStreamingContext(ssc) { ssc =>
38+
val dstream = new MockRateLimitDStream(ssc, Seq(Seq(1)), 1)
39+
val output = new TestOutputStreamWithPartitions(dstream)
40+
output.register()
41+
runStreams(ssc, 1, 1)
42+
43+
eventually(timeout(2.seconds)) {
44+
assert(dstream.publishCalls === 1)
45+
}
4446
}
4547
}
4648

4749
test("receiver rate controller updates reach receivers") {
4850
val ssc = new StreamingContext(conf, batchDuration)
51+
withStreamingContext(ssc) { ssc =>
52+
val dstream = new RateLimitInputDStream(ssc) {
53+
override val rateController =
54+
Some(new ReceiverRateController(id, new ConstantEstimator(200.0)))
55+
}
56+
SingletonDummyReceiver.reset()
4957

50-
val dstream = new RateLimitInputDStream(ssc) {
51-
override val rateController =
52-
Some(new ReceiverRateController(id, new ConstantEstimator(200.0)))
53-
}
54-
SingletonDummyReceiver.reset()
55-
56-
val output = new TestOutputStreamWithPartitions(dstream)
57-
output.register()
58-
runStreams(ssc, 2, 2)
58+
val output = new TestOutputStreamWithPartitions(dstream)
59+
output.register()
60+
runStreams(ssc, 2, 2)
5961

60-
eventually(timeout(5.seconds)) {
61-
assert(dstream.getCurrentRateLimit === Some(200))
62+
eventually(timeout(5.seconds)) {
63+
assert(dstream.getCurrentRateLimit === Some(200))
64+
}
6265
}
6366
}
6467

6568
test("multiple rate controller updates reach receivers") {
6669
val ssc = new StreamingContext(conf, batchDuration)
67-
val rates = Seq(100L, 200L, 300L)
70+
withStreamingContext(ssc) { ssc =>
71+
val rates = Seq(100L, 200L, 300L)
6872

69-
val dstream = new RateLimitInputDStream(ssc) {
70-
override val rateController =
71-
Some(new ReceiverRateController(id, new ConstantEstimator(rates.map(_.toDouble): _*)))
72-
}
73-
SingletonDummyReceiver.reset()
74-
75-
val output = new TestOutputStreamWithPartitions(dstream)
76-
output.register()
77-
78-
val observedRates = mutable.HashSet.empty[Long]
79-
80-
@volatile var done = false
81-
runInBackground {
82-
while (!done) {
83-
try {
84-
dstream.getCurrentRateLimit.foreach(observedRates += _)
85-
} catch {
86-
case NonFatal(_) => () // don't stop if the executor wasn't installed yet
73+
val dstream = new RateLimitInputDStream(ssc) {
74+
override val rateController =
75+
Some(new ReceiverRateController(id, new ConstantEstimator(rates.map(_.toDouble): _*)))
76+
}
77+
SingletonDummyReceiver.reset()
78+
79+
val output = new TestOutputStreamWithPartitions(dstream)
80+
output.register()
81+
82+
val observedRates = mutable.HashSet.empty[Long]
83+
84+
@volatile var done = false
85+
runInBackground {
86+
while (!done) {
87+
try {
88+
dstream.getCurrentRateLimit.foreach(observedRates += _)
89+
} catch {
90+
case NonFatal(_) => () // don't stop if the executor wasn't installed yet
91+
}
92+
Thread.sleep(20)
8793
}
88-
Thread.sleep(20)
8994
}
90-
}
91-
runStreams(ssc, 4, 4)
92-
done = true
95+
runStreams(ssc, 4, 4)
96+
done = true
9397

94-
// Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver
95-
observedRates should contain theSameElementsAs (rates :+ Long.MaxValue)
98+
// Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver
99+
observedRates should contain theSameElementsAs (rates :+ Long.MaxValue)
100+
}
96101
}
97102

98103
private def runInBackground(f: => Unit): Unit = {

0 commit comments

Comments
 (0)