Skip to content

[SPARK-2165] spark on yarn: add support for setting maxAppAttempts in the ApplicationSubmissionContext #1279

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
set("spark.home", home)
}

/**
* Set the max number of submission retries the Spark client will attempt
* before giving up
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't been adding specific routines to set the configs. The user can just set it using the existing SparkConf.set routines so I think we should remove this.

def setMaxAppAttempts(max: Int): SparkConf = {
set("spark.maxappattempts", max.toString())
}

/** Set multiple parameters together */
def setAll(settings: Traversable[(String, String)]) = {
this.settings ++= settings
Expand Down Expand Up @@ -167,6 +175,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
getOption(key).map(_.toInt).getOrElse(defaultValue)
}

def getIntOption(key: String): Option[Int] = getOption(key).map(_.toInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep things consistent (these api's are public) I don't think we should add the getIntOption without adding other routines like getLongOption, etc. For now can you just use getOption and then make it an Int.


/** Get a parameter as a long, falling back to a default if not set */
def getLong(key: String, defaultValue: Long): Long = {
getOption(key).map(_.toLong).getOrElse(defaultValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
appContext.setApplicationId(appId)
appContext.setApplicationName(args.appName)
sparkConf.getIntOption("spark.maxappattempts") match {
case Some(v) => appContext.setMaxAppAttempts(v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hadoop 0.23 (yarn alpha) doesn't have a setMaxAppAttempts routine. Just remove this and only do it in the yarn stable version.

case None => logDebug("Not setting max app attempts.")
}
appContext
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(amContainer)
appContext.setApplicationType("SPARK")
sparkConf.getIntOption("spark.maxappattempts") match {
case Some(v) => appContext.setMaxAppAttempts(v)
case None => logDebug("Not setting max app attempts.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add something like "cluster default setting will be used" to the log statement?

}

// Memory for the ApplicationMaster.
val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
Expand Down