Skip to content

SHS-NG M4.3: Port StorageTab to the new backend. #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions core/src/main/scala/org/apache/spark/status/AppStateListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,8 @@ private[spark] class AppStateListener(
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
event.blockUpdatedInfo.blockId match {
case block: RDDBlockId => updateRDDBlock(event, block)
case _ => // TODO: API only covers RDD storage. UI might need shuffle storage too.
case stream: StreamBlockId => updateStreamBlock(event, stream)
case _ =>
}
}

Expand Down Expand Up @@ -537,7 +538,14 @@ private[spark] class AppStateListener(
}
rdd.memoryUsed = newValue(rdd.memoryUsed, memoryDelta)
rdd.diskUsed = newValue(rdd.diskUsed, diskDelta)
update(rdd)

// Need to always flush the RDD in live applications in case there's a single cached block,
// which means there won't be a subsequent event that could trigger the flush.
if (live) {
update(rdd)
} else {
liveUpdate(rdd)
}
}

maybeExec.foreach { exec =>
Expand All @@ -557,6 +565,26 @@ private[spark] class AppStateListener(
}
}

private def updateStreamBlock(event: SparkListenerBlockUpdated, stream: StreamBlockId): Unit = {
val storageLevel = event.blockUpdatedInfo.storageLevel
if (storageLevel.isValid) {
val data = new StreamBlockData(
stream.name,
event.blockUpdatedInfo.blockManagerId.executorId,
event.blockUpdatedInfo.blockManagerId.hostPort,
storageLevel.description,
storageLevel.useMemory,
storageLevel.useDisk,
storageLevel.deserialized,
event.blockUpdatedInfo.memSize,
event.blockUpdatedInfo.diskSize)
kvstore.write(data)
} else {
kvstore.delete(classOf[StreamBlockData],
Array(stream.name, event.blockUpdatedInfo.blockManagerId.executorId))
}
}

private def getOrCreateExecutor(executorId: String): LiveExecutor = {
liveExecutors.getOrElseUpdate(executorId, new LiveExecutor(executorId))
}
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/status/AppStateStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ private[spark] class AppStateStore(store: KVStore) {
indexed.skip(offset).max(length).asScala.map(_.info).toSeq
}

