Skip to content

[SPARK-1132] Persisting Web UI through refactoring the SparkListener interface #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 78 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
164489d
Relax assumptions on compressors and serializers when batching
andrewor14 Feb 4, 2014
a531d2e
Relax assumptions on compressors and serializers when batching
andrewor14 Feb 4, 2014
3df7005
Merge branch 'master' of github.com:andrewor14/incubator-spark
andrewor14 Feb 4, 2014
287ef44
Avoid reading the entire batch into memory; also simplify streaming l…
andrewor14 Feb 4, 2014
bd5a1d7
Typo: phyiscal -> physical
andrewor14 Feb 4, 2014
13920c9
Update docs
andrewor14 Feb 5, 2014
090544a
Privatize methods
andrewor14 Feb 5, 2014
3ddeb7e
Also privatize fields
andrewor14 Feb 5, 2014
e3ae35f
Merge github.com:apache/incubator-spark
andrewor14 Feb 11, 2014
8e09306
Use JSON for ExecutorsUI
andrewor14 Feb 12, 2014
10ed49d
Merge github.com:apache/incubator-spark into persist-ui
andrewor14 Feb 12, 2014
dcbd312
Add JSON Serializability for all SparkListenerEvent's
andrewor14 Feb 12, 2014
bb222b9
ExecutorUI: render completely from JSON
andrewor14 Feb 13, 2014
bf0b2e9
ExecutorUI: Serialize events rather than arbitary executor information
andrewor14 Feb 14, 2014
de8a1cd
Serialize events both to and from JSON (rather than just to)
andrewor14 Feb 15, 2014
8a2ebe6
Fix bugs for EnvironmentUI and ExecutorsUI
andrewor14 Feb 15, 2014
c4cd480
Also deserialize new events
andrewor14 Feb 15, 2014
d859efc
BlockManagerUI: Add JSON functionality
andrewor14 Feb 15, 2014
8add36b
JobProgressUI: Add JSON functionality
andrewor14 Feb 15, 2014
b3976b0
Add functionality of reconstructing a persisted UI from SparkContext
andrewor14 Feb 16, 2014
4dfcd22
Merge git://git.apache.org/incubator-spark into persist-ui
andrewor14 Feb 17, 2014
f3fc13b
General refactor
andrewor14 Feb 17, 2014
3fd584e
Fix two major bugs
andrewor14 Feb 18, 2014
5ac906d
Mostly naming, formatting, and code style changes
andrewor14 Feb 18, 2014
904c729
Fix another major bug
andrewor14 Feb 18, 2014
4273013
Add a gateway SparkListener to simplify event logging
andrewor14 Feb 18, 2014
64d2ce1
Fix BlockManagerUI bug by introducing new event
andrewor14 Feb 19, 2014
6814da0
Explicitly register each UI listener rather than through some magic
andrewor14 Feb 19, 2014
d646df6
Completely decouple SparkUI from SparkContext
andrewor14 Feb 20, 2014
e9e1c6d
Move all JSON de/serialization logic to JsonProtocol
andrewor14 Feb 21, 2014
70e7e7a
Formatting changes
andrewor14 Feb 22, 2014
6631c02
More formatting changes, this time mainly for Json DSL
andrewor14 Feb 24, 2014
bbe3501
Embed storage status and RDD info in Task events
andrewor14 Feb 26, 2014
28019ca
Merge github.com:apache/spark
andrewor14 Feb 27, 2014
d1f4285
Migrate from lift-json to json4s-jackson
andrewor14 Feb 27, 2014
7b2f811
Guard against TaskMetrics NPE + Fix tests
andrewor14 Feb 27, 2014
996d7a2
Reflect RDD unpersist on UI
andrewor14 Feb 27, 2014
472fd8a
Fix a couple of tests
andrewor14 Feb 27, 2014
d47585f
Clean up FileLogger
andrewor14 Feb 28, 2014
faa113e
General clean up
andrewor14 Feb 28, 2014
4d2fb0c
Fix format fail
andrewor14 Feb 28, 2014
0503e4b
Fix PySpark tests + remove sc.clearFiles/clearJars
andrewor14 Feb 28, 2014
5d2cec1
JobLogger: ID -> Id
andrewor14 Feb 28, 2014
2981d61
Move SparkListenerBus out of DAGScheduler + Clean up
andrewor14 Mar 1, 2014
2fee310
Address Patrick's comments
andrewor14 Mar 1, 2014
cceff2b
Fix 100 char format fail
andrewor14 Mar 1, 2014
36b3e5d
Add HDFS support for event logging
andrewor14 Mar 3, 2014
03eda0b
Fix HDFS flush behavior
andrewor14 Mar 4, 2014
aef411c
Fix bug: storage status was not reflected on UI in the local case
andrewor14 Mar 4, 2014
bb4c503
Use a more mnemonic path for logging
andrewor14 Mar 4, 2014
18b256d
Refactor out event logging and replaying logic from UI
andrewor14 Mar 5, 2014
e375431
Add new constructors for SparkUI
andrewor14 Mar 5, 2014
1ba3407
Add a few configurable options to event logging
andrewor14 Mar 5, 2014
291b2be
Correct directory in log message "INFO: Logging events to <dir>"
andrewor14 Mar 5, 2014
4f69c4a
Master UI - Rebuild SparkUI on application finish
andrewor14 Mar 5, 2014
176e68e
Fix deprecated message for JavaSparkContext (minor)
andrewor14 Mar 6, 2014
ca258a4
Master UI - add support for reading compressed event logs
andrewor14 Mar 6, 2014
d6e3b4a
Merge github.com:apache/spark
andrewor14 Mar 7, 2014
d59da5f
Avoid logging all the blocks on each executor
andrewor14 Mar 7, 2014
b6eaea7
Treating SparkUI as a handler of MasterUI
andrewor14 Mar 10, 2014
77ba283
Address Kay's and Patrick's comments
andrewor14 Mar 11, 2014
dc93915
Imports, comments, and code formatting (minor)
andrewor14 Mar 11, 2014
d801d11
Merge github.com:apache/spark (major)
andrewor14 Mar 12, 2014
ac69ec8
Fix test fail
andrewor14 Mar 12, 2014
bf80e3d
Imports, comments, and code formatting, once again (minor)
andrewor14 Mar 12, 2014
3456090
Address Patrick's comments
andrewor14 Mar 13, 2014
c5c2c8f
Remove list of (TaskInfo, TaskMetrics) from StageInfo
andrewor14 Mar 13, 2014
45fd84c
Remove now deprecated test
andrewor14 Mar 14, 2014
650eb12
Add unit tests + Fix bugs found through tests
andrewor14 Mar 15, 2014
6740e49
Fix comment nits
andrewor14 Mar 15, 2014
9e14f97
Moved around functionality + renamed classes per Patrick
andrewor14 Mar 15, 2014
f80bd31
Simplify static handler and BlockManager status update logic
andrewor14 Mar 17, 2014
124429f
Clarify LiveListenerBus behavior + Add tests for new behavior
andrewor14 Mar 18, 2014
222adcd
Merge github.com:apache/spark
andrewor14 Mar 18, 2014
83af656
Scraps and pieces (no functionality change)
andrewor14 Mar 19, 2014
b8ba817
Remove UI from map when removing application in Master
andrewor14 Mar 19, 2014
a1c5cd9
Merge github.com:apache/spark
andrewor14 Mar 19, 2014
e5f14fa
Merge github.com:apache/spark
andrewor14 Mar 19, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark

