Skip to content

Commit 914b8ff

Browse files
committed
Moved utils functions to UIUtils.
1 parent 6de06b0 commit 914b8ff

File tree

6 files changed

+197
-155
lines changed

6 files changed

+197
-155
lines changed

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 99 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
package org.apache.spark.ui
1919

2020
import java.text.SimpleDateFormat
21-
import java.util.Date
21+
import java.util.{Locale, Date}
2222

2323
import scala.xml.Node
24+
import org.apache.spark.Logging
2425

2526
/** Utility functions for generating XML pages with spark content. */
26-
private[spark] object UIUtils {
27+
private[spark] object UIUtils extends Logging {
2728

2829
// SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
2930
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
@@ -49,6 +50,80 @@ private[spark] object UIUtils {
4950
"%.1f h".format(hours)
5051
}
5152

53+
/** Generate a verbose human-readable string representing a duration such as "5 second 35 ms" */
54+
def formatDurationVerbose(ms: Long): String = {
55+
try {
56+
val second = 1000L
57+
val minute = 60 * second
58+
val hour = 60 * minute
59+
val day = 24 * hour
60+
val week = 7 * day
61+
val year = 365 * day
62+
63+
def toString(num: Long, unit: String): String = {
64+
if (num == 0) {
65+
""
66+
} else if (num == 1) {
67+
s"$num $unit"
68+
} else {
69+
s"$num ${unit}s"
70+
}
71+
}
72+
73+
val millisecondsString = if (ms >= second && ms % second == 0) "" else s"${ms % second} ms"
74+
val secondString = toString((ms % minute) / second, "second")
75+
val minuteString = toString((ms % hour) / minute, "minute")
76+
val hourString = toString((ms % day) / hour, "hour")
77+
val dayString = toString((ms % week) / day, "day")
78+
val weekString = toString((ms % year) / week, "week")
79+
val yearString = toString(ms / year, "year")
80+
81+
Seq(
82+
second -> millisecondsString,
83+
minute -> s"$secondString $millisecondsString",
84+
hour -> s"$minuteString $secondString",
85+
day -> s"$hourString $minuteString $secondString",
86+
week -> s"$dayString $hourString $minuteString",
87+
year -> s"$weekString $dayString $hourString"
88+
).foreach { case (durationLimit, durationString) =>
89+
if (ms < durationLimit) {
90+
// if time is less than the limit (upto year)
91+
return durationString
92+
}
93+
}
94+
// if time is more than a year
95+
return s"$yearString $weekString $dayString"
96+
} catch {
97+
case e: Exception =>
98+
logError("Error converting time to string", e)
99+
// if there is some error, return blank string
100+
return ""
101+
}
102+
}
103+
104+
/** Generate a human-readable string representing a number (e.g. 100 K) */
105+
def formatNumber(records: Double): String = {
106+
val trillion = 1e12
107+
val billion = 1e9
108+
val million = 1e6
109+
val thousand = 1e3
110+
111+
val (value, unit) = {
112+
if (records >= 2*trillion) {
113+
(records / trillion, " T")
114+
} else if (records >= 2*billion) {
115+
(records / billion, " B")
116+
} else if (records >= 2*million) {
117+
(records / million, " M")
118+
} else if (records >= 2*thousand) {
119+
(records / thousand, " K")
120+
} else {
121+
(records, "")
122+
}
123+
}
124+
"%.1f%s".formatLocal(Locale.US, value, unit)
125+
}
126+
52127
// Yarn has to go through a proxy so the base uri is provided and has to be on all links
53128
val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("")
54129

@@ -146,21 +221,36 @@ private[spark] object UIUtils {
146221
/** Returns an HTML table constructed by generating a row for each object in a sequence. */
147222
def listingTable[T](
148223
headers: Seq[String],
149-
makeRow: T => Seq[Node],
150-
rows: Seq[T],
224+
generateDataRow: T => Seq[Node],
225+
data: Seq[T],
151226
fixedWidth: Boolean = false): Seq[Node] = {
152227

153-
val colWidth = 100.toDouble / headers.size
154-
val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
155228
var tableClass = "table table-bordered table-striped table-condensed sortable"
156229
if (fixedWidth) {
157230
tableClass += " table-fixed"
158231
}
159-
232+
val colWidth = 100.toDouble / headers.size
233+
val colWidthAttr =if (fixedWidth) colWidth + "%" else ""
234+
val headerRow: Seq[Node] = {
235+
// if none of the headers have "\n" in them
236+
if (headers.forall(!_.contains("\n"))) {
237+
// represent header as simple text
238+
headers.map(h => <th width={colWidthAttr}>{h}</th>)
239+
} else {
240+
// represent header text as list while respecting "\n"
241+
headers.map { case h =>
242+
<th width={colWidthAttr}>
243+
<ul class ="unstyled">
244+
{ h.split("\n").map { case t => <li> {t} </li> } }
245+
</ul>
246+
</th>
247+
}
248+
}
249+
}
160250
<table class={tableClass}>
161-
<thead>{headers.map(h => <th width={colWidthAttr}>{h}</th>)}</thead>
251+
<thead>{headerRow}</thead>
162252
<tbody>
163-
{rows.map(r => makeRow(r))}
253+
{data.map(r => generateDataRow(r))}
164254
</tbody>
165255
</table>
166256
}

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,7 @@ class StreamingContext private[streaming] (
157157

158158
private[streaming] val waiter = new ContextWaiter
159159

160-
private[streaming] val ui = new StreamingTab(this)
161-
ui.start()
160+
private[streaming] val uiTab = new StreamingTab(this)
162161

163162
/** Enumeration to identify current state of the StreamingContext */
164163
private[streaming] object StreamingContextState extends Enumeration {

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingProgressListener.scala renamed to streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
2828
import org.apache.spark.util.Distribution
2929

3030

31-
private[ui] class StreamingProgressListener(ssc: StreamingContext) extends StreamingListener {
31+
private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener {
3232

3333
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
3434
private val runningBatchInfos = new HashMap[Time, BatchInfo]

0 commit comments

Comments
 (0)