Skip to content

Commit 8856076

Browse files
zsxwingpwendell
authored andcommitted
SPARK-1284: Fix improper use of SimpleDateFormat
`SimpleDateFormat` is not thread-safe. Some places use the same SimpleDateFormat object without safeguard in the multiple threads. It will cause that the Web UI displays improper date. This PR creates a new `SimpleDateFormat` every time when it's necessary. Another solution is using `ThreadLocal` to store a `SimpleDateFormat` in each thread. If this PR impacts the performance, I can change to the latter one. Author: zsxwing <[email protected]> Closes #179 from zsxwing/SPARK-1278 and squashes the following commits: 21fabd3 [zsxwing] SPARK-1278: Fix improper use of SimpleDateFormat Conflicts: core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala core/src/main/scala/org/apache/spark/util/FileLogger.scala
1 parent d68549e commit 8856076

File tree

9 files changed

+25
-22
lines changed

9 files changed

+25
-22
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
4545

4646
val conf = new SparkConf
4747

48-
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
48+
def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
4949
val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
5050
val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
5151
val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
@@ -621,7 +621,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
621621

622622
/** Generate a new app ID given a app's submission date */
623623
def newApplicationId(submitDate: Date): String = {
624-
val appId = "app-%s-%04d".format(DATE_FORMAT.format(submitDate), nextAppNumber)
624+
val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
625625
nextAppNumber += 1
626626
appId
627627
}
@@ -644,7 +644,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
644644
}
645645

646646
def newDriverId(submitDate: Date): String = {
647-
val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber)
647+
val appId = "driver-%s-%04d".format(createDateFormat.format(submitDate), nextDriverNumber)
648648
nextDriverNumber += 1
649649
appId
650650
}

core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ import akka.pattern.ask
2525
import javax.servlet.http.HttpServletRequest
2626
import net.liftweb.json.JsonAST.JValue
2727

28-
import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
28+
import org.apache.spark.deploy.JsonProtocol
2929
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
3030
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
31-
import org.apache.spark.ui.UIUtils
31+
import org.apache.spark.ui.{WebUI, UIUtils}
3232
import org.apache.spark.util.Utils
3333

3434
private[spark] class IndexPage(parent: MasterWebUI) {
@@ -164,10 +164,10 @@ private[spark] class IndexPage(parent: MasterWebUI) {
164164
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
165165
{Utils.megabytesToString(app.desc.memoryPerSlave)}
166166
</td>
167-
<td>{DeployWebUI.formatDate(app.submitDate)}</td>
167+
<td>{WebUI.formatDate(app.submitDate)}</td>
168168
<td>{app.desc.user}</td>
169169
<td>{app.state.toString}</td>
170-
<td>{DeployWebUI.formatDuration(app.duration)}</td>
170+
<td>{WebUI.formatDuration(app.duration)}</td>
171171
</tr>
172172
}
173173

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private[spark] class Worker(
5656
Utils.checkHost(host, "Expected hostname")
5757
assert (port > 0)
5858

59-
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
59+
def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
6060

6161
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
6262
val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
@@ -309,7 +309,7 @@ private[spark] class Worker(
309309
}
310310

311311
def generateWorkerId(): String = {
312-
"worker-%s-%s-%d".format(DATE_FORMAT.format(new Date), host, port)
312+
"worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
313313
}
314314

315315
override def postStop() {

core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ class JobLogger(val user: String, val logDirName: String)
5353
private val jobIDToPrintWriter = new HashMap[Int, PrintWriter]
5454
private val stageIDToJobID = new HashMap[Int, Int]
5555
private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]]
56+
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
57+
override def initialValue() = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
58+
}
5659
private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
5760
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]
5861

@@ -116,7 +119,7 @@ class JobLogger(val user: String, val logDirName: String)
116119
var writeInfo = info
117120
if (withTime) {
118121
val date = new Date(System.currentTimeMillis())
119-
writeInfo = DATE_FORMAT.format(date) + ": " +info
122+
writeInfo = dateFormat.get.format(date) + ": " + info
120123
}
121124
jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
122125
}

core/src/main/scala/org/apache/spark/deploy/WebUI.scala renamed to core/src/main/scala/org/apache/spark/ui/WebUI.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,23 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.deploy
18+
package org.apache.spark.ui
1919

2020
import java.text.SimpleDateFormat
2121
import java.util.Date
2222

2323
/**
2424
* Utilities used throughout the web UI.
2525
*/
26-
private[spark] object DeployWebUI {
27-
val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
26+
private[spark] object WebUI {
27+
// SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
28+
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
29+
override def initialValue() = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
30+
}
2831

29-
def formatDate(date: Date): String = DATE_FORMAT.format(date)
32+
def formatDate(date: Date): String = dateFormat.get.format(date)
3033

31-
def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp))
34+
def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
3235

3336
def formatDuration(milliseconds: Long): String = {
3437
val seconds = milliseconds.toDouble / 1000

core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import scala.collection.mutable
2727
private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) {
2828

2929
val listener = parent.listener
30-
val dateFmt = parent.dateFmt
3130
val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
3231

3332
def toNodeSeq(): Seq[Node] = {

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ import org.apache.spark.util.Utils
4040
private[spark] class JobProgressUI(val sc: SparkContext) {
4141
private var _listener: Option[JobProgressListener] = None
4242
def listener = _listener.get
43-
val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
4443

4544
private val indexPage = new IndexPage(this)
4645
private val stagePage = new StagePage(this)

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ import scala.xml.Node
2626
import org.apache.spark.{ExceptionFailure}
2727
import org.apache.spark.executor.TaskMetrics
2828
import org.apache.spark.ui.UIUtils._
29+
import org.apache.spark.ui.WebUI
2930
import org.apache.spark.ui.Page._
3031
import org.apache.spark.util.{Utils, Distribution}
3132
import org.apache.spark.scheduler.TaskInfo
3233

3334
/** Page showing statistics and task list for a given stage */
3435
private[spark] class StagePage(parent: JobProgressUI) {
3536
def listener = parent.listener
36-
val dateFmt = parent.dateFmt
3737

3838
def render(request: HttpServletRequest): Seq[Node] = {
3939
listener.synchronized {
@@ -248,7 +248,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
248248
<td>{info.status}</td>
249249
<td>{info.taskLocality}</td>
250250
<td>{info.host}</td>
251-
<td>{dateFmt.format(new Date(info.launchTime))}</td>
251+
<td>{WebUI.formatDate(new Date(info.launchTime))}</td>
252252
<td sorttable_customkey={duration.toString}>
253253
{formatDuration}
254254
</td>

core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,14 @@ import scala.xml.Node
2323
import scala.collection.mutable.HashSet
2424

2525
import org.apache.spark.scheduler.{SchedulingMode, StageInfo, TaskInfo}
26-
import org.apache.spark.ui.UIUtils
26+
import org.apache.spark.ui.{WebUI, UIUtils}
2727
import org.apache.spark.util.Utils
2828

2929

3030
/** Page showing list of all ongoing and recently finished stages */
3131
private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgressUI) {
3232

3333
val listener = parent.listener
34-
val dateFmt = parent.dateFmt
3534
val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
3635

3736
def toNodeSeq(): Seq[Node] = {
@@ -75,7 +74,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
7574

7675
private def stageRow(s: StageInfo): Seq[Node] = {
7776
val submissionTime = s.submissionTime match {
78-
case Some(t) => dateFmt.format(new Date(t))
77+
case Some(t) => WebUI.formatDate(new Date(t))
7978
case None => "Unknown"
8079
}
8180

0 commit comments

Comments
 (0)