Skip to content

Commit 0d17060

Browse files
committed
Import, comments, and style fixes (minor)
1 parent c92e4d9 commit 0d17060

20 files changed

+42
-48
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
112112
}
113113

114114
/**
115-
* Called from executors to get the server URIs and
116-
* output sizes of the map outputs of a given shuffle
115+
* Called from executors to get the server URIs and output sizes of the map outputs of
116+
* a given shuffle.
117117
*/
118118
def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
119119
val statuses = mapStatuses.get(shuffleId).orNull
@@ -218,10 +218,9 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
218218
private var cacheEpoch = epoch
219219

220220
/**
221-
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses
222-
* in the master, so that statuses are dropped only by explicit deregistering or
223-
* by TTL-based cleaning (if set). Other than these two
224-
* scenarios, nothing should be dropped from this HashMap.
221+
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the master,
222+
* so that statuses are dropped only by explicit deregistering or by TTL-based cleaning (if set).
223+
* Other than these two scenarios, nothing should be dropped from this HashMap.
225224
*/
226225
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
227226
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ class SparkContext(
230230

231231
private[spark] val cleaner = new ContextCleaner(this)
232232
cleaner.start()
233+
233234
postEnvironmentUpdate()
234235

235236
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
@@ -773,7 +774,7 @@ class SparkContext(
773774
* filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
774775
*/
775776
def addJar(path: String) {
776-
if (path == null) {
777+
if (path == null) {
777778
logWarning("null specified as parameter to addJar")
778779
} else {
779780
var key = ""

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ object SparkEnv extends Logging {
185185
} else {
186186
new MapOutputTrackerWorker(conf)
187187
}
188+
188189
// Have to assign trackerActor after initialization as MapOutputTrackerActor
189190
// requires the MapOutputTracker itself
190191
mapOutputTracker.trackerActor = registerOrLookup(

core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import org.apache.spark.SparkConf
2727
* entire Spark job.
2828
*/
2929
trait BroadcastFactory {
30-
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit
30+
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager)
3131
def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
3232
def unbroadcast(id: Long, removeFromDriver: Boolean)
33-
def stop(): Unit
33+
def stop()
3434
}

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private[spark] object HttpBroadcast extends Logging {
9090
private var securityManager: SecurityManager = null
9191

9292
// TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist
93-
val files = new TimeStampedHashSet[String]
93+
private val files = new TimeStampedHashSet[String]
9494
private var cleaner: MetadataCleaner = null
9595

9696
private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt
@@ -195,7 +195,7 @@ private[spark] object HttpBroadcast extends Logging {
195195
def unpersist(id: Long, removeFromDriver: Boolean) = synchronized {
196196
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver)
197197
if (removeFromDriver) {
198-
val file = new File(broadcastDir, BroadcastBlockId(id).name)
198+
val file = getFile(id)
199199
files.remove(file.toString)
200200
deleteBroadcastFile(file)
201201
}
@@ -217,10 +217,9 @@ private[spark] object HttpBroadcast extends Logging {
217217
}
218218
}
219219

220-
/** Delete the given broadcast file. */
221220
private def deleteBroadcastFile(file: File) {
222221
try {
223-
if (!file.exists()) {
222+
if (!file.exists) {
224223
logWarning("Broadcast file to be deleted does not exist: %s".format(file))
225224
} else if (file.delete()) {
226225
logInfo("Deleted broadcast file: %s".format(file))
@@ -229,7 +228,7 @@ private[spark] object HttpBroadcast extends Logging {
229228
}
230229
} catch {
231230
case e: Exception =>
232-
logWarning("Exception while deleting broadcast file: %s".format(file), e)
231+
logError("Exception while deleting broadcast file: %s".format(file), e)
233232
}
234233
}
235234

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
7272
}
7373

7474
/**
75-
* Remove all persisted state associated with this HTTP broadcast.
75+
* Remove all persisted state associated with this Torrent broadcast.
7676
* @param removeFromDriver Whether to remove state from the driver.
7777
*/
7878
override def unpersist(removeFromDriver: Boolean) {
@@ -177,13 +177,12 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
177177
}
178178

179179
private[spark] object TorrentBroadcast extends Logging {
180+
private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
180181
private var initialized = false
181182
private var conf: SparkConf = null
182183

183-
lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
184-
185184
def initialize(_isDriver: Boolean, conf: SparkConf) {
186-
TorrentBroadcast.conf = conf //TODO: we might have to fix it in tests
185+
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
187186
synchronized {
188187
if (!initialized) {
189188
initialized = true

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ abstract class RDD[T: ClassTag](
158158
*/
159159
def unpersist(blocking: Boolean = true): RDD[T] = {
160160
logInfo("Removing RDD " + id + " from persistence list")
161-
sc.unpersistRDD(this.id, blocking)
161+
sc.unpersistRDD(id, blocking)
162162
storageLevel = StorageLevel.NONE
163163
this
164164
}
@@ -1128,4 +1128,5 @@ abstract class RDD[T: ClassTag](
11281128
def toJavaRDD() : JavaRDD[T] = {
11291129
new JavaRDD(this)(elementClassTag)
11301130
}
1131+
11311132
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1090,7 +1090,6 @@ class DAGScheduler(
10901090
eventProcessActor ! StopDAGScheduler
10911091
}
10921092
taskScheduler.stop()
1093-
listenerBus.stop()
10941093
}
10951094
}
10961095

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ private[spark] class BlockManager(
4949
maxMemory: Long,
5050
val conf: SparkConf,
5151
securityManager: SecurityManager,
52-
mapOutputTracker: MapOutputTracker
53-
) extends Logging {
52+
mapOutputTracker: MapOutputTracker)
53+
extends Logging {
5454

5555
val shuffleBlockManager = new ShuffleBlockManager(this)
5656
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
8282

8383
/**
8484
* Check if block manager master has a block. Note that this can be used to check for only
85-
* those blocks that are expected to be reported to block manager master.
85+
* those blocks that are reported to block manager master.
8686
*/
8787
def contains(blockId: BlockId) = {
8888
!getLocations(blockId).isEmpty

0 commit comments

Comments
 (0)