Skip to content

Commit 2c55783

Browse files
Sundeep Narravulapwendell
authored andcommitted
SPARK-1202 - Add a "cancel" button in the UI for stages
Author: Sundeep Narravula <[email protected]> Author: Sundeep Narravula <[email protected]> Closes #246 from sundeepn/uikilljob and squashes the following commits: 5fdd0e2 [Sundeep Narravula] Fix test string f6fdff1 [Sundeep Narravula] Format fix; reduced line size to less than 100 chars d1daeb9 [Sundeep Narravula] Incorporating review comments. 8d97923 [Sundeep Narravula] Ability to kill jobs thru the UI. This behavior can be turned on be settings the following variable: spark.ui.killEnabled=true (default=false) Adding DAGScheduler event StageCancelled and corresponding handlers. Added cancellation reason to handlers.
1 parent f99401a commit 2c55783

File tree

10 files changed

+87
-12
lines changed

10 files changed

+87
-12
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,6 +1138,16 @@ class SparkContext(config: SparkConf) extends Logging {
11381138
dagScheduler.cancelAllJobs()
11391139
}
11401140

1141+
/** Cancel a given job if it's scheduled or running */
1142+
private[spark] def cancelJob(jobId: Int) {
1143+
dagScheduler.cancelJob(jobId)
1144+
}
1145+
1146+
/** Cancel a given stage and all jobs associated with it */
1147+
private[spark] def cancelStage(stageId: Int) {
1148+
dagScheduler.cancelStage(stageId)
1149+
}
1150+
11411151
/**
11421152
* Clean a closure to make it ready to serialized and send to tasks
11431153
* (removes unreferenced variables in $outer's, updates REPL variables)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,13 @@ class DAGScheduler(
511511
eventProcessActor ! AllJobsCancelled
512512
}
513513

514+
/**
515+
* Cancel all jobs associated with a running or scheduled stage.
516+
*/
517+
def cancelStage(stageId: Int) {
518+
eventProcessActor ! StageCancelled(stageId)
519+
}
520+
514521
/**
515522
* Process one event retrieved from the event processing actor.
516523
*
@@ -551,6 +558,9 @@ class DAGScheduler(
551558
submitStage(finalStage)
552559
}
553560

561+
case StageCancelled(stageId) =>
562+
handleStageCancellation(stageId)
563+
554564
case JobCancelled(jobId) =>
555565
handleJobCancellation(jobId)
556566

@@ -560,11 +570,13 @@ class DAGScheduler(
560570
val activeInGroup = activeJobs.filter(activeJob =>
561571
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
562572
val jobIds = activeInGroup.map(_.jobId)
563-
jobIds.foreach(handleJobCancellation)
573+
jobIds.foreach(jobId => handleJobCancellation(jobId,
574+
"as part of cancelled job group %s".format(groupId)))
564575

565576
case AllJobsCancelled =>
566577
// Cancel all running jobs.
567-
runningStages.map(_.jobId).foreach(handleJobCancellation)
578+
runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId,
579+
"as part of cancellation of all jobs"))
568580
activeJobs.clear() // These should already be empty by this point,
569581
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
570582

@@ -991,11 +1003,23 @@ class DAGScheduler(
9911003
}
9921004
}
9931005

994-
private def handleJobCancellation(jobId: Int) {
1006+
private def handleStageCancellation(stageId: Int) {
1007+
if (stageIdToJobIds.contains(stageId)) {
1008+
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray
1009+
jobsThatUseStage.foreach(jobId => {
1010+
handleJobCancellation(jobId, "because Stage %s was cancelled".format(stageId))
1011+
})
1012+
} else {
1013+
logInfo("No active jobs to kill for Stage " + stageId)
1014+
}
1015+
}
1016+
1017+
private def handleJobCancellation(jobId: Int, reason: String = "") {
9951018
if (!jobIdToStageIds.contains(jobId)) {
9961019
logDebug("Trying to cancel unregistered job " + jobId)
9971020
} else {
998-
failJobAndIndependentStages(jobIdToActiveJob(jobId), s"Job $jobId cancelled", None)
1021+
failJobAndIndependentStages(jobIdToActiveJob(jobId),
1022+
"Job %d cancelled %s".format(jobId, reason), None)
9991023
}
10001024
}
10011025

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ private[scheduler] case class JobSubmitted(
4444
properties: Properties = null)
4545
extends DAGSchedulerEvent
4646

47+
private[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEvent
48+
4749
private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent
4850

4951
private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ private[spark] class SparkUI(
4646
val live = sc != null
4747

4848
val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf)
49+
val killEnabled = conf.getBoolean("spark.ui.killEnabled", true)
4950

5051
private val localHost = Utils.localHostName()
5152
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)

core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ private[ui] class IndexPage(parent: JobProgressUI) {
3232
private val sc = parent.sc
3333
private lazy val listener = parent.listener
3434
private lazy val isFairScheduler = parent.isFairScheduler
35+
private val killEnabled = parent.killEnabled
3536

3637
private def appName = parent.appName
3738

@@ -42,7 +43,18 @@ private[ui] class IndexPage(parent: JobProgressUI) {
4243
val failedStages = listener.failedStages.reverse.toSeq
4344
val now = System.currentTimeMillis()
4445

45-
val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
46+
if (killEnabled) {
47+
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
48+
val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
49+
50+
if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
51+
sc.cancelStage(stageId)
52+
}
53+
}
54+
55+
56+
val activeStagesTable =
57+
new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled)
4658
val completedStagesTable =
4759
new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
4860
val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ private[ui] class JobProgressUI(parent: SparkUI) {
3232
val basePath = parent.basePath
3333
val live = parent.live
3434
val sc = parent.sc
35+
val killEnabled = parent.killEnabled
3536

3637
lazy val listener = _listener.get
3738
lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.util.{Utils, Distribution}
3030
private[ui] class StagePage(parent: JobProgressUI) {
3131
private val basePath = parent.basePath
3232
private lazy val listener = parent.listener
33+
private lazy val sc = parent.sc
3334

3435
private def appName = parent.appName
3536

core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@ import org.apache.spark.ui.{WebUI, UIUtils}
2727
import org.apache.spark.util.Utils
2828

2929
/** Page showing list of all ongoing and recently finished stages */
30-
private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
30+
private[ui] class StageTable(
31+
stages: Seq[StageInfo],
32+
parent: JobProgressUI,
33+
killEnabled: Boolean = false) {
34+
3135
private val basePath = parent.basePath
3236
private lazy val listener = parent.listener
3337
private lazy val isFairScheduler = parent.isFairScheduler
@@ -71,15 +75,28 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
7175
</div>
7276
}
7377

