Skip to content

Commit 17f3075

Browse files
markhamstrapwendell
authored andcommitted
[SPARK-1620] Handle uncaught exceptions in function run by Akka scheduler
If the intended behavior was that uncaught exceptions thrown in functions being run by the Akka scheduler would end up being handled by the default uncaught exception handler set in Executor, and if that behavior is, in fact, correct, then this is a way to accomplish that. I'm not certain, though, that we shouldn't be doing something different to handle uncaught exceptions from some of these scheduled functions. In any event, this PR covers all of the cases I comment on in [SPARK-1620](https://issues.apache.org/jira/browse/SPARK-1620). Author: Mark Hamstra <[email protected]> Closes apache#622 from markhamstra/SPARK-1620 and squashes the following commits: 071d193 [Mark Hamstra] refactored post-SPARK-1772 1a6a35e [Mark Hamstra] another style fix d30eb94 [Mark Hamstra] scalastyle 3573ecd [Mark Hamstra] Use wrapped try/catch in Utils.tryOrExit 8fc0439 [Mark Hamstra] Make functions run by the Akka scheduler use Executor's UncaughtExceptionHandler
1 parent d58cb33 commit 17f3075

File tree

5 files changed

+36
-18
lines changed

5 files changed

+36
-18
lines changed

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
3030
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
3131
import org.apache.spark.deploy.DeployMessages._
3232
import org.apache.spark.deploy.master.Master
33-
import org.apache.spark.util.AkkaUtils
33+
import org.apache.spark.util.{Utils, AkkaUtils}
3434

3535
/**
3636
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
@@ -88,13 +88,15 @@ private[spark] class AppClient(
8888
var retries = 0
8989
registrationRetryTimer = Some {
9090
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
91-
retries += 1
92-
if (registered) {
93-
registrationRetryTimer.foreach(_.cancel())
94-
} else if (retries >= REGISTRATION_RETRIES) {
95-
markDead("All masters are unresponsive! Giving up.")
96-
} else {
97-
tryRegisterAllMasters()
91+
Utils.tryOrExit {
92+
retries += 1
93+
if (registered) {
94+
registrationRetryTimer.foreach(_.cancel())
95+
} else if (retries >= REGISTRATION_RETRIES) {
96+
markDead("All masters are unresponsive! Giving up.")
97+
} else {
98+
tryRegisterAllMasters()
99+
}
98100
}
99101
}
100102
}

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -166,14 +166,16 @@ private[spark] class Worker(
166166
var retries = 0
167167
registrationRetryTimer = Some {
168168
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
169-
retries += 1
170-
if (registered) {
171-
registrationRetryTimer.foreach(_.cancel())
172-
} else if (retries >= REGISTRATION_RETRIES) {
173-
logError("All masters are unresponsive! Giving up.")
174-
System.exit(1)
175-
} else {
176-
tryRegisterAllMasters()
169+
Utils.tryOrExit {
170+
retries += 1
171+
if (registered) {
172+
registrationRetryTimer.foreach(_.cancel())
173+
} else if (retries >= REGISTRATION_RETRIES) {
174+
logError("All masters are unresponsive! Giving up.")
175+
System.exit(1)
176+
} else {
177+
tryRegisterAllMasters()
178+
}
177179
}
178180
}
179181
}

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import scala.util.Random
3131
import org.apache.spark._
3232
import org.apache.spark.TaskState.TaskState
3333
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
34+
import org.apache.spark.util.Utils
3435

3536
/**
3637
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
@@ -139,7 +140,7 @@ private[spark] class TaskSchedulerImpl(
139140
import sc.env.actorSystem.dispatcher
140141
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
141142
SPECULATION_INTERVAL milliseconds) {
142-
checkSpeculatableTasks()
143+
Utils.tryOrExit { checkSpeculatableTasks() }
143144
}
144145
}
145146
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ private[spark] class BlockManager(
155155
BlockManagerWorker.startBlockManagerWorker(this)
156156
if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
157157
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
158-
heartBeat()
158+
Utils.tryOrExit { heartBeat() }
159159
}
160160
}
161161
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import tachyon.client.{TachyonFile,TachyonFS}
4040

4141
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
4242
import org.apache.spark.deploy.SparkHadoopUtil
43+
import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
4344
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
4445

4546
/**
@@ -780,6 +781,18 @@ private[spark] object Utils extends Logging {
780781
output.toString
781782
}
782783

784+
/**
785+
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
786+
* default UncaughtExceptionHandler
787+
*/
788+
def tryOrExit(block: => Unit) {
789+
try {
790+
block
791+
} catch {
792+
case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t)
793+
}
794+
}
795+
783796
/**
784797
* A regular expression to match classes of the "core" Spark API that we want to skip when
785798
* finding the call site of a method.

0 commit comments

Comments
 (0)