Skip to content

Commit 2c0aa22

Browse files
zsxwingpwendell
authored andcommitted
SPARK-1279: Fix improper use of SimpleDateFormat
`SimpleDateFormat` is not thread-safe. Some places use the same SimpleDateFormat object without safeguard in the multiple threads. It will cause that the Web UI displays improper date. This PR creates a new `SimpleDateFormat` every time when it's necessary. Another solution is using `ThreadLocal` to store a `SimpleDateFormat` in each thread. If this PR impacts the performance, I can change to the latter one. Author: zsxwing <[email protected]> Closes #179 from zsxwing/SPARK-1278 and squashes the following commits: 21fabd3 [zsxwing] SPARK-1278: Fix improper use of SimpleDateFormat
1 parent 7e17fe6 commit 2c0aa22

File tree

9 files changed

+30
-26
lines changed

9 files changed

+30
-26
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private[spark] class Master(
5151

5252
val conf = new SparkConf
5353

54-
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
54+
def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
5555
val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
5656
val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
5757
val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
@@ -682,7 +682,7 @@ private[spark] class Master(
682682

683683
/** Generate a new app ID given a app's submission date */
684684
def newApplicationId(submitDate: Date): String = {
685-
val appId = "app-%s-%04d".format(DATE_FORMAT.format(submitDate), nextAppNumber)
685+
val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
686686
nextAppNumber += 1
687687
appId
688688
}
@@ -706,7 +706,7 @@ private[spark] class Master(
706706
}
707707

708708
def newDriverId(submitDate: Date): String = {
709-
val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber)
709+
val appId = "driver-%s-%04d".format(createDateFormat.format(submitDate), nextDriverNumber)
710710
nextDriverNumber += 1
711711
appId
712712
}

core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ import scala.xml.Node
2525
import akka.pattern.ask
2626
import org.json4s.JValue
2727

28-
import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
28+
import org.apache.spark.deploy.{JsonProtocol}
2929
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
3030
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
31-
import org.apache.spark.ui.UIUtils
31+
import org.apache.spark.ui.{WebUI, UIUtils}
3232
import org.apache.spark.util.Utils
3333

3434
private[spark] class IndexPage(parent: MasterWebUI) {
@@ -169,10 +169,10 @@ private[spark] class IndexPage(parent: MasterWebUI) {
169169
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
170170
{Utils.megabytesToString(app.desc.memoryPerSlave)}
171171
</td>
172-
<td>{DeployWebUI.formatDate(app.submitDate)}</td>
172+
<td>{WebUI.formatDate(app.submitDate)}</td>
173173
<td>{app.desc.user}</td>
174174
<td>{app.state.toString}</td>
175-
<td>{DeployWebUI.formatDuration(app.duration)}</td>
175+
<td>{WebUI.formatDuration(app.duration)}</td>
176176
</tr>
177177
}
178178

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private[spark] class Worker(
5656
Utils.checkHost(host, "Expected hostname")
5757
assert (port > 0)
5858

59-
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
59+
def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
6060

6161
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
6262
val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
@@ -319,7 +319,7 @@ private[spark] class Worker(
319319
}
320320

321321
def generateWorkerId(): String = {
322-
"worker-%s-%s-%d".format(DATE_FORMAT.format(new Date), host, port)
322+
"worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
323323
}
324324

325325
override def postStop() {

core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
5555
private val jobIdToPrintWriter = new HashMap[Int, PrintWriter]
5656
private val stageIdToJobId = new HashMap[Int, Int]
5757
private val jobIdToStageIds = new HashMap[Int, Seq[Int]]
58-
private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
58+
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
59+
override def initialValue() = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
60+
}
5961
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]
6062

6163
createLogDir()
@@ -128,7 +130,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
128130
var writeInfo = info
129131
if (withTime) {
130132
val date = new Date(System.currentTimeMillis())
131-
writeInfo = DATE_FORMAT.format(date) + ": " + info
133+
writeInfo = dateFormat.get.format(date) + ": " + info
132134
}
133135
jobIdToPrintWriter.get(jobId).foreach(_.println(writeInfo))
134136
}

core/src/main/scala/org/apache/spark/deploy/WebUI.scala renamed to core/src/main/scala/org/apache/spark/ui/WebUI.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,23 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.deploy
18+
package org.apache.spark.ui
1919

2020
import java.text.SimpleDateFormat
2121
import java.util.Date
2222

2323
/**
2424
* Utilities used throughout the web UI.
2525
*/
26-
private[spark] object DeployWebUI {
27-
val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
26+
private[spark] object WebUI {
27+
// SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
28+
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
29+
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
30+
}
2831

29-
def formatDate(date: Date): String = DATE_FORMAT.format(date)
32+
def formatDate(date: Date): String = dateFormat.get.format(date)
3033

31-
def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp))
34+
def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
3235

