Skip to content

[SPARK-1768] History server enhancements. #718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import org.apache.spark.ui.SparkUI

private[spark] case class ApplicationHistoryInfo(
id: String,
name: String,
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String)

private[spark] abstract class ApplicationHistoryProvider {

/**
* Returns a list of applications available for the history server to show.
*
* @return List of all know applications.
*/
def getListing(): Seq[ApplicationHistoryInfo]

/**
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
* @return The application's UI, or null if application is not found.
*/
def getAppUI(appId: String): SparkUI

/**
* Called when the server is shutting down.
*/
def stop(): Unit = { }

/**
* Returns configuration data to be shown in the History Server home page.
*
* @return A map with the configuration data. Data is show in the order returned by the map.
*/
def getConfig(): Map[String, String] = Map()

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import java.io.FileNotFoundException

import scala.collection.mutable

import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils

private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
with Logging {

// Interval between each check for event log updates
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
conf.getInt("spark.history.updateInterval", 10)) * 1000

private val logDir = conf.get("spark.history.fs.logDirectory", null)
if (logDir == null) {
throw new IllegalArgumentException("Logging directory must be specified.")
}

private val fs = Utils.getHadoopFileSystem(logDir)

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L

// List of applications, in order from newest to oldest.
@volatile private var appList: Seq[ApplicationHistoryInfo] = Nil

/**
* A background thread that periodically checks for event log updates on disk.
*
* If a log check is invoked manually in the middle of a period, this thread re-adjusts the
* time at which it performs the next log check to maintain the same period as before.
*
* TODO: Add a mechanism to update manually.
*/
private val logCheckingThread = new Thread("LogCheckingThread") {
override def run() = Utils.logUncaughtExceptions {
while (true) {
val now = getMonotonicTimeMs()
if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
Thread.sleep(UPDATE_INTERVAL_MS)
} else {
// If the user has manually checked for logs recently, wait until
// UPDATE_INTERVAL_MS after the last check time
Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
}
checkForLogs()
}
}
}

initialize()

private def initialize() {
// Validate the log directory.
val path = new Path(logDir)
if (!fs.exists(path)) {
throw new IllegalArgumentException(
"Logging directory specified does not exist: %s".format(logDir))
}
if (!fs.getFileStatus(path).isDir) {
throw new IllegalArgumentException(
"Logging directory specified is not a directory: %s".format(logDir))
}

checkForLogs()
logCheckingThread.setDaemon(true)
logCheckingThread.start()
}

override def getListing() = appList

override def getAppUI(appId: String): SparkUI = {
try {
val appLogDir = fs.getFileStatus(new Path(logDir, appId))
loadAppInfo(appLogDir, true)._2
} catch {
case e: FileNotFoundException => null
}
}

override def getConfig(): Map[String, String] =
Map(("Event Log Location" -> logDir))

/**
* Builds the application list based on the current contents of the log directory.
* Tries to reuse as much of the data already in memory as possible, by not reading
* applications that haven't been updated since last time the logs were checked.
*/
private def checkForLogs() = {
lastLogCheckTimeMs = getMonotonicTimeMs()
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
try {
val logStatus = fs.listStatus(new Path(logDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
val logInfos = logDirs.filter {
dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))
}

val currentApps = Map[String, ApplicationHistoryInfo](
appList.map(app => (app.id -> app)):_*)

// For any application that either (i) is not listed or (ii) has changed since the last time
// the listing was created (defined by the log dir's modification time), load the app's info.
// Otherwise just reuse what's already in memory.
val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size)
for (dir <- logInfos) {
val curr = currentApps.getOrElse(dir.getPath().getName(), null)
if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
try {
newApps += loadAppInfo(dir, false)._1
} catch {
case e: Exception => logError(s"Failed to load app info from directory $dir.")
}
} else {
newApps += curr
}
}

appList = newApps.sortBy { info => -info.endTime }
} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
}
}

