Skip to content

Commit 95794cc

Browse files
committed
SPY-1394
1 parent 97dea98 commit 95794cc

File tree

4 files changed

+116
-20
lines changed

4 files changed

+116
-20
lines changed

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
package org.apache.spark
1919

20-
import scala.collection.mutable
21-
import scala.collection.mutable.ArrayBuffer
22-
2320
import org.apache.spark.rdd.RDD
2421
import org.apache.spark.storage._
2522

23+
import scala.collection.mutable
24+
import scala.collection.mutable.ArrayBuffer
25+
2626
/**
2727
* Spark class responsible for passing RDDs partition contents to the BlockManager and making
2828
* sure a node doesn't load two copies of an RDD at once.
@@ -174,7 +174,13 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
174174
updatedBlocks ++=
175175
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
176176
arr.iterator.asInstanceOf[Iterator[T]]
177-
case Right(it) =>
177+
178+
case Right((it, false)) =>
179+
// big block detected when unrolling
180+
val returnValues = it.asInstanceOf[Iterator[T]]
181+
returnValues
182+
183+
case Right((it, true)) =>
178184
// There is not enough space to cache this partition in memory
179185
val returnValues = it.asInstanceOf[Iterator[T]]
180186
if (putLevel.useDisk) {

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,14 @@ private[spark] class BlockManager(
509509
logDebug(s"Getting block $blockId from disk")
510510
val bytes: ByteBuffer = if (diskStore.contains(blockId)) {
511511
// DiskStore.getBytes() always returns Some, so this .get() is guaranteed to be safe
512-
diskStore.getBytes(blockId).get
512+
try {
513+
diskStore.getBytes(blockId).get
514+
} catch {
515+
case t: Throwable =>
516+
logError(s"diskStore.getBytes($blockId).get failed")
517+
throw t
518+
}
519+
513520
} else {
514521
// Remove the missing block so that its unavailability is reported to the driver
515522
removeBlock(blockId)

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 96 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ package org.apache.spark.storage
2020
import java.nio.ByteBuffer
2121
import java.util.LinkedHashMap
2222

23-
import scala.collection.mutable
24-
import scala.collection.mutable.ArrayBuffer
25-
2623
import org.apache.spark.TaskContext
2724
import org.apache.spark.memory.MemoryManager
28-
import org.apache.spark.util.{SizeEstimator, Utils}
2925
import org.apache.spark.util.collection.SizeTrackingVector
26+
import org.apache.spark.util.{SizeEstimator, Utils}
27+
28+
import scala.collection.mutable
29+
import scala.collection.mutable.ArrayBuffer
3030

3131
private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
3232

@@ -59,6 +59,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
5959
private val unrollMemoryThreshold: Long =
6060
conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
6161

62+
// csd flag controlling whether to apply Csd's caching block size policy
63+
private val csdCacheBlockSizeLimit: Long =
64+
conf.getLong("spark.storage.MemoryStore.csdCacheBlockSizeLimit", Integer.MAX_VALUE.toLong)
65+
assert(csdCacheBlockSizeLimit <= Integer.MAX_VALUE)
66+
6267
/** Total amount of memory available for storage, in bytes. */
6368
private def maxMemory: Long = memoryManager.maxStorageMemory
6469

@@ -173,11 +178,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
173178
val res = putArray(blockId, arrayValues, level, returnValues)
174179
droppedBlocks ++= res.droppedBlocks
175180
PutResult(res.size, res.data, droppedBlocks)
176-
case Right(iteratorValues) =>
181+
case Right((iteratorValues, false)) =>
182+
// big block detected when unrolling
183+
PutResult(0, Left(iteratorValues), droppedBlocks)
184+
case Right((iteratorValues, true)) =>
177185
// Not enough space to unroll this block; drop to disk if applicable
178186
if (level.useDisk && allowPersistToDisk) {
179187
logWarning(s"Persisting block $blockId to disk instead.")
180-
val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
188+
val res =
189+
blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
181190
PutResult(res.size, res.data, droppedBlocks)
182191
} else {
183192
PutResult(0, Left(iteratorValues), droppedBlocks)
@@ -234,6 +243,46 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
234243
logInfo("MemoryStore cleared")
235244
}
236245

