Skip to content

Commit 6a69fb2

Browse files
tnachenjeanlyn
authored andcommitted
[SPARK-7216] [MESOS] Add driver details page to Mesos cluster UI.
Add a details page that displays Mesos driver in the Mesos cluster UI Author: Timothy Chen <[email protected]> Closes apache#5763 from tnachen/mesos_cluster_page and squashes the following commits: 55f36eb [Timothy Chen] Add driver details page to Mesos cluster UI.
1 parent b561c62 commit 6a69fb2

File tree

6 files changed

+222
-11
lines changed

6 files changed

+222
-11
lines changed
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.mesos.ui
19+
20+
import javax.servlet.http.HttpServletRequest
21+
22+
import scala.xml.Node
23+
24+
import org.apache.spark.deploy.Command
25+
import org.apache.spark.deploy.mesos.MesosDriverDescription
26+
import org.apache.spark.scheduler.cluster.mesos.{MesosClusterSubmissionState, MesosClusterRetryState}
27+
import org.apache.spark.ui.{UIUtils, WebUIPage}
28+
29+
30+
private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") {
31+
32+
override def render(request: HttpServletRequest): Seq[Node] = {
33+
val driverId = request.getParameter("id")
34+
require(driverId != null && driverId.nonEmpty, "Missing id parameter")
35+
36+
val state = parent.scheduler.getDriverState(driverId)
37+
if (state.isEmpty) {
38+
val content =
39+
<div>
40+
<p>Cannot find driver {driverId}</p>
41+
</div>
42+
return UIUtils.basicSparkPage(content, s"Details for Job $driverId")
43+
}
44+
val driverState = state.get
45+
val driverHeaders = Seq("Driver property", "Value")
46+
val schedulerHeaders = Seq("Scheduler property", "Value")
47+
val commandEnvHeaders = Seq("Command environment variable", "Value")
48+
val launchedHeaders = Seq("Launched property", "Value")
49+
val commandHeaders = Seq("Comamnd property", "Value")
50+
val retryHeaders = Seq("Last failed status", "Next retry time", "Retry count")
51+
val driverDescription = Iterable.apply(driverState.description)
52+
val submissionState = Iterable.apply(driverState.submissionState)
53+
val command = Iterable.apply(driverState.description.command)
54+
val schedulerProperties = Iterable.apply(driverState.description.schedulerProperties)
55+
val commandEnv = Iterable.apply(driverState.description.command.environment)
56+
val driverTable =
57+
UIUtils.listingTable(driverHeaders, driverRow, driverDescription)
58+
val commandTable =
59+
UIUtils.listingTable(commandHeaders, commandRow, command)
60+
val commandEnvTable =
61+
UIUtils.listingTable(commandEnvHeaders, propertiesRow, commandEnv)
62+
val schedulerTable =
63+
UIUtils.listingTable(schedulerHeaders, propertiesRow, schedulerProperties)
64+
val launchedTable =
65+
UIUtils.listingTable(launchedHeaders, launchedRow, submissionState)
66+
val retryTable =
67+
UIUtils.listingTable(
68+
retryHeaders, retryRow, Iterable.apply(driverState.description.retryState))
69+
val content =
70+
<p>Driver state information for driver id {driverId}</p>
71+
<a href="/">Back to Drivers</a>
72+
<div class="row-fluid">
73+
<div class="span12">
74+
<h4>Driver state: {driverState.state}</h4>
75+
<h4>Driver properties</h4>
76+
{driverTable}
77+
<h4>Driver command</h4>
78+
{commandTable}
79+
<h4>Driver command environment</h4>
80+
{commandEnvTable}
81+
<h4>Scheduler properties</h4>
82+
{schedulerTable}
83+
<h4>Launched state</h4>
84+
{launchedTable}
85+
<h4>Retry state</h4>
86+
{retryTable}
87+
</div>
88+
</div>;
89+
90+
UIUtils.basicSparkPage(content, s"Details for Job $driverId")
91+
}
92+
93+
private def launchedRow(submissionState: Option[MesosClusterSubmissionState]): Seq[Node] = {
94+
submissionState.map { state =>
95+
<tr>
96+
<td>Mesos Slave ID</td>
97+
<td>{state.slaveId.getValue}</td>
98+
</tr>
99+
<tr>
100+
<td>Mesos Task ID</td>
101+
<td>{state.taskId.getValue}</td>
102+
</tr>
103+
<tr>
104+
<td>Launch Time</td>
105+
<td>{state.startDate}</td>
106+
</tr>
107+
<tr>
108+
<td>Finish Time</td>
109+
<td>{state.finishDate.map(_.toString).getOrElse("")}</td>
110+
</tr>
111+
<tr>
112+
<td>Last Task Status</td>
113+
<td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td>
114+
</tr>
115+
}.getOrElse(Seq[Node]())
116+
}
117+
118+
private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = {
119+
properties.map { case (k, v) =>
120+
<tr>
121+
<td>{k}</td><td>{v}</td>
122+
</tr>
123+
}.toSeq
124+
}
125+
126+
private def commandRow(command: Command): Seq[Node] = {
127+
<tr>
128+
<td>Main class</td><td>{command.mainClass}</td>
129+
</tr>
130+
<tr>
131+
<td>Arguments</td><td>{command.arguments.mkString(" ")}</td>
132+
</tr>
133+
<tr>
134+
<td>Class path entries</td><td>{command.classPathEntries.mkString(" ")}</td>
135+
</tr>
136+
<tr>
137+
<td>Java options</td><td>{command.javaOpts.mkString((" "))}</td>
138+
</tr>
139+
<tr>
140+
<td>Library path entries</td><td>{command.libraryPathEntries.mkString((" "))}</td>
141+
</tr>
142+
}
143+
144+
private def driverRow(driver: MesosDriverDescription): Seq[Node] = {
145+
<tr>
146+
<td>Name</td><td>{driver.name}</td>
147+
</tr>
148+
<tr>
149+
<td>Id</td><td>{driver.submissionId}</td>
150+
</tr>
151+
<tr>
152+
<td>Cores</td><td>{driver.cores}</td>
153+
</tr>
154+
<tr>
155+
<td>Memory</td><td>{driver.mem}</td>
156+
</tr>
157+
<tr>
158+
<td>Submitted</td><td>{driver.submissionDate}</td>
159+
</tr>
160+
<tr>
161+
<td>Supervise</td><td>{driver.supervise}</td>
162+
</tr>
163+
}
164+
165+
private def retryRow(retryState: Option[MesosClusterRetryState]): Seq[Node] = {
166+
retryState.map { state =>
167+
<tr>
168+
<td>
169+
{state.lastFailureStatus}
170+
</td>
171+
<td>
172+
{state.nextRetry}
173+
</td>
174+
<td>
175+
{state.retries}
176+
</td>
177+
</tr>
178+
}.getOrElse(Seq[Node]())
179+
}
180+
}

