Skip to content

Commit 61b8d6e

Browse files
committed
Fixed issue with Tachyon + new BlockManager methods.
1 parent f489fdc commit 61b8d6e

File tree

8 files changed

+26
-36
lines changed

8 files changed

+26
-36
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
6565

6666
/**
6767
* Whether the cleaning thread will block on cleanup tasks.
68-
* This is set to true only for tests. */
68+
* This is set to true only for tests.
69+
*/
6970
private val blockOnCleanupTasks = sc.conf.getBoolean(
7071
"spark.cleaner.referenceTracking.blocking", false)
7172

@@ -133,7 +134,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
133134
}
134135

135136
/** Perform RDD cleanup. */
136-
private def doCleanupRDD(rddId: Int, blocking: Boolean) {
137+
def doCleanupRDD(rddId: Int, blocking: Boolean) {
137138
try {
138139
logDebug("Cleaning RDD " + rddId)
139140
sc.unpersistRDD(rddId, blocking)
@@ -145,7 +146,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
145146
}
146147

147148
/** Perform shuffle cleanup, asynchronously. */
148-
private def doCleanupShuffle(shuffleId: Int, blocking: Boolean) {
149+
def doCleanupShuffle(shuffleId: Int, blocking: Boolean) {
149150
try {
150151
logDebug("Cleaning shuffle " + shuffleId)
151152
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
@@ -158,7 +159,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
158159
}
159160

160161
/** Perform broadcast cleanup. */
161-
private def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
162+
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
162163
try {
163164
logDebug("Cleaning broadcast " + broadcastId)
164165
broadcastManager.unbroadcast(broadcastId, true, blocking)
@@ -175,18 +176,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
175176

176177
// Used for testing. These methods explicitly blocks until cleanup is completed
177178
// to ensure that more reliable testing.
178-
179-
def cleanupRDD(rdd: RDD[_]) {
180-
doCleanupRDD(rdd.id, blocking = true)
181-
}
182-
183-
def cleanupShuffle(shuffleDependency: ShuffleDependency[_, _]) {
184-
doCleanupShuffle(shuffleDependency.shuffleId, blocking = true)
185-
}
186-
187-
def cleanupBroadcast[T](broadcast: Broadcast[T]) {
188-
doCleanupBroadcast(broadcast.id, blocking = true)
189-
}
190179
}
191180

192181
private object ContextCleaner {

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -233,11 +233,13 @@ class SparkContext(
233233
@volatile private[spark] var dagScheduler = new DAGScheduler(this)
234234
dagScheduler.start()
235235

236-
private[spark] val cleaner: Option[ContextCleaner] =
236+
private[spark] val cleaner: Option[ContextCleaner] = {
237237
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
238238
Some(new ContextCleaner(this))
239-
} else None
240-
239+
} else {
240+
None
241+
}
242+
}
241243
cleaner.foreach(_.start())
242244

243245
postEnvironmentUpdate()

core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import org.apache.spark.SparkConf
2727
* entire Spark job.
2828
*/
2929
trait BroadcastFactory {
30-
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager)
30+
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit
3131
def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
32-
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean)
33-
def stop()
32+
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit
33+
def stop(): Unit
3434
}

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,9 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
8686
val start = System.nanoTime
8787
value_ = HttpBroadcast.read[T](id)
8888
/*
89-
* Storing the broadcast data in BlockManager so that all
90-
* so that all subsequent tasks using the broadcast variable
91-
* does not need to fetch it again. The BlockManagerMaster
92-
* does not need to be told about this block as no one
93-
* needs to know about this data block.
89+
* We cache broadcast data in the BlockManager so that subsequent tasks using it
90+
* do not need to re-fetch. This data is only used locally and no other node
91+
* needs to fetch this block, so we don't notify the master.
9492
*/
9593
SparkEnv.get.blockManager.putSingle(
9694
blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,13 +220,16 @@ private[spark] class BlockManager(
220220
}
221221
}
222222

223-
/** Get the BlockStatus for the block identified by the given ID, if it exists. */
223+
/**
224+
* Get the BlockStatus for the block identified by the given ID, if it exists.
225+
* NOTE: This is mainly for testing, and it doesn't fetch information from Tachyon.
226+
*/
224227
def getStatus(blockId: BlockId): Option[BlockStatus] = {
225228
blockInfo.get(blockId).map { info =>
226229
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
227230
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
228-
val tachyonSize = if (tachyonStore.contains(blockId)) tachyonStore.getSize(blockId) else 0L
229-
BlockStatus(info.level, memSize, diskSize, tachyonSize)
231+
// Assume that block is not in Tachyon
232+
BlockStatus(info.level, memSize, diskSize, 0L)
230233
}
231234
}
232235

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
9595
getBlockLocation(blockId).file.exists()
9696
}
9797

98-
/** List all the blocks currently stored in disk by the disk manager. */
98+
/** List all the blocks currently stored on disk by the disk manager. */
9999
def getAllBlocks(): Seq[BlockId] = {
100100
// Get all the files inside the array of array of directories
101101
subDirs.flatten.filter(_ != null).flatMap { dir =>

core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = fa
108108
}
109109
}
110110

111-
// Should we return previous value directly or as Option?
112111
def putIfAbsent(key: A, value: B): Option[B] = {
113112
val prev = internalMap.putIfAbsent(key, TimeStampedValue(value, currentTime))
114113
Option(prev).map(_.value)
@@ -148,5 +147,4 @@ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = fa
148147
def getTimestamp(key: A): Option[Long] = {
149148
getTimeStampedValue(key).map(_.timestamp)
150149
}
151-
152150
}

core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
5757
val tester = new CleanerTester(sc, rddIds = Seq(rdd.id))
5858

5959
// Explicit cleanup
60-
cleaner.cleanupRDD(rdd)
60+
cleaner.doCleanupRDD(rdd.id, blocking = true)
6161
tester.assertCleanup()
6262

6363
// Verify that RDDs can be re-executed after cleaning up
@@ -70,7 +70,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
7070
val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId))
7171

7272
// Explicit cleanup
73-
shuffleDeps.foreach(s => cleaner.cleanupShuffle(s))
73+
shuffleDeps.foreach(s => cleaner.doCleanupShuffle(s.shuffleId, blocking = true))
7474
tester.assertCleanup()
7575

7676
// Verify that shuffles can be re-executed after cleaning up
@@ -82,7 +82,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
8282
val tester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
8383

8484
// Explicit cleanup
85-
cleaner.cleanupBroadcast(broadcast)
85+
cleaner.doCleanupBroadcast(broadcast.id, blocking = true)
8686
tester.assertCleanup()
8787
}
8888

0 commit comments

Comments
 (0)