Skip to content

Commit 3be4b7a

Browse files
committed
Use InputInfo
1 parent b50fa32 commit 3be4b7a

File tree

7 files changed

+74
-56
lines changed

7 files changed

+74
-56
lines changed

core/src/main/resources/org/apache/spark/ui/static/streaming-page.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,8 @@ $(function() {
266266
}
267267
}
268268

269-
if (getParameterFromURL("show-receivers-detail") == "true") {
270-
// Show the details for all receivers
269+
if (getParameterFromURL("show-streams-detail") == "true") {
270+
// Show the details for all InputDStream
271271
$('#inputs-table').toggle('collapsed');
272272
$('#triangle').html('▼');
273273
}

streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
110110
.toArray
111111
}
112112

113+
def getInputStreamName(streamId: Int): Option[String] = synchronized {
114+
inputStreams.find(_.id == streamId).map(_.name)
115+
}
116+
113117
def generateJobs(time: Time): Seq[Job] = {
114118
logDebug("Generating jobs for time " + time)
115119
val jobs = this.synchronized {

streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
4444
/** This is an unique identifier for the input stream. */
4545
val id = ssc.getNewInputStreamId()
4646

47+
/**
48+
* The name of this InputDStream. By default, it's the class name with its id.
49+
*/
50+
def name: String = s"${getClass.getSimpleName}-$id"
51+
4752
/**
4853
* Checks whether the 'time' is valid wrt slideDuration for generating RDD.
4954
* Additionally it also ensures valid times are in strictly increasing order.

streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobI
2626

2727
private[ui] case class BatchUIData(
2828
val batchTime: Time,
29-
val receiverNumRecords: Map[Int, Long],
29+
val streamIdToNumRecords: Map[Int, Long],
3030
val submissionTime: Long,
3131
val processingStartTime: Option[Long],
3232
val processingEndTime: Option[Long],
@@ -58,7 +58,7 @@ private[ui] case class BatchUIData(
5858
/**
5959
* The number of recorders received by the receivers in this batch.
6060
*/
61-
def numRecords: Long = receiverNumRecords.map(_._2).sum
61+
def numRecords: Long = streamIdToNumRecords.values.sum
6262
}
6363

6464
private[ui] object BatchUIData {

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,9 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
144144
}
145145
}
146146

147-
def numReceivers: Int = ssc.graph.getReceiverInputStreams().size
147+
def numReceivers: Int = synchronized {
148+
receiverInfos.size
149+
}
148150

149151
def numTotalCompletedBatches: Long = synchronized {
150152
totalCompletedBatches
@@ -174,35 +176,41 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
174176
completedBatchUIData.toSeq
175177
}
176178

177-
def allReceivers: Seq[Int] = synchronized {
178-
receiverInfos.keys.toSeq
179+
def streamName(streamId: Int): Option[String] = {
180+
ssc.graph.getInputStreamName(streamId)
179181
}
180182

181183
/**
182-
* Return all of the event rates for each receiver in each batch.
184+
* Return all InputDStream Ids
185+
*/
186+
def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)
187+
188+
/**
189+
* Return all of the event rates for each InputDStream in each batch.
183190
*/
184191
def receivedEventRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
185-
val latestBatches = retainedBatches.map { batchUIData =>
186-
(batchUIData.batchTime.milliseconds, batchUIData.receiverNumRecords)
192+
val _retainedBatches = retainedBatches
193+
val latestBatches = _retainedBatches.map { batchUIData =>
194+
(batchUIData.batchTime.milliseconds, batchUIData.streamIdToNumRecords)
187195
}
188-
(0 until numReceivers).map { receiverId =>
196+
streamIds.map { streamId =>
189197
val eventRates = latestBatches.map {
190-
case (batchTime, receiverNumRecords) =>
191-
val numRecords = receiverNumRecords.getOrElse(receiverId, 0L)
198+
case (batchTime, streamIdToNumRecords) =>
199+
val numRecords = streamIdToNumRecords.getOrElse(streamId, 0L)
192200
(batchTime, numRecords * 1000.0 / batchDuration)
193201
}
194-
(receiverId, eventRates)
202+
(streamId, eventRates)
195203
}.toMap
196204
}
197205

198206
def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
199-
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receiverNumRecords)
207+
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.streamIdToNumRecords)
200208
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
201-
(0 until numReceivers).map { receiverId =>
202-
(receiverId, lastReceivedBlockInfo.getOrElse(receiverId, 0L))
209+
streamIds.map { streamId =>
210+
(streamId, lastReceivedBlockInfo.getOrElse(streamId, 0L))
203211
}.toMap
204212
}.getOrElse {
205-
(0 until numReceivers).map(receiverId => (receiverId, 0L)).toMap
213+
streamIds.map(streamId => (streamId, 0L)).toMap
206214
}
207215
}
208216

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
207207
val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min
208208
val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max
209209