74-
/** Render an HTML row that represents a stage */
75-
private def stageRow(s: StageInfo): Seq[Node] = {
76-
val poolName = listener.stageIdToPool.get(s.stageId)
78+
private def makeDescription(s: StageInfo): Seq[Node] = {
7779
val nameLink =
7880
<a href={"%s/stages/stage?id=%s".format(UIUtils.prependBaseUri(basePath), s.stageId)}>
7981
{s.name}
8082
</a>
83+
val killLink = if (killEnabled) {
84+
<div>[<a href=
85+
{"%s/stages?id=%s&terminate=true".format(UIUtils.prependBaseUri(basePath), s.stageId)}>
86+
Kill
87+
</a>]</div>
88+
89+
}
8190
val description = listener.stageIdToDescription.get(s.stageId)
82-
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
91+
.map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
92+
.getOrElse(<div>{nameLink} {killLink}</div>)
93+
94+
return description
95+
}
96+
97+
/** Render an HTML row that represents a stage */
98+
private def stageRow(s: StageInfo): Seq[Node] = {
99+
val poolName = listener.stageIdToPool.get(s.stageId)
83100
val submissionTime = s.submissionTime match {
84101
case Some(t) => WebUI.formatDate(new Date(t))
85102
case None => "Unknown"
@@ -118,7 +135,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
118135
</a>
119136
</td>
120137
}}
121-
<td>{description}</td>
138+
<td>{makeDescription(s)}</td>
122139
<td valign="middle">{submissionTime}</td>
123140
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
124141
<td class="progress-cell">

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
290290
val rdd = makeRdd(1, Nil)
291291
val jobId = submit(rdd, Array(0))
292292
cancel(jobId)
293-
assert(failure.getMessage === s"Job $jobId cancelled")
293+
assert(failure.getMessage === s"Job $jobId cancelled ")
294294
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
295295
assert(sparkListener.failedStages.contains(0))
296296
assert(sparkListener.failedStages.size === 1)

docs/configuration.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,13 @@ Apart from these, the following properties are also available, and may be useful
190190
user that started the Spark job has view access.
191191
</td>
192192
</tr>
193+
<tr>
194+
<td>spark.ui.killEnabled</td>
195+
<td>true</td>
196+
<td>
197+
Allows stages and corresponding jobs to be killed from the web ui.
198+
</td>
199+
</tr>
193200
<tr>
194201
<td>spark.shuffle.compress</td>
195202
<td>true</td>

0 commit comments

Comments
 (0)