Skip to content

Commit e4c1982

Browse files
committed
2 parents 921e914 + 7ff8c45 commit e4c1982

File tree

122 files changed

+2454
-1818
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

122 files changed

+2454
-1818
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ Spark is a fast and general cluster computing system for Big Data. It provides
44
high-level APIs in Scala, Java, and Python, and an optimized engine that
55
supports general computation graphs for data analysis. It also supports a
66
rich set of higher-level tools including Spark SQL for SQL and structured
7-
data processing, MLLib for machine learning, GraphX for graph processing,
8-
and Spark Streaming.
7+
data processing, MLlib for machine learning, GraphX for graph processing,
8+
and Spark Streaming for stream processing.
99

1010
<http://spark.apache.org/>
1111

bin/pyspark

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ export PYSPARK_SUBMIT_ARGS
8585

8686
# For pyspark tests
8787
if [[ -n "$SPARK_TESTING" ]]; then
88+
unset YARN_CONF_DIR
89+
unset HADOOP_CONF_DIR
8890
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
8991
exec "$PYSPARK_PYTHON" -m doctest $1
9092
else

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1261,7 +1261,10 @@ class SparkContext(config: SparkConf) extends Logging {
12611261

12621262
/** Post the application start event */
12631263
private def postApplicationStart() {
1264-
listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
1264+
// Note: this code assumes that the task scheduler has been initialized and has contacted
1265+
// the cluster manager to get an application ID (in case the cluster manager provides one).
1266+
listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
1267+
startTime, sparkUser))
12651268
}
12661269