/**
* Parse the application's logs to find out the information we need to build the
* listing page.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to document here when we want the parameter renderUI to be true, and when we want it to be false. Based on my understanding, it should be true if the user explicitly clicks on the application, but otherwise false.

*
* When creating the listing of available apps, there is no need to load the whole UI for the
* application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user
* clicks on a specific application.
*
* @param logDir Directory with application's log files.
* @param renderUI Whether to create the SparkUI for the application.
* @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're only ever interested in one of these at a time. If renderUI is true, we're only interested in the UI. Otherwise, we're only interested in the app info. We should abstract this so we don't have to return a tuple.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a private method, so I'm not to worried about the ugly interface. My main goal here is to share the code to set up and drive the replay bus. The UI makes it really hard to break this up, because it takes the replay bus as a constructor argument, so you can't just have something like a "replayLog(logFile, SparkListener*)" and call that with a SparkUI object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance,

override def getAppUI(logDir): SparkUI = {
  val eventLogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs)
  val replayBus = new ReplayListenerBus(...)
  val ui = new SparkUI(...)
  replayBus.replay()
  // ... set ACLs appropriately
  ui
}

private def getAppInfo(logDir: FileStatus): ApplicationHistoryInfo = {
  val eventLogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs)
  val replayBus = new ReplayListenerBus(...)
  val appListener = new ApplicationEventListener
  replayBus.addListener(appListener)
  replayBus.replay()
  // ... other application history info
  new ApplicationHistoryInfo(...)
}

That way you don't have to check if ui is null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dislike the duplication of code to parse logs / create the replay listener / drive it caused by your code. Note that in both cases I need the ApplicationEventListener, since that's where the ACLs come from.

I can move the ACL set up to getAppUI() to avoid the null check, but otherwise, I rather prefer the current code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, we still get the view ACLs for rendering the UI.

I think the ugliness of returning a tuple can be avoided, even if it is not user facing. These are really two separate cases anyway, so the correct way to do this is to abstract out the duplicate part (i.e. the instantiation of the ReplayBus), and then have both getAppUI and getAppInfo use the same method. Right now, based on the method signature, it seems that the caller needs both items of the tuple, which is not true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the least, we should move the UI view acls code up, so we don't have to do an extra null check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though about that during our discussion, but that would require propagating the ACLs to the other method too; so now we'd be returning a 3-tuple? That would make the ugliness even worse...

(I could restore the ACL field in the info object, but then we'd be wasting a lot of space in memory just to avoid a null check.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait sorry, I'm not following. What I meant was to move L188 - 190 to right after we create the SparkUI is L172. What other method are you referring to? (Why does the caller need the view ACLs?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, misunderstood what you meant. But in any case, that won't work, because we need to set up the ACLs after the replay happens in L178.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, because you need to replay to get the ACLs. Then it's fine.

*/
private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs)
val path = logDir.getPath
val appId = path.getName
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)

val ui: SparkUI = if (renderUI) {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
// Do not call ui.bind() to avoid creating a new server for each application
} else {
null
}

replayBus.replay()
val appInfo = ApplicationHistoryInfo(
appId,
appListener.appName,
appListener.startTime,
appListener.endTime,
getModificationTime(logDir),
appListener.sparkUser)

if (ui != null) {
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setUIAcls(uiAclsEnabled)
ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
}
(appInfo, ui)
}

/** Return when this directory was last modified. */
private def getModificationTime(dir: FileStatus): Long = {
try {
val logFiles = fs.listStatus(dir.getPath)
if (logFiles != null && !logFiles.isEmpty) {
logFiles.map(_.getModificationTime).max
} else {
dir.getModificationTime
}
} catch {
case t: Throwable =>
logError("Exception in accessing modification time of %s".format(dir.getPath), t)
-1L
}
}

/** Returns the system's mononotically increasing time. */
private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,36 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}

private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {

private val pageSize = 20

def render(request: HttpServletRequest): Seq[Node] = {
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
val appTable = UIUtils.listingTable(appHeader, appRow, appRows)
val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
val requestedFirst = (requestedPage - 1) * pageSize

val allApps = parent.getApplicationList()
val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))

val actualPage = (actualFirst / pageSize) + 1
val last = Math.min(actualFirst + pageSize, allApps.size) - 1
val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0)

val appTable = UIUtils.listingTable(appHeader, appRow, apps)
val providerConfig = parent.getProviderConfig()
val content =
<div class="row-fluid">
<div class="span12">
<ul class="unstyled">
<li><strong>Event Log Location: </strong> {parent.baseLogDir}</li>
{ providerConfig.map(e => <li><strong>{e._1}:</strong> {e._2}</li>) }
</ul>
{
if (parent.appIdToInfo.size > 0) {
if (allApps.size > 0) {
<h4>
Showing {parent.appIdToInfo.size}/{parent.getNumApplications}
Completed Application{if (parent.getNumApplications > 1) "s" else ""}
Showing {actualFirst + 1}-{last + 1} of {allApps.size}
<span style="float: right">
{if (actualPage > 1) <a href={"/?page=" + (actualPage - 1)}>&lt;</a>}
{if (actualPage < pageCount) <a href={"/?page=" + (actualPage + 1)}>&gt;</a>}
</span>
</h4> ++
appTable
} else {
Expand All @@ -56,26 +72,20 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Completed",
"Duration",
"Spark User",
"Log Directory",
"Last Updated")

private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val appName = if (info.started) info.name else info.logDirPath.getName
val uiAddress = parent.getAddress + info.ui.basePath
val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started"
val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed"
val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---"
val sparkUser = if (info.started) info.sparkUser else "Unknown user"
val logDirectory = info.logDirPath.getName
val uiAddress = "/history/" + info.id
val startTime = UIUtils.formatDate(info.startTime)
val endTime = UIUtils.formatDate(info.endTime)
val duration = UIUtils.formatDuration(info.endTime - info.startTime)
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{appName}</a></td>
<td><a href={uiAddress}>{info.name}</a></td>
<td>{startTime}</td>
<td>{endTime}</td>
<td>{duration}</td>
<td>{sparkUser}</td>
<td>{logDirectory}</td>
<td>{info.sparkUser}</td>
<td>{lastUpdated}</td>
</tr>
}
Expand Down
Loading