Skip to content

Commit 5c7ce16

Browse files
author
Andrew Or
committed
Connect RDDs across stages + update style
This requires us to track incoming and outgoing edges in each stage on the backend, and render the connecting edges manually ourselves in d3.
1 parent ab91416 commit 5c7ce16

File tree

3 files changed

+186
-53
lines changed

3 files changed

+186
-53
lines changed

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js

Lines changed: 149 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,14 @@
1515
* limitations under the License.
1616
*/
1717

18-
var VizDefaults = {
19-
stageVizOffset: 160
18+
var VizConstants = {
19+
stageVizOffset: 160,
20+
rddScopeColor: "#AADFFF",
21+
rddDotColor: "#444444",
22+
stageScopeColor: "#FFDDEE",
23+
scopeLabelColor: "#888888",
24+
edgeColor: "#444444",
25+
edgeWidth: "1.5px"
2026
};
2127

2228
/*
@@ -30,7 +36,7 @@ function toggleShowViz(forJob) {
3036
if (show) {
3137
var shouldRender = d3.select("#viz-graph svg").empty();
3238
if (shouldRender) {
33-
renderViz();
39+
renderViz(forJob);
3440
styleViz(forJob);
3541
}
3642
$("#viz-graph").show();
@@ -49,7 +55,7 @@ function toggleShowViz(forJob) {
4955
* http://github.com/andrewor14/dagre-d3/dist/dagre-d3.js. For more detail, please
5056
* track the changes in that project after it was forked.
5157
*/
52-
function renderViz() {
58+
function renderViz(forJob) {
5359

5460
// If there is not a dot file to render, report error
5561
if (d3.select("#viz-dot-files").empty()) {
@@ -61,18 +67,42 @@ function renderViz() {
6167

6268
var svg = d3.select("#viz-graph").append("svg");
6369

64-
// Each div in #viz-dot-files stores the content of one dot file
70+
// On the job page, the visualization for each stage will rendered separately
71+
// Thus, we will need to render the edges that cross stages later on our own
72+
var crossStageEdges = [];
73+
74+
// Each div child in #viz-dot-files stores the content of one dot file
6575
// Each dot file is used to generate the visualization for a stage
66-
d3.selectAll("#viz-dot-files div").each(function(d, i) {
76+
d3.selectAll("#viz-dot-files > div").each(function(d, i) {
6777
var div = d3.select(this);
6878
var stageId = div.attr("name");
69-
var dot = div.text();
79+
var dot = div.select("div.dot-file").text();
7080
var container = svg.append("g").attr("id", "graph_" + stageId);
7181
// Move the container so it doesn't overlap with the existing ones
72-
container.attr("transform", "translate(" + VizDefaults.stageVizOffset * i + ", 0)");
82+
container.attr("transform", "translate(" + VizConstants.stageVizOffset * i + ", 0)");
7383
renderDot(dot, container);
84+
// If there are any incoming edges into this graph, keep track of them to
85+
// render them separately later (job page only). Note that we cannot draw
86+
// them now because we need to put these edges in a container that is on
87+
// top of all stage graphs.
88+
if (forJob) {
89+
div.selectAll("div.incoming-edge").each(function(v) {
90+
var edge = d3.select(this).text().split(","); // e.g. 3,4 => [3, 4]
91+
crossStageEdges.push(edge);
92+
});
93+
}
7494
});
7595

96+
// Time to draw cross stage edges (job page only)!
97+
if (crossStageEdges.length > 0 && forJob) {
98+
var container = svg.append("g").attr("id", "cross-stage-edges");
99+
for (var i = 0; i < crossStageEdges.length; i++) {
100+
var fromId = "node_" + crossStageEdges[i][0];
101+
var toId = "node_" + crossStageEdges[i][1];
102+
connect(fromId, toId, container);
103+
}
104+
}
105+
76106
// Set the appropriate SVG dimensions to ensure that all elements are displayed
77107
var svgMargin = 20;
78108
var boundingBox = svg.node().getBBox();
@@ -107,9 +137,7 @@ function renderViz() {
107137
svg.attr("viewBox", newViewBox);
108138
}
109139

110-
/*
111-
*
112-
*/
140+
/* Render the dot file as an SVG in the given container. */
113141
function renderDot(dot, container) {
114142
var escaped_dot = dot
115143
.replace(/&lt;/g, "<")
@@ -120,56 +148,134 @@ function renderDot(dot, container) {
120148
renderer(container, g);
121149
}
122150

123-
/*
124-
* Style the visualization we just rendered.
125-
* We apply a different style depending on whether this is a stage or a job.
126-
*/
151+
/* Style the visualization we just rendered. */
127152
function styleViz(forJob) {
153+
d3.selectAll("svg g.cluster rect")
154+
.style("fill", "none")
155+
.style("stroke", VizConstants.rddScopeColor)
156+
.style("stroke-width", "4px")
157+
.style("stroke-opacity", "0.5");
158+
d3.selectAll("svg g.cluster text")
159+
.attr("fill", VizConstants.scopeLabelColor)
160+
.attr("font-size", "11px")
161+
d3.selectAll("svg path")
162+
.style("stroke", VizConstants.edgeColor)
163+
.style("stroke-width", VizConstants.edgeWidth);
164+
styleEdgeArrow();
165+
166+
// Apply a different color to stage clusters
167+
d3.selectAll("svg g.cluster")
168+
.filter(function() {
169+
var name = d3.select(this).attr("name");
170+
return name && name.indexOf("Stage") > -1;
171+
})
172+
.select("rect")
173+
.style("stroke", VizConstants.stageScopeColor);
174+
175+
// Apply any job or stage specific styles
128176
if (forJob) {
129177
styleJobViz();
130178
} else {
131179
styleStageViz();
132180
}
133181
}
134182

183+
/*
184+
* Put an arrow at the end of every edge.
185+
* We need to do this because we manually render some edges ourselves through d3.
186+
* For these edges, we borrow the arrow marker generated by dagre-d3.
187+
*/
188+
function styleEdgeArrow() {
189+
var dagreD3Marker = d3.select("svg g.edgePaths marker").node()
190+
d3.select("svg")
191+
.append(function() { return dagreD3Marker.cloneNode(true); })
192+
.attr("id", "marker-arrow")
193+
.select("path")
194+
.attr("fill", VizConstants.edgeColor)
195+
.attr("strokeWidth", "0px");
196+
d3.selectAll("svg g > path").attr("marker-end", "url(#marker-arrow)")
197+
d3.selectAll("svg g.edgePaths def").remove(); // We no longer need these
198+
}
199+
200+
/* Apply job-page-specific style to the visualization. */
135201
function styleJobViz() {
136-
d3.selectAll("svg g.cluster rect")
137-
.style("fill", "none")
138-
.style("stroke", "#AADFFF")
139-
.style("stroke-width", "4px")
140-
.style("stroke-opacity", "0.5");
141-
d3.selectAll("svg g.node rect")
142-
.style("fill", "white")
143-
.style("stroke", "black")
144-
.style("stroke-width", "2px")
145-
.style("fill-opacity", "0.8")
146-
.style("stroke-opacity", "0.9");
147-
d3.selectAll("svg g.edgePath path")
148-
.style("stroke", "black")
149-
.style("stroke-width", "2px");
150-
d3.selectAll("svg g.cluster text")
151-
.attr("fill", "#AAAAAA")
152-
.attr("font-size", "11px")
202+
d3.selectAll("svg g.node circle")
203+
.style("fill", VizConstants.rddDotColor);
204+
d3.selectAll("svg g#cross-stage-edges path")
205+
.style("fill", "none");
153206
}
154207

208+
/* Apply stage-page-specific style to the visualization. */
155209
function styleStageViz() {
156-
d3.selectAll("svg g.cluster rect")
157-
.style("fill", "none")
158-
.style("stroke", "#AADFFF")
159-
.style("stroke-width", "4px")
160-
.style("stroke-opacity", "0.5");
161210
d3.selectAll("svg g.node rect")
162211
.style("fill", "white")
163212
.style("stroke", "black")
164213
.style("stroke-width", "2px")
165214
.style("fill-opacity", "0.8")
166215
.style("stroke-opacity", "0.9");
167-
d3.selectAll("svg g.edgePath path")
168-
.style("stroke", "black")
169-
.style("stroke-width", "2px");
170-
d3.selectAll("svg g.cluster text")
171-
.attr("fill", "#AAAAAA")
172-
.attr("font-size", "11px")
216+
}
217+
218+
/*
219+
* (Job page only) Return the absolute position of the
220+
* group element identified by the given selector.
221+
*/
222+
function getAbsolutePosition(groupId) {
223+
var obj = d3.select("#" + groupId).filter("g");
224+
var _x = 0, _y = 0;
225+
while (!obj.empty()) {
226+
var transformText = obj.attr("transform");
227+
var translate = d3.transform(transformText).translate
228+
_x += translate[0];
229+
_y += translate[1];
230+
obj = d3.select(obj.node().parentNode).filter("g")
231+
}
232+
return { x: _x, y: _y };
233+
}
234+
235+
/*
236+
* (Job page only) Connect two group elements with a curved edge.
237+
* This assumes that the path will be styled later.
238+
*/
239+
function connect(fromNodeId, toNodeId, container) {
240+
var from = getAbsolutePosition(fromNodeId);
241+
var to = getAbsolutePosition(toNodeId);
242+
243+
// Account for node radius (this is highly-specific to the job page)
244+
// Otherwise the arrow heads will bleed into the circle itself
245+
var delta = toFloat(d3.select("svg g.nodes circle").attr("r"));
246+
var markerEndDelta = 2; // adjust for arrow stroke width
247+
if (from.x < to.x) {
248+
from.x = from.x + delta;
249+
to.x = to.x - delta - markerEndDelta;
250+
} else if (from.x > to.x) {
251+
from.x = from.x - delta;
252+
to.x = to.x + delta + markerEndDelta;
253+
}
254+
255+
if (from.y == to.y) {
256+
// If they are on the same rank, curve the middle part of the edge
257+
// upward a little to avoid interference with things in between
258+
var points = [
259+
[from.x, from.y],
260+
[from.x + (to.x - from.x) * 0.2, from.y],
261+
[from.x + (to.x - from.x) * 0.3, from.y - 20],
262+
[from.x + (to.x - from.x) * 0.7, from.y - 20],
263+
[from.x + (to.x - from.x) * 0.8, to.y],
264+
[to.x, to.y]
265+
];
266+
} else {
267+
var points = [
268+
[from.x, from.y],
269+
[from.x + (to.x - from.x) * 0.4, from.y],
270+
[from.x + (to.x - from.x) * 0.6, to.y],
271+
[to.x, to.y]
272+
];
273+
}
274+
275+
var line = d3.svg.line().interpolate("basis");
276+
container.append("path")
277+
.datum(points)
278+
.attr("d", line);
173279
}
174280

175281
/* Helper method to convert attributes to numeric values. */

core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ private[ui] class VisualizationListener(conf: SparkConf) extends SparkListener {
5858
{
5959
graphs.map { g =>
6060
<div name={g.rootScope.id} style="display:none">
61-
{VizGraph.makeDotFile(g, forJob)}
61+
<div class="dot-file">{VizGraph.makeDotFile(g, forJob)}</div>
62+
{ g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
63+
{ g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }
6264
</div>
6365
}
6466
}

core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,23 @@ package org.apache.spark.ui.viz
2020
import scala.collection.mutable
2121
import scala.collection.mutable.ListBuffer
2222

23+
import org.apache.spark.Logging
2324
import org.apache.spark.rdd.RDDScope
2425
import org.apache.spark.scheduler.StageInfo
2526

2627
/**
2728
* A representation of a generic scoped graph used for storing visualization information.
2829
*
2930
* Each graph is defined with a set of edges and a root scope, which may contain children
30-
* nodes and children scopes.
31+
* nodes and children scopes. Additionally, a graph may also have edges that enter or exit
32+
* the graph from nodes that belong to adjacent graphs.
3133
*/
32-
private[ui] case class VizGraph(edges: Seq[VizEdge], rootScope: VizScope)
34+
private[ui] case class VizGraph(
35+
edges: Seq[VizEdge],
36+
outgoingEdges: Seq[VizEdge],
37+
incomingEdges: Seq[VizEdge],
38+
rootScope: VizScope)
39+
3340
private[ui] case class VizNode(id: Int, name: String)
3441
private[ui] case class VizEdge(fromId: Int, toId: Int)
3542

@@ -46,7 +53,7 @@ private[ui] class VizScope(val id: String, val name: String) {
4653
def attachChildScope(childScope: VizScope): Unit = { _childrenScopes += childScope }
4754
}
4855

49-
private[ui] object VizGraph {
56+
private[ui] object VizGraph extends Logging {
5057

5158
/**
5259
* Construct a VizGraph for a given stage.
@@ -56,7 +63,7 @@ private[ui] object VizGraph {
5663
* between two RDDs from the parent to the child.
5764
*/
5865
def makeVizGraph(stage: StageInfo): VizGraph = {
59-
val edges = new mutable.HashSet[VizEdge]
66+
val edges = new ListBuffer[VizEdge]
6067
val nodes = new mutable.HashMap[Int, VizNode]
6168
val scopes = new mutable.HashMap[String, VizScope] // scope ID -> viz scope
6269

@@ -66,7 +73,10 @@ private[ui] object VizGraph {
6673
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
6774
val rootScope = new VizScope(stageScopeId, stageScopeName)
6875

69-
// Find nodes, edges, and children scopes
76+
// Find nodes, edges, and children scopes that belong to this stage. Each node is an RDD
77+
// that lives either directly in the root scope or in one of the children scopes. Each
78+
// children scope represents a level of the RDD scope and must contain at least one RDD.
79+
// Children scopes can be nested if one RDD operation calls another.
7080
stage.rddInfos.foreach { rdd =>
7181
edges ++= rdd.parentIds.map { parentId => VizEdge(parentId, rdd.id) }
7282
val node = nodes.getOrElseUpdate(rdd.id, VizNode(rdd.id, rdd.name))
@@ -94,10 +104,23 @@ private[ui] object VizGraph {
94104
}
95105
}
96106

97-
// Remove any edges with nodes belonging to other stages so we do not have orphaned nodes
98-
edges.retain { case VizEdge(f, t) => nodes.contains(f) && nodes.contains(t) }
107+
// Classify each edge as internal, outgoing or incoming
108+
val internalEdges = new ListBuffer[VizEdge]
109+
val outgoingEdges = new ListBuffer[VizEdge]
110+
val incomingEdges = new ListBuffer[VizEdge]
111+
edges.foreach { case e: VizEdge =>
112+
val fromThisGraph = nodes.contains(e.fromId)
113+
val toThisGraph = nodes.contains(e.toId)
114+
(fromThisGraph, toThisGraph) match {
115+
case (true, true) => internalEdges += e
116+
case (true, false) => outgoingEdges += e
117+
case (false, true) => incomingEdges += e
118+
// should never happen
119+
case _ => logWarning(s"Found an orphan edge in stage ${stage.stageId}: $e")
120+
}
121+
}
99122

100-
VizGraph(edges.toSeq, rootScope)
123+
VizGraph(internalEdges, outgoingEdges, incomingEdges, rootScope)
101124
}
102125

103126
/**
@@ -116,7 +139,9 @@ private[ui] object VizGraph {
116139
dotFile.append(s""" ${edge.fromId}->${edge.toId} [lineInterpolate="basis"];\n""")
117140
}
118141
dotFile.append("}")
119-
dotFile.toString()
142+
val result = dotFile.toString()
143+
logDebug(result)
144+
result
120145
}
121146

122147
/** Return the dot representation of a node. */

0 commit comments

Comments
 (0)