@@ -300,11 +300,14 @@ private[spark] class ApplicationMaster(
300
300
val expiryInterval = yarnConf.getInt(YarnConfiguration .RM_AM_EXPIRY_INTERVAL_MS , 120000 )
301
301
302
302
// we want to be reasonably responsive without causing too many requests to RM.
303
- val schedulerInterval =
304
- sparkConf.getTimeAsMs(" spark.yarn.scheduler.heartbeat.interval-ms" , " 5s " )
303
+ val heartbeatInterval = math.max( 0 , math.min(expiryInterval / 2 ,
304
+ sparkConf.getTimeAsMs(" spark.yarn.scheduler.heartbeat.interval-ms" , " 3s " )) )
305
305
306
- // must be <= expiryInterval / 2.
307
- val interval = math.max(0 , math.min(expiryInterval / 2 , schedulerInterval))
306
+ // we want to check more frequently for pending containers
307
+ val initialAllocationInterval = math.min(heartbeatInterval,
308
+ sparkConf.getTimeAsMs(" spark.yarn.scheduler.initial-allocation.interval" , " 200ms" ))
309
+
310
+ var nextAllocationInterval = initialAllocationInterval
308
311
309
312
// The number of failures in a row until Reporter thread give up
310
313
val reporterMaxFailures = sparkConf.getInt(" spark.yarn.scheduler.reporterThread.maxFailures" , 5 )
@@ -330,15 +333,27 @@ private[spark] class ApplicationMaster(
330
333
if (! NonFatal (e) || failureCount >= reporterMaxFailures) {
331
334
finish(FinalApplicationStatus .FAILED ,
332
335
ApplicationMaster .EXIT_REPORTER_FAILURE , " Exception was thrown " +
333
- s " ${failureCount} time(s) from Reporter thread. " )
334
-
336
+ s " $failureCount time(s) from Reporter thread. " )
335
337
} else {
336
- logWarning(s " Reporter thread fails ${ failureCount} time(s) in a row. " , e)
338
+ logWarning(s " Reporter thread fails $failureCount time(s) in a row. " , e)
337
339
}
338
340
}
339
341
}
340
342
try {
341
- Thread .sleep(interval)
343
+ val numPendingAllocate = allocator.getNumPendingAllocate
344
+ val sleepInterval =
345
+ if (numPendingAllocate > 0 ) {
346
+ val currentAllocationInterval =
347
+ math.min(heartbeatInterval, nextAllocationInterval)
348
+ nextAllocationInterval *= 2
349
+ currentAllocationInterval
350
+ } else {
351
+ nextAllocationInterval = initialAllocationInterval
352
+ heartbeatInterval
353
+ }
354
+ logDebug(s " Number of pending allocations is $numPendingAllocate. " +
355
+ s " Sleeping for $sleepInterval. " )
356
+ Thread .sleep(sleepInterval)
342
357
} catch {
343
358
case e : InterruptedException =>
344
359
}
@@ -349,7 +364,8 @@ private[spark] class ApplicationMaster(
349
364
t.setDaemon(true )
350
365
t.setName(" Reporter" )
351
366
t.start()
352
- logInfo(" Started progress reporter thread - sleep time : " + interval)
367
+ logInfo(s " Started progress reporter thread with (heartbeat : $heartbeatInterval, " +
368
+ s " initial allocation : $initialAllocationInterval) intervals " )
353
369
t
354
370
}
355
371
0 commit comments