Skip to content

Commit 380c5b0

Browse files
committed
SPARK-2645: Fix for SparkContext stop behavior
1 parent b566b66 commit 380c5b0

File tree

1 file changed

+39
-37
lines changed

1 file changed

+39
-37
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 39 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ import org.apache.spark.storage._
4545
import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator}
4646
import org.apache.spark.util.{RpcUtils, Utils}
4747

48+
import scala.util.control.NonFatal
49+
4850
/**
4951
* :: DeveloperApi ::
5052
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
@@ -91,46 +93,46 @@ class SparkEnv (
9193

9294
private[spark] def stop() {
9395

94-
if(isStopped) return
95-
96-
isStopped = true
97-
try {
98-
pythonWorkers.foreach { case (key, worker) => worker.stop()}
99-
Option(httpFileServer).foreach(_.stop())
100-
mapOutputTracker.stop()
101-
shuffleManager.stop()
102-
broadcastManager.stop()
103-
blockManager.stop()
104-
blockManager.master.stop()
105-
metricsSystem.stop()
106-
outputCommitCoordinator.stop()
107-
rpcEnv.shutdown()
108-
} catch {
109-
case e: Exception =>
110-
logInfo("Exception while SparkEnv stop", e)
111-
}
96+
if(!isStopped) {
97+
isStopped = true
98+
try {
99+
pythonWorkers.foreach { case (key, worker) => worker.stop()}
100+
Option(httpFileServer).foreach(_.stop())
101+
mapOutputTracker.stop()
102+
shuffleManager.stop()
103+
broadcastManager.stop()
104+
blockManager.stop()
105+
blockManager.master.stop()
106+
metricsSystem.stop()
107+
outputCommitCoordinator.stop()
108+
rpcEnv.shutdown()
109+
} catch {
110+
case NonFatal(e) =>
111+
logInfo("Exception while SparkEnv stop", e)
112+
}
112113

113-
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
114-
// down, but let's call it anyway in case it gets fixed in a later release
115-
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
116-
// actorSystem.awaitTermination()
117-
118-
// Note that blockTransferService is stopped by BlockManager since it is started by it.
119-
120-
// If we only stop sc, but the driver process still run as a services then we need to delete
121-
// the tmp dir, if not, it will create too many tmp dirs.
122-
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
123-
// current working dir in executor which we do not need to delete.
124-
driverTmpDirToDelete match {
125-
case Some(path) => {
126-
try {
127-
Utils.deleteRecursively(new File(path))
128-
} catch {
129-
case e: Exception =>
130-
logWarning(s"Exception while deleting Spark temp dir: $path", e)
114+
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
115+
// down, but let's call it anyway in case it gets fixed in a later release
116+
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
117+
// actorSystem.awaitTermination()
118+
119+
// Note that blockTransferService is stopped by BlockManager since it is started by it.
120+
121+
// If we only stop sc, but the driver process still run as a services then we need to delete
122+
// the tmp dir, if not, it will create too many tmp dirs.
123+
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
124+
// current working dir in executor which we do not need to delete.
125+
driverTmpDirToDelete match {
126+
case Some(path) => {
127+
try {
128+
Utils.deleteRecursively(new File(path))
129+
} catch {
130+
case e: Exception =>
131+
logWarning(s"Exception while deleting Spark temp dir: $path", e)
132+
}
131133
}
134+
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
132135
}
133-
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
134136
}
135137
}
136138

0 commit comments

Comments
 (0)