Skip to content

[SPARK-2261] Make event logger use a single file. #1222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6251dd7
Make event logger use a single file.
May 22, 2014
8f42274
Make history server parse old-style log directories.
May 23, 2014
9db0efd
Show prettier name in UI.
Jun 24, 2014
ae571fa
Fix end-to-end event logger test.
Jul 3, 2014
f677930
Fix botched rebase.
Sep 15, 2014
3700586
Restore log flushing.
Sep 16, 2014
a722184
Do not encode metadata in log file name.
Sep 16, 2014
cc6bce4
Some review feedback.
Sep 16, 2014
16661a3
Simplify some internal code.
Sep 16, 2014
bb7c2d3
Fix scalastyle warning.
Sep 17, 2014
04364dc
Merge branch 'master' into hist-server-single-log
Oct 4, 2014
a624a89
Merge branch 'master' into hist-server-single-log
Oct 6, 2014
bd6ba8c
Review feedback.
Oct 7, 2014
9e928ba
Add types.
Oct 7, 2014
d93c44a
Add a newline to make the header more readable.
Oct 7, 2014
0ef3f70
Merge branch 'master' into hist-server-single-log
Oct 9, 2014
16fd491
Use unique log directory for each codec.
Oct 9, 2014
a6d5c50
Merge branch 'master' into hist-server-single-log
Oct 16, 2014
b3ee30b
Merge branch 'master' into hist-server-single-log
Nov 4, 2014
45c7a1f
Version of SPARK-3697 for this branch.
Nov 20, 2014
3f4500f
Unit test for SPARK-3697.
Nov 20, 2014
ed0023e
Merge branch 'master' into hist-server-single-log
Dec 9, 2014
346f0b4
Review feedback.
Dec 10, 2014
f91c13e
Handle "spark.eventLog.overwrite", and add unit test.
Dec 12, 2014
dce28e9
Fix log overwrite test.
Dec 12, 2014
216c5a3
Review comments.
Dec 12, 2014
59c561c
Review feedback.
Dec 12, 2014
c7e6123
Update comment.
Dec 19, 2014
cc8f5de
Store header in plain text.
Dec 19, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ private[spark] class ApplicationDescription(
val memoryPerSlave: Int,
val command: Command,
var appUiUrl: String,
val eventLogDir: Option[String] = None)
val eventLogFile: Option[String] = None)
extends Serializable {

val user = System.getProperty("user.name", "<unknown>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.spark.deploy.history

import java.io.FileNotFoundException
import java.io.{BufferedInputStream, FileNotFoundException, InputStream}

import scala.collection.mutable

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.AccessControlException

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -64,6 +66,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()

// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
private[history] val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
private[history] val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"

/**
* A background thread that periodically checks for event log updates on disk.
*
Expand All @@ -90,7 +98,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

initialize()

private def initialize() {
private def initialize(): Unit = {
// Validate the log directory.
val path = new Path(logDir)
if (!fs.exists(path)) {
Expand All @@ -106,17 +114,20 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}

checkForLogs()
logCheckingThread.setDaemon(true)
logCheckingThread.start()

// Disable the background thread during tests.
if (!conf.contains("spark.testing")) {
logCheckingThread.setDaemon(true)
logCheckingThread.start()
}
}

override def getListing() = applications.values

override def getAppUI(appId: String): Option[SparkUI] = {
try {
applications.get(appId).map { info =>
val (replayBus, appListener) = createReplayBus(fs.getFileStatus(
new Path(logDir, info.logDir)))
val replayBus = new ReplayListenerBus()
val ui = {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
Expand All @@ -125,15 +136,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
// Do not call ui.bind() to avoid creating a new server for each application
}

replayBus.replay()
val appListener = new ApplicationEventListener()
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, info.logPath)), replayBus)

ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)")
ui.setAppName(s"${appInfo.name} ($appId)")

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED),
ui.getSecurityManager.setViewAcls(appInfo.sparkUser,
appListener.viewAcls.getOrElse(""))
ui
}
Expand All @@ -149,41 +162,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
* Tries to reuse as much of the data already in memory as possible, by not reading
* applications that haven't been updated since last time the logs were checked.
*/
private def checkForLogs() = {
private[history] def checkForLogs(): Unit = {
lastLogCheckTimeMs = getMonotonicTimeMs()
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
try {
val logStatus = fs.listStatus(new Path(logDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()

// Load all new logs from the log directory. Only directories that have a modification time
// later than the last known log directory will be loaded.
try {
var newLastModifiedTime = lastModifiedTime
val logInfos = logDirs
.filter { dir =>
if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) {
val modTime = getModificationTime(dir)
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
modTime > lastModifiedTime
} else {
false
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
val logInfos = statusList
.filter { entry =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump this up 1 line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes the alignment of flatMap / sortBy really weird. Do you have an example of what you have in mind so I can follow it? Others I have found follow this style.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see... I guess it was like this already before this patch. Since we have huge filter and flatMap blocks maybe it makes sense to put these in their own respective variables. Don't worry about making that change here because it's somewhat orthogonal to this PR.

try {
val isFinishedApplication =
if (isLegacyLogDirectory(entry)) {
fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
} else {
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
}

if (isFinishedApplication) {
val modTime = getModificationTime(entry)
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
modTime >= lastModifiedTime
} else {
false
}
} catch {
case e: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we still need to do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops.

// every poll.
logDebug(s"No permission to read $entry, ignoring.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we get these? By default this will "fail" silently, and I'm wondering if we should still tell the user about these somehow (maybe less frequently than just logInfo here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You get these when a directory or file is created under the log directory with permissions that don't allow the HS to read it. It's SPARK-3697.

false
}
}
.flatMap { dir =>
.flatMap { entry =>
try {
val (replayBus, appListener) = createReplayBus(dir)
replayBus.replay()
Some(new FsApplicationHistoryInfo(
dir.getPath().getName(),
appListener.appId.getOrElse(dir.getPath().getName()),
appListener.appName.getOrElse(NOT_STARTED),
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(dir),
appListener.sparkUser.getOrElse(NOT_STARTED)))
Some(replay(entry, new ReplayListenerBus()))
} catch {
case e: Exception =>
logInfo(s"Failed to load application log data from $dir.", e)
logError(s"Failed to load application log data from $entry.", e)
None
}
}
Expand Down Expand Up @@ -217,37 +234,100 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
applications = newApps
}
} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
case e: Exception => logError("Exception in checking for event log updates", e)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't comment in the line above this one, but we really shouldn't catch Throwable here. We should make it an Exception. (not your changes)

}

private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = {
val path = logDir.getPath()
val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)
(replayBus, appListener)
/**
* Replays the events in the specified log file and returns information about the associated
* application.
*/
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
val logPath = eventLog.getPath()
val (logInput, sparkVersion) =
if (isLegacyLogDirectory(eventLog)) {
openLegacyEventLog(logPath)
} else {
EventLoggingListener.openEventLog(logPath, fs)
}
try {
val appListener = new ApplicationEventListener
bus.addListener(appListener)
bus.replay(logInput, sparkVersion)
new FsApplicationHistoryInfo(
logPath.getName(),
appListener.appId.getOrElse(logPath.getName()),
appListener.appName.getOrElse(NOT_STARTED),
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog),
appListener.sparkUser.getOrElse(NOT_STARTED))
} finally {
logInput.close()
}
}

/** Return when this directory was last modified. */
private def getModificationTime(dir: FileStatus): Long = {
try {
val logFiles = fs.listStatus(dir.getPath)
if (logFiles != null && !logFiles.isEmpty) {
logFiles.map(_.getModificationTime).max
} else {
dir.getModificationTime
/**
* Loads a legacy log directory. This assumes that the log directory contains a single event
* log file (along with other metadata files), which is the case for directories generated by
* the code in previous releases.
*
* @return 2-tuple of (input stream of the events, version of Spark which wrote the log)
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Return a 2-tuple of (an input stream of the events, and the version of Spark in which the log is written).

private[history] def openLegacyEventLog(dir: Path): (InputStream, String) = {
val children = fs.listStatus(dir)
var eventLogPath: Path = null
var codecName: Option[String] = None
var sparkVersion: String = null

children.foreach { child =>
child.getPath().getName() match {
case name if name.startsWith(LOG_PREFIX) =>
eventLogPath = child.getPath()

case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) =>
codecName = Some(codec.substring(COMPRESSION_CODEC_PREFIX.length()))

case version if version.startsWith(SPARK_VERSION_PREFIX) =>
sparkVersion = version.substring(SPARK_VERSION_PREFIX.length())

case _ =>
}
} catch {
case t: Throwable =>
logError("Exception in accessing modification time of %s".format(dir.getPath), t)
-1L
}

if (eventLogPath == null || sparkVersion == null) {
throw new IllegalArgumentException(s"$dir is not a Spark application log directory.")
}

val codec = try {
codecName.map { c => CompressionCodec.createCodec(conf, c) }
} catch {
case e: Exception =>
throw new IllegalArgumentException(s"Unknown compression codec $codecName.")
}

val in = new BufferedInputStream(fs.open(eventLogPath))
(codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
}

/**
* Return whether the specified event log path contains a old directory-based event log.
* Previously, the event log of an application comprises of multiple files in a directory.
* As of Spark 1.3, these files are consolidated into a single one that replaces the directory.
* See SPARK-2261 for more detail.
*/
private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a quick comment here:

/**
 * Return whether the specified event log is in the legacy file structure.
 * Previously, the event log of an application comprises of multiple files in a directory.
 * As of Spark 1.3, these files are consolidated into a single one that replaces the directory.
 * See SPARK-2261 for more detail.
 */


private def getModificationTime(fsEntry: FileStatus): Long = {
if (fsEntry.isDir) {
fs.listStatus(fsEntry.getPath).map(_.getModificationTime()).max
} else {
fsEntry.getModificationTime()
}
}

/** Returns the system's mononotically increasing time. */
private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)
private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000)

}

Expand All @@ -256,7 +336,7 @@ private object FsHistoryProvider {
}

private class FsApplicationHistoryInfo(
val logDir: String,
val logPath: String,
id: String,
name: String,
startTime: Long,
Expand Down
46 changes: 23 additions & 23 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy.master

import java.io.FileNotFoundException
import java.net.URLEncoder
import java.text.SimpleDateFormat
import java.util.Date
Expand All @@ -32,6 +33,7 @@ import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.Serialization
import akka.serialization.SerializationExtension
import org.apache.hadoop.fs.Path

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
Expand All @@ -56,6 +58,7 @@ private[spark] class Master(
import context.dispatcher // to use Akka's scheduler.schedule()

val conf = new SparkConf
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)

def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
Expand Down Expand Up @@ -510,7 +513,7 @@ private[spark] class Master(
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0

for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
Expand Down Expand Up @@ -707,41 +710,38 @@ private[spark] class Master(
def rebuildSparkUI(app: ApplicationInfo): Boolean = {
val appName = app.desc.name
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you still need this?

val eventLogDir = app.desc.eventLogDir.getOrElse {
val eventLogFile = app.desc.eventLogFile.getOrElse {
// Event logging is not enabled for this application
app.desc.appUiUrl = notFoundBasePath
return false
}

val appEventLogDir = EventLoggingListener.getLogDirPath(eventLogDir, app.id)
val fileSystem = Utils.getHadoopFileSystem(appEventLogDir,
SparkHadoopUtil.get.newConfiguration(conf))
val eventLogInfo = EventLoggingListener.parseLoggingInfo(appEventLogDir, fileSystem)
val eventLogPaths = eventLogInfo.logPaths
val compressionCodec = eventLogInfo.compressionCodec

if (eventLogPaths.isEmpty) {
// Event logging is enabled for this application, but no event logs are found
val title = s"Application history not found (${app.id})"
var msg = s"No event logs found for application $appName in $appEventLogDir."
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
return false
}

try {
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)
val (logInput, sparkVersion) = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin @andrewor14 can not see "Completed Applications" 's UI now. The eventLogFile is still a base dir path like "hdfs://****/1.3.0/", may be we still need EventLoggingListener.getLogPath(eventLogFile, app.id)?
*I think we should rename current eventLogFIle back to eventLogDir, and make a new var eventLogFIle by using getLogPath

no-ui-for-application

val replayBus = new ReplayListenerBus()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait... did you mean to pass in logInput?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because I changed that signature. The event stream is now passed in the replay() method.

val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
replayBus.replay()
try {
replayBus.replay(logInput, sparkVersion)
} finally {
logInput.close()
}
appIdToUI(app.id) = ui
webUi.attachSparkUI(ui)
// Application UI is successfully rebuilt, so link the Master UI to it
app.desc.appUiUrl = ui.getBasePath
app.desc.appUiUrl = ui.basePath
true
} catch {
case fnf: FileNotFoundException =>
// Event logging is enabled for this application, but no event logs are found
val title = s"Application history not found (${app.id})"
var msg = s"No event logs found for application $appName in $eventLogFile."
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be better if we run this in an if (file not exists) case instead of relying on a specific exception for this particular behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree. if (file exists) checks are racy, and entail more RPCs to the NN. And we're really interested in handling that particular exception, so I don't see any advantage in the explicit check.

case e: Exception =>
// Relay exception message to application UI page
val title = s"Application history load error (${app.id})"
Expand Down
Loading