@@ -234,107 +234,100 @@ private[spark] class MemoryStore(
234
234
235
235
val ccbst = new CsdCacheBlockSizeTracker (csdCacheBlockSizeLimit)
236
236
// Unroll this block safely, checking whether we have exceeded our threshold periodically
237
- try {
238
- var currentSize = 0L
239
- while (values.hasNext && keepUnrolling && ccbst.shouldCache) {
240
- vector += values.next()
241
- if (elementsUnrolled % memoryCheckPeriod == 0 ) {
242
- // If our vector's size has exceeded the threshold, request more memory
243
- currentSize = vector.estimateSize()
244
- if (ccbst.shouldTurnOffCache(currentSize)) {
245
- ccbst.turnOffCache()
246
- } else if (currentSize >= memoryThreshold) {
247
- val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
248
- keepUnrolling =
249
- reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode .ON_HEAP )
250
- if (keepUnrolling) {
251
- unrollMemoryUsedByThisBlock += amountToRequest
252
- }
253
- // New threshold is currentSize * memoryGrowthFactor
254
- memoryThreshold += amountToRequest
237
+ var currentSize = 0L
238
+ while (values.hasNext && keepUnrolling && ccbst.shouldCache) {
239
+ vector += values.next()
240
+ if (elementsUnrolled % memoryCheckPeriod == 0 ) {
241
+ // If our vector's size has exceeded the threshold, request more memory
242
+ currentSize = vector.estimateSize()
243
+ if (ccbst.shouldTurnOffCache(currentSize)) {
244
+ ccbst.turnOffCache()
245
+ } else if (currentSize >= memoryThreshold) {
246
+ val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
247
+ keepUnrolling =
248
+ reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode .ON_HEAP )
249
+ if (keepUnrolling) {
250
+ unrollMemoryUsedByThisBlock += amountToRequest
255
251
}
252
+ // New threshold is currentSize * memoryGrowthFactor
253
+ memoryThreshold += amountToRequest
256
254
}
257
- elementsUnrolled += 1
258
255
}
256
+ elementsUnrolled += 1
257
+ }
259
258
260
- if (keepUnrolling && ccbst.shouldCache) {
261
- // We successfully unrolled the entirety of this block
262
- // and the block size is within csdCacheBlockSizeLimit
263
- val arrayValues = vector.toArray
264
- vector = null
265
- val entry =
266
- new DeserializedMemoryEntry [T ](arrayValues, SizeEstimator .estimate(arrayValues), classTag)
267
- val size = entry.size
268
-
269
- def transferUnrollToStorage (amount : Long ): Unit = {
270
- // Synchronize so that transfer is atomic
271
- memoryManager.synchronized {
272
- releaseUnrollMemoryForThisTask(MemoryMode .ON_HEAP , amount)
273
- val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode .ON_HEAP )
274
- assert(success, " transferring unroll memory to storage memory failed" )
275
- }
259
+ if (keepUnrolling && ccbst.shouldCache) {
260
+ // We successfully unrolled the entirety of this block
261
+ // and the block size is within csdCacheBlockSizeLimit
262
+ val arrayValues = vector.toArray
263
+ vector = null
264
+ val entry =
265
+ new DeserializedMemoryEntry [T ](arrayValues, SizeEstimator .estimate(arrayValues), classTag)
266
+ val size = entry.size
267
+ def transferUnrollToStorage (amount : Long ): Unit = {
268
+ // Synchronize so that transfer is atomic
269
+ memoryManager.synchronized {
270
+ releaseUnrollMemoryForThisTask(MemoryMode .ON_HEAP , amount)
271
+ val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode .ON_HEAP )
272
+ assert(success, " transferring unroll memory to storage memory failed" )
276
273
}
277
-
278
- // Acquire storage memory if necessary to store this block in memory.
279
- val enoughStorageMemory = {
280
- if (unrollMemoryUsedByThisBlock <= size) {
281
- val acquiredExtra =
282
- memoryManager.acquireStorageMemory(
283
- blockId, size - unrollMemoryUsedByThisBlock, MemoryMode .ON_HEAP )
284
- if (acquiredExtra) {
285
- transferUnrollToStorage(unrollMemoryUsedByThisBlock)
286
- }
287
- acquiredExtra
288
- } else { // unrollMemoryUsedByThisBlock > size
289
- // If this task attempt already owns more unroll memory than is necessary to store the
290
- // block, then release the extra memory that will not be used.
291
- val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
292
- releaseUnrollMemoryForThisTask(MemoryMode .ON_HEAP , excessUnrollMemory)
293
- transferUnrollToStorage(size)
294
- true
274
+ }
275
+ // Acquire storage memory if necessary to store this block in memory.
276
+ val enoughStorageMemory = {
277
+ if (unrollMemoryUsedByThisBlock <= size) {
278
+ val acquiredExtra =
279
+ memoryManager.acquireStorageMemory(
280
+ blockId, size - unrollMemoryUsedByThisBlock, MemoryMode .ON_HEAP )
281
+ if (acquiredExtra) {
282
+ transferUnrollToStorage(unrollMemoryUsedByThisBlock)
295
283
}
284
+ acquiredExtra
285
+ } else { // unrollMemoryUsedByThisBlock > size
286
+ // If this task attempt already owns more unroll memory than is necessary to store the
287
+ // block, then release the extra memory that will not be used.
288
+ val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
289
+ releaseUnrollMemoryForThisTask(MemoryMode .ON_HEAP , excessUnrollMemory)
290
+ transferUnrollToStorage(size)
291
+ true
296
292
}
297
- if (enoughStorageMemory) {
298
- entries.synchronized {
299
- entries.put(blockId, entry)
300
- }
301
- logInfo(" Block %s stored as values in memory (estimated size %s, free %s)" .format(
302
- blockId, Utils .bytesToString(size), Utils .bytesToString(maxMemory - blocksMemoryUsed)))
303
- Right (size)
304
- } else {
305
- assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
306
- " released too much unroll memory" )
307
- Left (
308
- (
309
- new PartiallyUnrolledIterator (
310
- this ,
311
- MemoryMode .ON_HEAP ,
312
- unrollMemoryUsedByThisBlock,
313
- unrolled = arrayValues.toIterator,
314
- rest = Iterator .empty),
315
- true
316
- )
317
- )
293
+ }
294
+ if (enoughStorageMemory) {
295
+ entries.synchronized {
296
+ entries.put(blockId, entry)
318
297
}
298
+ logInfo(" Block %s stored as values in memory (estimated size %s, free %s)" .format(
299
+ blockId, Utils .bytesToString(size), Utils .bytesToString(maxMemory - blocksMemoryUsed)))
300
+ Right (size)
319
301
} else {
320
- val iter = new PartiallyUnrolledIterator (
321
- this ,
322
- MemoryMode .ON_HEAP ,
323
- unrollMemoryUsedByThisBlock,
324
- unrolled = vector.iterator,
325
- rest = values
302
+ assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
303
+ " released too much unroll memory" )
304
+ Left (
305
+ (
306
+ new PartiallyUnrolledIterator (
307
+ this ,
308
+ MemoryMode .ON_HEAP ,
309
+ unrollMemoryUsedByThisBlock,
310
+ unrolled = arrayValues.toIterator,
311
+ rest = Iterator .empty),
312
+ true
313
+ )
326
314
)
327
- // We ran out of space while unrolling the values for this block
328
- if (! ccbst.shouldCache) {
329
- logBlockCacheSizeLimitMessage(blockId, currentSize)
330
- Left ((iter, false ))
331
- } else {
332
- logUnrollFailureMessage(blockId, vector.estimateSize())
333
- Left ((iter, true ))
334
- }
335
315
}
336
- } finally {
337
-
316
+ } else {
317
+ val iter = new PartiallyUnrolledIterator (
318
+ this ,
319
+ MemoryMode .ON_HEAP ,
320
+ unrollMemoryUsedByThisBlock,
321
+ unrolled = vector.iterator,
322
+ rest = values)
323
+ // We ran out of space while unrolling the values for this block
324
+ if (! ccbst.shouldCache) {
325
+ logBlockCacheSizeLimitMessage(blockId, currentSize)
326
+ Left ((iter, false ))
327
+ } else {
328
+ logUnrollFailureMessage(blockId, vector.estimateSize())
329
+ Left ((iter, true ))
330
+ }
338
331
}
339
332
}
340
333
/**
0 commit comments