From 4721c7d9394b7917b2be8fb7df5e4eb1c31d68df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Garillot?= Date: Mon, 13 Jul 2015 13:26:57 +0200 Subject: [PATCH 1/8] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator --- .../streaming/receiver/RateLimiter.scala | 24 ++++++++++++++++--- .../streaming/receiver/ReceiverMessage.scala | 3 ++- .../receiver/ReceiverSupervisorImpl.scala | 2 ++ .../streaming/scheduler/ReceiverTracker.scala | 8 ++++++- 4 files changed, 32 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 8df542b367d27..356ae340387df 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.receiver +import java.util.concurrent.atomic.AtomicInteger + import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter} import org.apache.spark.{Logging, SparkConf} @@ -34,12 +36,28 @@ import org.apache.spark.{Logging, SparkConf} */ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { - private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0) - private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate) + // treated as an upper limit + private val maxRateLimit = conf.getInt("spark.streaming.receiver.maxRate", 0) + private[receiver] var currentRateLimit = new AtomicInteger(maxRateLimit) + private lazy val rateLimiter = GuavaRateLimiter.create(currentRateLimit.get()) def waitToPush() { - if (desiredRate > 0) { + if (currentRateLimit.get() > 0) { rateLimiter.acquire() } } + + private[receiver] def updateRate(newRate: Int): Unit = + if (newRate > 0) { + try { + if (maxRateLimit > 0) { + currentRateLimit.set(newRate.min(maxRateLimit)) + } + else { + currentRateLimit.set(newRate) + } + } finally { + rateLimiter.setRate(currentRateLimit.get()) + } + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala index 7bf3c33319491..1eb55affaa9d0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala @@ -23,4 +23,5 @@ import org.apache.spark.streaming.Time private[streaming] sealed trait ReceiverMessage extends Serializable private[streaming] object StopReceiver extends ReceiverMessage private[streaming] case class CleanupOldBlocks(threshTime: Time) extends ReceiverMessage - +private[streaming] case class UpdateRateLimit(elementsPerSecond: Long) + extends ReceiverMessage diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 6078cdf8f8790..6e819460b1b23 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -77,6 +77,8 @@ private[streaming] class ReceiverSupervisorImpl( case CleanupOldBlocks(threshTime) => logDebug("Received delete old batch signal") cleanupOldBlocks(threshTime) + case UpdateRateLimit(eps) => + blockGenerator.updateRate(eps.toInt) } }) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 644e581cd8279..604d1a0dae289 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SparkEnv, SparkException} import org.apache.spark.rpc._ import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl, - StopReceiver} + StopReceiver, UpdateRateLimit} import org.apache.spark.util.SerializableConfiguration /** @@ -180,6 +180,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false logError(s"Deregistered receiver for stream $streamId: $messageWithError") } + /** Update a receiver's maximum rate from an estimator's update */ + def sendRateUpdate(streamUID: Int, newRate: Long): Unit = { + for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint)) + eP.send(UpdateRateLimit(newRate)) + } + /** Add new blocks for the given stream */ private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { receivedBlockTracker.addBlock(receivedBlockInfo) From d15de422b973a020d5aa9035016c1274262631fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Garillot?= Date: Wed, 15 Jul 2015 13:14:31 +0200 Subject: [PATCH 2/8] [SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.streaming.receiver.maxRate --- .../streaming/receiver/RateLimiterSuite.scala | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala new file mode 100644 index 0000000000000..904c7773c5f2c --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.receiver + +import org.apache.spark.SparkConf +import org.apache.spark.SparkFunSuite + +/** Testsuite for testing the network receiver behavior */ +class RateLimiterSuite extends SparkFunSuite { + + test("rate limiter initializes even without a maxRate set") { + val conf = new SparkConf() + val rateLimiter = new RateLimiter(conf){} + rateLimiter.updateRate(105) + assert(rateLimiter.currentRateLimit.get == 105) + } + + test("rate limiter updates when below maxRate") { + val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "110") + val rateLimiter = new RateLimiter(conf){} + rateLimiter.updateRate(105) + assert(rateLimiter.currentRateLimit.get == 105) + } + + test("rate limiter stays below maxRate despite large updates") { + val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "100") + val rateLimiter = new RateLimiter(conf){} + rateLimiter.updateRate(105) + assert(rateLimiter.currentRateLimit.get == 100) + } + +} From cd1397d141eda98ded62491c0f2d90a2b47e56c5 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Fri, 17 Jul 2015 16:38:10 +0200 Subject: [PATCH 3/8] Add a test for the propagation of a new rate limit from driver to receivers. --- .../spark/streaming/receiver/Receiver.scala | 2 +- .../receiver/ReceiverSupervisor.scala | 3 ++ .../receiver/ReceiverSupervisorImpl.scala | 3 ++ .../spark/streaming/TestSuiteBase.scala | 15 ++++++++ .../scheduler/ReceiverTrackerSuite.scala | 34 +++++++++++++++++++ 5 files changed, 56 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index 5b5a3fe648602..c3078cd4ad35f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -271,7 +271,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable } /** Get the attached executor. */ - private def executor = { + private[streaming] def executor = { assert(executor_ != null, "Executor has not been attached to this receiver") executor_ } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index eeb14ca3a49e9..944d893b9bbf7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -58,6 +58,9 @@ private[streaming] abstract class ReceiverSupervisor( /** Time between a receiver is stopped and started again */ private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000) + /** The current maximum rate limit for this receiver. */ + private[streaming] def getCurrentRateLimit: Option[Int] = None + /** Exception associated with the stopping of the receiver */ @volatile protected var stoppingError: Throwable = null diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 6e819460b1b23..edb0fc3718fc7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -100,6 +100,9 @@ private[streaming] class ReceiverSupervisorImpl( } }, streamId, env.conf) + override private[streaming] def getCurrentRateLimit: Option[Int] = + Some(blockGenerator.currentRateLimit.get) + /** Push a single record of received data into block generator. */ def pushSingle(data: Any) { blockGenerator.addData(data) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 0d58a7b54412f..d0ac371db9aad 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -537,4 +537,19 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { verifyOutput[W](output, expectedOutput, useSet) } } + + /** + * Wait until `cond` becomes true, or timeout ms have passed. This method checks the condition + * every 100ms, so it won't wait more than 100ms more than necessary. + * + * @param cond A boolean that should become `true` + * @param timemout How many millis to wait before giving up + */ + def waitUntil(cond: => Boolean, timeout: Int): Unit = { + val start = System.currentTimeMillis() + val end = start + timeout + while ((System.currentTimeMillis() < end) && !cond) { + Thread.sleep(100) + } + } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index a6e783861dbe6..9da851b5e6c1e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -22,6 +22,9 @@ import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver._ import org.apache.spark.util.Utils +import org.apache.spark.streaming.dstream.InputDStream +import scala.reflect.ClassTag +import org.apache.spark.streaming.dstream.ReceiverInputDStream /** Testsuite for receiver scheduling */ class ReceiverTrackerSuite extends TestSuiteBase { @@ -72,15 +75,46 @@ class ReceiverTrackerSuite extends TestSuiteBase { assert(locations(0).length === 1) assert(locations(3).length === 1) } + + test("Receiver tracker - propagates rate limit") { + val newRateLimit = 100 + val ids = new TestReceiverInputDStream(ssc) + val tracker = new ReceiverTracker(ssc) + tracker.start() + waitUntil(TestDummyReceiver.started, 5000) + tracker.sendRateUpdate(ids.id, newRateLimit) + // this is an async message, we need to wait a bit for it to be processed + waitUntil(ids.getRateLimit.get == newRateLimit, 1000) + assert(ids.getRateLimit.get === newRateLimit) + } +} + +/** An input DStream with a hard-coded receiver that gives access to internals for testing. */ +private class TestReceiverInputDStream(@transient ssc_ : StreamingContext) + extends ReceiverInputDStream[Int](ssc_) { + + override def getReceiver(): DummyReceiver = TestDummyReceiver + + def getRateLimit: Option[Int] = + TestDummyReceiver.executor.getCurrentRateLimit } +/** + * We need the receiver to be an object, otherwise serialization will create another one + * and we won't be able to read its rate limit. + */ +private object TestDummyReceiver extends DummyReceiver + /** * Dummy receiver implementation */ private class DummyReceiver(host: Option[String] = None) extends Receiver[Int](StorageLevel.MEMORY_ONLY) { + var started = false + def onStart() { + started = true } def onStop() { From 261a05128ec1e1e055c62a6afd44fef39fb711c1 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Mon, 20 Jul 2015 14:28:46 +0200 Subject: [PATCH 4/8] - removed field to hold the current rate limit in rate limiter - made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually` --- .../streaming/receiver/RateLimiter.scala | 30 +++++++++---------- .../spark/streaming/receiver/Receiver.scala | 2 +- .../receiver/ReceiverSupervisor.scala | 2 +- .../receiver/ReceiverSupervisorImpl.scala | 7 +++-- .../streaming/scheduler/ReceiverTracker.scala | 2 +- .../spark/streaming/TestSuiteBase.scala | 15 ---------- .../streaming/receiver/RateLimiterSuite.scala | 7 ++--- .../scheduler/ReceiverTrackerSuite.scala | 17 +++++++---- 8 files changed, 36 insertions(+), 46 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 356ae340387df..0f15ddc7288ab 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -37,27 +37,25 @@ import org.apache.spark.{Logging, SparkConf} private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { // treated as an upper limit - private val maxRateLimit = conf.getInt("spark.streaming.receiver.maxRate", 0) - private[receiver] var currentRateLimit = new AtomicInteger(maxRateLimit) - private lazy val rateLimiter = GuavaRateLimiter.create(currentRateLimit.get()) + private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue) + private lazy val rateLimiter = GuavaRateLimiter.create(maxRateLimit.toDouble) def waitToPush() { - if (currentRateLimit.get() > 0) { - rateLimiter.acquire() - } + rateLimiter.acquire() } - private[receiver] def updateRate(newRate: Int): Unit = + /** + * Return the current rate limit. If no limit has been set so far, it returns {{{Long.MaxValue}}}. + */ + def getCurrentLimit: Long = + rateLimiter.getRate.toLong + + private[receiver] def updateRate(newRate: Long): Unit = if (newRate > 0) { - try { - if (maxRateLimit > 0) { - currentRateLimit.set(newRate.min(maxRateLimit)) - } - else { - currentRateLimit.set(newRate) - } - } finally { - rateLimiter.setRate(currentRateLimit.get()) + if (maxRateLimit > 0) { + rateLimiter.setRate(newRate.min(maxRateLimit)) + } else { + rateLimiter.setRate(newRate) } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index c3078cd4ad35f..c8ccfce3902c0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -271,7 +271,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable } /** Get the attached executor. */ - private[streaming] def executor = { + private[streaming] def executor: ReceiverSupervisor = { assert(executor_ != null, "Executor has not been attached to this receiver") executor_ } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index 944d893b9bbf7..18a5bd7519fef 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -59,7 +59,7 @@ private[streaming] abstract class ReceiverSupervisor( private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000) /** The current maximum rate limit for this receiver. */ - private[streaming] def getCurrentRateLimit: Option[Int] = None + private[streaming] def getCurrentRateLimit: Option[Long] = None /** Exception associated with the stopping of the receiver */ @volatile protected var stoppingError: Throwable = null diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index edb0fc3718fc7..0ada69b6e1aa1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -78,7 +78,8 @@ private[streaming] class ReceiverSupervisorImpl( logDebug("Received delete old batch signal") cleanupOldBlocks(threshTime) case UpdateRateLimit(eps) => - blockGenerator.updateRate(eps.toInt) + logInfo(s"Received a new rate limit: $eps.") + blockGenerator.updateRate(eps) } }) @@ -100,8 +101,8 @@ private[streaming] class ReceiverSupervisorImpl( } }, streamId, env.conf) - override private[streaming] def getCurrentRateLimit: Option[Int] = - Some(blockGenerator.currentRateLimit.get) + override private[streaming] def getCurrentRateLimit: Option[Long] = + Some(blockGenerator.getCurrentLimit) /** Push a single record of received data into block generator. */ def pushSingle(data: Any) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 604d1a0dae289..af37b4876ffc2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -180,7 +180,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false logError(s"Deregistered receiver for stream $streamId: $messageWithError") } - /** Update a receiver's maximum rate from an estimator's update */ + /** Update a receiver's maximum ingestion rate */ def sendRateUpdate(streamUID: Int, newRate: Long): Unit = { for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint)) eP.send(UpdateRateLimit(newRate)) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index d0ac371db9aad..0d58a7b54412f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -537,19 +537,4 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { verifyOutput[W](output, expectedOutput, useSet) } } - - /** - * Wait until `cond` becomes true, or timeout ms have passed. This method checks the condition - * every 100ms, so it won't wait more than 100ms more than necessary. - * - * @param cond A boolean that should become `true` - * @param timemout How many millis to wait before giving up - */ - def waitUntil(cond: => Boolean, timeout: Int): Unit = { - val start = System.currentTimeMillis() - val end = start + timeout - while ((System.currentTimeMillis() < end) && !cond) { - Thread.sleep(100) - } - } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala index 904c7773c5f2c..c6330eb3673fb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala @@ -27,21 +27,20 @@ class RateLimiterSuite extends SparkFunSuite { val conf = new SparkConf() val rateLimiter = new RateLimiter(conf){} rateLimiter.updateRate(105) - assert(rateLimiter.currentRateLimit.get == 105) + assert(rateLimiter.getCurrentLimit == 105) } test("rate limiter updates when below maxRate") { val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "110") val rateLimiter = new RateLimiter(conf){} rateLimiter.updateRate(105) - assert(rateLimiter.currentRateLimit.get == 105) + assert(rateLimiter.getCurrentLimit == 105) } test("rate limiter stays below maxRate despite large updates") { val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "100") val rateLimiter = new RateLimiter(conf){} rateLimiter.updateRate(105) - assert(rateLimiter.currentRateLimit.get == 100) + assert(rateLimiter.getCurrentLimit === 100) } - } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 9da851b5e6c1e..41d92fb5db32f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.streaming.scheduler +import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.Timeouts +import org.scalatest.time.SpanSugar._ import org.apache.spark.streaming._ import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel @@ -77,15 +80,18 @@ class ReceiverTrackerSuite extends TestSuiteBase { } test("Receiver tracker - propagates rate limit") { - val newRateLimit = 100 + val newRateLimit = 100L val ids = new TestReceiverInputDStream(ssc) val tracker = new ReceiverTracker(ssc) tracker.start() - waitUntil(TestDummyReceiver.started, 5000) + eventually(timeout(5 seconds)) { + assert(TestDummyReceiver.started) + } tracker.sendRateUpdate(ids.id, newRateLimit) // this is an async message, we need to wait a bit for it to be processed - waitUntil(ids.getRateLimit.get == newRateLimit, 1000) - assert(ids.getRateLimit.get === newRateLimit) + eventually(timeout(3 seconds)) { + assert(ids.getCurrentRateLimit.get === newRateLimit) + } } } @@ -95,8 +101,9 @@ private class TestReceiverInputDStream(@transient ssc_ : StreamingContext) override def getReceiver(): DummyReceiver = TestDummyReceiver - def getRateLimit: Option[Int] = + def getCurrentRateLimit: Option[Long] = { TestDummyReceiver.executor.getCurrentRateLimit + } } /** From 0c51959c9315f63bc80a7ff5b716f48f907b1152 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Mon, 20 Jul 2015 16:40:01 +0200 Subject: [PATCH 5/8] =?UTF-8?q?Added=20a=20few=20tests=20that=20measure=20?= =?UTF-8?q?the=20receiver=E2=80=99s=20rate.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As I mentioned before, I don’t think this is a great idea: - such tests are flaky (original test in ReceiverSuite was ignored for that reason) - Guava’s code has its own test suite, so we can assume it implements `setRate` correctly I noticed one flaky failure in about 10 runs on my machine (receiver got 1 message less than the lower bound, which is within 5% of the nominal rate). --- .../spark/streaming/ReceiverSuite.scala | 96 ++++------------ .../streaming/receiver/RateLimiterSuite.scala | 108 ++++++++++++++++++ .../scheduler/ReceiverTrackerSuite.scala | 1 - 3 files changed, 130 insertions(+), 75 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 5d7127627eea5..c096a251374b6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -155,63 +155,6 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { assert(recordedData.toSet === generatedData.toSet) } - ignore("block generator throttling") { - val blockGeneratorListener = new FakeBlockGeneratorListener - val blockIntervalMs = 100 - val maxRate = 1001 - val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms"). - set("spark.streaming.receiver.maxRate", maxRate.toString) - val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) - val expectedBlocks = 20 - val waitTime = expectedBlocks * blockIntervalMs - val expectedMessages = maxRate * waitTime / 1000 - val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000 - val generatedData = new ArrayBuffer[Int] - - // Generate blocks - val startTime = System.currentTimeMillis() - blockGenerator.start() - var count = 0 - while(System.currentTimeMillis - startTime < waitTime) { - blockGenerator.addData(count) - generatedData += count - count += 1 - } - blockGenerator.stop() - - val recordedBlocks = blockGeneratorListener.arrayBuffers - val recordedData = recordedBlocks.flatten - assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received") - assert(recordedData.toSet === generatedData.toSet, "Received data not same") - - // recordedData size should be close to the expected rate; use an error margin proportional to - // the value, so that rate changes don't cause a brittle test - val minExpectedMessages = expectedMessages - 0.05 * expectedMessages - val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages - val numMessages = recordedData.size - assert( - numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, - s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" - ) - - // XXX Checking every block would require an even distribution of messages across blocks, - // which throttling code does not control. Therefore, test against the average. - val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock - val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock - val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") - - // the first and last block may be incomplete, so we slice them out - val validBlocks = recordedBlocks.drop(1).dropRight(1) - val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size - - assert( - averageBlockSize >= minExpectedMessagesPerBlock && - averageBlockSize <= maxExpectedMessagesPerBlock, - s"# records in received blocks = [$receivedBlockSizes], not between " + - s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average" - ) - } - /** * Test whether write ahead logs are generated by received, * and automatically cleaned up. The clean up must be aware of the @@ -347,28 +290,33 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { errors += throwable } } +} - /** - * An implementation of BlockGeneratorListener that is used to test the BlockGenerator. - */ - class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener { - // buffer of data received as ArrayBuffers - val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] - val errors = new ArrayBuffer[Throwable] +/** + * An implementation of BlockGeneratorListener that is used to test the BlockGenerator. + */ +class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener { + // buffer of data received as ArrayBuffers + val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] + val errors = new ArrayBuffer[Throwable] - def onAddData(data: Any, metadata: Any) { } + def onAddData(data: Any, metadata: Any) {} - def onGenerateBlock(blockId: StreamBlockId) { } + def onGenerateBlock(blockId: StreamBlockId) {} - def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { - val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int]) - arrayBuffers += bufferOfInts - Thread.sleep(0) - } + def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { + val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int]) + arrayBuffers += bufferOfInts + Thread.sleep(0) + } - def onError(message: String, throwable: Throwable) { - errors += throwable - } + def onError(message: String, throwable: Throwable) { + errors += throwable + } + + def reset(): Unit = { + arrayBuffers.clear() + errors.clear() } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala index c6330eb3673fb..e58baed5f205a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala @@ -17,8 +17,12 @@ package org.apache.spark.streaming.receiver +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.SparkConf import org.apache.spark.SparkFunSuite +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.FakeBlockGeneratorListener /** Testsuite for testing the network receiver behavior */ class RateLimiterSuite extends SparkFunSuite { @@ -43,4 +47,108 @@ class RateLimiterSuite extends SparkFunSuite { rateLimiter.updateRate(105) assert(rateLimiter.getCurrentLimit === 100) } + + def setupGenerator(blockInterval: Int): (BlockGenerator, FakeBlockGeneratorListener) = { + val blockGeneratorListener = new FakeBlockGeneratorListener + val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockInterval}ms") + val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) + (blockGenerator, blockGeneratorListener) + } + + test("throttling block generator") { + val blockIntervalMs = 100 + val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs) + val maxRate = 1000 + blockGenerator.updateRate(maxRate) + blockGenerator.start() + throttlingTest(maxRate, blockGenerator, blockGeneratorListener, blockIntervalMs) + blockGenerator.stop() + } + + test("throttling block generator changes rate up") { + val blockIntervalMs = 100 + val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs) + val maxRate1 = 1000 + blockGenerator.start() + blockGenerator.updateRate(maxRate1) + throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs) + + blockGeneratorListener.reset() + val maxRate2 = 5000 + blockGenerator.updateRate(maxRate2) + throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs) + blockGenerator.stop() + } + + test("throttling block generator changes rate up and down") { + val blockIntervalMs = 100 + val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs) + val maxRate1 = 1000 + blockGenerator.updateRate(maxRate1) + blockGenerator.start() + throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs) + + blockGeneratorListener.reset() + val maxRate2 = 5000 + blockGenerator.updateRate(maxRate2) + throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs) + + blockGeneratorListener.reset() + val maxRate3 = 1000 + blockGenerator.updateRate(maxRate3) + throttlingTest(maxRate3, blockGenerator, blockGeneratorListener, blockIntervalMs) + blockGenerator.stop() + } + + def throttlingTest( + maxRate: Long, + blockGenerator: BlockGenerator, + blockGeneratorListener: FakeBlockGeneratorListener, + blockIntervalMs: Int) { + val expectedBlocks = 20 + val waitTime = expectedBlocks * blockIntervalMs + val expectedMessages = maxRate * waitTime / 1000 + val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000 + val generatedData = new ArrayBuffer[Int] + + // Generate blocks + val startTime = System.currentTimeMillis() + var count = 0 + while(System.currentTimeMillis - startTime < waitTime) { + blockGenerator.addData(count) + generatedData += count + count += 1 + } + + val recordedBlocks = blockGeneratorListener.arrayBuffers + val recordedData = recordedBlocks.flatten + assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received") + + // recordedData size should be close to the expected rate; use an error margin proportional to + // the value, so that rate changes don't cause a brittle test + val minExpectedMessages = expectedMessages - 0.05 * expectedMessages + val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages + val numMessages = recordedData.size + assert( + numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, + s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" + ) + + // XXX Checking every block would require an even distribution of messages across blocks, + // which throttling code does not control. Therefore, test against the average. + val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock + val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock + val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") + + // the first and last block may be incomplete, so we slice them out + val validBlocks = recordedBlocks.drop(1).dropRight(1) + val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size + + assert( + averageBlockSize >= minExpectedMessagesPerBlock && + averageBlockSize <= maxExpectedMessagesPerBlock, + s"# records in received blocks = [$receivedBlockSizes], not between " + + s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average" + ) + } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 41d92fb5db32f..46d7bc479b5ff 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver._ import org.apache.spark.util.Utils import org.apache.spark.streaming.dstream.InputDStream -import scala.reflect.ClassTag import org.apache.spark.streaming.dstream.ReceiverInputDStream /** Testsuite for receiver scheduling */ From 210f495fff34f25caaba41a8db720c1e3a63fa95 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Tue, 21 Jul 2015 14:29:04 +0200 Subject: [PATCH 6/8] =?UTF-8?q?Revert=20"Added=20a=20few=20tests=20that=20?= =?UTF-8?q?measure=20the=20receiver=E2=80=99s=20rate."?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 0c51959c9315f63bc80a7ff5b716f48f907b1152. --- .../spark/streaming/ReceiverSuite.scala | 96 ++++++++++++---- .../streaming/receiver/RateLimiterSuite.scala | 108 ------------------ .../scheduler/ReceiverTrackerSuite.scala | 1 + 3 files changed, 75 insertions(+), 130 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index c096a251374b6..5d7127627eea5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -155,6 +155,63 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { assert(recordedData.toSet === generatedData.toSet) } + ignore("block generator throttling") { + val blockGeneratorListener = new FakeBlockGeneratorListener + val blockIntervalMs = 100 + val maxRate = 1001 + val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms"). + set("spark.streaming.receiver.maxRate", maxRate.toString) + val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) + val expectedBlocks = 20 + val waitTime = expectedBlocks * blockIntervalMs + val expectedMessages = maxRate * waitTime / 1000 + val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000 + val generatedData = new ArrayBuffer[Int] + + // Generate blocks + val startTime = System.currentTimeMillis() + blockGenerator.start() + var count = 0 + while(System.currentTimeMillis - startTime < waitTime) { + blockGenerator.addData(count) + generatedData += count + count += 1 + } + blockGenerator.stop() + + val recordedBlocks = blockGeneratorListener.arrayBuffers + val recordedData = recordedBlocks.flatten + assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received") + assert(recordedData.toSet === generatedData.toSet, "Received data not same") + + // recordedData size should be close to the expected rate; use an error margin proportional to + // the value, so that rate changes don't cause a brittle test + val minExpectedMessages = expectedMessages - 0.05 * expectedMessages + val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages + val numMessages = recordedData.size + assert( + numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, + s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" + ) + + // XXX Checking every block would require an even distribution of messages across blocks, + // which throttling code does not control. Therefore, test against the average. + val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock + val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock + val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") + + // the first and last block may be incomplete, so we slice them out + val validBlocks = recordedBlocks.drop(1).dropRight(1) + val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size + + assert( + averageBlockSize >= minExpectedMessagesPerBlock && + averageBlockSize <= maxExpectedMessagesPerBlock, + s"# records in received blocks = [$receivedBlockSizes], not between " + + s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average" + ) + } + /** * Test whether write ahead logs are generated by received, * and automatically cleaned up. The clean up must be aware of the @@ -290,33 +347,28 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { errors += throwable } } -} -/** - * An implementation of BlockGeneratorListener that is used to test the BlockGenerator. - */ -class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener { - // buffer of data received as ArrayBuffers - val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] - val errors = new ArrayBuffer[Throwable] - - def onAddData(data: Any, metadata: Any) {} + /** + * An implementation of BlockGeneratorListener that is used to test the BlockGenerator. + */ + class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener { + // buffer of data received as ArrayBuffers + val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] + val errors = new ArrayBuffer[Throwable] - def onGenerateBlock(blockId: StreamBlockId) {} + def onAddData(data: Any, metadata: Any) { } - def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { - val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int]) - arrayBuffers += bufferOfInts - Thread.sleep(0) - } + def onGenerateBlock(blockId: StreamBlockId) { } - def onError(message: String, throwable: Throwable) { - errors += throwable - } + def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { + val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int]) + arrayBuffers += bufferOfInts + Thread.sleep(0) + } - def reset(): Unit = { - arrayBuffers.clear() - errors.clear() + def onError(message: String, throwable: Throwable) { + errors += throwable + } } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala index e58baed5f205a..c6330eb3673fb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala @@ -17,12 +17,8 @@ package org.apache.spark.streaming.receiver -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.SparkConf import org.apache.spark.SparkFunSuite -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.FakeBlockGeneratorListener /** Testsuite for testing the network receiver behavior */ class RateLimiterSuite extends SparkFunSuite { @@ -47,108 +43,4 @@ class RateLimiterSuite extends SparkFunSuite { rateLimiter.updateRate(105) assert(rateLimiter.getCurrentLimit === 100) } - - def setupGenerator(blockInterval: Int): (BlockGenerator, FakeBlockGeneratorListener) = { - val blockGeneratorListener = new FakeBlockGeneratorListener - val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockInterval}ms") - val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) - (blockGenerator, blockGeneratorListener) - } - - test("throttling block generator") { - val blockIntervalMs = 100 - val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs) - val maxRate = 1000 - blockGenerator.updateRate(maxRate) - blockGenerator.start() - throttlingTest(maxRate, blockGenerator, blockGeneratorListener, blockIntervalMs) - blockGenerator.stop() - } - - test("throttling block generator changes rate up") { - val blockIntervalMs = 100 - val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs) - val maxRate1 = 1000 - blockGenerator.start() - blockGenerator.updateRate(maxRate1) - throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs) - - blockGeneratorListener.reset() - val maxRate2 = 5000 - blockGenerator.updateRate(maxRate2) - throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs) - blockGenerator.stop() - } - - test("throttling block generator changes rate up and down") { - val blockIntervalMs = 100 - val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs) - val maxRate1 = 1000 - blockGenerator.updateRate(maxRate1) - blockGenerator.start() - throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs) - - blockGeneratorListener.reset() - val maxRate2 = 5000 - blockGenerator.updateRate(maxRate2) - throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs) - - blockGeneratorListener.reset() - val maxRate3 = 1000 - blockGenerator.updateRate(maxRate3) - throttlingTest(maxRate3, blockGenerator, blockGeneratorListener, blockIntervalMs) - blockGenerator.stop() - } - - def throttlingTest( - maxRate: Long, - blockGenerator: BlockGenerator, - blockGeneratorListener: FakeBlockGeneratorListener, - blockIntervalMs: Int) { - val expectedBlocks = 20 - val waitTime = expectedBlocks * blockIntervalMs - val expectedMessages = maxRate * waitTime / 1000 - val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000 - val generatedData = new ArrayBuffer[Int] - - // Generate blocks - val startTime = System.currentTimeMillis() - var count = 0 - while(System.currentTimeMillis - startTime < waitTime) { - blockGenerator.addData(count) - generatedData += count - count += 1 - } - - val recordedBlocks = blockGeneratorListener.arrayBuffers - val recordedData = recordedBlocks.flatten - assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received") - - // recordedData size should be close to the expected rate; use an error margin proportional to - // the value, so that rate changes don't cause a brittle test - val minExpectedMessages = expectedMessages - 0.05 * expectedMessages - val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages - val numMessages = recordedData.size - assert( - numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, - s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" - ) - - // XXX Checking every block would require an even distribution of messages across blocks, - // which throttling code does not control. Therefore, test against the average. - val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock - val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock - val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") - - // the first and last block may be incomplete, so we slice them out - val validBlocks = recordedBlocks.drop(1).dropRight(1) - val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size - - assert( - averageBlockSize >= minExpectedMessagesPerBlock && - averageBlockSize <= maxExpectedMessagesPerBlock, - s"# records in received blocks = [$receivedBlockSizes], not between " + - s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average" - ) - } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 46d7bc479b5ff..41d92fb5db32f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver._ import org.apache.spark.util.Utils import org.apache.spark.streaming.dstream.InputDStream +import scala.reflect.ClassTag import org.apache.spark.streaming.dstream.ReceiverInputDStream /** Testsuite for receiver scheduling */ From 162d9e598040b5b2bad36fa0f7139a99df95e79d Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Tue, 21 Jul 2015 15:06:13 +0200 Subject: [PATCH 7/8] Use Reflection for accessing truly private `executor` method and use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior). --- .../streaming/receiver/RateLimiter.scala | 2 -- .../spark/streaming/receiver/Receiver.scala | 2 +- .../streaming/scheduler/ReceiverTracker.scala | 3 +- .../scheduler/ReceiverTrackerSuite.scala | 29 +++++++++++++++---- 4 files changed, 27 insertions(+), 9 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 0f15ddc7288ab..23a676e97c2c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -17,8 +17,6 @@ package org.apache.spark.streaming.receiver -import java.util.concurrent.atomic.AtomicInteger - import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter} import org.apache.spark.{Logging, SparkConf} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index c8ccfce3902c0..7504fa44d9fae 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -271,7 +271,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable } /** Get the attached executor. */ - private[streaming] def executor: ReceiverSupervisor = { + private def executor: ReceiverSupervisor = { assert(executor_ != null, "Executor has not been attached to this receiver") executor_ } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index af37b4876ffc2..b0469ebccecc2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -182,8 +182,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Update a receiver's maximum ingestion rate */ def sendRateUpdate(streamUID: Int, newRate: Long): Unit = { - for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint)) + for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint)) { eP.send(UpdateRateLimit(newRate)) + } } /** Add new blocks for the given stream */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 41d92fb5db32f..2f0a13e060be0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -80,12 +80,27 @@ class ReceiverTrackerSuite extends TestSuiteBase { } test("Receiver tracker - propagates rate limit") { + object streamingListener extends StreamingListener { + @volatile + var started = false + + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { + started = true + } + } + + ssc.addStreamingListener(streamingListener) + ssc.scheduler.listenerBus.start(ssc.sc) + val newRateLimit = 100L val ids = new TestReceiverInputDStream(ssc) val tracker = new ReceiverTracker(ssc) tracker.start() + + // we wait until the Receiver has registered with the tracker, + // otherwise our rate update is lost eventually(timeout(5 seconds)) { - assert(TestDummyReceiver.started) + assert(streamingListener.started) } tracker.sendRateUpdate(ids.id, newRateLimit) // this is an async message, we need to wait a bit for it to be processed @@ -102,7 +117,14 @@ private class TestReceiverInputDStream(@transient ssc_ : StreamingContext) override def getReceiver(): DummyReceiver = TestDummyReceiver def getCurrentRateLimit: Option[Long] = { - TestDummyReceiver.executor.getCurrentRateLimit + invokeExecutorMethod.getCurrentRateLimit + } + + private def invokeExecutorMethod: ReceiverSupervisor = { + val c = classOf[Receiver[_]] + val ex = c.getDeclaredMethod("executor") + ex.setAccessible(true) + ex.invoke(TestDummyReceiver).asInstanceOf[ReceiverSupervisor] } } @@ -118,10 +140,7 @@ private object TestDummyReceiver extends DummyReceiver private class DummyReceiver(host: Option[String] = None) extends Receiver[Int](StorageLevel.MEMORY_ONLY) { - var started = false - def onStart() { - started = true } def onStop() { From 8941cf91b03aa7835a78bc756bee1f32cb7bb1d8 Mon Sep 17 00:00:00 2001 From: Iulian Dragos Date: Wed, 22 Jul 2015 16:45:57 +0200 Subject: [PATCH 8/8] Renames and other nitpicks. --- .../streaming/receiver/RateLimiter.scala | 6 +++++ .../scheduler/ReceiverTrackerSuite.scala | 26 ++++++++++--------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 23a676e97c2c7..f663def4c0511 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -48,6 +48,12 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { def getCurrentLimit: Long = rateLimiter.getRate.toLong + /** + * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by + * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that. + * + * @param newRate A new rate in events per second. It has no effect if it's 0 or negative. + */ private[receiver] def updateRate(newRate: Long): Unit = if (newRate > 0) { if (maxRateLimit > 0) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 2f0a13e060be0..aadb7231757b8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -80,7 +80,7 @@ class ReceiverTrackerSuite extends TestSuiteBase { } test("Receiver tracker - propagates rate limit") { - object streamingListener extends StreamingListener { + object ReceiverStartedWaiter extends StreamingListener { @volatile var started = false @@ -89,32 +89,32 @@ class ReceiverTrackerSuite extends TestSuiteBase { } } - ssc.addStreamingListener(streamingListener) + ssc.addStreamingListener(ReceiverStartedWaiter) ssc.scheduler.listenerBus.start(ssc.sc) val newRateLimit = 100L - val ids = new TestReceiverInputDStream(ssc) + val inputDStream = new RateLimitInputDStream(ssc) val tracker = new ReceiverTracker(ssc) tracker.start() // we wait until the Receiver has registered with the tracker, // otherwise our rate update is lost eventually(timeout(5 seconds)) { - assert(streamingListener.started) + assert(ReceiverStartedWaiter.started) } - tracker.sendRateUpdate(ids.id, newRateLimit) + tracker.sendRateUpdate(inputDStream.id, newRateLimit) // this is an async message, we need to wait a bit for it to be processed eventually(timeout(3 seconds)) { - assert(ids.getCurrentRateLimit.get === newRateLimit) + assert(inputDStream.getCurrentRateLimit.get === newRateLimit) } } } /** An input DStream with a hard-coded receiver that gives access to internals for testing. */ -private class TestReceiverInputDStream(@transient ssc_ : StreamingContext) +private class RateLimitInputDStream(@transient ssc_ : StreamingContext) extends ReceiverInputDStream[Int](ssc_) { - override def getReceiver(): DummyReceiver = TestDummyReceiver + override def getReceiver(): DummyReceiver = SingletonDummyReceiver def getCurrentRateLimit: Option[Long] = { invokeExecutorMethod.getCurrentRateLimit @@ -124,15 +124,17 @@ private class TestReceiverInputDStream(@transient ssc_ : StreamingContext) val c = classOf[Receiver[_]] val ex = c.getDeclaredMethod("executor") ex.setAccessible(true) - ex.invoke(TestDummyReceiver).asInstanceOf[ReceiverSupervisor] + ex.invoke(SingletonDummyReceiver).asInstanceOf[ReceiverSupervisor] } } /** - * We need the receiver to be an object, otherwise serialization will create another one - * and we won't be able to read its rate limit. + * A Receiver as an object so we can read its rate limit. + * + * @note It's necessary to be a top-level object, or else serialization would create another + * one on the executor side and we won't be able to read its rate limit. */ -private object TestDummyReceiver extends DummyReceiver +private object SingletonDummyReceiver extends DummyReceiver /** * Dummy receiver implementation