Skip to content

Commit e0f33f7

Browse files
committed
Add supervise support and persist retries.
1 parent 371ce65 commit e0f33f7

File tree

8 files changed

+223
-94
lines changed

8 files changed

+223
-94
lines changed

core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.json4s._
3232
import org.json4s.jackson.JsonMethods
3333

3434
import org.apache.spark.{Logging, SparkConf, SparkContext}
35-
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
35+
import org.apache.spark.deploy.master.{RecoveryState}
3636
import org.apache.spark.util.Utils
3737

3838
/**

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -381,11 +381,13 @@ object SparkSubmit {
381381

382382
// Standalone cluster only
383383
// Do not set CL arguments here because there are multiple possibilities for the main class
384-
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
384+
OptionAssigner(args.jars, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars"),
385385
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
386-
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"),
387-
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, sysProp = "spark.driver.cores"),
388-
OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER,
386+
OptionAssigner(args.driverMemory, STANDALONE | MESOS, CLUSTER,
387+
sysProp = "spark.driver.memory"),
388+
OptionAssigner(args.driverCores, STANDALONE | MESOS, CLUSTER,
389+
sysProp = "spark.driver.cores"),
390+
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
389391
sysProp = "spark.driver.supervise"),
390392

391393
// Yarn client only

core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverOutputPage.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.deploy.mesos.ui
2020
import org.apache.spark.ui.{UIUtils, WebUIPage}
2121
import javax.servlet.http.HttpServletRequest
2222
import scala.xml.Node
23-
import org.apache.spark.scheduler.cluster.mesos.{ClusterTaskState, DriverSubmission}
23+
import org.apache.spark.scheduler.cluster.mesos.{RetryState, ClusterTaskState, DriverSubmission}
2424
import org.apache.mesos.Protos.TaskStatus
2525

2626
class DriverOutputPage(parent: MesosClusterUI) extends WebUIPage("") {
@@ -31,10 +31,13 @@ class DriverOutputPage(parent: MesosClusterUI) extends WebUIPage("") {
3131
val queuedHeaders = Seq("DriverID", "Submit Date", "Description")
3232
val driverHeaders = queuedHeaders ++
3333
Seq("Start Date", "Mesos Slave ID", "State")
34+
val retryHeaders = Seq("DriverID", "Submit Date", "Description") ++
35+
Seq("Last Failed Status", "Next Retry Time", "Attempt Count")
3436

3537
val queuedTable = UIUtils.listingTable(queuedHeaders, queuedRow, state.queuedDrivers)
3638
val launchedTable = UIUtils.listingTable(driverHeaders, driverRow, state.launchedDrivers)
3739
val finishedTable = UIUtils.listingTable(driverHeaders, driverRow, state.finishedDrivers)
40+
val retryTable = UIUtils.listingTable(retryHeaders, retryRow, state.retryList)
3841
val content =
3942
<p>Mesos Framework ID: {state.appId}</p>
4043
<div class="row-fluid">
@@ -45,6 +48,8 @@ class DriverOutputPage(parent: MesosClusterUI) extends WebUIPage("") {
4548
{launchedTable}
4649
<h4>Finished Drivers:</h4>
4750
{finishedTable}
51+
<h4>Supervise drivers waiting for retry:</h4>
52+
{retryTable}
4853
</div>
4954
</div>;
5055
UIUtils.basicSparkPage(content, "Spark Drivers for Mesos cluster")
@@ -69,6 +74,17 @@ class DriverOutputPage(parent: MesosClusterUI) extends WebUIPage("") {
6974
</tr>
7075
}
7176

77+
def retryRow(state: RetryState): Seq[Node] = {
78+
<tr>
79+
<td>{state.submission.submissionId}</td>
80+
<td>{state.submission.submitDate}</td>
81+
<td>{state.submission.desc.desc.command.mainClass}</td>
82+
<td>{state.lastFailureStatus}</td>
83+
<td>{state.nextRetry}</td>
84+
<td>{state.retries}</td>
85+
</tr>
86+
}
87+
7288
def stateString(status: Option[TaskStatus]): String = {
7389
if (status.isEmpty) {
7490
return ""

core/src/main/scala/org/apache/spark/deploy/rest/MesosRestServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ class MesosKillRequestServlet(scheduler: ClusterScheduler, conf: SparkConf)
167167
val response = scheduler.killDriver(submissionId)
168168
val k = new KillSubmissionResponse
169169
k.serverSparkVersion = sparkVersion
170-
k.message = response.message.orNull
170+
k.message = response.message
171171
k.submissionId = submissionId
172172
k.success = response.success
173173
k

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/DriverQueue.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.scheduler.cluster.mesos
1919

2020
import scala.collection.mutable
21-
import javax.annotation.concurrent.NotThreadSafe
2221

2322
/**
2423
* A request queue for launching drivers in Mesos cluster mode.
@@ -27,8 +26,7 @@ import javax.annotation.concurrent.NotThreadSafe
2726
* This queue is also bounded and rejects offers when it's full.
2827
* @param state Mesos state abstraction to fetch persistent state.
2928
*/
30-
@NotThreadSafe
31-
class DriverQueue(state: ClusterPersistenceEngine, capacity: Int) {
29+
private[mesos] class DriverQueue(state: ClusterPersistenceEngine, capacity: Int) {
3230
var queue: mutable.Queue[DriverSubmission] = new mutable.Queue[DriverSubmission]()
3331
private var count = 0
3432

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/LaunchedDrivers.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
package org.apache.spark.scheduler.cluster.mesos
1919

2020
import scala.collection.mutable
21-
import org.apache.mesos.Protos.{TaskID, SlaveID}
21+
import org.apache.mesos.Protos.SlaveID
2222

23-
class LaunchedDrivers(state: ClusterPersistenceEngine) {
23+
/**
24+
* Tracks all the launched or running drivers in the Mesos cluster scheduler.
25+
* @param state Persistence engine to store state.
26+
*/
27+
private[mesos] class LaunchedDrivers(state: ClusterPersistenceEngine) {
2428
private val drivers = new mutable.HashMap[String, ClusterTaskState]
2529

2630
// Holds the list of tasks that needs to reconciliation from the master.

0 commit comments

Comments
 (0)