Skip to content

Commit 3b6d48c

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
2 parents 926bd6a + 2f63995 commit 3b6d48c

File tree

18 files changed

+267
-82
lines changed

18 files changed

+267
-82
lines changed

.rat-excludes

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,8 @@ test.out/*
4343
.*iml
4444
service.properties
4545
db.lck
46+
build/*
47+
dist/*
48+
.*out
49+
.*ipr
50+
.*iws

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ class SparkContext(config: SparkConf) extends Logging {
6666
// contains a map from hostname to a list of input format splits on the host.
6767
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()
6868

69+
/**
70+
* Create a SparkContext that loads settings from system properties (for instance, when
71+
* launching with ./bin/spark-submit).
72+
*/
73+
def this() = this(new SparkConf())
74+
6975
/**
7076
* :: DeveloperApi ::
7177
* Alternative constructor for setting preferred locations where Spark will create executors.

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -278,10 +278,11 @@ object SparkEnv extends Logging {
278278
addedJars: Seq[String],
279279
addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {
280280

281+
import Properties._
281282
val jvmInformation = Seq(
282-
("Java Version", "%s (%s)".format(Properties.javaVersion, Properties.javaVendor)),
283-
("Java Home", Properties.javaHome),
284-
("Scala Version", Properties.versionString)
283+
("Java Version", s"$javaVersion ($javaVendor)"),
284+
("Java Home", javaHome),
285+
("Scala Version", versionString)
285286
).sorted
286287

287288
// Spark properties
@@ -296,18 +297,15 @@ object SparkEnv extends Logging {
296297

297298
// System properties that are not java classpaths
298299
val systemProperties = System.getProperties.iterator.toSeq
299-
val otherProperties = systemProperties.filter { case (k, v) =>
300+
val otherProperties = systemProperties.filter { case (k, _) =>
300301
k != "java.class.path" && !k.startsWith("spark.")
301302
}.sorted
302303

303304
// Class paths including all added jars and files
304-
val classPathProperty = systemProperties.find { case (k, v) =>
305-
k == "java.class.path"
306-
}.getOrElse(("", ""))
307-
val classPathEntries = classPathProperty._2
305+
val classPathEntries = javaClassPath
308306
.split(File.pathSeparator)
309-
.filterNot(e => e.isEmpty)
310-
.map(e => (e, "System Classpath"))
307+
.filterNot(_.isEmpty)
308+
.map((_, "System Classpath"))
311309
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
312310
val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted
313311

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ object SparkSubmit {
6868

6969
/**
7070
* @return a tuple containing the arguments for the child, a list of classpath
71-
* entries for the child, a list of system propertes, a list of env vars
71+
* entries for the child, a list of system properties, a list of env vars
7272
* and the main class for the child
7373
*/
7474
private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String],

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
3030
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
3131
import org.apache.spark.deploy.DeployMessages._
3232
import org.apache.spark.deploy.master.Master
33-
import org.apache.spark.util.AkkaUtils
33+
import org.apache.spark.util.{Utils, AkkaUtils}
3434

3535
/**
3636
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
@@ -88,13 +88,15 @@ private[spark] class AppClient(
8888
var retries = 0
8989
registrationRetryTimer = Some {
9090
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
91-
retries += 1
92-
if (registered) {
93-
registrationRetryTimer.foreach(_.cancel())
94-
} else if (retries >= REGISTRATION_RETRIES) {
95-
markDead("All masters are unresponsive! Giving up.")
96-
} else {
97-
tryRegisterAllMasters()
91+
Utils.tryOrExit {
92+
retries += 1
93+
if (registered) {
94+
registrationRetryTimer.foreach(_.cancel())
95+
} else if (retries >= REGISTRATION_RETRIES) {
96+
markDead("All masters are unresponsive! Giving up.")
97+
} else {
98+
tryRegisterAllMasters()
99+
}
98100
}
99101
}
100102
}

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -166,14 +166,16 @@ private[spark] class Worker(
166166
var retries = 0
167167
registrationRetryTimer = Some {
168168
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
169-
retries += 1
170-
if (registered) {
171-
registrationRetryTimer.foreach(_.cancel())
172-
} else if (retries >= REGISTRATION_RETRIES) {
173-
logError("All masters are unresponsive! Giving up.")
174-
System.exit(1)
175-
} else {
176-
tryRegisterAllMasters()
169+
Utils.tryOrExit {
170+
retries += 1
171+
if (registered) {
172+
registrationRetryTimer.foreach(_.cancel())
173+
} else if (retries >= REGISTRATION_RETRIES) {
174+
logError("All masters are unresponsive! Giving up.")
175+
System.exit(1)
176+
} else {
177+
tryRegisterAllMasters()
178+
}
177179
}
178180
}
179181
}

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import scala.util.Random
3131
import org.apache.spark._
3232
import org.apache.spark.TaskState.TaskState
3333
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
34+
import org.apache.spark.util.Utils
3435

3536
/**
3637
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
@@ -139,7 +140,7 @@ private[spark] class TaskSchedulerImpl(
139140
import sc.env.actorSystem.dispatcher
140141
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
141142
SPECULATION_INTERVAL milliseconds) {
142-
checkSpeculatableTasks()
143+
Utils.tryOrExit { checkSpeculatableTasks() }
143144
}
144145
}
145146
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ private[spark] class BlockManager(
155155
BlockManagerWorker.startBlockManagerWorker(this)
156156
if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
157157
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
158-
heartBeat()
158+
Utils.tryOrExit { heartBeat() }
159159
}
160160
}
161161
}

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,13 @@ private[spark] object UIUtils extends Logging {
3636
def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
3737

3838
def formatDuration(milliseconds: Long): String = {
39+
if (milliseconds < 100) {
40+
return "%d ms".format(milliseconds)
41+
}
3942
val seconds = milliseconds.toDouble / 1000
43+
if (seconds < 1) {
44+
return "%.1f s".format(seconds)
45+
}
4046
if (seconds < 60) {
4147
return "%.0f s".format(seconds)
4248
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import tachyon.client.{TachyonFile,TachyonFS}
4040

4141
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
4242
import org.apache.spark.deploy.SparkHadoopUtil
43+
import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
4344
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
4445

4546
/**
@@ -780,6 +781,18 @@ private[spark] object Utils extends Logging {
780781
output.toString
781782
}
782783

784+
/**
785+
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
786+
* default UncaughtExceptionHandler
787+
*/
788+
def tryOrExit(block: => Unit) {
789+
try {
790+
block
791+
} catch {
792+
case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t)
793+
}
794+
}
795+
783796
/**
784797
* A regular expression to match classes of the "core" Spark API that we want to skip when
785798
* finding the call site of a method.

0 commit comments

Comments
 (0)