diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index e03c6f9c184c9..a7790ce03d619 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -218,7 +218,6 @@ private[spark] object PythonRDD { } } catch { case eof: EOFException => {} - case e: Throwable => throw e } JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism)) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index f291266fcf17c..f8c6312bad884 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -58,13 +58,11 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { new Socket(daemonHost, daemonPort) } catch { - case exc: SocketException => { + case exc: SocketException => logWarning("Python daemon unexpectedly quit, attempting to restart") stopDaemon() startDaemon() new Socket(daemonHost, daemonPort) - } - case e: Throwable => throw e } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index f31dd4eba87c0..3086206ce79ea 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -89,7 +89,7 @@ object SparkHadoopUtil { .newInstance() .asInstanceOf[SparkHadoopUtil] } catch { - case th: Throwable => throw new SparkException("Unable to load YARN support", th) + case e: Exception => throw new SparkException("Unable to load YARN support", e) } } else { new SparkHadoopUtil diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 7e98c7f8ff99b..ed03f45a46c47 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -31,6 +31,8 @@ import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master import org.apache.spark.util.AkkaUtils +import org.apache.spark.util.Utils + /** * Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL, @@ -88,14 +90,16 @@ private[spark] class AppClient( var retries = 0 registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { - retries += 1 - if (registered) { - registrationRetryTimer.foreach(_.cancel()) - } else if (retries >= REGISTRATION_RETRIES) { - logError("All masters are unresponsive! Giving up.") - markDead() - } else { - tryRegisterAllMasters() + Utils.tryOrExit { + retries += 1 + if (registered) { + registrationRetryTimer.foreach(_.cancel()) + } else if (retries >= REGISTRATION_RETRIES) { + logError("All masters are unresponsive! Giving up.") + markDead() + } else { + tryRegisterAllMasters() + } } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index bc7dd7dc8a199..4b41ded80abda 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -163,14 +163,16 @@ private[spark] class Worker( var retries = 0 registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { - retries += 1 - if (registered) { - registrationRetryTimer.foreach(_.cancel()) - } else if (retries >= REGISTRATION_RETRIES) { - logError("All masters are unresponsive! Giving up.") - System.exit(1) - } else { - tryRegisterAllMasters() + Utils.tryOrExit { + retries += 1 + if (registered) { + registrationRetryTimer.foreach(_.cancel()) + } else if (retries >= REGISTRATION_RETRIES) { + logError("All masters are unresponsive! Giving up.") + System.exit(1) + } else { + tryRegisterAllMasters() + } } } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 07411bfa1c172..356c04fed8a2a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -78,28 +78,7 @@ private[spark] class Executor( // Setup an uncaught exception handler for non-local mode. // Make any thread terminations due to uncaught exceptions kill the entire // executor process to avoid surprising stalls. - Thread.setDefaultUncaughtExceptionHandler( - new Thread.UncaughtExceptionHandler { - override def uncaughtException(thread: Thread, exception: Throwable) { - try { - logError("Uncaught exception in thread " + thread, exception) - - // We may have been called from a shutdown hook. If so, we must not call System.exit(). - // (If we do, we will deadlock.) - if (!Utils.inShutdown()) { - if (exception.isInstanceOf[OutOfMemoryError]) { - System.exit(ExecutorExitCode.OOM) - } else { - System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) - } - } - } catch { - case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) - case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) - } - } - } - ) + Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler) } val executorSource = new ExecutorSource(this, executorId) @@ -257,6 +236,11 @@ private[spark] class Executor( } case t: Throwable => { + // Attempt to exit cleanly by informing the driver of our failure. + // If anything goes wrong (or this was a fatal exception), we will delegate to + // the default uncaught exception handler, which will terminate the Executor. + logError("Exception in task ID " + taskId, t) + val serviceTime = (System.currentTimeMillis() - taskStart).toInt val metrics = attemptedTask.flatMap(t => t.metrics) for (m <- metrics) { @@ -266,11 +250,11 @@ private[spark] class Executor( val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) - // TODO: Should we exit the whole executor here? On the one hand, the failed task may - // have left some weird state around depending on when the exception was thrown, but on - // the other hand, maybe we could detect that when future tasks fail and exit then. - logError("Exception in task ID " + taskId, t) - //System.exit(1) + // Don't forcibly exit unless the exception was inherently fatal, to avoid + // stopping other tasks unnecessarily. + if (Utils.isFatalError(t)) { + ExecutorUncaughtExceptionHandler.uncaughtException(t) + } } } finally { // TODO: Unregister shuffle memory only for ResultTask diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala new file mode 100644 index 0000000000000..b0e984c03964c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.Logging +import org.apache.spark.util.Utils + +/** + * The default uncaught exception handler for Executors terminates the whole process, to avoid + * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better + * to fail fast when things go wrong. + */ +private[spark] object ExecutorUncaughtExceptionHandler + extends Thread.UncaughtExceptionHandler with Logging { + + override def uncaughtException(thread: Thread, exception: Throwable) { + try { + logError("Uncaught exception in thread " + thread, exception) + + // We may have been called from a shutdown hook. If so, we must not call System.exit(). + // (If we do, we will deadlock.) + if (!Utils.inShutdown()) { + if (exception.isInstanceOf[OutOfMemoryError]) { + System.exit(ExecutorExitCode.OOM) + } else { + System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) + } + } + } catch { + case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) + } + } + + def uncaughtException(exception: Throwable) { + uncaughtException(Thread.currentThread(), exception) + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 41cca8dffcee7..b4ce5848ae8a9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -133,7 +133,7 @@ private[spark] class BlockManager( BlockManagerWorker.startBlockManagerWorker(this) if (!BlockManager.getDisableHeartBeatsForTesting(conf)) { heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { - heartBeat() + Utils.tryOrExit { heartBeat() } } } } @@ -842,8 +842,26 @@ private[spark] class BlockManager( bytes: ByteBuffer, serializer: Serializer = defaultSerializer): Iterator[Any] = { bytes.rewind() - val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) - serializer.newInstance().deserializeStream(stream).asIterator + + def getIterator = { + val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) + serializer.newInstance().deserializeStream(stream).asIterator + } + + if (blockId.isShuffle) { + // Reducer may need to read many local shuffle blocks and will wrap them into Iterators + // at the beginning. The wrapping will cost some memory (compression instance + // initialization, etc.). Reducer read shuffle blocks one by one so we could do the + // wrapping lazily to save memory. + class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] { + lazy val proxy = f + override def hasNext: Boolean = proxy.hasNext + override def next(): Any = proxy.next() + } + new LazyProxyIterator(getIterator) + } else { + getIterator + } } def stop() { diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index f3e1c38744d78..e15de53b37117 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -134,14 +134,14 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD private def addShutdownHook() { localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir)) Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { - override def run() { + override def run(): Unit = Utils.logUncaughtExceptions { logDebug("Shutdown hook called") localDirs.foreach { localDir => try { if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) } catch { - case t: Throwable => - logError("Exception while deleting local spark dir: " + localDir, t) + case e: Exception => + logError("Exception while deleting local spark dir: " + localDir, e) } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ec6c1ed857a05..863e76aa8fd05 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import java.io._ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address} +import java.nio.ByteBuffer import java.util.{Locale, Random, UUID} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} @@ -27,19 +28,16 @@ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.io.Source import scala.reflect.ClassTag +import scala.util.control.{ControlThrowable, NonFatal} import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} -import org.apache.hadoop.io._ - -import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} +import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil -import java.nio.ByteBuffer -import org.apache.spark.{SparkConf, SparkException, Logging} - +import org.apache.spark.executor.ExecutorUncaughtExceptionHandler +import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} /** * Various utility methods used by Spark. @@ -621,6 +619,18 @@ private[spark] object Utils extends Logging { output.toString } + /** + * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the + * default UncaughtExceptionHandler + */ + def tryOrExit(block: => Unit) { + try { + block + } catch { + case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t) + } + } + /** * A regular expression to match classes of the "core" Spark API that we want to skip when * finding the call site of a method. @@ -833,4 +843,28 @@ private[spark] object Utils extends Logging { System.currentTimeMillis - start } + /** + * Executes the given block, printing and re-throwing any uncaught exceptions. + * This is particularly useful for wrapping code that runs in a thread, to ensure + * that exceptions are printed, and to avoid having to catch Throwable. + */ + def logUncaughtExceptions[T](f: => T): T = { + try { + f + } catch { + case t: Throwable => + logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) + throw t + } + } + + /** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */ + def isFatalError(e: Throwable): Boolean = { + e match { + case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable => + false + case _ => + true + } + } } diff --git a/ec2/spark-ec2 b/ec2/spark-ec2 index 454057aa0d279..31f9771223e51 100755 --- a/ec2/spark-ec2 +++ b/ec2/spark-ec2 @@ -19,4 +19,4 @@ # cd "`dirname $0`" -PYTHONPATH="./third_party/boto-2.4.1.zip/boto-2.4.1:$PYTHONPATH" python ./spark_ec2.py $@ +PYTHONPATH="./third_party/boto-2.4.1.zip/boto-2.4.1:$PYTHONPATH" python ./spark_ec2.py "$@" diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index cfc61b20100c6..6d06a5db53625 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -629,9 +629,23 @@ def ssh(host, opts, command): time.sleep(30) tries = tries + 1 +# Backported from Python 2.7 for compatiblity with 2.6 (See SPARK-1990) +def _check_output(*popenargs, **kwargs): + if 'stdout' in kwargs: + raise ValueError('stdout argument not allowed, it will be overridden.') + process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs) + output, unused_err = process.communicate() + retcode = process.poll() + if retcode: + cmd = kwargs.get("args") + if cmd is None: + cmd = popenargs[0] + raise subprocess.CalledProcessError(retcode, cmd, output=output) + return output + def ssh_read(host, opts, command): - return subprocess.check_output( + return _check_output( ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)])