import scala.{Option, deprecated}

import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}

/**
Expand Down
76 changes: 45 additions & 31 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package org.apache.spark
import scala.collection.mutable.{ArrayBuffer, HashSet}

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockManager, RDDBlockId, StorageLevel}
import org.apache.spark.storage.{BlockId, BlockManager, BlockStatus, RDDBlockId, StorageLevel}

/** Spark class responsible for passing RDDs split contents to the BlockManager and making
sure a node doesn't load two copies of an RDD at once.
*/
/**
* Spark class responsible for passing RDDs split contents to the BlockManager and making
* sure a node doesn't load two copies of an RDD at once.
*/
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {

/** Keys of RDD splits that are being computed/loaded. */
Expand All @@ -49,11 +50,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
try {loading.wait()} catch {case _ : Throwable =>}
}
logInfo("Finished waiting for %s".format(key))
// See whether someone else has successfully loaded it. The main way this would fail
// is for the RDD-level cache eviction policy if someone else has loaded the same RDD
// partition but we didn't want to make space for it. However, that case is unlikely
// because it's unlikely that two threads would work on the same RDD partition. One
// downside of the current code is that threads wait serially if this does happen.
/* See whether someone else has successfully loaded it. The main way this would fail
* is for the RDD-level cache eviction policy if someone else has loaded the same RDD
* partition but we didn't want to make space for it. However, that case is unlikely
* because it's unlikely that two threads would work on the same RDD partition. One
* downside of the current code is that threads wait serially if this does happen. */
blockManager.get(key) match {
case Some(values) =>
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
Expand All @@ -69,32 +70,45 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
// If we got here, we have to load the split
logInfo("Partition %s not found, computing it".format(key))
val computedValues = rdd.computeOrReadCheckpoint(split, context)

// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }
if (storageLevel.useDisk && !storageLevel.useMemory) {
// In the case that this RDD is to be persisted using DISK_ONLY
// the iterator will be passed directly to the blockManager (rather then
// caching it to an ArrayBuffer first), then the resulting block data iterator
// will be passed back to the user. If the iterator generates a lot of data,
// this means that it doesn't all have to be held in memory at one time.
// This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
// blocks aren't dropped by the block store before enabling that.
blockManager.put(key, computedValues, storageLevel, tellMaster = true)
return blockManager.get(key) match {
case Some(values) =>
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
logInfo("Failure to store %s".format(key))
throw new Exception("Block manager failed to return persisted valued")

// Keep track of blocks with updated statuses
var updatedBlocks = Seq[(BlockId, BlockStatus)]()
val returnValue: Iterator[T] = {
if (storageLevel.useDisk && !storageLevel.useMemory) {
/* In the case that this RDD is to be persisted using DISK_ONLY
* the iterator will be passed directly to the blockManager (rather then
* caching it to an ArrayBuffer first), then the resulting block data iterator
* will be passed back to the user. If the iterator generates a lot of data,
* this means that it doesn't all have to be held in memory at one time.
* This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
* blocks aren't dropped by the block store before enabling that. */
updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true)
blockManager.get(key) match {
case Some(values) =>
new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
logInfo("Failure to store %s".format(key))
throw new Exception("Block manager failed to return persisted valued")
}
} else {
// In this case the RDD is cached to an array buffer. This will save the results
// if we're dealing with a 'one-time' iterator
val elements = new ArrayBuffer[Any]
elements ++= computedValues
updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]
}
} else {
// In this case the RDD is cached to an array buffer. This will save the results
// if we're dealing with a 'one-time' iterator
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)
return elements.iterator.asInstanceOf[Iterator[T]]
}

