Skip to content

Commit b27f8e8

Browse files
committed
Merge pull request #3 from andrewor14/cleanup
Patrick's comments
2 parents a2cc8bc + cd72d19 commit b27f8e8

11 files changed

+79
-62
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
8080
/** Stop the cleaner. */
8181
def stop() {
8282
stopped = true
83-
cleaningThread.interrupt()
8483
}
8584

8685
/** Register a RDD for cleanup when it is garbage collected. */
@@ -119,8 +118,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
119118
}
120119
}
121120
} catch {
122-
case ie: InterruptedException =>
123-
if (!stopped) logWarning("Cleaning thread interrupted")
124121
case t: Throwable => logError("Error in cleaning thread", t)
125122
}
126123
}

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,18 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
7171
* (driver and worker) use different HashMap to store its metadata.
7272
*/
7373
private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging {
74-
7574
private val timeout = AkkaUtils.askTimeout(conf)
7675

77-
/** Set to the MapOutputTrackerActor living on the driver */
76+
/** Set to the MapOutputTrackerActor living on the driver. */
7877
var trackerActor: ActorRef = _
7978

80-
/** This HashMap needs to have different storage behavior for driver and worker */
79+
/**
80+
* This HashMap has different behavior for the master and the workers.
81+
*
82+
* On the master, it serves as the source of map outputs recorded from ShuffleMapTasks.
83+
* On the workers, it simply serves as a cache, in which a miss triggers a fetch from the
84+
* master's corresponding HashMap.
85+
*/
8186
protected val mapStatuses: Map[Int, Array[MapStatus]]
8287

8388
/**
@@ -87,7 +92,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
8792
protected var epoch: Long = 0
8893
protected val epochLock = new AnyRef
8994

90-
/** Remembers which map output locations are currently being fetched on a worker */
95+
/** Remembers which map output locations are currently being fetched on a worker. */
9196
private val fetching = new HashSet[Int]
9297

9398
/**
@@ -173,7 +178,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
173178
}
174179
}
175180

176-
/** Called to get current epoch number */
181+
/** Called to get current epoch number. */
177182
def getEpoch: Long = {
178183
epochLock.synchronized {
179184
return epoch
@@ -195,16 +200,13 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
195200
}
196201
}
197202

198-
/** Unregister shuffle data */
203+
/** Unregister shuffle data. */
199204
def unregisterShuffle(shuffleId: Int) {
200205
mapStatuses.remove(shuffleId)
201206
}
202207

203-
def stop() {
204-
sendTracker(StopMapOutputTracker)
205-
mapStatuses.clear()
206-
trackerActor = null
207-
}
208+
/** Stop the tracker. */
209+
def stop() { }
208210
}
209211

210212
/**
@@ -219,7 +221,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
219221

220222
/**
221223
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the master,
222-
* so that statuses are dropped only by explicit deregistering or by TTL-based cleaning (if set).
224+
* so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set).
223225
* Other than these two scenarios, nothing should be dropped from this HashMap.
224226
*/
225227
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
@@ -314,7 +316,9 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
314316
}
315317

