Skip to content

Commit cff023c

Browse files
committed
Fixed issues based on Andrew's comments.
1 parent 4d05314 commit cff023c

File tree

7 files changed

+24
-29
lines changed

7 files changed

+24
-29
lines changed

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,8 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
4343
val blockId = BroadcastBlockId(id)
4444

4545
/*
46-
* Broadcasted data is also stored in the BlockManager of the driver.
47-
* The BlockManagerMaster
48-
* does not need to be told about this block as not only
49-
* need to know about this data block.
46+
* Broadcasted data is also stored in the BlockManager of the driver. The BlockManagerMaster
47+
* does not need to be told about this block as not only need to know about this data block.
5048
*/
5149
HttpBroadcast.synchronized {
5250
SparkEnv.get.blockManager.putSingle(

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
193193
* updated block statuses. This is useful when the master is not informed of the given block
194194
* by all block managers.
195195
*/
196-
def getMatcinghBlockIds(
196+
def getMatchinghBlockIds(
197197
filter: BlockId => Boolean,
198198
askSlaves: Boolean): Seq[BlockId] = {
199199
val msg = GetMatchingBlockIds(filter, askSlaves)

core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,26 +39,26 @@ class BlockManagerSlaveActor(
3939
// Operations that involve removing blocks may be slow and should be done asynchronously
4040
override def receive = {
4141
case RemoveBlock(blockId) =>
42-
doAsync[Boolean]("removing block", sender) {
42+
doAsync[Boolean]("removing block " + blockId, sender) {
4343
blockManager.removeBlock(blockId)
4444
true
4545
}
4646

4747
case RemoveRdd(rddId) =>
48-
doAsync[Int]("removing RDD", sender) {
48+
doAsync[Int]("removing RDD " + rddId, sender) {
4949
blockManager.removeRdd(rddId)
5050
}
5151

5252
case RemoveShuffle(shuffleId) =>
53-
doAsync[Boolean]("removing shuffle", sender) {
53+
doAsync[Boolean]("removing shuffle " + shuffleId, sender) {
5454
if (mapOutputTracker != null) {
5555
mapOutputTracker.unregisterShuffle(shuffleId)
5656
}
5757
blockManager.shuffleBlockManager.removeShuffle(shuffleId)
5858
}
5959

6060
case RemoveBroadcast(broadcastId, tellMaster) =>
61-
doAsync[Int]("removing RDD", sender) {
61+
doAsync[Int]("removing broadcast " + broadcastId, sender) {
6262
blockManager.removeBroadcast(broadcastId, tellMaster)
6363
}
6464

@@ -72,8 +72,7 @@ class BlockManagerSlaveActor(
7272
private def doAsync[T](actionMessage: String, responseActor: ActorRef)(body: => T) {
7373
val future = Future {
7474
logDebug(actionMessage)
75-
val response = body
76-
response
75+
body
7776
}
7877
future.onSuccess { case response =>
7978
logDebug("Done " + actionMessage + ", response is " + response)

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
4747
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
4848
private var shuffleSender : ShuffleSender = null
4949

50-
5150
addShutdownHook()
5251

5352
/**

core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
166166
sc.stop()
167167

168168
val conf2 = new SparkConf()
169-
.setMaster("local[4]")
170-
//.setMaster("local-cluster[2, 1, 512]")
169+
.setMaster("local-cluster[2, 1, 512]")
171170
.setAppName("ContextCleanerSuite")
172171
.set("spark.cleaner.referenceTracking.blocking", "true")
173172
sc = new SparkContext(conf2)
@@ -180,7 +179,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
180179
val shuffleIds = 0 until sc.newShuffleId
181180
val broadcastIds = 0L until numBroadcasts
182181

183-
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
182+
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
184183
runGC()
185184
intercept[Exception] {
186185
preGCTester.assertCleanup()(timeout(1000 millis))
@@ -391,22 +390,22 @@ class CleanerTester(
391390
toBeCleanedBroadcstIds.isEmpty
392391

393392
private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
394-
blockManager.master.getMatcinghBlockIds( _ match {
395-
case RDDBlockId(rddId, _) => true
393+
blockManager.master.getMatchinghBlockIds( _ match {
394+
case RDDBlockId(`rddId`, _) => true
396395
case _ => false
397396
}, askSlaves = true)
398397
}
399398

400399
private def getShuffleBlocks(shuffleId: Int): Seq[BlockId] = {
401-
blockManager.master.getMatcinghBlockIds( _ match {
402-
case ShuffleBlockId(shuffleId, _, _) => true
400+
blockManager.master.getMatchinghBlockIds( _ match {
401+
case ShuffleBlockId(`shuffleId`, _, _) => true
403402
case _ => false
404403
}, askSlaves = true)
405404
}
406405

407406
private def getBroadcastBlocks(broadcastId: Long): Seq[BlockId] = {
408-
blockManager.master.getMatcinghBlockIds( _ match {
409-
case BroadcastBlockId(broadcastId, _) => true
407+
blockManager.master.getMatchinghBlockIds( _ match {
408+
case BroadcastBlockId(`broadcastId`, _) => true
410409
case _ => false
411410
}, askSlaves = true)
412411
}

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -862,29 +862,29 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
862862
securityMgr, mapOutputTracker)
863863
val list = List.fill(2)(new Array[Byte](10))
864864

865-
// Tell master. By LRU, only list2 and list3 remains.
865+
// insert some blocks
866866
store.put("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
867867
store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
868868
store.put("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
869869

870870
// getLocations and getBlockStatus should yield the same locations
871-
assert(store.master.getMatcinghBlockIds(_.toString.contains("list"), askSlaves = false).size === 3)
872-
assert(store.master.getMatcinghBlockIds(_.toString.contains("list1"), askSlaves = false).size === 1)
871+
assert(store.master.getMatchinghBlockIds(_.toString.contains("list"), askSlaves = false).size === 3)
872+
assert(store.master.getMatchinghBlockIds(_.toString.contains("list1"), askSlaves = false).size === 1)
873873

874-
// Tell master. By LRU, only list2 and list3 remains.
874+
// insert some more blocks
875875
store.put("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
876876
store.put("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
877877
store.put("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
878878

879879
// getLocations and getBlockStatus should yield the same locations
880-
assert(store.master.getMatcinghBlockIds(_.toString.contains("newlist"), askSlaves = false).size === 1)
881-
assert(store.master.getMatcinghBlockIds(_.toString.contains("newlist"), askSlaves = true).size === 3)
880+
assert(store.master.getMatchinghBlockIds(_.toString.contains("newlist"), askSlaves = false).size === 1)
881+
assert(store.master.getMatchinghBlockIds(_.toString.contains("newlist"), askSlaves = true).size === 3)
882882

883883
val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
884884
blockIds.foreach { blockId =>
885885
store.put(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
886886
}
887-
val matchedBlockIds = store.master.getMatcinghBlockIds(_ match {
887+
val matchedBlockIds = store.master.getMatchinghBlockIds(_ match {
888888
case RDDBlockId(1, _) => true
889889
case _ => false
890890
}, askSlaves = true)

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ class JsonProtocolSuite extends FunSuite {
108108
// BlockId
109109
testBlockId(RDDBlockId(1, 2))
110110
testBlockId(ShuffleBlockId(1, 2, 3))
111-
testBlockId(BroadcastBlockId(1L, "<Insert words of wisdom here>"))
111+
testBlockId(BroadcastBlockId(1L, "insert_words_of_wisdom_here"))
112112
testBlockId(TaskResultBlockId(1L))
113113
testBlockId(StreamBlockId(1, 2L))
114114
}

0 commit comments

Comments
 (0)