Skip to content

Commit e275e23

Browse files
committed
Move time related methods to Streaming's UIUtils
1 parent d5d86f6 commit e275e23

File tree

4 files changed

+104
-65
lines changed

4 files changed

+104
-65
lines changed

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -35,43 +35,6 @@ private[spark] object UIUtils extends Logging {
3535
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
3636
}
3737

38-
/**
39-
* Return the short string for a `TimeUnit`.
40-
*/
41-
def shortTimeUnitString(unit: TimeUnit): String = unit match {
42-
case TimeUnit.NANOSECONDS => "ns"
43-
case TimeUnit.MICROSECONDS => "us"
44-
case TimeUnit.MILLISECONDS => "ms"
45-
case TimeUnit.SECONDS => "sec"
46-
case TimeUnit.MINUTES => "min"
47-
case TimeUnit.HOURS => "hrs"
48-
case TimeUnit.DAYS => "days"
49-
}
50-
51-
/**
52-
* Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value
53-
* after converting, also with its TimeUnit.
54-
*/
55-
def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
56-
if (milliseconds < 1000) {
57-
return (milliseconds, TimeUnit.MILLISECONDS)
58-
}
59-
val seconds = milliseconds.toDouble / 1000
60-
if (seconds < 60) {
61-
return (seconds, TimeUnit.SECONDS)
62-
}
63-
val minutes = seconds / 60
64-
if (minutes < 60) {
65-
return (minutes, TimeUnit.MINUTES)
66-
}
67-
val hours = minutes / 60
68-
if (hours < 24) {
69-
return (hours, TimeUnit.HOURS)
70-
}
71-
val days = hours / 24
72-
(days, TimeUnit.DAYS)
73-
}
74-
7538
def formatDate(date: Date): String = dateFormat.get.format(date)
7639

7740
def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.xml.{Node, Unparsed}
2727

2828
import org.apache.spark.Logging
2929
import org.apache.spark.ui._
30-
import org.apache.spark.ui.UIUtils._
30+
import org.apache.spark.ui.{UIUtils => SparkUIUtils}
3131

3232
/**
3333
* @param timelineDivId the timeline `id` used in the html `div` tag
@@ -103,13 +103,13 @@ private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) {
103103
* Converting the original data as per `unit`.
104104
*/
105105
def timelineData(unit: TimeUnit): Seq[(Long, Double)] =
106-
data.map(x => x._1 -> StreamingPage.convertToTimeUnit(x._2, unit))
106+
data.map(x => x._1 -> UIUtils.convertToTimeUnit(x._2, unit))
107107

108108
/**
109109
* Converting the original data as per `unit`.
110110
*/
111111
def histogramData(unit: TimeUnit): Seq[Double] =
112-
data.map(x => StreamingPage.convertToTimeUnit(x._2, unit))
112+
data.map(x => UIUtils.convertToTimeUnit(x._2, unit))
113113

114114
val avg: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)
115115

@@ -149,17 +149,17 @@ private[ui] class StreamingPage(parent: StreamingTab)
149149
generateStatTable() ++
150150
generateBatchListTables()
151151
}
152-
UIUtils.headerSparkPage("Streaming Statistics", content, parent, Some(5000))
152+
SparkUIUtils.headerSparkPage("Streaming Statistics", content, parent, Some(5000))
153153
}
154154

155155
/**
156156
* Generate html that will load css/js files for StreamingPage
157157
*/
158158
private def generateLoadResources(): Seq[Node] = {
159159
// scalastyle:off
160-
<script src={UIUtils.prependBaseUri("/static/d3.min.js")}></script>
161-
<link rel="stylesheet" href={UIUtils.prependBaseUri("/static/streaming-page.css")} type="text/css"/>
162-
<script src={UIUtils.prependBaseUri("/static/streaming-page.js")}></script>
160+
<script src={SparkUIUtils.prependBaseUri("/static/d3.min.js")}></script>
161+
<link rel="stylesheet" href={SparkUIUtils.prependBaseUri("/static/streaming-page.css")} type="text/css"/>
162+
<script src={SparkUIUtils.prependBaseUri("/static/streaming-page.js")}></script>
163163
// scalastyle:on
164164
}
165165