3336
def formatDuration(milliseconds: Long): String = {
3437
val seconds = milliseconds.toDouble / 1000

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.ui.jobs
1919

20-
import java.text.SimpleDateFormat
2120
import javax.servlet.http.HttpServletRequest
2221

2322
import org.eclipse.jetty.servlet.ServletContextHandler
@@ -32,7 +31,6 @@ import org.apache.spark.util.Utils
3231
private[ui] class JobProgressUI(parent: SparkUI) {
3332
val appName = parent.appName
3433
val basePath = parent.basePath
35-
val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
3634
val live = parent.live
3735
val sc = parent.sc
3836

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,13 @@ import javax.servlet.http.HttpServletRequest
2323
import scala.xml.Node
2424

2525
import org.apache.spark.ui.Page._
26-
import org.apache.spark.ui.UIUtils
26+
import org.apache.spark.ui.{WebUI, UIUtils}
2727
import org.apache.spark.util.{Utils, Distribution}
2828

2929
/** Page showing statistics and task list for a given stage */
3030
private[ui] class StagePage(parent: JobProgressUI) {
3131
private val appName = parent.appName
3232
private val basePath = parent.basePath
33-
private val dateFmt = parent.dateFmt
3433
private lazy val listener = parent.listener
3534

3635
def render(request: HttpServletRequest): Seq[Node] = {
@@ -253,7 +252,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
253252
<td>{info.status}</td>
254253
<td>{info.taskLocality}</td>
255254
<td>{info.host}</td>
256-
<td>{dateFmt.format(new Date(info.launchTime))}</td>
255+
<td>{WebUI.formatDate(new Date(info.launchTime))}</td>
257256
<td sorttable_customkey={duration.toString}>
258257
{formatDuration}
259258
</td>

core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,12 @@ import scala.collection.mutable.HashMap
2323
import scala.xml.Node
2424

2525
import org.apache.spark.scheduler.{StageInfo, TaskInfo}
26-
import org.apache.spark.ui.UIUtils
26+
import org.apache.spark.ui.{WebUI, UIUtils}
2727
import org.apache.spark.util.Utils
2828

2929
/** Page showing list of all ongoing and recently finished stages */
3030
private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
3131
private val basePath = parent.basePath
32-
private val dateFmt = parent.dateFmt
3332
private lazy val listener = parent.listener
3433
private lazy val isFairScheduler = parent.isFairScheduler
3534

@@ -82,7 +81,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
8281
val description = listener.stageIdToDescription.get(s.stageId)
8382
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
8483
val submissionTime = s.submissionTime match {
85-
case Some(t) => dateFmt.format(new Date(t))
84+
case Some(t) => WebUI.formatDate(new Date(t))
8685
case None => "Unknown"
8786
}
8887
val finishTime = s.completionTime.getOrElse(System.currentTimeMillis)

core/src/main/scala/org/apache/spark/util/FileLogger.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ class FileLogger(
4444
overwrite: Boolean = true)
4545
extends Logging {
4646

47-
private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
47+
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
48+
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
49+
}
50+
4851
private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
4952
private var fileIndex = 0
5053

@@ -111,7 +114,7 @@ class FileLogger(
111114
def log(msg: String, withTime: Boolean = false) {
112115
val writeInfo = if (!withTime) msg else {
113116
val date = new Date(System.currentTimeMillis())
114-
DATE_FORMAT.format(date) + ": " + msg
117+
dateFormat.get.format(date) + ": " + msg
115118
}
116119
writer.foreach(_.print(writeInfo))
117120
}

0 commit comments

Comments
 (0)