12671270
/** Post the application end event */
@@ -1294,7 +1297,7 @@ class SparkContext(config: SparkConf) extends Logging {
12941297
*/
12951298
object SparkContext extends Logging {
12961299

1297-
private[spark] val SPARK_VERSION = "1.0.0"
1300+
private[spark] val SPARK_VERSION = "1.2.0-SNAPSHOT"
12981301

12991302
private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
13001303

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ object SparkEnv extends Logging {
225225

226226
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
227227
"BlockManagerMaster",
228-
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
228+
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)
229229

230230
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
231231
serializer, conf, securityManager, mapOutputTracker, shuffleManager)

core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ private[spark] abstract class ApplicationHistoryProvider {
3434
*
3535
* @return List of all know applications.
3636
*/
37-
def getListing(): Seq[ApplicationHistoryInfo]
37+
def getListing(): Iterable[ApplicationHistoryInfo]
3838

3939
/**
4040
* Returns the Spark UI for a specific application.
4141
*
4242
* @param appId The application ID.
43-
* @return The application's UI, or null if application is not found.
43+
* @return The application's UI, or None if application is not found.
4444
*/
45-
def getAppUI(appId: String): SparkUI
45+
def getAppUI(appId: String): Option[SparkUI]
4646

4747
/**
4848
* Called when the server is shutting down.

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 105 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import org.apache.spark.util.Utils
3232
private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
3333
with Logging {
3434

35+
private val NOT_STARTED = "<Not Started>"
36+
3537
// Interval between each check for event log updates
3638
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
3739
conf.getInt("spark.history.updateInterval", 10)) * 1000
@@ -47,8 +49,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
4749
// A timestamp of when the disk was last accessed to check for log updates
4850
private var lastLogCheckTimeMs = -1L
4951

50-
// List of applications, in order from newest to oldest.
51-
@volatile private var appList: Seq[ApplicationHistoryInfo] = Nil
52+
// The modification time of the newest log detected during the last scan. This is used
53+
// to ignore logs that are older during subsequent scans, to avoid processing data that
54+
// is already known.
55+
private var lastModifiedTime = -1L
56+
57+
// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
58+
// into the map in order, so the LinkedHashMap maintains the correct ordering.
59+
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
60+
= new mutable.LinkedHashMap()
5261

5362
/**
5463
* A background thread that periodically checks for event log updates on disk.
@@ -93,15 +102,35 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
93102
logCheckingThread.start()
94103
}
95104

96-
override def getListing() = appList
105+
override def getListing() = applications.values
97106

98-
override def getAppUI(appId: String): SparkUI = {
107+
override def getAppUI(appId: String): Option[SparkUI] = {
99108
try {
100-
val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId))
101-
val (_, ui) = loadAppInfo(appLogDir, renderUI = true)
102-
ui
109+
applications.get(appId).map { info =>
110+
val (replayBus, appListener) = createReplayBus(fs.getFileStatus(
111+
new Path(logDir, info.logDir)))
112+
val ui = {
113+
val conf = this.conf.clone()
114+
val appSecManager = new SecurityManager(conf)
115+
new SparkUI(conf, appSecManager, replayBus, appId,
116+
s"${HistoryServer.UI_PATH_PREFIX}/$appId")
117+
// Do not call ui.bind() to avoid creating a new server for each application
118+
}
119+
120+
replayBus.replay()
121+
122+
ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)")
123+
124+
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
125+
ui.getSecurityManager.setAcls(uiAclsEnabled)
126+
// make sure to set admin acls before view acls so they are properly picked up
127+
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
128+
ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED),
129+
appListener.viewAcls.getOrElse(""))
130+
ui
131+
}
103132
} catch {
104-
case e: FileNotFoundException => null
133+
case e: FileNotFoundException => None
105134
}
106135
}
107136

@@ -119,84 +148,79 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
119148
try {
120149
val logStatus = fs.listStatus(new Path(resolvedLogDir))
121150
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
122-
val logInfos = logDirs.filter { dir =>
123-
fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE))
124-
}
125151

126-
val currentApps = Map[String, ApplicationHistoryInfo](
127-
appList.map(app => app.id -> app):_*)
128-
129-
// For any application that either (i) is not listed or (ii) has changed since the last time
130-
// the listing was created (defined by the log dir's modification time), load the app's info.
131-
// Otherwise just reuse what's already in memory.
132-
val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size)
133-
for (dir <- logInfos) {
134-
val curr = currentApps.getOrElse(dir.getPath().getName(), null)
135-
if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
152+
// Load all new logs from the log directory. Only directories that have a modification time
153+
// later than the last known log directory will be loaded.
154+
var newLastModifiedTime = lastModifiedTime
155+
val logInfos = logDirs
156+
.filter { dir =>
157+
if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) {
158+
val modTime = getModificationTime(dir)
159+
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
160+
modTime > lastModifiedTime
161+
} else {
162+
false
163+
}
164+
}
165+
.flatMap { dir =>
136166
try {
137-
val (app, _) = loadAppInfo(dir, renderUI = false)
138-
newApps += app
167+
val (replayBus, appListener) = createReplayBus(dir)
168+
replayBus.replay()
169+
Some(new FsApplicationHistoryInfo(
170+
dir.getPath().getName(),
171+
appListener.appId.getOrElse(dir.getPath().getName()),
172+
appListener.appName.getOrElse(NOT_STARTED),
173+
appListener.startTime.getOrElse(-1L),
174+
appListener.endTime.getOrElse(-1L),
175+
getModificationTime(dir),
176+
appListener.sparkUser.getOrElse(NOT_STARTED)))
139177
} catch {
140-
case e: Exception => logError(s"Failed to load app info from directory $dir.")
178+
case e: Exception =>
179+
logInfo(s"Failed to load application log data from $dir.", e)
180+
None
181+
}
182+
}
183+
.sortBy { info => -info.endTime }
184+
185+
lastModifiedTime = newLastModifiedTime
186+
187+
// When there are new logs, merge the new list with the existing one, maintaining
188+
// the expected ordering (descending end time). Maintaining the order is important
189+
// to avoid having to sort the list every time there is a request for the log list.
190+
if (!logInfos.isEmpty) {
191+
val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
192+
def addIfAbsent(info: FsApplicationHistoryInfo) = {
193+
if (!newApps.contains(info.id)) {
194+
newApps += (info.id -> info)
141195
}
142-
} else {
143-
newApps += curr
144196
}
145-
}
146197

147-
appList = newApps.sortBy { info => -info.endTime }
198+
val newIterator = logInfos.iterator.buffered
199+
val oldIterator = applications.values.iterator.buffered
200+
while (newIterator.hasNext && oldIterator.hasNext) {
201+
if (newIterator.head.endTime > oldIterator.head.endTime) {
202+
addIfAbsent(newIterator.next)
203+
} else {
204+
addIfAbsent(oldIterator.next)
205+
}
206+
}
207+
newIterator.foreach(addIfAbsent)
208+
oldIterator.foreach(addIfAbsent)
209+
210+
applications = newApps
211+
}
148212
} catch {
149213
case t: Throwable => logError("Exception in checking for event log updates", t)
150214
}
151215
}
152216

153-
/**
154-
* Parse the application's logs to find out the information we need to build the
155-
* listing page.
156-
*
157-
* When creating the listing of available apps, there is no need to load the whole UI for the
158-
* application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user
159-
* clicks on a specific application.
160-
*
161-
* @param logDir Directory with application's log files.
162-
* @param renderUI Whether to create the SparkUI for the application.
163-
* @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
164-
*/
165-
private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
166-
val path = logDir.getPath
167-
val appId = path.getName
217+
private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = {
218+
val path = logDir.getPath()
168219
val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
169220
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
170221
val appListener = new ApplicationEventListener
171222
replayBus.addListener(appListener)
172-
173-
val ui: SparkUI = if (renderUI) {
174-
val conf = this.conf.clone()
175-
val appSecManager = new SecurityManager(conf)
176-
new SparkUI(conf, appSecManager, replayBus, appId,
177-
HistoryServer.UI_PATH_PREFIX + s"/$appId")
178-
// Do not call ui.bind() to avoid creating a new server for each application
179-
} else {
180-
null
181-
}
182-
183-
replayBus.replay()
184-
val appInfo = ApplicationHistoryInfo(
185-
appId,
186-
appListener.appName,
187-
appListener.startTime,
188-
appListener.endTime,
189-
getModificationTime(logDir),
190-
appListener.sparkUser)
191-
192-
if (ui != null) {
193-
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
194-
ui.getSecurityManager.setAcls(uiAclsEnabled)
195-
// make sure to set admin acls before view acls so properly picked up
196-
ui.getSecurityManager.setAdminAcls(appListener.adminAcls)
197-
ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
198-
}
199-
(appInfo, ui)
223+
(replayBus, appListener)
200224
}
201225

202226
/** Return when this directory was last modified. */
@@ -219,3 +243,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
219243
private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)
220244

221245
}
246+
247+
private class FsApplicationHistoryInfo(
248+
val logDir: String,
249+
id: String,
250+
name: String,
251+
startTime: Long,
252+
endTime: Long,
253+
lastUpdated: Long,
254+
sparkUser: String)
255+
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser)

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,7 @@ class HistoryServer(
5252

5353
private val appLoader = new CacheLoader[String, SparkUI] {
5454
override def load(key: String): SparkUI = {
55-
val ui = provider.getAppUI(key)
56-
if (ui == null) {
57-
throw new NoSuchElementException()
58-
}
55+
val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException())
5956
attachSparkUI(ui)
6057
ui
6158
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ private[spark] class Executor(
123123
env.metricsSystem.report()
124124
isStopped = true
125125
threadPool.shutdown()
126+
if (!isLocal) {
127+
env.stop()
128+
}
126129
}
127130

128131
class TaskRunner(

core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,38 +24,31 @@ package org.apache.spark.scheduler
2424
* from multiple applications are seen, the behavior is unspecified.
2525
*/
2626
private[spark] class ApplicationEventListener extends SparkListener {
27-
var appName = "<Not Started>"
28-
var sparkUser = "<Not Started>"
29-
var startTime = -1L
30-
var endTime = -1L
31-
var viewAcls = ""
32-
var adminAcls = ""
33-
34-
def applicationStarted = startTime != -1
35-
36-
def applicationCompleted = endTime != -1
37-
38-
def applicationDuration: Long = {
39-
val difference = endTime - startTime
40-
if (applicationStarted && applicationCompleted && difference > 0) difference else -1L
41-
}
27+
var appName: Option[String] = None
28+
var appId: Option[String] = None
29+
var sparkUser: Option[String] = None
30+
var startTime: Option[Long] = None
31+
var endTime: Option[Long] = None
32+
var viewAcls: Option[String] = None
33+
var adminAcls: Option[String] = None
4234

4335
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
44-
appName = applicationStart.appName
45-
startTime = applicationStart.time
46-
sparkUser = applicationStart.sparkUser
36+
appName = Some(applicationStart.appName)
37+
appId = applicationStart.appId
38+
startTime = Some(applicationStart.time)
39+
sparkUser = Some(applicationStart.sparkUser)
4740
}
4841

4942
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
50-
endTime = applicationEnd.time
43+
endTime = Some(applicationEnd.time)
5144
}
5245

5346
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
5447
synchronized {
5548
val environmentDetails = environmentUpdate.environmentDetails
5649
val allProperties = environmentDetails("Spark Properties").toMap
57-
viewAcls = allProperties.getOrElse("spark.ui.view.acls", "")
58-
adminAcls = allProperties.getOrElse("spark.admin.acls", "")
50+
viewAcls = allProperties.get("spark.ui.view.acls")
51+
adminAcls = allProperties.get("spark.admin.acls")
5952
}
6053
}
6154
}

core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,12 @@ private[spark] trait SchedulerBackend {
3131
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
3232
throw new UnsupportedOperationException
3333
def isReady(): Boolean = true
34+
35+
/**
36+
* The application ID associated with the job, if any.
37+
*
38+
* @return The application ID, or None if the backend does not provide an ID.
39+
*/
40+
def applicationId(): Option[String] = None
41+
3442
}

0 commit comments

Comments
 (0)