Skip to content

Commit cdea0a1

Browse files
committed
Merge pull request #7 from markhamstra/master-csd
SPY-302 backporting of SPARK-1620, SPARK-1685, SPARK-1686, SPARK-1772
2 parents 49621ec + 76dc266 commit cdea0a1

File tree

12 files changed

+163
-70
lines changed

12 files changed

+163
-70
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,8 @@ class SparkContext(
218218
} catch {
219219
// TODO: Enumerate the exact reasons why it can fail
220220
// But irrespective of it, it means we cannot proceed !
221-
case th: Throwable => {
222-
throw new SparkException("YARN mode not available ?", th)
221+
case e: Exception => {
222+
throw new SparkException("YARN mode not available ?", e)
223223
}
224224
}
225225
val backend = new CoarseGrainedSchedulerBackend(scheduler, this.env.actorSystem)
@@ -233,8 +233,8 @@ class SparkContext(
233233
cons.newInstance(this).asInstanceOf[ClusterScheduler]
234234

235235
} catch {
236-
case th: Throwable => {
237-
throw new SparkException("YARN mode not available ?", th)
236+
case e: Exception => {
237+
throw new SparkException("YARN mode not available ?", e)
238238
}
239239
}
240240

@@ -243,8 +243,8 @@ class SparkContext(
243243
val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext])
244244
cons.newInstance(scheduler, this).asInstanceOf[CoarseGrainedSchedulerBackend]
245245
} catch {
246-
case th: Throwable => {
247-
throw new SparkException("YARN mode not available ?", th)
246+
case e: Exception => {
247+
throw new SparkException("YARN mode not available ?", e)
248248
}
249249
}
250250

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,6 @@ private[spark] object PythonRDD {
265265
}
266266
} catch {
267267
case eof: EOFException => {}
268-
case e => throw e
269268
}
270269
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
271270
}

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,11 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
5858
try {
5959
new Socket(daemonHost, daemonPort)
6060
} catch {
61-
case exc: SocketException => {
61+
case exc: SocketException =>
6262
logWarning("Python daemon unexpectedly quit, attempting to restart")
6363
stopDaemon()
6464
startDaemon()
6565
new Socket(daemonHost, daemonPort)
66-
}
67-
case e => throw e
6866
}
6967
}
7068
}

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ object SparkHadoopUtil {
7373
try {
7474
Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
7575
} catch {
76-
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
76+
case e: Exception => throw new SparkException("Unable to load YARN support", e)
7777
}
7878
} else {
7979
new SparkHadoopUtil

core/src/main/scala/org/apache/spark/deploy/client/Client.scala

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.Logging
3333
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
3434
import org.apache.spark.deploy.DeployMessages._
3535
import org.apache.spark.deploy.master.Master
36+
import org.apache.spark.util.Utils
3637

3738

3839
/**
@@ -61,6 +62,7 @@ private[spark] class Client(
6162
var masterAddress: Address = null
6263
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
6364
var alreadyDead = false // To avoid calling listener.dead() multiple times
65+
var registrationRetryTimer: Option[Cancellable] = None
6466

6567
override def preStart() {
6668
try {
@@ -85,19 +87,21 @@ private[spark] class Client(
8587
tryRegisterAllMasters()
8688

8789
var retries = 0
88-
lazy val retryTimer: Cancellable =
90+
registrationRetryTimer = Some {
8991
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
90-
retries += 1
91-
if (registered) {
92-
retryTimer.cancel()
93-
} else if (retries >= REGISTRATION_RETRIES) {
94-
logError("All masters are unresponsive! Giving up.")
95-
markDead()
96-
} else {
97-
tryRegisterAllMasters()
92+
Utils.tryOrExit {
93+
retries += 1
94+
if (registered) {
95+
registrationRetryTimer.foreach(_.cancel())
96+
} else if (retries >= REGISTRATION_RETRIES) {
97+
logError("All masters are unresponsive! Giving up.")
98+
markDead()
99+
} else {
100+
tryRegisterAllMasters()
101+
}
98102
}
99103
}
100-
retryTimer // start timer
104+
}
101105
}
102106

103107
def changeMaster(url: String) {
@@ -174,6 +178,10 @@ private[spark] class Client(
174178
alreadyDead = true
175179
}
176180
}
181+
182+
override def postStop() {
183+
registrationRetryTimer.foreach(_.cancel())
184+
}
177185
}
178186

179187
def start() {

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
8585

8686
var leaderElectionAgent: ActorRef = _
8787

88+
private var recoveryCompletionTask: Cancellable = _
89+
8890
// As a temporary workaround before better ways of configuring memory, we allow users to set
8991
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
9092
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
@@ -128,6 +130,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
128130
}
129131

130132
override def postStop() {
133+
// prevent the CompleteRecovery message sending to restarted master
134+
if (recoveryCompletionTask != null) {
135+
recoveryCompletionTask.cancel()
136+
}
131137
webUi.stop()
132138
masterMetricsSystem.stop()
133139
applicationMetricsSystem.stop()
@@ -147,10 +153,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
147153

148154
if (state == RecoveryState.RECOVERING) {
149155
beginRecovery(storedApps, storedWorkers)
150-
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
156+
recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
157+
CompleteRecovery)
151158
}
152159
}
153160

161+
case CompleteRecovery => completeRecovery()
162+
154163
case RevokedLeadership => {
155164
logError("Leadership has been revoked -- master shutting down.")
156165
System.exit(0)
@@ -350,15 +359,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
350359
* Schedule the currently available resources among waiting apps. This method will be called
351360
* every time a new app joins or resource availability changes.
352361
*/
353-
def schedule() {
362+
private def schedule() {
354363
if (state != RecoveryState.ALIVE) { return }
355364
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
356365
// in the queue, then the second app, etc.
357366
if (spreadOutApps) {
358367
// Try to spread out each app among all the nodes, until it has all its cores
359368
for (app <- waitingApps if app.coresLeft > 0) {
360369
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
361-
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
370+
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
362371
val numUsable = usableWorkers.length
363372
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
364373
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ private[spark] class Worker(
8585
val metricsSystem = MetricsSystem.createMetricsSystem("worker")
8686
val workerSource = new WorkerSource(this)
8787

88+
var registrationRetryTimer: Option[Cancellable] = None
89+
8890
def coresFree: Int = cores - coresUsed
8991
def memoryFree: Int = memory - memoryUsed
9092

@@ -144,21 +146,22 @@ private[spark] class Worker(
144146

145147
def registerWithMaster() {
146148
tryRegisterAllMasters()
147-
148149
var retries = 0
149-
lazy val retryTimer: Cancellable =
150+
registrationRetryTimer = Some {
150151
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
151-
retries += 1
152-
if (registered) {
153-
retryTimer.cancel()
154-
} else if (retries >= REGISTRATION_RETRIES) {
155-
logError("All masters are unresponsive! Giving up.")
156-
System.exit(1)
157-
} else {
158-
tryRegisterAllMasters()
152+
Utils.tryOrExit {
153+
retries += 1
154+
if (registered) {
155+
registrationRetryTimer.foreach(_.cancel())
156+
} else if (retries >= REGISTRATION_RETRIES) {
157+
logError("All masters are unresponsive! Giving up.")
158+
System.exit(1)
159+
} else {
160+
tryRegisterAllMasters()
161+
}
159162
}
160163
}
161-
retryTimer // start timer
164+
}
162165
}
163166

164167
override def receive = {
@@ -260,6 +263,7 @@ private[spark] class Worker(
260263
}
261264

262265
override def postStop() {
266+
registrationRetryTimer.foreach(_.cancel())
263267
executors.values.foreach(_.kill())
264268
webUi.stop()
265269
metricsSystem.stop()

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -79,28 +79,7 @@ private[spark] class Executor(
7979
// Setup an uncaught exception handler for non-local mode.
8080
// Make any thread terminations due to uncaught exceptions kill the entire
8181
// executor process to avoid surprising stalls.
82-
Thread.setDefaultUncaughtExceptionHandler(
83-
new Thread.UncaughtExceptionHandler {
84-
override def uncaughtException(thread: Thread, exception: Throwable) {
85-
try {
86-
logError("Uncaught exception in thread " + thread, exception)
87-
88-
// We may have been called from a shutdown hook. If so, we must not call System.exit().
89-
// (If we do, we will deadlock.)
90-
if (!Utils.inShutdown()) {
91-
if (exception.isInstanceOf[OutOfMemoryError]) {
92-
System.exit(ExecutorExitCode.OOM)
93-
} else {
94-
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
95-
}
96-
}
97-
} catch {
98-
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
99-
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
100-
}
101-
}
102-
}
103-
)
82+
Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler)
10483
}
10584

10685
val executorSource = new ExecutorSource(this, executorId)
@@ -258,6 +237,11 @@ private[spark] class Executor(
258237
}
259238

260239
case t: Throwable => {
240+
// Attempt to exit cleanly by informing the driver of our failure.
241+
// If anything goes wrong (or this was a fatal exception), we will delegate to
242+
// the default uncaught exception handler, which will terminate the Executor.
243+
logError("Exception in task ID " + taskId, t)
244+
261245
val serviceTime = (System.currentTimeMillis() - taskStart).toInt
262246
val metrics = attemptedTask.flatMap(t => t.metrics)
263247
for (m <- metrics) {
@@ -267,11 +251,11 @@ private[spark] class Executor(
267251
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
268252
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
269253

270-
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
271-
// have left some weird state around depending on when the exception was thrown, but on
272-
// the other hand, maybe we could detect that when future tasks fail and exit then.
273-
logError("Exception in task ID " + taskId, t)
274-
//System.exit(1)
254+
// Don't forcibly exit unless the exception was inherently fatal, to avoid
255+
// stopping other tasks unnecessarily.
256+
if (Utils.isFatalError(t)) {
257+
ExecutorUncaughtExceptionHandler.uncaughtException(t)
258+
}
275259
}
276260
} finally {
277261
runningTasks.remove(taskId)
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.executor
19+
20+
import org.apache.spark.Logging
21+
import org.apache.spark.util.Utils
22+
23+
/**
24+
* The default uncaught exception handler for Executors terminates the whole process, to avoid
25+
* getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
26+
* to fail fast when things go wrong.
27+
*/
28+
private[spark] object ExecutorUncaughtExceptionHandler
29+
extends Thread.UncaughtExceptionHandler with Logging {
30+
31+
override def uncaughtException(thread: Thread, exception: Throwable) {
32+
try {
33+
logError("Uncaught exception in thread " + thread, exception)
34+
35+
// We may have been called from a shutdown hook. If so, we must not call System.exit().
36+
// (If we do, we will deadlock.)
37+
if (!Utils.inShutdown()) {
38+
if (exception.isInstanceOf[OutOfMemoryError]) {
39+
System.exit(ExecutorExitCode.OOM)
40+
} else {
41+
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
42+
}
43+
}
44+
} catch {
45+
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
46+
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
47+
}
48+
}
49+
50+
def uncaughtException(exception: Throwable) {
51+
uncaughtException(Thread.currentThread(), exception)
52+
}
53+
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ private[spark] class BlockManager(
128128
BlockManagerWorker.startBlockManagerWorker(this)
129129
if (!BlockManager.getDisableHeartBeatsForTesting) {
130130
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
131-
heartBeat()
131+
Utils.tryOrExit { heartBeat() }
132132
}
133133
}
134134
}

0 commit comments

Comments
 (0)