18
18
package org .apache .spark .deploy .history
19
19
20
20
import java .io .{IOException , BufferedInputStream , FileNotFoundException , InputStream }
21
- import java .util .concurrent .{Executors , TimeUnit }
21
+ import java .util .concurrent .{ExecutorService , Executors , TimeUnit }
22
22
23
23
import scala .collection .mutable
24
24
import scala .concurrent .duration .Duration
25
25
26
26
import com .google .common .util .concurrent .ThreadFactoryBuilder
27
27
28
- import org . apache . hadoop . fs .{ FileStatus , Path }
28
+ import com . google . common . util . concurrent . MoreExecutors
29
29
import org .apache .hadoop .fs .permission .AccessControlException
30
-
31
- import org .apache .spark .{Logging , SecurityManager , SparkConf }
30
+ import org .apache .hadoop .fs .{FileStatus , Path }
32
31
import org .apache .spark .deploy .SparkHadoopUtil
33
32
import org .apache .spark .io .CompressionCodec
34
33
import org .apache .spark .scheduler ._
35
34
import org .apache .spark .ui .SparkUI
36
35
import org .apache .spark .util .Utils
36
+ import org .apache .spark .{Logging , SecurityManager , SparkConf }
37
+
37
38
38
39
/**
39
40
* A class that provides application history from event logs stored in the file system.
@@ -98,6 +99,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
98
99
}
99
100
}
100
101
102
+ /**
103
+ * An Executor to fetch and parse log files.
104
+ */
105
+ private val replayExecutor : ExecutorService = {
106
+ if (! conf.contains(" spark.testing" )) {
107
+ Executors .newSingleThreadExecutor(Utils .namedThreadFactory(" log-replay-executor" ))
108
+ } else {
109
+ MoreExecutors .sameThreadExecutor()
110
+ }
111
+ }
112
+
101
113
initialize()
102
114
103
115
private def initialize (): Unit = {
@@ -171,10 +183,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
171
183
*/
172
184
private [history] def checkForLogs (): Unit = {
173
185
try {
174
- var newLastModifiedTime = lastModifiedTime
175
186
val statusList = Option (fs.listStatus(new Path (logDir))).map(_.toSeq)
176
187
.getOrElse(Seq [FileStatus ]())
177
- val logInfos = statusList
188
+ var newLastModifiedTime = lastModifiedTime
189
+ val logInfos : Seq [FileStatus ] = statusList
178
190
.filter { entry =>
179
191
try {
180
192
getModificationTime(entry).map { time =>
@@ -189,48 +201,69 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
189
201
false
190
202
}
191
203
}
192
- .flatMap { entry =>
193
- try {
194
- Some (replay(entry, new ReplayListenerBus ()))
195
- } catch {
196
- case e : Exception =>
197
- logError(s " Failed to load application log data from $entry. " , e)
198
- None
199
- }
200
- }
201
- .sortWith(compareAppInfo)
204
+ .flatMap { entry => Some (entry) }
205
+ .sortWith { case (entry1, entry2) =>
206
+ val mod1 = getModificationTime(entry1).getOrElse(- 1L )
207
+ val mod2 = getModificationTime(entry2).getOrElse(- 1L )
208
+ mod1 >= mod2
209
+ }
210
+
211
+ logInfos.sliding(20 , 20 ).foreach { batch =>
212
+ replayExecutor.submit(new Runnable {
213
+ override def run (): Unit = mergeApplicationListing(batch)
214
+ })
215
+ }
202
216
203
217
lastModifiedTime = newLastModifiedTime
218
+ } catch {
219
+ case e : Exception => logError(" Exception in checking for event log updates" , e)
220
+ }
221
+ }
204
222
205
- // When there are new logs, merge the new list with the existing one, maintaining
206
- // the expected ordering (descending end time). Maintaining the order is important
207
- // to avoid having to sort the list every time there is a request for the log list.
208
- if (! logInfos.isEmpty) {
209
- val newApps = new mutable.LinkedHashMap [String , FsApplicationHistoryInfo ]()
210
- def addIfAbsent (info : FsApplicationHistoryInfo ) = {
211
- if (! newApps.contains(info.id) ||
212
- newApps(info.id).logPath.endsWith(EventLoggingListener .IN_PROGRESS ) &&
213
- ! info.logPath.endsWith(EventLoggingListener .IN_PROGRESS )) {
214
- newApps += (info.id -> info)
215
- }
223
+ /**
224
+ * Replay the log files in the list and merge the list of old applications with new ones
225
+ */
226
+ private def mergeApplicationListing (logs : Seq [FileStatus ]): Unit = {
227
+ val bus = new ReplayListenerBus ()
228
+ val newApps = logs.flatMap { fileStatus =>
229
+ try {
230
+ val res = replay(fileStatus, bus)
231
+ logInfo(s " Application log ${res.logPath} loaded successfully. " )
232
+ Some (res)
233
+ } catch {
234
+ case e : Exception =>
235
+ logError(
236
+ s " Exception encountered when attempting to load application log ${fileStatus.getPath}" )
237
+ None
238
+ }
239
+ }.toSeq.sortWith(compareAppInfo)
240
+
241
+ // When there are new logs, merge the new list with the existing one, maintaining
242
+ // the expected ordering (descending end time). Maintaining the order is important
243
+ // to avoid having to sort the list every time there is a request for the log list.
244
+ if (newApps.nonEmpty) {
245
+ val mergedApps = new mutable.LinkedHashMap [String , FsApplicationHistoryInfo ]()
246
+ def addIfAbsent (info : FsApplicationHistoryInfo ): Unit = {
247
+ if (! mergedApps.contains(info.id) ||
248
+ mergedApps(info.id).logPath.endsWith(EventLoggingListener .IN_PROGRESS ) &&
249
+ ! info.logPath.endsWith(EventLoggingListener .IN_PROGRESS )) {
250
+ mergedApps += (info.id -> info)
216
251
}
252
+ }
217
253
218
- val newIterator = logInfos.iterator.buffered
219
- val oldIterator = applications.values.iterator.buffered
220
- while (newIterator.hasNext && oldIterator.hasNext) {
221
- if (compareAppInfo(newIterator.head, oldIterator.head)) {
222
- addIfAbsent(newIterator.next)
223
- } else {
224
- addIfAbsent(oldIterator.next)
225
- }
254
+ val newIterator = newApps.iterator.buffered
255
+ val oldIterator = applications.values.iterator.buffered
256
+ while (newIterator.hasNext && oldIterator.hasNext) {
257
+ if (compareAppInfo(newIterator.head, oldIterator.head)) {
258
+ addIfAbsent(newIterator.next())
259
+ } else {
260
+ addIfAbsent(oldIterator.next())
226
261
}
227
- newIterator.foreach(addIfAbsent)
228
- oldIterator.foreach(addIfAbsent)
229
-
230
- applications = newApps
231
262
}
232
- } catch {
233
- case e : Exception => logError(" Exception in checking for event log updates" , e)
263
+ newIterator.foreach(addIfAbsent)
264
+ oldIterator.foreach(addIfAbsent)
265
+
266
+ applications = mergedApps
234
267
}
235
268
}
236
269
0 commit comments