// Update task metrics to include any blocks whose storage status is updated
val metrics = context.taskMetrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also write this as:

Option(context.taskMetrics).foreach{ metrics => 
  metrics.updatedBlocks = Some(updatedBlocks)
}

Not sure it's any nicer though :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually - is there any case where the metrics could be null here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I didn't have the null check at first but a few of the tests threw NPE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could also create an empty (non-null) TaskMetrics in each of the relevant tests. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya ideally that would be better - otherwise we risk masking what is an actual failure during a legitimate execution. If you can just clean up the tests that would be great. If they are creating TaskContext objects we should include an empty metrics inside of them

metrics.updatedBlocks = Some(updatedBlocks)

returnValue

} finally {
loading.synchronized {
loading.remove(key)
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.spark

import java.net.{Authenticator, PasswordAuthentication}
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.deploy.SparkHadoopUtil

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.io.Text

import org.apache.spark.deploy.SparkHadoopUtil

/**
* Spark class responsible for security.
*
Expand Down
93 changes: 72 additions & 21 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,18 @@ class SparkContext(

if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")

// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus

// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.create(
conf,
"<driver>",
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
isDriver = true,
isLocal = isLocal)
isLocal = isLocal,
listenerBus = listenerBus)
SparkEnv.set(env)

// Used to store a URL for each static file/jar together with the file's local timestamp
Expand All @@ -151,9 +155,26 @@ class SparkContext(
private[spark] val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)

// Initialize the Spark UI
// Initialize the Spark UI, registering all associated listeners
private[spark] val ui = new SparkUI(this)
ui.bind()
ui.start()

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appName, conf)
listenerBus.addListener(logger)
Some(logger)
} else None
}

// Information needed to replay logged events, if any
private[spark] val eventLoggingInfo: Option[EventLoggingInfo] =
eventLogger.map { logger => Some(logger.info) }.getOrElse(None)

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

val startTime = System.currentTimeMillis()

Expand Down Expand Up @@ -200,13 +221,13 @@ class SparkContext(
executorEnvs("SPARK_USER") = sparkUser

// Create and start the scheduler
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName)
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
taskScheduler.start()

@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
@volatile private[spark] var dagScheduler = new DAGScheduler(this)
dagScheduler.start()

ui.start()
postEnvironmentUpdate()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
Expand Down Expand Up @@ -571,7 +592,6 @@ class SparkContext(
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
}


protected[spark] def checkpointFile[T: ClassTag](
path: String
): RDD[T] = {
Expand Down Expand Up @@ -641,10 +661,11 @@ class SparkContext(
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory), conf, env.securityManager)

logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
}

def addSparkListener(listener: SparkListener) {
dagScheduler.addSparkListener(listener)
listenerBus.addListener(listener)
}

/**
Expand All @@ -671,7 +692,7 @@ class SparkContext(
*/
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap

def getStageInfo: Map[Stage,StageInfo] = {
def getStageInfo: Map[Stage, StageInfo] = {
dagScheduler.stageToInfos
}

Expand All @@ -698,7 +719,7 @@ class SparkContext(
}

/**
* Return current scheduling mode
* Return current scheduling mode
*/
def getSchedulingMode: SchedulingMode.SchedulingMode = {
taskScheduler.schedulingMode
Expand All @@ -708,6 +729,7 @@ class SparkContext(
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
*/
@deprecated("adding files no longer creates local copies that need to be deleted", "1.0.0")
def clearFiles() {
addedFiles.clear()
}
Expand All @@ -722,6 +744,23 @@ class SparkContext(
dagScheduler.getPreferredLocs(rdd, partition)
}

/**
* Register an RDD to be persisted in memory and/or disk storage
*/
private[spark] def persistRDD(rdd: RDD[_]) {
persistentRdds(rdd.id) = rdd
}

/**
* Unpersist an RDD from memory and/or disk storage
*/
private[spark] def unpersistRDD(rdd: RDD[_], blocking: Boolean = true) {
val rddId = rdd.id
env.blockManager.master.removeRdd(rddId, blocking)
persistentRdds.remove(rddId)
listenerBus.post(SparkListenerUnpersistRDD(rddId))
}

/**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
Expand All @@ -744,15 +783,15 @@ class SparkContext(
if (SparkHadoopUtil.get.isYarnMode() &&
(master == "yarn-standalone" || master == "yarn-cluster")) {
// In order for this to work in yarn-cluster mode the user must specify the
// --addjars option to the client to upload the file into the distributed cache
// --addjars option to the client to upload the file into the distributed cache
// of the AM to make it show up in the current working directory.
val fileName = new Path(uri.getPath).getName()
try {
env.httpFileServer.addJar(new File(fileName))
} catch {
case e: Exception => {
// For now just log an error but allow to go through so spark examples work.
// The spark examples don't really need the jar distributed since its also
// The spark examples don't really need the jar distributed since its also
// the app jar.
logError("Error adding jar (" + e + "), was the --addJars option used?")
null
Expand All @@ -773,32 +812,33 @@ class SparkContext(
logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
}
}
postEnvironmentUpdate()
}

/**
* Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
* any new nodes.
*/
@deprecated("adding jars no longer creates local copies that need to be deleted", "1.0.0")
def clearJars() {
addedJars.clear()
}

/** Shut down the SparkContext. */
def stop() {
ui.stop()
eventLogger.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
dagScheduler = null
if (dagSchedulerCopy != null) {
metadataCleaner.cancel()
dagSchedulerCopy.stop()
listenerBus.stop()
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
// Clean up locally linked files
clearFiles()
clearJars()
SparkEnv.set(null)
ShuffleMapTask.clearCache()
ResultTask.clearCache()
Expand Down Expand Up @@ -1026,6 +1066,19 @@ class SparkContext(
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()

/** Post the environment update event once the task scheduler is ready */
private def postEnvironmentUpdate() {
if (taskScheduler != null) {
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
val environmentDetails =
SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
listenerBus.post(environmentUpdate)
}
}

/** Called by MetadataCleaner to clean up the persistentRdds map periodically */
private[spark] def cleanup(cleanupTime: Long) {
persistentRdds.clearOldValues(cleanupTime)
Expand Down Expand Up @@ -1189,9 +1242,7 @@ object SparkContext extends Logging {
}

/** Creates a task scheduler based on a given master URL. Extracted for testing. */
private def createTaskScheduler(sc: SparkContext, master: String, appName: String)
: TaskScheduler =
{
private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
// Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
Expand Down Expand Up @@ -1230,7 +1281,7 @@ object SparkContext extends Logging {
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, appName)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
scheduler

Expand All @@ -1247,7 +1298,7 @@ object SparkContext extends Logging {
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
val masterUrls = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, appName)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop()
Expand Down Expand Up @@ -1307,9 +1358,9 @@ object SparkContext extends Logging {
val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
val backend = if (coarseGrained) {
new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)
new CoarseMesosSchedulerBackend(scheduler, sc, url)
} else {
new MesosSchedulerBackend(scheduler, sc, url, appName)
new MesosSchedulerBackend(scheduler, sc, url)
}
scheduler.initialize(backend)
scheduler
Expand Down
Loading