Skip to content

Commit 2f684ea

Browse files
committed
Refactoring the BlockManager to replace the Either[Either[A,B]] usage. Now using trait 'Values'. Also modified BlockStore.putBytes call to return PutResult, so that it behaves like putValues.
1 parent f70d069 commit 2f684ea

File tree

4 files changed

+56
-45
lines changed

4 files changed

+56
-45
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 50 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ import org.apache.spark.network._
3535
import org.apache.spark.serializer.Serializer
3636
import org.apache.spark.util._
3737

38+
sealed trait Values
39+
40+
case class ByteBufferValues(buffer: ByteBuffer) extends Values
41+
case class IteratorValues(iterator: Iterator[Any]) extends Values
42+
case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends Values
43+
3844
private[spark] class BlockManager(
3945
executorId: String,
4046
actorSystem: ActorSystem,
@@ -455,7 +461,7 @@ private[spark] class BlockManager(
455461

456462
def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
457463
: Long = {
458-
doPut(blockId, Left(Left(values)), level, tellMaster)
464+
doPut(blockId, IteratorValues(values), level, tellMaster)
459465
}
460466

461467
/**
@@ -477,7 +483,7 @@ private[spark] class BlockManager(
477483
def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
478484
tellMaster: Boolean = true) : Long = {
479485
require(values != null, "Values is null")
480-
doPut(blockId, Left(Right(values)), level, tellMaster)
486+
doPut(blockId, ArrayBufferValues(values), level, tellMaster)
481487
}
482488

483489
/**
@@ -486,11 +492,11 @@ private[spark] class BlockManager(
486492
def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel,
487493
tellMaster: Boolean = true) {
488494
require(bytes != null, "Bytes is null")
489-
doPut(blockId, Right(bytes), level, tellMaster)
495+
doPut(blockId, ByteBufferValues(bytes), level, tellMaster)
490496
}
491497

492498
private def doPut(blockId: BlockId,
493-
data: Either[Either[Iterator[Any],ArrayBuffer[Any]], ByteBuffer],
499+
data: Values,
494500
level: StorageLevel, tellMaster: Boolean = true): Long = {
495501
require(blockId != null, "BlockId is null")
496502
require(level != null && level.isValid, "StorageLevel is null or invalid")
@@ -533,8 +539,9 @@ private[spark] class BlockManager(
533539

534540
// If we're storing bytes, then initiate the replication before storing them locally.
535541
// This is faster as data is already serialized and ready to send.
536-
val replicationFuture = if (data.isRight && level.replication > 1) {
537-
val bufferView = data.right.get.duplicate() // Doesn't copy the bytes, just creates a wrapper
542+
val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
543+
//Duplicate doesn't copy the bytes, just creates a wrapper
544+
val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
538545
Future {
539546
replicate(blockId, bufferView, level)
540547
}
@@ -548,42 +555,43 @@ private[spark] class BlockManager(
548555

549556
var marked = false
550557
try {
551-
data match {
552-
case Left(values) => {
553-
if (level.useMemory) {
554-
// Save it just to memory first, even if it also has useDisk set to true; we will
555-
// drop it to disk later if the memory store can't hold it.
556-
val res = values match {
557-
case Left(values_i) => memoryStore.putValues(blockId, values_i, level, true)
558-
case Right(values_a) => memoryStore.putValues(blockId, values_a, level, true)
559-
}
560-
size = res.size
561-
res.data match {
562-
case Right(newBytes) => bytesAfterPut = newBytes
563-
case Left(newIterator) => valuesAfterPut = newIterator
564-
}
565-
} else {
566-
// Save directly to disk.
567-
// Don't get back the bytes unless we replicate them.
568-
val askForBytes = level.replication > 1
569-
570-
val res = values match {
571-
case Left(values_i) => diskStore.putValues(blockId, values_i, level, askForBytes)
572-
case Right(values_a) => diskStore.putValues(blockId, values_a, level, askForBytes)
573-
}
574-
575-
size = res.size
576-
res.data match {
577-
case Right(newBytes) => bytesAfterPut = newBytes
578-
case _ =>
579-
}
558+
if (level.useMemory) {
559+
// Save it just to memory first, even if it also has useDisk set to true; we will
560+
// drop it to disk later if the memory store can't hold it.
561+
val res = data match {
562+
case IteratorValues(values_i) =>
563+
memoryStore.putValues(blockId, values_i, level, true)
564+
case ArrayBufferValues(values_a) =>
565+
memoryStore.putValues(blockId, values_a, level, true)
566+
case ByteBufferValues(value_bytes) => {
567+
value_bytes.rewind();
568+
memoryStore.putBytes(blockId, value_bytes, level)
569+
}
570+
}
571+
size = res.size
572+
res.data match {
573+
case Right(newBytes) => bytesAfterPut = newBytes
574+
case Left(newIterator) => valuesAfterPut = newIterator
575+
}
576+
} else {
577+
// Save directly to disk.
578+
// Don't get back the bytes unless we replicate them.
579+
val askForBytes = level.replication > 1
580+
581+
val res = data match {
582+
case IteratorValues(values_i) =>
583+
diskStore.putValues(blockId, values_i, level, askForBytes)
584+
case ArrayBufferValues(values_a) =>
585+
diskStore.putValues(blockId, values_a, level, askForBytes)
586+
case ByteBufferValues(value_bytes) => {
587+
value_bytes.rewind();
588+
diskStore.putBytes(blockId, value_bytes, level)
580589
}
581590
}
582-
case Right(bytes) => {
583-
bytes.rewind()
584-
// Store it only in memory at first, even if useDisk is also set to true
585-
(if (level.useMemory) memoryStore else diskStore).putBytes(blockId, bytes, level)
586-
size = bytes.limit
591+
size = res.size
592+
res.data match {
593+
case Right(newBytes) => bytesAfterPut = newBytes
594+
case _ =>
587595
}
588596
}
589597

@@ -612,8 +620,8 @@ private[spark] class BlockManager(
612620
// values and need to serialize and replicate them now:
613621
if (level.replication > 1) {
614622
data match {
615-
case Right(bytes) => Await.ready(replicationFuture, Duration.Inf)
616-
case Left(values) => {
623+
case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf)
624+
case _ => {
617625
val remoteStartTime = System.currentTimeMillis
618626
// Serialize the block if not already done
619627
if (bytesAfterPut == null) {

core/src/main/scala/org/apache/spark/storage/BlockStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.Logging
2828
*/
2929
private[spark]
3030
abstract class BlockStore(val blockManager: BlockManager) extends Logging {
31-
def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel)
31+
def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel) : PutResult
3232

3333
/**
3434
* Put in a block and, possibly, also return its content as either bytes or another Iterator.

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
3737
diskManager.getBlockLocation(blockId).length
3838
}
3939

40-
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) {
40+
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
4141
// So that we do not modify the input offsets !
4242
// duplicate does not copy buffer, so inexpensive
4343
val bytes = _bytes.duplicate()
@@ -52,6 +52,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
5252
val finishTime = System.currentTimeMillis
5353
logDebug("Block %s stored as %s file on disk in %d ms".format(
5454
file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime)))
55+
return PutResult(bytes.limit(), Right(bytes.duplicate()))
5556
}
5657

5758
override def putValues(

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
4949
}
5050
}
5151

52-
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) {
52+
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
5353
// Work on a duplicate - since the original input might be used elsewhere.
5454
val bytes = _bytes.duplicate()
5555
bytes.rewind()
@@ -59,8 +59,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
5959
elements ++= values
6060
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
6161
tryToPut(blockId, elements, sizeEstimate, true)
62+
PutResult(sizeEstimate, Left(values.toIterator))
6263
} else {
6364
tryToPut(blockId, bytes, bytes.limit, false)
65+
PutResult(bytes.limit(), Right(bytes.duplicate()))
6466
}
6567
}
6668

0 commit comments

Comments
 (0)