246+
/**
247+
* This api is used by CSD as a post process of [[unrollSafely]] to detect
248+
* partitions larger than 2G in estimated object size when there is not enough memory for
249+
* unrolling.
250+
* This api continues fetching until we "see" 2G of object in size or values exhausted,
251+
* with the assumption that the amount of memory specified by sizeLimit parameter are available
252+
* in spark's user memory space per thread and per operator/RDD compute.
253+
*
254+
* Parameter sizeLimit is at most as large as csdCacheBlockSizeLimit (which is also upper bounded
255+
* by Integer.MAX_VALUE), we can make sure user memory space has at least 2G available
256+
* per thread for worst case.
257+
*/
258+
private[this] def fetchUntilCsdBlockSizeLimit[T](
259+
blockId: BlockId,
260+
inputValues: Iterator[T],
261+
valuesSeen: SizeTrackingVector[Any]): Boolean = {
262+
// if switch is off, do nothing
263+
if (csdCacheBlockSizeLimit <= 0) {
264+
true
265+
} else {
266+
val start = System.currentTimeMillis
267+
var currentEstimatedSize = valuesSeen.estimateSize()
268+
try {
269+
var elementsExamined = 0L
270+
val memoryCheckPeriod = 16
271+
while (inputValues.hasNext && currentEstimatedSize <= csdCacheBlockSizeLimit) {
272+
valuesSeen += inputValues.next()
273+
elementsExamined += 1
274+
if (elementsExamined % memoryCheckPeriod == 0) {
275+
currentEstimatedSize = valuesSeen.estimateSize()
276+
}
277+
}
278+
(currentEstimatedSize <= csdCacheBlockSizeLimit)
279+
} finally {
280+
logWarning(s"fetchUntilCsdBlockSizeLimit($blockId) duration: " +
281+
s"${Utils.msDurationToString(System.currentTimeMillis - start)}")
282+
}
283+
}
284+
}
285+
237286
/**
238287
* Unroll the given block in memory safely.
239288
*
@@ -245,12 +294,19 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
245294
*
246295
* This method returns either an array with the contents of the entire block or an iterator
247296
* containing the values of the block (if the array would have exceeded available memory).
297+
*
298+
* SPY-1394: CSD modified this API in the following way:
299+
* 1. It returns a tuple (iterator, boolean), when short of memory.
300+
* The boolean is an indicator on whether caller should cache to disk, based
301+
* on detection of over-sized block.
302+
* 2. When over-sized block is detected, terminate the unroll and tell the caller to not
303+
* cache at all.
248304
*/
249305
def unrollSafely(
250306
blockId: BlockId,
251307
values: Iterator[Any],
252308
droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
253-
: Either[Array[Any], Iterator[Any]] = {
309+
: Either[Array[Any], (Iterator[Any], Boolean)] = {
254310

255311
// Number of elements unrolled so far
256312
var elementsUnrolled = 0
@@ -281,11 +337,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
281337

282338
// Unroll this block safely, checking whether we have exceeded our threshold periodically
283339
try {
284-
while (values.hasNext && keepUnrolling) {
340+
var currentSize = 0L
341+
var shouldCache = true
342+
while (values.hasNext && keepUnrolling && (csdCacheBlockSizeLimit <= 0 || shouldCache)) {
285343
vector += values.next()
286344
if (elementsUnrolled % memoryCheckPeriod == 0) {
287345
// If our vector's size has exceeded the threshold, request more memory
288-
val currentSize = vector.estimateSize()
346+
currentSize = vector.estimateSize()
347+
348+
if (csdCacheBlockSizeLimit > 0 && shouldCache && currentSize > csdCacheBlockSizeLimit) {
349+
shouldCache = false
350+
}
351+
289352
if (currentSize >= memoryThreshold) {
290353
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
291354
keepUnrolling = reserveUnrollMemoryForThisTask(
@@ -300,13 +363,25 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
300363
elementsUnrolled += 1
301364
}
302365

303-
if (keepUnrolling) {
366+
if (keepUnrolling && shouldCache) {
304367
// We successfully unrolled the entirety of this block
305368
Left(vector.toArray)
306369
} else {
307-
// We ran out of space while unrolling the values for this block
308-
logUnrollFailureMessage(blockId, vector.estimateSize())
309-
Right(vector.iterator ++ values)
370+
if (!shouldCache) {
371+
logBlockSizeLimitMessage(blockId, currentSize)
372+
Right(vector.iterator ++ values, shouldCache)
373+
} else {
374+
// could be false positive because we have not seen enough of the values
375+
// continue the fetching using memory from user
376+
shouldCache = fetchUntilCsdBlockSizeLimit(blockId, values, vector)
377+
if (!shouldCache) {
378+
logBlockSizeLimitMessage(blockId, vector.estimateSize())
379+
} else {
380+
// We ran out of space while unrolling the values for this block
381+
logUnrollFailureMessage(blockId, vector.estimateSize())
382+
}
383+
Right(vector.iterator ++ values, shouldCache)
384+
}
310385
}
311386

312387
} finally {
@@ -583,4 +658,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
583658
)
584659
logMemoryUsage()
585660
}
661+
662+
private def logBlockSizeLimitMessage(blockId: BlockId, currentSize: Long): Unit = {
663+
logWarning(
664+
s"Block size limit reached: $blockId! " +
665+
s"(computed ${Utils.bytesToString(currentSize)} so far)"
666+
)
667+
logMemoryUsage()
668+
}
586669
}

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,13 +1146,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
11461146
*/
11471147
private def verifyUnroll(
11481148
expected: Iterator[Any],
1149-
result: Either[Array[Any], Iterator[Any]],
1149+
result: Either[Array[Any], (Iterator[Any], Boolean)],
11501150
shouldBeArray: Boolean): Unit = {
11511151
val actual: Iterator[Any] = result match {
11521152
case Left(arr: Array[Any]) =>
11531153
assert(shouldBeArray, "expected iterator from unroll!")
11541154
arr.iterator
1155-
case Right(it: Iterator[Any]) =>
1155+
case Right((it: Iterator[Any], shouldCache: Boolean)) =>
11561156
assert(!shouldBeArray, "expected array from unroll!")
11571157
it
11581158
case _ =>

0 commit comments

Comments
 (0)