core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,17 +56,19 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
5656
}
5757

5858
private def queuedRow(submission: MesosDriverDescription): Seq[Node] = {
59+
val id = submission.submissionId
5960
<tr>
60-
<td>{submission.submissionId}</td>
61+
<td><a href={s"driver?id=$id"}>{id}</a></td>
6162
<td>{submission.submissionDate}</td>
6263
<td>{submission.command.mainClass}</td>
6364
<td>cpus: {submission.cores}, mem: {submission.mem}</td>
6465
</tr>
6566
}
6667

6768
private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
69+
val id = state.driverDescription.submissionId
6870
<tr>
69-
<td>{state.driverDescription.submissionId}</td>
71+
<td><a href={s"driver?id=$id"}>{id}</a></td>
7072
<td>{state.driverDescription.submissionDate}</td>
7173
<td>{state.driverDescription.command.mainClass}</td>
7274
<td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>
@@ -77,8 +79,9 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
7779
}
7880

7981
private def retryRow(submission: MesosDriverDescription): Seq[Node] = {
82+
val id = submission.submissionId
8083
<tr>
81-
<td>{submission.submissionId}</td>
84+
<td><a href={s"driver?id=$id"}>{id}</a></td>
8285
<td>{submission.submissionDate}</td>
8386
<td>{submission.command.mainClass}</td>
8487
<td>{submission.retryState.get.lastFailureStatus}</td>

core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ private[spark] class MesosClusterUI(
3939

4040
override def initialize() {
4141
attachPage(new MesosClusterPage(this))
42+
attachPage(new DriverPage(this))
4243
attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, "/static"))
4344
}
4445
}

core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ private[spark] class MesosRestServer(
5353
new MesosStatusRequestServlet(scheduler, masterConf)
5454
}
5555

