Skip to content

Commit c5a17fd

Browse files
author
Marcelo Vanzin
committed
SHS-NG M4.3: Port StorageTab to the new backend.
This required adding information about StreamBlockId to the store, which is not available yet via the API. So an internal type was added until there's a need to expose that information in the API. The UI only lists RDDs that have cached partitions, and that information wasn't being correctly captured in the listener, so that's also fixed, along with some minor (internal) API adjustments so that the UI can get the correct data.
1 parent d66024c commit c5a17fd

19 files changed

+246
-941
lines changed

core/src/main/scala/org/apache/spark/status/AppStateListener.scala

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,8 @@ private[spark] class AppStateListener(
449449
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
450450
event.blockUpdatedInfo.blockId match {
451451
case block: RDDBlockId => updateRDDBlock(event, block)
452-
case _ => // TODO: API only covers RDD storage. UI might need shuffle storage too.
452+
case stream: StreamBlockId => updateStreamBlock(event, stream)
453+
case _ =>
453454
}
454455
}
455456

@@ -537,7 +538,14 @@ private[spark] class AppStateListener(
537538
}
538539
rdd.memoryUsed = newValue(rdd.memoryUsed, memoryDelta)
539540
rdd.diskUsed = newValue(rdd.diskUsed, diskDelta)
540-
update(rdd)
541+
542+
// Need to always flush the RDD in live applications in case there's a single cached block,
543+
// which means there won't be a subsequent event that could trigger the flush.
544+
if (live) {
545+
update(rdd)
546+
} else {
547+
liveUpdate(rdd)
548+
}
541549
}
542550

543551
maybeExec.foreach { exec =>
@@ -557,6 +565,26 @@ private[spark] class AppStateListener(
557565
}
558566
}
559567

568+
private def updateStreamBlock(event: SparkListenerBlockUpdated, stream: StreamBlockId): Unit = {
569+
val storageLevel = event.blockUpdatedInfo.storageLevel
570+
if (storageLevel.isValid) {
571+
val data = new StreamBlockData(
572+
stream.name,
573+
event.blockUpdatedInfo.blockManagerId.executorId,
574+
event.blockUpdatedInfo.blockManagerId.hostPort,
575+
storageLevel.description,
576+
storageLevel.useMemory,
577+
storageLevel.useDisk,
578+
storageLevel.deserialized,
579+
event.blockUpdatedInfo.memSize,
580+
event.blockUpdatedInfo.diskSize)
581+
kvstore.write(data)
582+
} else {
583+
kvstore.delete(classOf[StreamBlockData],
584+
Array(stream.name, event.blockUpdatedInfo.blockManagerId.executorId))
585+
}
586+
}
587+
560588
private def getOrCreateExecutor(executorId: String): LiveExecutor = {
561589
liveExecutors.getOrElseUpdate(executorId, new LiveExecutor(executorId))
562590
}

core/src/main/scala/org/apache/spark/status/AppStateStore.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,10 @@ private[spark] class AppStateStore(store: KVStore) {
199199
indexed.skip(offset).max(length).asScala.map(_.info).toSeq
200200
}
201201

