Skip to content

[SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats. #6082

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,22 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
</tr>
<tr>
<td><code>spark.yarn.scheduler.heartbeat.interval-ms</code></td>
<td>5000</td>
<td>3000</td>
<td>
The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager.
The value is capped at half the value of YARN's configuration for the expiry interval
(<code>yarn.am.liveness-monitor.expiry-interval-ms</code>).
</td>
</tr>
<tr>
<td><code>spark.yarn.scheduler.initial-allocation.interval</code></td>
<td>200ms</td>
<td>
The initial interval in which the Spark application master eagerly heartbeats to the YARN ResourceManager
when there are pending container allocation requests. It should be no larger than
<code>spark.yarn.scheduler.heartbeat.interval-ms</code>. The allocation interval will doubled on
successive eager heartbeats if pending containers still exist, until
<code>spark.yarn.scheduler.heartbeat.interval-ms</code> is reached.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might get to long (if it does I'm ok with leaving it out) but it would be nice to say it resets when no pending containers.

</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,14 @@ private[spark] class ApplicationMaster(
val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)

// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval =
sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "5s")
val heartbeatInterval = math.max(0, math.min(expiryInterval / 2,
sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))

// must be <= expiryInterval / 2.
val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
// we want to check more frequently for pending containers
val initialAllocationInterval = math.min(heartbeatInterval,
sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it never sleep for 200ms, always *2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for finding that! I've tested it with 50ms and did not notice it after I've refactored the debug log. About to fix it soon.


var nextAllocationInterval = initialAllocationInterval

// The number of failures in a row until Reporter thread give up
val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
Expand All @@ -330,15 +333,27 @@ private[spark] class ApplicationMaster(
if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"${failureCount} time(s) from Reporter thread.")

s"$failureCount time(s) from Reporter thread.")
} else {
logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e)
logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
}
}
}
try {
Thread.sleep(interval)
val numPendingAllocate = allocator.getNumPendingAllocate
val sleepInterval =
if (numPendingAllocate > 0) {
val currentAllocationInterval =
math.min(heartbeatInterval, nextAllocationInterval)
nextAllocationInterval *= 2
currentAllocationInterval
} else {
nextAllocationInterval = initialAllocationInterval
heartbeatInterval
}
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Sleeping for $sleepInterval.")
Thread.sleep(sleepInterval)
} catch {
case e: InterruptedException =>
}
Expand All @@ -349,7 +364,8 @@ private[spark] class ApplicationMaster(
t.setDaemon(true)
t.setName("Reporter")
t.start()
logInfo("Started progress reporter thread - sleep time : " + interval)
logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " +
s"initial allocation : $initialAllocationInterval) intervals")
t
}

Expand Down