56-
private[deploy] class MesosSubmitRequestServlet(
56+
private[mesos] class MesosSubmitRequestServlet(
5757
scheduler: MesosClusterScheduler,
5858
conf: SparkConf)
5959
extends SubmitRequestServlet {
@@ -139,7 +139,7 @@ private[deploy] class MesosSubmitRequestServlet(
139139
}
140140
}
141141

142-
private[deploy] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
142+
private[mesos] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
143143
extends KillRequestServlet {
144144
protected override def handleKill(submissionId: String): KillSubmissionResponse = {
145145
val k = scheduler.killDriver(submissionId)
@@ -148,7 +148,7 @@ private[deploy] class MesosKillRequestServlet(scheduler: MesosClusterScheduler,
148148
}
149149
}
150150

151-
private[deploy] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
151+
private[mesos] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
152152
extends StatusRequestServlet {
153153
protected override def handleStatus(submissionId: String): SubmissionStatusResponse = {
154154
val d = scheduler.getDriverStatus(submissionId)

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,13 @@ private[spark] class MesosClusterSubmissionState(
5050
val taskId: TaskID,
5151
val slaveId: SlaveID,
5252
var mesosTaskStatus: Option[TaskStatus],
53-
var startDate: Date)
53+
var startDate: Date,
54+
var finishDate: Option[Date])
5455
extends Serializable {
5556

5657
def copy(): MesosClusterSubmissionState = {
5758
new MesosClusterSubmissionState(
58-
driverDescription, taskId, slaveId, mesosTaskStatus, startDate)
59+
driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate)
5960
}
6061
}
6162

@@ -95,6 +96,14 @@ private[spark] class MesosClusterSchedulerState(
9596
val finishedDrivers: Iterable[MesosClusterSubmissionState],
9697
val pendingRetryDrivers: Iterable[MesosDriverDescription])
9798

99+
/**
100+
* The full state of a Mesos driver, that is being used to display driver information on the UI.
101+
*/
102+
private[spark] class MesosDriverState(
103+
val state: String,
104+
val description: MesosDriverDescription,
105+
val submissionState: Option[MesosClusterSubmissionState] = None)
106+
98107
/**
99108
* A Mesos scheduler that is responsible for launching submitted Spark drivers in cluster mode
100109
* as Mesos tasks in a Mesos cluster.
@@ -233,6 +242,22 @@ private[spark] class MesosClusterScheduler(
233242
s
234243
}
235244

245+
/**
246+
* Gets the driver state to be displayed on the Web UI.
247+
*/
248+
def getDriverState(submissionId: String): Option[MesosDriverState] = {
249+
stateLock.synchronized {
250+
queuedDrivers.find(_.submissionId.equals(submissionId))
251+
.map(d => new MesosDriverState("QUEUED", d))
252+
.orElse(launchedDrivers.get(submissionId)
253+
.map(d => new MesosDriverState("RUNNING", d.driverDescription, Some(d))))
254+
.orElse(finishedDrivers.find(_.driverDescription.submissionId.equals(submissionId))
255+
.map(d => new MesosDriverState("FINISHED", d.driverDescription, Some(d))))
256+
.orElse(pendingRetryDrivers.find(_.submissionId.equals(submissionId))
257+
.map(d => new MesosDriverState("RETRYING", d)))
258+
}
259+
}
260+
236261
private def isQueueFull(): Boolean = launchedDrivers.size >= queuedCapacity
237262

238263
/**
@@ -439,7 +464,7 @@ private[spark] class MesosClusterScheduler(
439464
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
440465
submission.submissionId)
441466
val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId,
442-
None, new Date())
467+
None, new Date(), None)
443468
launchedDrivers(submission.submissionId) = newState
444469
launchedDriversState.persist(submission.submissionId, newState)
445470
afterLaunchCallback(submission.submissionId)
@@ -534,6 +559,7 @@ private[spark] class MesosClusterScheduler(
534559
// Check if the driver is supervise enabled and can be relaunched.
535560
if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
536561
removeFromLaunchedDrivers(taskId)
562+
state.finishDate = Some(new Date())
537563
val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
538564
val (retries, waitTimeSec) = retryState
539565
.map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) }
@@ -546,6 +572,7 @@ private[spark] class MesosClusterScheduler(
546572
pendingRetryDriversState.persist(taskId, newDriverDescription)
547573
} else if (TaskState.isFinished(TaskState.fromMesos(status.getState))) {
548574
removeFromLaunchedDrivers(taskId)
575+
state.finishDate = Some(new Date())
549576
if (finishedDrivers.size >= retainedDrivers) {
550577
val toRemove = math.max(retainedDrivers / 10, 1)
551578
finishedDrivers.trimStart(toRemove)

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.{ArrayList => JArrayList, Collections, List => JList}
2323
import scala.collection.JavaConversions._
2424
import scala.collection.mutable.{HashMap, HashSet}
2525

26-
import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
26+
import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
2727
import org.apache.mesos.protobuf.ByteString
2828
import org.apache.mesos.{Scheduler => MScheduler, _}
2929
import org.apache.spark.executor.MesosExecutorBackend
@@ -56,7 +56,7 @@ private[spark] class MesosSchedulerBackend(
5656

5757
// The listener bus to publish executor added/removed events.
5858
val listenerBus = sc.listenerBus
59-
59+
6060
private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)
6161

6262
@volatile var appId: String = _

0 commit comments

Comments
 (0)