210-
val eventRateForAllReceivers = new EventRateUIData(batches.map { batchInfo =>
210+
val eventRateForAllStreams = new EventRateUIData(batches.map { batchInfo =>
211211
(batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration)
212212
})
213213

@@ -231,37 +231,37 @@ private[ui] class StreamingPage(parent: StreamingTab)
231231
val (maxTime, normalizedUnit) = UIUtils.normalizeDuration(_maxTime)
232232
val formattedUnit = UIUtils.shortTimeUnitString(normalizedUnit)
233233

234-
// Use the max input rate for all receivers' graphs to make the Y axis ranges same.
234+
// Use the max input rate for all InputDStreams' graphs to make the Y axis ranges same.
235235
// If it's not an integral number, just use its ceil integral number.
236-
val maxEventRate = eventRateForAllReceivers.max.map(_.ceil.toLong).getOrElse(0L)
236+
val maxEventRate = eventRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L)
237237
val minEventRate = 0L
238238

239-
// JavaScript to show/hide the receiver sub table.
239+
// JavaScript to show/hide the InputDStreams sub table.
240240
val triangleJs =
241241
s"""$$('#inputs-table').toggle('collapsed');
242242
|var status = false;
243243
|if ($$(this).html() == '$BLACK_RIGHT_TRIANGLE_HTML') {
244244
|$$(this).html('$BLACK_DOWN_TRIANGLE_HTML');status = true;}
245245
|else {$$(this).html('$BLACK_RIGHT_TRIANGLE_HTML');status = false;}
246246
|window.history.pushState('',
247-
| document.title, window.location.pathname + '?show-receivers-detail=' + status);"""
247+
| document.title, window.location.pathname + '?show-streams-detail=' + status);"""
248248
.stripMargin.replaceAll("\\n", "") // it must be only one single line
249249

250250
val batchInterval = StreamingPage.convertToTimeUnit(listener.batchDuration, normalizedUnit)
251251

252252
val jsCollector = new JsCollector
253253

254-
val graphUIDataForEventRateOfAllReceivers =
254+
val graphUIDataForEventRateOfAllStreams =
255255
new GraphUIData(
256-
"all-receiver-events-timeline",
257-
"all-receiver-events-histogram",
258-
eventRateForAllReceivers.data,
256+
"all-stream-events-timeline",
257+
"all-stream-events-histogram",
258+
eventRateForAllStreams.data,
259259
minBatchTime,
260260
maxBatchTime,
261261
minEventRate,
262262
maxEventRate,
263263
"events/sec")
264-
graphUIDataForEventRateOfAllReceivers.generateDataJs(jsCollector)
264+
graphUIDataForEventRateOfAllStreams.generateDataJs(jsCollector)
265265

266266
val graphUIDataForSchedulingDelay =
267267
new GraphUIData(
@@ -299,7 +299,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
299299
formattedUnit)
300300
graphUIDataForTotalDelay.generateDataJs(jsCollector)
301301

302-
val hasReceiver = listener.allReceivers.nonEmpty
302+
// It's false before the user registers the first InputDStream
303+
val hasStream = listener.streamIds.nonEmpty
303304

304305
val numCompletedBatches = listener.retainedCompletedBatches.size
305306
val numActiveBatches = batchTimes.length - numCompletedBatches
@@ -317,21 +318,21 @@ private[ui] class StreamingPage(parent: StreamingTab)
317318
<td style="vertical-align: middle;">
318319
<div style="width: 160px;">
319320
<div>
320-
{if (hasReceiver) {
321+
{if (hasStream) {
321322
<span id="triangle" onclick={Unparsed(triangleJs)}>{Unparsed(BLACK_RIGHT_TRIANGLE_HTML)}</span>
322323
}}
323324
<strong>Input Rate</strong>
324325
</div>
325-
<div>Avg: {eventRateForAllReceivers.formattedAvg} events/sec</div>
326+
<div>Avg: {eventRateForAllStreams.formattedAvg} events/sec</div>
326327
</div>
327328
</td>
328-
<td class="timeline">{graphUIDataForEventRateOfAllReceivers.generateTimelineHtml(jsCollector)}</td>
329-
<td class="histogram">{graphUIDataForEventRateOfAllReceivers.generateHistogramHtml(jsCollector)}</td>
329+
<td class="timeline">{graphUIDataForEventRateOfAllStreams.generateTimelineHtml(jsCollector)}</td>
330+
<td class="histogram">{graphUIDataForEventRateOfAllStreams.generateHistogramHtml(jsCollector)}</td>
330331
</tr>
331-
{if (hasReceiver) {
332+
{if (hasStream) {
332333
<tr id="inputs-table" style="display: none;" >
333334
<td colspan="3">
334-
{generateInputReceiversTable(jsCollector, minBatchTime, maxBatchTime, minEventRate, maxEventRate)}
335+
{generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minEventRate, maxEventRate)}
335336
</td>
336337
</tr>
337338
}}
@@ -372,14 +373,14 @@ private[ui] class StreamingPage(parent: StreamingTab)
372373
generateTimeMap(batchTimes) ++ table ++ jsCollector.toHtml
373374
}
374375

375-
private def generateInputReceiversTable(
376+
private def generateInputDStreamsTable(
376377
jsCollector: JsCollector,
377378
minX: Long,
378379
maxX: Long,
379380
minY: Double,
380381
maxY: Double): Seq[Node] = {
381-
val content = listener.allReceivers.map { receiverId =>
382-
generateInputReceiverRow(jsCollector, receiverId, minX, maxX, minY, maxY)
382+
val content = listener.receivedEventRateWithBatchTime.map { case (streamId, eventRates) =>
383+
generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, minY, maxY)
383384
}.foldLeft[Seq[Node]](Nil)(_ ++ _)
384385

385386
// scalastyle:off
@@ -400,33 +401,36 @@ private[ui] class StreamingPage(parent: StreamingTab)
400401
// scalastyle:on
401402
}
402403

403-
private def generateInputReceiverRow(
404+
private def generateInputDStreamRow(
404405
jsCollector: JsCollector,
405-
receiverId: Int,
406+
streamId: Int,
407+
eventRates: Seq[(Long, Double)],
406408
minX: Long,
407409
maxX: Long,
408410
minY: Double,
409411
maxY: Double): Seq[Node] = {
410-
val receiverInfo = listener.receiverInfo(receiverId)
411-
val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId")
412+
// If this is a ReceiverInputDStream, we need to show the receiver info. Or we only need the
413+
// InputDStream name.
414+
val receiverInfo = listener.receiverInfo(streamId)
415+
val receiverName = receiverInfo.map(_.name).
416+
orElse(listener.streamName(streamId)).getOrElse(s"Stream-$streamId")
412417
val receiverActive = receiverInfo.map { info =>
413418
if (info.active) "ACTIVE" else "INACTIVE"
414419
}.getOrElse(emptyCell)
415420
val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
416-
val receiverLastError = listener.receiverInfo(receiverId).map { info =>
421+
val receiverLastError = receiverInfo.map { info =>
417422
val msg = s"${info.lastErrorMessage} - ${info.lastError}"
418423
if (msg.size > 100) msg.take(97) + "..." else msg
419424
}.getOrElse(emptyCell)
420425
val receiverLastErrorTime = receiverInfo.map {
421426
r => if (r.lastErrorTime < 0) "-" else UIUtils.formatDate(r.lastErrorTime)
422427
}.getOrElse(emptyCell)
423-
val receivedRecords =
424-
new EventRateUIData(listener.receivedEventRateWithBatchTime.get(receiverId).getOrElse(Seq()))
428+
val receivedRecords = new EventRateUIData(eventRates)
425429

426430
val graphUIDataForEventRate =
427431
new GraphUIData(
428-
s"receiver-$receiverId-events-timeline",
429-
s"receiver-$receiverId-events-histogram",
432+
s"stream-$streamId-events-timeline",
433+
s"stream-$streamId-events-histogram",
430434
receivedRecords.data,
431435
minX,
432436
maxX,

streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
9494
batchUIData.get.schedulingDelay should be (batchInfoStarted.schedulingDelay)
9595
batchUIData.get.processingDelay should be (batchInfoStarted.processingDelay)
9696
batchUIData.get.totalDelay should be (batchInfoStarted.totalDelay)
97-
batchUIData.get.receiverNumRecords should be (Map(0 -> 300L, 1 -> 300L))
97+
batchUIData.get.streamIdToNumRecords should be (Map(0 -> 300L, 1 -> 300L))
9898
batchUIData.get.numRecords should be(600)
9999
batchUIData.get.outputOpIdSparkJobIdPairs should be
100100
Seq(OutputOpIdAndSparkJobId(0, 0),
@@ -182,7 +182,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
182182
batchUIData.get.schedulingDelay should be (batchInfoSubmitted.schedulingDelay)
183183
batchUIData.get.processingDelay should be (batchInfoSubmitted.processingDelay)
184184
batchUIData.get.totalDelay should be (batchInfoSubmitted.totalDelay)
185-
batchUIData.get.receiverNumRecords should be (Map.empty)
185+
batchUIData.get.streamIdToNumRecords should be (Map.empty)
186186
batchUIData.get.numRecords should be (0)
187187
batchUIData.get.outputOpIdSparkJobIdPairs should be (Seq(OutputOpIdAndSparkJobId(0, 0)))
188188

@@ -211,17 +211,14 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
211211
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
212212

213213
for (_ <- 0 until 2 * limit) {
214-
val receivedBlockInfo = Map(
215-
0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)),
216-
1 -> Array(ReceivedBlockInfo(1, 300, null))
217-
)
214+
val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
218215

219216
// onBatchSubmitted
220-
val batchInfoSubmitted = BatchInfo(Time(1000), receivedBlockInfo, 1000, None, None)
217+
val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, None, None)
221218
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
222219

223220
// onBatchStarted
224-
val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
221+
val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
225222
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
226223

227224
// onJobStart
@@ -238,7 +235,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
238235
listener.onJobStart(jobStart4)
239236

240237
// onBatchCompleted
241-
val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
238+
val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
242239
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
243240
}
244241

0 commit comments

Comments
 (0)