Skip to content

[SPARK-5931][CORE] Use consistent naming for time properties #5236

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 60 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
59bf9e1
[SPARK-5931] Updated Utils and JavaUtils classes to add helper method…
Mar 27, 2015
404f8c3
Updated usage of spark.core.connection.ack.wait.timeout
Mar 27, 2015
7db6d2a
Updated usage of spark.akka.timeout
Mar 27, 2015
4933fda
Updated usage of spark.storage.blockManagerSlaveTimeout
Mar 27, 2015
c9f5cad
Updated spark.shuffle.io.retryWait
Mar 27, 2015
21ef3dd
updated spark.shuffle.sasl.timeout
Mar 27, 2015
064ebd6
Updated usage of spark.cleaner.ttl
Mar 27, 2015
7320c87
updated spark.akka.heartbeat.interval
Mar 27, 2015
272c215
Updated spark.locality.wait
Mar 27, 2015
3352d34
Updated spark.scheduler.maxRegisteredResourcesWaitingTime
Mar 27, 2015
3f1cfc8
Updated spark.scheduler.revive.interval
Mar 27, 2015
6d1518e
Upated spark.speculation.interval
Mar 27, 2015
2fcc91c
Updated spark.dynamicAllocation.executorIdleTimeout
Mar 27, 2015
5181597
Updated spark.dynamicAllocation.schedulerBacklogTimeout
Mar 27, 2015
c6a0095
Updated spark.core.connection.auth.wait.timeout
Mar 27, 2015
cde9bff
Updated spark.streaming.blockInterval
Mar 27, 2015
42477aa
Updated configuration doc with note on specifying time properties
Mar 27, 2015
9a29d8d
Fixed misuse of time in streaming context test
Mar 27, 2015
34f87c2
Update Utils.scala
ilganeli Mar 28, 2015
8f741e1
Update JavaUtils.java
ilganeli Mar 28, 2015
9e2547c
Reverting doc changes
Mar 30, 2015
499bdf0
Merge branch 'SPARK-5931' of github.com:ilganeli/spark into SPARK-5931
Mar 30, 2015
5232a36
[SPARK-5931] Changed default behavior of time string conversion.
Mar 30, 2015
3a12dd8
Updated host revceiver
Mar 30, 2015
68f4e93
Updated more files to clean up usage of default time strings
Mar 30, 2015
70ac213
Fixed remaining usages to be consistent. Updated Java-side time conve…
Mar 30, 2015
647b5ac
Udpated time conversion to use map iterator instead of if fall through
Mar 31, 2015
1c0c07c
Updated Java code to add day, minutes, and hours
Mar 31, 2015
8613631
Whitespace
Mar 31, 2015
bac9edf
More whitespace
Mar 31, 2015
1858197
Fixed bug where all time was being converted to us instead of the app…
Mar 31, 2015
3b126e1
Fixed conversion to US from seconds
Mar 31, 2015
39164f9
[SPARK-5931] Updated Java conversion to be similar to scala conversio…
Mar 31, 2015
b2fc965
replaced get or default since it's not present in this version of java
Mar 31, 2015
dd0a680
Updated scala code to call into java
Mar 31, 2015
bf779b0
Special handling of overlapping usffixes for java
Apr 1, 2015
76cfa27
[SPARK-5931] Minor nit fixes'
Apr 1, 2015
5193d5f
Resolved merge conflicts
Apr 6, 2015
6387772
Updated suffix handling to handle overlap of units more gracefully
Apr 6, 2015
19c31af
Added cleaner computation of time conversions in tests
Apr 6, 2015
ff40bfe
Updated tests to fix small bugs
Apr 6, 2015
28187bf
Convert straight to seconds
Apr 6, 2015
1465390
Nit
Apr 6, 2015
cbf41db
Got rid of thrown exceptions
Apr 7, 2015
d4efd26
Added time conversion for yarn.scheduler.heartbeat.interval-ms
Apr 8, 2015
4e48679
Fixed priority order and mixed up conversions in a couple spots
Apr 8, 2015
1a1122c
Formatting fixes and added m for use as minute formatter
Apr 8, 2015
cbd2ca6
Formatting error
Apr 8, 2015
6f651a8
Now using regexes to simplify code in parseTimeString. Introduces get…
Apr 8, 2015
7d19cdd
Added fix for possible NPE
Apr 8, 2015
dc7bd08
Fixed error in exception handling
Apr 8, 2015
69fedcc
Added test for zero
Apr 8, 2015
8927e66
Fixed handling of -1
Apr 9, 2015
642a06d
Fixed logic for invalid suffixes and addid matching test
Apr 9, 2015
25d3f52
Minor nit fixes
Apr 11, 2015
bc04e05
Minor fixes and doc updates
Apr 13, 2015
951ca2d
Made the most recent round of changes
Apr 13, 2015
f5fafcd
Doc updates
Apr 13, 2015
de3bff9
Fixing style errors
Apr 13, 2015
4526c81
Update configuration.md
Apr 13, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ private[spark] class ExecutorAllocationManager(
Integer.MAX_VALUE)