def rddList(): Seq[v1.RDDStorageInfo] = {
store.view(classOf[RDDStorageInfoWrapper]).sorted().asScala.map(_.info).toSeq
def rddList(cachedOnly: Boolean = true): Seq[v1.RDDStorageInfo] = {
store.view(classOf[RDDStorageInfoWrapper]).sorted().asScala.map(_.info).filter { rdd =>
!cachedOnly || rdd.numCachedPartitions > 0
}.toSeq
}

def rdd(rddId: Int): v1.RDDStorageInfo = {
Expand All @@ -211,6 +213,10 @@ private[spark] class AppStateStore(store: KVStore) {
store.view(classOf[ExecutorEventData]).asScala.map(_.event).toSeq
}

def streamBlocksList(): Seq[StreamBlockData] = {
store.view(classOf[StreamBlockData]).asScala.toSeq
}

def close(): Unit = {
store.close()
}
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -422,16 +422,17 @@ private class LiveRDDPartition(val blockName: String) {

}

private class LiveRDDDistribution(val exec: LiveExecutor) {
private class LiveRDDDistribution(exec: LiveExecutor) {

val executorId = exec.executorId
var memoryRemaining = exec.maxMemory
var memoryUsed = 0L
var diskUsed = 0L

var onHeapUsed = 0L
var offHeapUsed = 0L
var onHeapRemaining = 0L
var offHeapRemaining = 0L
var onHeapRemaining = exec.totalOnHeap
var offHeapRemaining = exec.totalOffHeap

def toApi(): v1.RDDDataDistribution = {
new v1.RDDDataDistribution(
Expand Down Expand Up @@ -478,7 +479,7 @@ private class LiveRDD(info: RDDInfo) extends LiveEntity {
}

val dists = if (distributions.nonEmpty) {
Some(distributions.values.toList.sortBy(_.exec.executorId).map(_.toApi()))
Some(distributions.values.toList.sortBy(_.executorId).map(_.toApi()))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,90 +21,11 @@ import javax.ws.rs.core.MediaType

import org.apache.spark.storage.{RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.storage.StorageListener

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class AllRDDResource(ui: SparkUI) {

@GET
def rddList(): Seq[RDDStorageInfo] = {
val storageStatusList = ui.storageListener.activeStorageStatusList
val rddInfos = ui.storageListener.rddInfoList
rddInfos.map{rddInfo =>
AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
includeDetails = false)
}
}
def rddList(): Seq[RDDStorageInfo] = ui.store.rddList()

}

private[spark] object AllRDDResource {

def getRDDStorageInfo(
rddId: Int,
listener: StorageListener,
includeDetails: Boolean): Option[RDDStorageInfo] = {
val storageStatusList = listener.activeStorageStatusList
listener.rddInfoList.find { _.id == rddId }.map { rddInfo =>
getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails)
}
}

def getRDDStorageInfo(
rddId: Int,
rddInfo: RDDInfo,
storageStatusList: Seq[StorageStatus],
includeDetails: Boolean): RDDStorageInfo = {
val workers = storageStatusList.map { (rddId, _) }
val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList)
val blocks = storageStatusList
.flatMap { _.rddBlocksById(rddId) }
.sortWith { _._1.name < _._1.name }
.map { case (blockId, status) =>
(blockId, status, blockLocations.getOrElse(blockId, Seq[String]("Unknown")))
}

val dataDistribution = if (includeDetails) {
Some(storageStatusList.map { status =>
new RDDDataDistribution(
address = status.blockManagerId.hostPort,
memoryUsed = status.memUsedByRdd(rddId),
memoryRemaining = status.memRemaining,
diskUsed = status.diskUsedByRdd(rddId),
onHeapMemoryUsed = Some(
if (!rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
offHeapMemoryUsed = Some(
if (rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
onHeapMemoryRemaining = status.onHeapMemRemaining,
offHeapMemoryRemaining = status.offHeapMemRemaining
) } )
} else {
None
}
val partitions = if (includeDetails) {
Some(blocks.map { case (id, block, locations) =>
new RDDPartitionInfo(
blockName = id.name,
storageLevel = block.storageLevel.description,
memoryUsed = block.memSize,
diskUsed = block.diskSize,
executors = locations
)
} )
} else {
None
}

new RDDStorageInfo(
id = rddId,
name = rddInfo.name,
numPartitions = rddInfo.numPartitions,
numCachedPartitions = rddInfo.numCachedPartitions,
storageLevel = rddInfo.storageLevel.description,
memoryUsed = rddInfo.memSize,
diskUsed = rddInfo.diskSize,
dataDistribution = dataDistribution,
partitions = partitions
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.status.api.v1

import java.util.NoSuchElementException
import javax.ws.rs.{GET, PathParam, Produces}
import javax.ws.rs.core.MediaType

Expand All @@ -26,9 +27,12 @@ private[v1] class OneRDDResource(ui: SparkUI) {

@GET
def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = {
AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse(
throw new NotFoundException(s"no rdd found w/ id $rddId")
)
try {
ui.store.rdd(rddId)
} catch {
case _: NoSuchElementException =>
throw new NotFoundException(s"no rdd found w/ id $rddId")
}
}

}
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/status/storeTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,19 @@ private[spark] class ExecutorStageSummaryWrapper(
private[spark] class ExecutorEventData(
@KVIndexParam val id: Long,
val event: SparkListenerEvent)

private[spark] class StreamBlockData(
val name: String,
val executorId: String,
val hostPort: String,
val storageLevel: String,
val useMemory: Boolean,
val useDisk: Boolean,
val deserialized: Boolean,
val memSize: Long,
val diskSize: Long) {

@JsonIgnore @KVIndex
def key: Array[String] = Array(name, executorId)

}
100 changes: 0 additions & 100 deletions core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala

This file was deleted.

Loading