316318
override def stop() {
317-
super.stop()
319+
sendTracker(StopMapOutputTracker)
320+
mapStatuses.clear()
321+
trackerActor = null
318322
metadataCleaner.cancel()
319323
cachedSerializedStatuses.clear()
320324
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
3535
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
3636
import org.apache.mesos.MesosNativeLibrary
3737

38+
import org.apache.spark.broadcast.Broadcast
3839
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
3940
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4041
import org.apache.spark.rdd._
@@ -227,8 +228,12 @@ class SparkContext(
227228
@volatile private[spark] var dagScheduler = new DAGScheduler(this)
228229
dagScheduler.start()
229230

230-
private[spark] val cleaner = new ContextCleaner(this)
231-
cleaner.start()
231+
private[spark] val cleaner: Option[ContextCleaner] =
232+
if (conf.getBoolean("spark.cleaner.automatic", true)) {
233+
Some(new ContextCleaner(this))
234+
} else None
235+
236+
cleaner.foreach(_.start())
232237

233238
postEnvironmentUpdate()
234239

@@ -643,9 +648,9 @@ class SparkContext(
643648
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
644649
* The variable will be sent to each cluster only once.
645650
*/
646-
def broadcast[T](value: T) = {
651+
def broadcast[T](value: T): Broadcast[T] = {
647652
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
648-
cleaner.registerBroadcastForCleanup(bc)
653+
cleaner.foreach(_.registerBroadcastForCleanup(bc))
649654
bc
650655
}
651656

@@ -840,7 +845,7 @@ class SparkContext(
840845
dagScheduler = null
841846
if (dagSchedulerCopy != null) {
842847
metadataCleaner.cancel()
843-
cleaner.stop()
848+
cleaner.foreach(_.stop())
844849
dagSchedulerCopy.stop()
845850
listenerBus.stop()
846851
taskScheduler = null

core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,21 @@ abstract class Broadcast[T](val id: Long) extends Serializable {
6262
def value: T
6363

6464
/**
65-
* Remove all persisted state associated with this broadcast on the executors. The next use
66-
* of this broadcast on the executors will trigger a remote fetch.
65+
* Delete cached copies of this broadcast on the executors. If the broadcast is used after
66+
* this is called, it will need to be re-sent to each executor.
6767
*/
6868
def unpersist()
6969

7070
/**
71-
* Remove all persisted state associated with this broadcast on both the executors and the
72-
* driver. Overriding implementations should set isValid to false.
71+
* Remove all persisted state associated with this broadcast on both the executors and
72+
* the driver.
7373
*/
74-
private[spark] def destroy()
74+
private[spark] def destroy() {
75+
_isValid = false
76+
onDestroy()
77+
}
78+
79+
protected def onDestroy()
7580

7681
/**
7782
* If this broadcast is no longer valid, throw an exception.

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
5454
HttpBroadcast.unpersist(id, removeFromDriver = false)
5555
}
5656

57-
/**
58-
* Remove all persisted state associated with this HTTP Broadcast on both the executors
59-
* and the driver.
60-
*/
61-
private[spark] def destroy() {
62-
_isValid = false
57+
protected def onDestroy() {
6358
HttpBroadcast.unpersist(id, removeFromDriver = true)
6459
}
6560

@@ -91,7 +86,6 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
9186

9287
private[spark] object HttpBroadcast extends Logging {
9388
private var initialized = false
94-
9589
private var broadcastDir: File = null
9690
private var compress: Boolean = false
9791
private var bufferSize: Int = 65536
@@ -101,11 +95,9 @@ private[spark] object HttpBroadcast extends Logging {
10195

10296
// TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist
10397
private val files = new TimeStampedHashSet[String]
104-
private var cleaner: MetadataCleaner = null
105-
10698
private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt
107-
10899
private var compressionCodec: CompressionCodec = null
100+
private var cleaner: MetadataCleaner = null
109101

110102
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
111103
synchronized {

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,7 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
5757
TorrentBroadcast.unpersist(id, removeFromDriver = false)
5858
}
5959

60-
/**
61-
* Remove all persisted state associated with this Torrent broadcast on both the executors
62-
* and the driver.
63-
*/
64-
private[spark] def destroy() {
65-
_isValid = false
60+
protected def onDestroy() {
6661
TorrentBroadcast.unpersist(id, removeFromDriver = true)
6762
}
6863

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
4242
*
4343
* THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
4444
* threads, so it needs locks in public API methods to maintain its state. In addition, some
45-
* SchedulerBackends sycnchronize on themselves when they want to send events here, and then
45+
* SchedulerBackends synchronize on themselves when they want to send events here, and then
4646
* acquire a lock on us, so we need to make sure that we don't try to lock the backend while
4747
* we are holding a lock on ourselves.
4848
*/

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -829,12 +829,12 @@ private[spark] class BlockManager(
829829
/**
830830
* Remove all blocks belonging to the given broadcast.
831831
*/
832-
def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean) {
832+
def removeBroadcast(broadcastId: Long, tellMaster: Boolean) {
833833
logInfo("Removing broadcast " + broadcastId)
834834
val blocksToRemove = blockInfo.keys.collect {
835835
case bid @ BroadcastBlockId(`broadcastId`, _) => bid
836836
}
837-
blocksToRemove.foreach { blockId => removeBlock(blockId, removeFromDriver) }
837+
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
838838
}
839839

840840
/**

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,20 +109,20 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
109109
/** Remove all blocks belonging to the given RDD. */
110110
def removeRdd(rddId: Int, blocking: Boolean) {
111111
val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
112-
future onFailure {
112+
future.onFailure {
113113
case e: Throwable => logError("Failed to remove RDD " + rddId, e)
114114
}
115115
if (blocking) {
116116
Await.result(future, timeout)
117117
}
118118
}
119119

120-
/** Remove all blocks belonging to the given shuffle. */
120+
/** Remove all blocks belonging to the given shuffle asynchronously. */
121121
def removeShuffle(shuffleId: Int) {
122122
askDriverWithReply(RemoveShuffle(shuffleId))
123123
}
124124

125-
/** Remove all blocks belonging to the given broadcast. */
125+
/** Remove all blocks belonging to the given broadcast asynchronously. */
126126
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean) {
127127
askDriverWithReply(RemoveBroadcast(broadcastId, removeFromMaster))
128128
}
@@ -142,7 +142,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
142142
}
143143

144144
/**
145-
* Return the block's status on all block managers, if any.
145+
* Return the block's status on all block managers, if any. This can potentially be an
146+
* expensive operation and is used mainly for testing.
146147
*
147148
* If askSlaves is true, this invokes the master to query each block manager for the most
148149
* updated block statuses. This is useful when the master is not informed of the given block

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
168168
*/
169169
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean) {
170170
// TODO: Consolidate usages of <driver>
171-
val removeMsg = RemoveBroadcast(broadcastId)
171+
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
172172
blockManagerInfo.values
173173
.filter { info => removeFromDriver || info.blockManagerId.executorId != "<driver>" }
174174
.foreach { bm => bm.slaveActor ! removeMsg }
@@ -255,7 +255,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
255255
}
256256

257257
/**
258-
* Return the block's status for all block managers, if any.
258+
* Return the block's status for all block managers, if any. This can potentially be an
259+
* expensive operation and is used mainly for testing.
259260
*
260261
* If askSlaves is true, the master queries each block manager for the most updated block
261262
* statuses. This is useful when the master is not informed of the given block by all block

0 commit comments

Comments
 (0)