// How long there must be backlogged tasks for before an addition is triggered (seconds)
private val schedulerBacklogTimeout = conf.getLong(
"spark.dynamicAllocation.schedulerBacklogTimeout", 5)
private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.schedulerBacklogTimeout", "5s")

// Same as above, but used only after `schedulerBacklogTimeout` is exceeded
private val sustainedSchedulerBacklogTimeout = conf.getLong(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
// Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded
private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", s"${schedulerBacklogTimeoutS}s")

// How long an executor must be idle for before it is removed (seconds)
private val executorIdleTimeout = conf.getLong(
"spark.dynamicAllocation.executorIdleTimeout", 600)
private val executorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.executorIdleTimeout", "600s")

// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
Expand Down Expand Up @@ -150,14 +150,14 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
}
if (schedulerBacklogTimeout <= 0) {
if (schedulerBacklogTimeoutS <= 0) {
throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
}
if (sustainedSchedulerBacklogTimeout <= 0) {
if (sustainedSchedulerBacklogTimeoutS <= 0) {
throw new SparkException(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
}
if (executorIdleTimeout <= 0) {
if (executorIdleTimeoutS <= 0) {
throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
}
// Require external shuffle service for dynamic allocation
Expand Down Expand Up @@ -262,8 +262,8 @@ private[spark] class ExecutorAllocationManager(
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeout seconds)")
addTime += sustainedSchedulerBacklogTimeout * 1000
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
addTime += sustainedSchedulerBacklogTimeoutS * 1000
delta
} else {
0
Expand Down Expand Up @@ -351,7 +351,7 @@ private[spark] class ExecutorAllocationManager(
val removeRequestAcknowledged = testing || client.killExecutor(executorId)
if (removeRequestAcknowledged) {
logInfo(s"Removing executor $executorId because it has been idle for " +
s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})")
executorsPendingToRemove.add(executorId)
true
} else {
Expand Down Expand Up @@ -407,8 +407,8 @@ private[spark] class ExecutorAllocationManager(
private def onSchedulerBacklogged(): Unit = synchronized {
if (addTime == NOT_SET) {
logDebug(s"Starting timer to add executors because pending tasks " +
s"are building up (to expire in $schedulerBacklogTimeout seconds)")
addTime = clock.getTimeMillis + schedulerBacklogTimeout * 1000
s"are building up (to expire in $schedulerBacklogTimeoutS seconds)")
addTime = clock.getTimeMillis + schedulerBacklogTimeoutS * 1000
}
}

Expand All @@ -431,8 +431,8 @@ private[spark] class ExecutorAllocationManager(
if (executorIds.contains(executorId)) {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
s"scheduled to run on the executor (to expire in $executorIdleTimeoutS seconds)")
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeoutS * 1000
}
} else {
logWarning(s"Attempted to mark unknown executor $executorId idle")
Expand Down
15 changes: 9 additions & 6 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,17 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)

// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
// "milliseconds"
private val executorTimeoutMs = sc.conf.getOption("spark.network.timeout").map(_.toLong * 1000).
getOrElse(sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120000))

private val slaveTimeoutMs =
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", "120s")
private val executorTimeoutMs =
sc.conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms") * 1000

// "spark.network.timeoutInterval" uses "seconds", while
// "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
private val checkTimeoutIntervalMs =
sc.conf.getOption("spark.network.timeoutInterval").map(_.toLong * 1000).
getOrElse(sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000))
private val timeoutIntervalMs =
sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s")
private val checkTimeoutIntervalMs =
sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000

private var timeoutCheckingTask: ScheduledFuture[_] = null

Expand Down
36 changes: 36 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,42 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
getOption(key).getOrElse(defaultValue)
}

/**
* Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
* suffix is provided then seconds are assumed.
* @throws NoSuchElementException
*/
def getTimeAsSeconds(key: String): Long = {
Utils.timeStringAsSeconds(get(key))
}

/**
* Get a time parameter as seconds, falling back to a default if not set. If no
* suffix is provided then seconds are assumed.
*
*/
def getTimeAsSeconds(key: String, defaultValue: String): Long = {
Utils.timeStringAsSeconds(get(key, defaultValue))
}

/**
* Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
* suffix is provided then milliseconds are assumed.
* @throws NoSuchElementException
*/
def getTimeAsMs(key: String): Long = {
Utils.timeStringAsMs(get(key))
}

/**
* Get a time parameter as milliseconds, falling back to a default if not set. If no
* suffix is provided then milliseconds are assumed.
*/
def getTimeAsMs(key: String, defaultValue: String): Long = {
Utils.timeStringAsMs(get(key, defaultValue))
}


/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
Option(settings.get(key))
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -436,14 +436,14 @@ private[spark] class Executor(
* This thread stops running when the executor is stopped.
*/
private def startDriverHeartbeater(): Unit = {
val interval = conf.getInt("spark.executor.heartbeatInterval", 10000)
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
val thread = new Thread() {
override def run() {
// Sleep a random interval so the heartbeats don't end up in sync
Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])
Thread.sleep(intervalMs + (math.random * intervalMs).asInstanceOf[Int])
while (!isStopped) {
reportHeartBeat()
Thread.sleep(interval)
Thread.sleep(intervalMs)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ private[nio] class ConnectionManager(
new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor"))

private val ackTimeout =
conf.getInt("spark.core.connection.ack.wait.timeout", conf.getInt("spark.network.timeout", 120))
conf.getTimeAsSeconds("spark.core.connection.ack.wait.timeout",
conf.get("spark.network.timeout", "120s"))

// Get the thread counts from the Spark Configuration.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ private[spark] class TaskSchedulerImpl(
val conf = sc.conf

// How often to check for speculative tasks
val SPECULATION_INTERVAL = conf.getLong("spark.speculation.interval", 100)
val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")

// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000)
val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")

// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
Expand Down Expand Up @@ -143,8 +143,8 @@ private[spark] class TaskSchedulerImpl(
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds,
SPECULATION_INTERVAL_MS milliseconds) {
Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
}
}
Expand Down Expand Up @@ -173,7 +173,7 @@ private[spark] class TaskSchedulerImpl(
this.cancel()
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,15 +848,18 @@ private[spark] class TaskSetManager(
}

private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
val defaultWait = conf.get("spark.locality.wait", "3000")
level match {
case TaskLocality.PROCESS_LOCAL =>
conf.get("spark.locality.wait.process", defaultWait).toLong
case TaskLocality.NODE_LOCAL =>
conf.get("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
conf.get("spark.locality.wait.rack", defaultWait).toLong
case _ => 0L
val defaultWait = conf.get("spark.locality.wait", "3s")
val localityWaitKey = level match {
case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
case _ => null
}

if (localityWaitKey != null) {
conf.getTimeAsMs(localityWaitKey, defaultWait)
} else {
0L
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just an idea, maybe we can rewrite this as:

val defaultWait = conf.get("spark.locality.wait", "3s")
val localityWaitKey =
  key match {
    case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
    case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
    case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
  }
Utils.timeStringAsMs(conf.get(localityWaitKey, defaultWait))

Looks nicer IMO, less duplicate code

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
// Submit tasks after maxRegisteredWaitingTime milliseconds
// if minRegisteredRatio has not yet been reached
val maxRegisteredWaitingTime =
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
val maxRegisteredWaitingTimeMs =
conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
val createTime = System.currentTimeMillis()

private val executorDataMap = new HashMap[String, ExecutorData]
Expand All @@ -77,12 +77,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

override def onStart() {
// Periodically revive offers to allow delay scheduling to work
val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000)
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")

reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}
}, 0, reviveInterval, TimeUnit.MILLISECONDS)
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
}

override def receive: PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -301,9 +302,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
return true
}
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTimeMs) {
logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTime(ms)")
s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeMs(ms)")
return true
}
false
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.util
import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.Await
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.Try

import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import akka.pattern.ask
Expand Down Expand Up @@ -66,7 +65,8 @@ private[spark] object AkkaUtils extends Logging {

val akkaThreads = conf.getInt("spark.akka.threads", 4)
val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
val akkaTimeout = conf.getInt("spark.akka.timeout", conf.getInt("spark.network.timeout", 120))
val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout",
conf.get("spark.network.timeout", "120s"))
val akkaFrameSize = maxFrameSizeBytes(conf)
val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
Expand All @@ -78,8 +78,8 @@ private[spark] object AkkaUtils extends Logging {

val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"

val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 6000)
val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)
val akkaHeartBeatPausesS = conf.getTimeAsSeconds("spark.akka.heartbeat.pauses", "6000s")
val akkaHeartBeatIntervalS = conf.getTimeAsSeconds("spark.akka.heartbeat.interval", "1000s")

val secretKey = securityManager.getSecretKey()
val isAuthOn = securityManager.isAuthenticationEnabled()
Expand All @@ -102,14 +102,14 @@ private[spark] object AkkaUtils extends Logging {
|akka.jvm-exit-on-fatal-error = off
|akka.remote.require-cookie = "$requireCookie"
|akka.remote.secure-cookie = "$secureCookie"
|akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
|akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
|akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatIntervalS s
|akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPausesS s
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = $port
|akka.remote.netty.tcp.tcp-nodelay = on
|akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
|akka.remote.netty.tcp.connection-timeout = $akkaTimeoutS s
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
|akka.remote.netty.tcp.execution-pool-size = $akkaThreads
|akka.actor.default-dispatcher.throughput = $akkaBatchSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[spark] object MetadataCleanerType extends Enumeration {
// initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
private[spark] object MetadataCleaner {
def getDelaySeconds(conf: SparkConf): Int = {
conf.getInt("spark.cleaner.ttl", -1)
conf.getTimeAsSeconds("spark.cleaner.ttl", "-1").toInt
}

def getDelaySeconds(
Expand Down
26 changes: 22 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
import java.util.{Properties, Locale, Random, UUID}
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection

import scala.collection.JavaConversions._
Expand All @@ -46,6 +46,7 @@ import tachyon.client.{TachyonFS, TachyonFile}

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}

/** CallSite represents a place in user code. It can have a short and a long form. */
Expand Down Expand Up @@ -611,9 +612,10 @@ private[spark] object Utils extends Logging {
}
Utils.setupSecureURLConnection(uc, securityMgr)

val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000
uc.setConnectTimeout(timeout)
uc.setReadTimeout(timeout)
val timeoutMs =
conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
uc.setConnectTimeout(timeoutMs)
uc.setReadTimeout(timeoutMs)
uc.connect()
val in = uc.getInputStream()
downloadFile(url, in, targetFile, fileOverwrite)
Expand Down Expand Up @@ -1010,6 +1012,22 @@ private[spark] object Utils extends Logging {
)
}

/**
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in ms.
*/
def timeStringAsMs(str: String): Long = {
JavaUtils.timeStringAsMs(str)
}

/**
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in seconds.
*/
def timeStringAsSeconds(str: String): Long = {
JavaUtils.timeStringAsSec(str)
}

/**
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
*/
Expand Down
Loading