@@ -168,15 +168,15 @@ private[ui] class StreamingPage(parent: StreamingTab)
168168
val timeSinceStart = System.currentTimeMillis() - startTime
169169
<div>Running batches of
170170
<strong>
171-
{formatDurationVerbose(listener.batchDuration)}
171+
{SparkUIUtils.formatDurationVerbose(listener.batchDuration)}
172172
</strong>
173173
for
174174
<strong>
175-
{formatDurationVerbose(timeSinceStart)}
175+
{SparkUIUtils.formatDurationVerbose(timeSinceStart)}
176176
</strong>
177177
since
178178
<strong>
179-
{UIUtils.formatDate(startTime)}
179+
{SparkUIUtils.formatDate(startTime)}
180180
</strong>
181181
</div>
182182
<br />
@@ -247,7 +247,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
247247
| document.title, window.location.pathname + '?show-streams-detail=' + status);"""
248248
.stripMargin.replaceAll("\\n", "") // it must be only one single line
249249

250-
val batchInterval = StreamingPage.convertToTimeUnit(listener.batchDuration, normalizedUnit)
250+
val batchInterval = UIUtils.convertToTimeUnit(listener.batchDuration, normalizedUnit)
251251

252252
val jsCollector = new JsCollector
253253

@@ -423,7 +423,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
423423
if (msg.size > 100) msg.take(97) + "..." else msg
424424
}.getOrElse(emptyCell)
425425
val receiverLastErrorTime = receiverInfo.map {
426-
r => if (r.lastErrorTime < 0) "-" else UIUtils.formatDate(r.lastErrorTime)
426+
r => if (r.lastErrorTime < 0) "-" else SparkUIUtils.formatDate(r.lastErrorTime)
427427
}.getOrElse(emptyCell)
428428
val receivedRecords = new EventRateUIData(eventRates)
429429

@@ -491,22 +491,9 @@ private[ui] object StreamingPage {
491491
* Returns a human-readable string representing a duration such as "5 second 35 ms"
492492
*/
493493
def formatDurationOption(msOption: Option[Long]): String = {
494-
msOption.map(formatDurationVerbose).getOrElse(emptyCell)
494+
msOption.map(SparkUIUtils.formatDurationVerbose).getOrElse(emptyCell)
495495
}
496496

497-
/**
498-
* Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it
499-
* will discard the fractional part.
500-
*/
501-
def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match {
502-
case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000
503-
case TimeUnit.MICROSECONDS => milliseconds * 1000
504-
case TimeUnit.MILLISECONDS => milliseconds
505-
case TimeUnit.SECONDS => milliseconds / 1000.0
506-
case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0
507-
case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0
508-
case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
509-
}
510497
}
511498

512499
/**
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.ui
19+
20+
import java.util.concurrent.TimeUnit
21+
22+
object UIUtils {
23+
24+
/**
25+
* Return the short string for a `TimeUnit`.
26+
*/
27+
def shortTimeUnitString(unit: TimeUnit): String = unit match {
28+
case TimeUnit.NANOSECONDS => "ns"
29+
case TimeUnit.MICROSECONDS => "us"
30+
case TimeUnit.MILLISECONDS => "ms"
31+
case TimeUnit.SECONDS => "sec"
32+
case TimeUnit.MINUTES => "min"
33+
case TimeUnit.HOURS => "hrs"
34+
case TimeUnit.DAYS => "days"
35+
}
36+
37+
/**
38+
* Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value
39+
* after converting, also with its TimeUnit.
40+
*/
41+
def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
42+
if (milliseconds < 1000) {
43+
return (milliseconds, TimeUnit.MILLISECONDS)
44+
}
45+
val seconds = milliseconds.toDouble / 1000
46+
if (seconds < 60) {
47+
return (seconds, TimeUnit.SECONDS)
48+
}
49+
val minutes = seconds / 60
50+
if (minutes < 60) {
51+
return (minutes, TimeUnit.MINUTES)
52+
}
53+
val hours = minutes / 60
54+
if (hours < 24) {
55+
return (hours, TimeUnit.HOURS)
56+
}
57+
val days = hours / 24
58+
(days, TimeUnit.DAYS)
59+
}
60+
61+
/**
62+
* Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it
63+
* will discard the fractional part.
64+
*/
65+
def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match {
66+
case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000
67+
case TimeUnit.MICROSECONDS => milliseconds * 1000
68+
case TimeUnit.MILLISECONDS => milliseconds
69+
case TimeUnit.SECONDS => milliseconds / 1000.0
70+
case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0
71+
case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0
72+
case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
73+
}
74+
}

core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala renamed to streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
19-
package org.apache.spark.ui
18+
package org.apache.spark.streaming.ui
2019

2120
import java.util.concurrent.TimeUnit
2221

@@ -49,4 +48,20 @@ class UIUtilsSuite extends FunSuite with Matchers{
4948
time should be (expectedTime +- 1E-6)
5049
unit should be (expectedUnit)
5150
}
51+
52+
test("convertToTimeUnit") {
53+
verifyConvertToTimeUnit(60.0 * 1000 * 1000 * 1000, 60 * 1000, TimeUnit.NANOSECONDS)
54+
verifyConvertToTimeUnit(60.0 * 1000 * 1000, 60 * 1000, TimeUnit.MICROSECONDS)
55+
verifyConvertToTimeUnit(60 * 1000, 60 * 1000, TimeUnit.MILLISECONDS)
56+
verifyConvertToTimeUnit(60, 60 * 1000, TimeUnit.SECONDS)
57+
verifyConvertToTimeUnit(1, 60 * 1000, TimeUnit.MINUTES)
58+
verifyConvertToTimeUnit(1.0 / 60, 60 * 1000, TimeUnit.HOURS)
59+
verifyConvertToTimeUnit(1.0 / 60 / 24, 60 * 1000, TimeUnit.DAYS)
60+
}
61+
62+
private def verifyConvertToTimeUnit(
63+
expectedTime: Double, milliseconds: Long, unit: TimeUnit): Unit = {
64+
val convertedTime = UIUtils.convertToTimeUnit(milliseconds, unit)
65+
convertedTime should be (expectedTime +- 1E-6)
66+
}
5267
}

0 commit comments

Comments
 (0)