@@ -34,7 +34,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
34
34
35
35
import org .apache .spark .{Logging , SparkConf , SparkContext , SparkException , TestUtils }
36
36
import org .apache .spark .scheduler .cluster .ExecutorInfo
37
- import org .apache .spark .scheduler .{SparkListenerApplicationStart , SparkListener ,
37
+ import org .apache .spark .scheduler .{SparkListener , SparkListenerApplicationStart ,
38
38
SparkListenerExecutorAdded }
39
39
import org .apache .spark .util .Utils
40
40
@@ -293,6 +293,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
293
293
private [spark] class SaveExecutorInfo extends SparkListener {
294
294
val addedExecutorInfos = mutable.Map [String , ExecutorInfo ]()
295
295
var driverLogs : Option [collection.Map [String , String ]] = None
296
+
296
297
override def onExecutorAdded (executor : SparkListenerExecutorAdded ) {
297
298
addedExecutorInfos(executor.executorId) = executor.executorInfo
298
299
}
@@ -327,34 +328,35 @@ private object YarnClusterDriver extends Logging with Matchers {
327
328
val data = sc.parallelize(1 to 4 , 4 ).collect().toSet
328
329
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS ))
329
330
data should be (Set (1 , 2 , 3 , 4 ))
330
-
331
- // verify log urls are present
332
- val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo ]
333
- assert(listeners.size === 1 )
334
- val listener = listeners(0 )
335
- val executorInfos = listener.addedExecutorInfos.values
336
- assert(executorInfos.nonEmpty)
337
- executorInfos.foreach { info =>
338
- assert(info.logUrlMap.nonEmpty)
339
- }
340
-
341
- // YARN does some weird redirects after the app is done, so check before it is complete.
342
- if (conf.get(" spark.master" ) == " yarn-cluster" ) {
343
- val driverLogs = listener.driverLogs.get
344
- assert(driverLogs.size === 2 )
345
- assert(driverLogs.containsKey(" stderr" ))
346
- assert(driverLogs.containsKey(" stdout" ))
347
- val stderr = driverLogs(" stderr" ) // YARN puts everything in stderr.
348
- val lines = Source .fromURL(stderr).getLines()
349
- // Look for a line that contains YarnClusterSchedulerBackend, since that is guaranteed in
350
- // cluster mode.
351
- assert(lines.exists(_.contains(" YarnClusterSchedulerBackend" )))
352
- }
353
331
result = " success"
354
332
} finally {
355
333
sc.stop()
356
334
Files .write(result, status, UTF_8 )
357
335
}
336
+
337
+ // verify log urls are present
338
+ val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo ]
339
+ assert(listeners.size === 1 )
340
+ val listener = listeners(0 )
341
+ val executorInfos = listener.addedExecutorInfos.values
342
+ assert(executorInfos.nonEmpty)
343
+ executorInfos.foreach { info =>
344
+ assert(info.logUrlMap.nonEmpty)
345
+ }
346
+
347
+ // If we are running in yarn-cluster mode, verify that driver logs are downloadable.
348
+ if (conf.get(" spark.master" ) == " yarn-cluster" ) {
349
+ assert(listener.driverLogs.nonEmpty)
350
+ val driverLogs = listener.driverLogs.get
351
+ assert(driverLogs.size === 2 )
352
+ assert(driverLogs.containsKey(" stderr" ))
353
+ assert(driverLogs.containsKey(" stdout" ))
354
+ val stderr = driverLogs(" stderr" ) // YARN puts everything in stderr.
355
+ val lines = Source .fromURL(stderr).getLines()
356
+ // Look for a line that contains YarnClusterSchedulerBackend, since that is guaranteed in
357
+ // cluster mode.
358
+ assert(lines.exists(_.contains(" YarnClusterSchedulerBackend" )))
359
+ }
358
360
}
359
361
360
362
}
0 commit comments