Skip to content

Commit 7dda084

Browse files
rekhajoshmAndrew Or
authored andcommitted
[SPARK-2645] [CORE] Allow SparkEnv.stop() to be called multiple times without side effects.
Fix for SparkContext stop behavior - Allow sc.stop() to be called multiple times without side effects. Author: Joshi <[email protected]> Author: Rekha Joshi <[email protected]> Closes apache#6973 from rekhajoshm/SPARK-2645 and squashes the following commits: 277043e [Joshi] Fix for SparkContext stop behavior 446b0a4 [Joshi] Fix for SparkContext stop behavior 2ce5760 [Joshi] Fix for SparkContext stop behavior c97839a [Joshi] Fix for SparkContext stop behavior 1aff39c [Joshi] Fix for SparkContext stop behavior 12f66b5 [Joshi] Fix for SparkContext stop behavior 72bb484 [Joshi] Fix for SparkContext stop behavior a5a7d7f [Joshi] Fix for SparkContext stop behavior 9193a0c [Joshi] Fix for SparkContext stop behavior 58dba70 [Joshi] SPARK-2645: Fix for SparkContext stop behavior 380c5b0 [Joshi] SPARK-2645: Fix for SparkContext stop behavior b566b66 [Joshi] SPARK-2645: Fix for SparkContext stop behavior 0be142d [Rekha Joshi] Merge pull request #3 from apache/master 106fd8e [Rekha Joshi] Merge pull request #2 from apache/master e3677c9 [Rekha Joshi] Merge pull request #1 from apache/master
1 parent 79f0b37 commit 7dda084

File tree

2 files changed

+47
-32
lines changed

2 files changed

+47
-32
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.net.Socket
2222

2323
import akka.actor.ActorSystem
2424

25-
import scala.collection.JavaConversions._
2625
import scala.collection.mutable
2726
import scala.util.Properties
2827

@@ -90,39 +89,42 @@ class SparkEnv (
9089
private var driverTmpDirToDelete: Option[String] = None
9190

9291
private[spark] def stop() {
93-
isStopped = true
94-
pythonWorkers.foreach { case(key, worker) => worker.stop() }
95-
Option(httpFileServer).foreach(_.stop())
96-
mapOutputTracker.stop()
97-
shuffleManager.stop()
98-
broadcastManager.stop()
99-
blockManager.stop()
100-
blockManager.master.stop()
101-
metricsSystem.stop()
102-
outputCommitCoordinator.stop()
103-
rpcEnv.shutdown()
104-
105-
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
106-
// down, but let's call it anyway in case it gets fixed in a later release
107-
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
108-
// actorSystem.awaitTermination()
109-
110-
// Note that blockTransferService is stopped by BlockManager since it is started by it.
111-
112-
// If we only stop sc, but the driver process still run as a services then we need to delete
113-
// the tmp dir, if not, it will create too many tmp dirs.
114-
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
115-
// current working dir in executor which we do not need to delete.
116-
driverTmpDirToDelete match {
117-
case Some(path) => {
118-
try {
119-
Utils.deleteRecursively(new File(path))
120-
} catch {
121-
case e: Exception =>
122-
logWarning(s"Exception while deleting Spark temp dir: $path", e)
92+
93+
if (!isStopped) {
94+
isStopped = true
95+
pythonWorkers.values.foreach(_.stop())
96+
Option(httpFileServer).foreach(_.stop())
97+
mapOutputTracker.stop()
98+
shuffleManager.stop()
99+
broadcastManager.stop()
100+
blockManager.stop()
101+
blockManager.master.stop()
102+
metricsSystem.stop()
103+
outputCommitCoordinator.stop()
104+
rpcEnv.shutdown()
105+
106+
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
107+
// down, but let's call it anyway in case it gets fixed in a later release
108+
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
109+
// actorSystem.awaitTermination()
110+
111+
// Note that blockTransferService is stopped by BlockManager since it is started by it.
112+
113+
// If we only stop sc, but the driver process still run as a services then we need to delete
114+
// the tmp dir, if not, it will create too many tmp dirs.
115+
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
116+
// current working dir in executor which we do not need to delete.
117+
driverTmpDirToDelete match {
118+
case Some(path) => {
119+
try {
120+
Utils.deleteRecursively(new File(path))
121+
} catch {
122+
case e: Exception =>
123+
logWarning(s"Exception while deleting Spark temp dir: $path", e)
124+
}
123125
}
126+
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
124127
}
125-
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
126128
}
127129
}
128130

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.util.Utils
3030

3131
import scala.concurrent.Await
3232
import scala.concurrent.duration.Duration
33+
import org.scalatest.Matchers._
3334

3435
class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
3536

@@ -272,4 +273,16 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
272273
sc.stop()
273274
}
274275
}
276+
277+
test("calling multiple sc.stop() must not throw any exception") {
278+
noException should be thrownBy {
279+
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
280+
val cnt = sc.parallelize(1 to 4).count()
281+
sc.cancelAllJobs()
282+
sc.stop()
283+
// call stop second time
284+
sc.stop()
285+
}
286+
}
287+
275288
}

0 commit comments

Comments
 (0)