202-
def rddList(): Seq[v1.RDDStorageInfo] = {
203-
store.view(classOf[RDDStorageInfoWrapper]).sorted().asScala.map(_.info).toSeq
202+
def rddList(cachedOnly: Boolean = true): Seq[v1.RDDStorageInfo] = {
203+
store.view(classOf[RDDStorageInfoWrapper]).sorted().asScala.map(_.info).filter { rdd =>
204+
!cachedOnly || rdd.numCachedPartitions > 0
205+
}.toSeq
204206
}
205207

206208
def rdd(rddId: Int): v1.RDDStorageInfo = {
@@ -211,6 +213,10 @@ private[spark] class AppStateStore(store: KVStore) {
211213
store.view(classOf[ExecutorEventData]).asScala.map(_.event).toSeq
212214
}
213215

216+
def streamBlocksList(): Seq[StreamBlockData] = {
217+
store.view(classOf[StreamBlockData]).asScala.toSeq
218+
}
219+
214220
def close(): Unit = {
215221
store.close()
216222
}

core/src/main/scala/org/apache/spark/status/LiveEntity.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -422,16 +422,17 @@ private class LiveRDDPartition(val blockName: String) {
422422

423423
}
424424

425-
private class LiveRDDDistribution(val exec: LiveExecutor) {
425+
private class LiveRDDDistribution(exec: LiveExecutor) {
426426

427+
val executorId = exec.executorId
427428
var memoryRemaining = exec.maxMemory
428429
var memoryUsed = 0L
429430
var diskUsed = 0L
430431

431432
var onHeapUsed = 0L
432433
var offHeapUsed = 0L
433-
var onHeapRemaining = 0L
434-
var offHeapRemaining = 0L
434+
var onHeapRemaining = exec.totalOnHeap
435+
var offHeapRemaining = exec.totalOffHeap
435436

436437
def toApi(): v1.RDDDataDistribution = {
437438
new v1.RDDDataDistribution(
@@ -478,7 +479,7 @@ private class LiveRDD(info: RDDInfo) extends LiveEntity {
478479
}
479480

480481
val dists = if (distributions.nonEmpty) {
481-
Some(distributions.values.toList.sortBy(_.exec.executorId).map(_.toApi()))
482+
Some(distributions.values.toList.sortBy(_.executorId).map(_.toApi()))
482483
} else {
483484
None
484485
}

core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala

Lines changed: 1 addition & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -21,90 +21,11 @@ import javax.ws.rs.core.MediaType
2121

2222
import org.apache.spark.storage.{RDDInfo, StorageStatus, StorageUtils}
2323
import org.apache.spark.ui.SparkUI
24-
import org.apache.spark.ui.storage.StorageListener
2524

2625
@Produces(Array(MediaType.APPLICATION_JSON))
2726
private[v1] class AllRDDResource(ui: SparkUI) {
2827

2928
@GET
30-
def rddList(): Seq[RDDStorageInfo] = {
31-
val storageStatusList = ui.storageListener.activeStorageStatusList
32-
val rddInfos = ui.storageListener.rddInfoList
33-
rddInfos.map{rddInfo =>
34-
AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
35-
includeDetails = false)
36-
}
37-
}
29+
def rddList(): Seq[RDDStorageInfo] = ui.store.rddList()
3830

3931
}
40-
41-
private[spark] object AllRDDResource {
42-
43-
def getRDDStorageInfo(
44-
rddId: Int,
45-
listener: StorageListener,
46-
includeDetails: Boolean): Option[RDDStorageInfo] = {
47-
val storageStatusList = listener.activeStorageStatusList
48-
listener.rddInfoList.find { _.id == rddId }.map { rddInfo =>
49-
getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails)
50-
}
51-
}
52-
53-
def getRDDStorageInfo(
54-
rddId: Int,
55-
rddInfo: RDDInfo,
56-
storageStatusList: Seq[StorageStatus],
57-
includeDetails: Boolean): RDDStorageInfo = {
58-
val workers = storageStatusList.map { (rddId, _) }
59-
val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList)
60-
val blocks = storageStatusList
61-
.flatMap { _.rddBlocksById(rddId) }
62-
.sortWith { _._1.name < _._1.name }
63-
.map { case (blockId, status) =>
64-
(blockId, status, blockLocations.getOrElse(blockId, Seq[String]("Unknown")))
65-
}
66-
67-
val dataDistribution = if (includeDetails) {
68-
Some(storageStatusList.map { status =>
69-
new RDDDataDistribution(
70-
address = status.blockManagerId.hostPort,
71-
memoryUsed = status.memUsedByRdd(rddId),
72-
memoryRemaining = status.memRemaining,
73-
diskUsed = status.diskUsedByRdd(rddId),
74-
onHeapMemoryUsed = Some(
75-
if (!rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
76-
offHeapMemoryUsed = Some(
77-
if (rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
78-
onHeapMemoryRemaining = status.onHeapMemRemaining,
79-
offHeapMemoryRemaining = status.offHeapMemRemaining
80-
) } )
81-
} else {
82-
None
83-
}
84-
val partitions = if (includeDetails) {
85-
Some(blocks.map { case (id, block, locations) =>
86-
new RDDPartitionInfo(
87-
blockName = id.name,
88-
storageLevel = block.storageLevel.description,
89-
memoryUsed = block.memSize,
90-
diskUsed = block.diskSize,
91-
executors = locations
92-
)
93-
} )
94-
} else {
95-
None
96-
}
97-
98-
new RDDStorageInfo(
99-
id = rddId,
100-
name = rddInfo.name,
101-
numPartitions = rddInfo.numPartitions,
102-
numCachedPartitions = rddInfo.numCachedPartitions,
103-
storageLevel = rddInfo.storageLevel.description,
104-
memoryUsed = rddInfo.memSize,
105-
diskUsed = rddInfo.diskSize,
106-
dataDistribution = dataDistribution,
107-
partitions = partitions
108-
)
109-
}
110-
}

core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.spark.status.api.v1
1818

19+
import java.util.NoSuchElementException
1920
import javax.ws.rs.{GET, PathParam, Produces}
2021
import javax.ws.rs.core.MediaType
2122

@@ -26,9 +27,12 @@ private[v1] class OneRDDResource(ui: SparkUI) {
2627

2728
@GET
2829
def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = {
29-
AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse(
30-
throw new NotFoundException(s"no rdd found w/ id $rddId")
31-
)
30+
try {
31+
ui.store.rdd(rddId)
32+
} catch {
33+
case _: NoSuchElementException =>
34+
throw new NotFoundException(s"no rdd found w/ id $rddId")
35+
}
3236
}
3337

3438
}

core/src/main/scala/org/apache/spark/status/storeTypes.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,19 @@ private[spark] class ExecutorStageSummaryWrapper(
141141
private[spark] class ExecutorEventData(
142142
@KVIndexParam val id: Long,
143143
val event: SparkListenerEvent)
144+
145+
private[spark] class StreamBlockData(
146+
val name: String,
147+
val executorId: String,
148+
val hostPort: String,
149+
val storageLevel: String,
150+
val useMemory: Boolean,
151+
val useDisk: Boolean,
152+
val deserialized: Boolean,
153+
val memSize: Long,
154+
val diskSize: Long) {
155+
156+
@JsonIgnore @KVIndex
157+
def key: Array[String] = Array(name, executorId)
158+
159+
}

core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala

Lines changed: 0 additions & 100 deletions
This file was deleted.

0 commit comments

Comments
 (0)