Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Adds measure & transmission of back-pressure from Job queue to Receivers #10

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener, LatestSpeedListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.{CallSite, Utils}

Expand Down Expand Up @@ -185,6 +185,8 @@ class StreamingContext private[streaming] (

private[streaming] val progressListener = new StreamingJobProgressListener(this)

private[streaming] val speedListener = new LatestSpeedListener(this.graph.batchDuration)

private[streaming] val uiTab: Option[StreamingTab] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new StreamingTab(this))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.receiver

import scala.collection.mutable.ArrayBuffer

/**
* This trait provides a strategy to deal with a large amount of data seen
* at a Receiver, possibly ensuing an exhaustion of resources.
* See SPARK-7398
* Any long blocking operation in this class will hurt the throughput.
*/
trait CongestionStrategy {

/**
* Called on every batch interval with the estimated maximum number of
* elements per block that can been processed in a batch interval,
* based on the processing speed observed over the last batch.
*/
def onBlockBoundUpdate(bound: Int): Unit

/**
* Given data buffers intended for a block, and for the following block
* mutates those buffers to an amount appropriate with respect to the
* back-pressure information provided through `onBlockBoundUpdate`.
*/
def restrictCurrentBuffer(currentBuffer: ArrayBuffer[Any], nextBuffer: ArrayBuffer[Any]): Unit

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.receiver

import scala.collection.mutable.ArrayBuffer

/**
* This class provides a congestion strategy that ignores
* any back-pressure information.
* @see CongestionStrategy
*/
class IgnoreCongestionStrategy extends CongestionStrategy {

override def onBlockBoundUpdate(bound: Int) {}

override def restrictCurrentBuffer(currentBuffer: ArrayBuffer[Any],
nextBuffer: ArrayBuffer[Any]): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ import org.apache.spark.streaming.Time
private[streaming] sealed trait ReceiverMessage extends Serializable
private[streaming] object StopReceiver extends ReceiverMessage
private[streaming] case class CleanupOldBlocks(threshTime: Time) extends ReceiverMessage
private[streaming] case class BatchProcessingSpeedInfo(batchTime: Time, elementsPerBlock: Int)
extends ReceiverMessage

Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ private[streaming] class ReceiverSupervisorImpl(
}
}

protected val congestionStrategy: CongestionStrategy = {
val strategyNameOption = env.conf.getOption("spark.streaming.receiver.congestionStrategy")

strategyNameOption.map {
case "ignore" => new IgnoreCongestionStrategy()
case _ => new IgnoreCongestionStrategy()
}.getOrElse {
new IgnoreCongestionStrategy()
}
}

/** Remote RpcEndpointRef for the ReceiverTracker */
private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv)
Expand All @@ -77,6 +87,9 @@ private[streaming] class ReceiverSupervisorImpl(
case CleanupOldBlocks(threshTime) =>
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
case BatchProcessingSpeedInfo(batchTime, elemsPerBlock) =>
congestionStrategy.onBlockBoundUpdate(elemsPerBlock)
logDebug(s"Received update for $streamId at ${batchTime.milliseconds} : $elemsPerBlock")
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
eventLoop.start()

listenerBus.start(ssc.sparkContext)
ssc.addStreamingListener(ssc.speedListener)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()
Expand Down Expand Up @@ -169,6 +170,9 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
jobSet.processingDelay / 1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
for { time <- ssc.speedListener.latestTime
speeds <- ssc.speedListener.streamIdToElemsPerBatch}
receiverTracker.refreshLatestSpeeds(time, speeds)
}
case Failure(e) =>
reportError("Error running job " + job, e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException
import org.apache.spark.rpc._
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl, StopReceiver}
import org.apache.spark.streaming.receiver.BatchProcessingSpeedInfo

/**
* Messages used by the NetworkReceiver and the ReceiverTracker to communicate
Expand Down Expand Up @@ -134,6 +135,19 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
}

/**
* Updates a receiver on the processing speed of its stream, counted in elements
* processed per batch interval.
*/
def refreshLatestSpeeds(batchTime: Time, streamIdToElemsPerBatch: Map[Int, Long]){
val blockIntervalMs = ssc.conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
val ratio = blockIntervalMs.toFloat / ssc.graph.batchDuration.milliseconds
for {(sId, speed) <- streamIdToElemsPerBatch
info <- receiverInfo.get(sId)
eP <- Option(info.endpoint)}
if (speed > 0) eP.send(BatchProcessingSpeedInfo(batchTime, math.round( speed * ratio )))
}

/** Register a receiver */
private def registerReceiver(
streamId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package org.apache.spark.streaming.scheduler

import scala.collection.mutable.Queue

import org.apache.spark.util.Distribution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Time

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -77,6 +78,36 @@ trait StreamingListener {
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
}

/**
* :: DeveloperApi ::
* A StreamingListener that estimates the number of elements that the previous
* batch would have processed, for each stream, if the duration of computation
* was one batchDuration.
* @param batchDuration The nominal (yardstick) duration for computation.
*/
@DeveloperApi
class LatestSpeedListener(batchDuration: Duration) extends StreamingListener {
var latestTime : Option[Time] = None
var streamIdToElemsPerBatch: Option[Map[Int, Long]] = None

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted){
this.synchronized{
val newTime = batchCompleted.batchInfo.batchTime
val delay = batchCompleted.batchInfo.processingDelay
if (latestTime.isEmpty || newTime > latestTime.get && delay.isDefined){
latestTime = Some(newTime)
val ratio = delay.get.toDouble / batchDuration.milliseconds
val elements = batchCompleted.batchInfo.streamIdToNumRecords
streamIdToElemsPerBatch = Some(elements.mapValues{ x => math.round(x / ratio) })
}
}
}

def getSpeedForStreamId(streamId: Int): Option[Long] = {
streamIdToElemsPerBatch.flatMap(_.get(streamId))
}

}

/**
* :: DeveloperApi ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ package org.apache.spark.streaming
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler._

import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
import org.apache.spark.Logging



class StreamingListenerSuite extends TestSuiteBase with Matchers {

val input = (1 to 4).map(Seq(_)).toSeq
Expand Down Expand Up @@ -131,6 +131,32 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}
}

ignore("latest speed reporting") {
val midInput = (1 to 40).map(Seq(_)).toSeq
val midSsc = setupStreams(midInput, operation)
val midLatestSpeed = new LatestSpeedListener(batchDuration)
midSsc.addStreamingListener(midLatestSpeed)
runStreams(midSsc, input.size, input.size)

val speeds = midLatestSpeed.streamIdToElemsPerBatch
speeds should not be None
speeds.get should have size 1
val midSp = midLatestSpeed.getSpeedForStreamId(0)

// between two batch sizes that are both below the system's limits,
// the estimate of elements processed per batch should be comparable
val bigInput = (1 to 400).map(Seq(_)).toSeq
val bigSsc = setupStreams(bigInput, operation)
val bigLatestSpeed = new LatestSpeedListener(batchDuration)
bigSsc.addStreamingListener(bigLatestSpeed)
runStreams(bigSsc, bigInput.size, bigInput.size)

val bigSp = bigLatestSpeed.getSpeedForStreamId(0)
bigSp should not be empty
midSp should not be empty
bigSp.get should (be >= (midSp.get / 2) and be <= (midSp.get * 2))
}

/** Check if a sequence of numbers is in increasing order */
def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
for (i <- 1 until seq.size) {
Expand Down