Skip to content

Sort shuffle read improvements #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 18, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
package org.apache.spark.shuffle.sort

import java.io.FileOutputStream
import java.nio.ByteBuffer
import java.util.Comparator

import org.apache.spark.executor.ShuffleWriteMetrics

import scala.collection.mutable.{ArrayBuffer, HashMap, Queue}
import scala.util.{Failure, Success, Try}

import org.apache.spark._
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{BaseShuffleHandle, FetchFailedException, ShuffleReader}
import org.apache.spark.storage._
Expand Down Expand Up @@ -58,9 +58,6 @@ private[spark] class SortShuffleReader[K, C](
require(endPartition == startPartition + 1,
"Sort shuffle currently only supports fetching one partition")

/** Shuffle block fetcher iterator */
private var shuffleRawBlockFetcherItr: ShuffleRawBlockFetcherIterator = _

private val dep = handle.dependency
private val conf = SparkEnv.get.conf
private val blockManager = SparkEnv.get.blockManager
Expand All @@ -69,16 +66,9 @@ private[spark] class SortShuffleReader[K, C](

private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024

/** Number of bytes spilled in memory and on disk */
private var _memoryBytesSpilled: Long = 0L
private var _diskBytesSpilled: Long = 0L

/** Queue to store in-memory shuffle blocks */
private val inMemoryBlocks = new Queue[MemoryShuffleBlock]()

/** number of bytes left to fetch */
private var unfetchedBytes: Long = 0L

/**
* Maintain the relation between shuffle block and its size. The reason we should maintain this
* is that the request shuffle block size is not equal to the result size because of
Expand All @@ -100,6 +90,16 @@ private[spark] class SortShuffleReader[K, C](
/** A merge thread to merge on-disk blocks */
private val tieredMerger = new TieredDiskMerger(conf, dep, keyComparator, context)

/** Shuffle block fetcher iterator */
private var shuffleRawBlockFetcherItr: ShuffleRawBlockFetcherIterator = _

/** Number of bytes spilled in memory and on disk */
private var _memoryBytesSpilled: Long = 0L
private var _diskBytesSpilled: Long = 0L

/** number of bytes left to fetch */
private var unfetchedBytes: Long = 0L

def memoryBytesSpilled: Long = _memoryBytesSpilled

def diskBytesSpilled: Long = _diskBytesSpilled + tieredMerger.diskBytesSpilled
Expand Down Expand Up @@ -129,15 +129,24 @@ private[spark] class SortShuffleReader[K, C](
// Try to fit block in memory. If this fails, merge in-memory blocks to disk.
val blockSize = blockData.size
val granted = shuffleMemoryManager.tryToAcquire(blockSize)
val block = MemoryShuffleBlock(blockId, blockData)
if (granted >= blockSize) {
inMemoryBlocks += MemoryShuffleBlock(blockId, blockData)
if (blockData.isDirect) {
// If the memory shuffle block is allocated on direct buffer, copy it on heap,
// otherwise off heap memory will be increased out of control.
val onHeapBuffer = ByteBuffer.allocate(blockSize.toInt)
onHeapBuffer.put(blockData.nioByteBuffer)

inMemoryBlocks += MemoryShuffleBlock(blockId, new NioManagedBuffer(onHeapBuffer))
blockData.release()
} else {
inMemoryBlocks += MemoryShuffleBlock(blockId, blockData)
}
} else {
logInfo(s"Granted $granted memory is not enough to store shuffle block id $blockId, " +
logDebug(s"Granted $granted memory is not enough to store shuffle block id $blockId, " +
s"block size $blockSize, spilling in-memory blocks to release the memory")

shuffleMemoryManager.release(granted)
spillInMemoryBlocks(block)
spillInMemoryBlocks(MemoryShuffleBlock(blockId, blockData))
}

unfetchedBytes -= shuffleBlockMap(blockId.asInstanceOf[ShuffleBlockId])._2
Expand All @@ -153,19 +162,22 @@ private[spark] class SortShuffleReader[K, C](
val mergedItr =
MergeUtil.mergeSort(finalItrGroup, keyComparator, dep.keyOrdering, dep.aggregator)

// Update the spilled info.
// Update the spilled info and do cleanup work when task is finished.
context.taskMetrics().memoryBytesSpilled += memoryBytesSpilled
context.taskMetrics().diskBytesSpilled += diskBytesSpilled

def releaseFinalShuffleMemory(): Unit = {
inMemoryBlocks.foreach { block =>
block.blockData.release()
shuffleMemoryManager.release(block.blockData.size)
}
inMemoryBlocks.clear()
}
context.addTaskCompletionListener(_ => releaseFinalShuffleMemory())

// Release the in-memory block when iteration is completed.
val completionItr = CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](
mergedItr, {
inMemoryBlocks.foreach { block =>
block.blockData.release()
shuffleMemoryManager.release(block.blockData.size)
}
inMemoryBlocks.clear()
})
mergedItr, releaseFinalShuffleMemory())

new InterruptibleIterator(context, completionItr.map(p => (p._1, p._2)))
}
Expand All @@ -174,6 +186,15 @@ private[spark] class SortShuffleReader[K, C](
// Write merged blocks to disk
val (tmpBlockId, file) = blockManager.diskBlockManager.createTempShuffleBlock()

def releaseTempShuffleMemory(blocks: ArrayBuffer[MemoryShuffleBlock]): Unit = {
for (block <- blocks) {
block.blockData.release()
if (block != tippingBlock) {
shuffleMemoryManager.release(block.blockData.size)
}
}
}

// If the remaining unfetched data would fit inside our current allocation, we don't want to
// waste time spilling blocks beyond the space needed for it.
// We use the request size to calculate the remaining spilled size to make sure the
Expand Down Expand Up @@ -201,7 +222,7 @@ private[spark] class SortShuffleReader[K, C](
var success = false

try {
partialMergedItr.foreach(p => writer.write(p))
partialMergedItr.foreach(writer.write)
success = true
} finally {
if (!success) {
Expand All @@ -216,6 +237,7 @@ private[spark] class SortShuffleReader[K, C](
writer.commitAndClose()
writer = null
}
releaseTempShuffleMemory(blocksToSpill)
}
_diskBytesSpilled += curWriteMetrics.shuffleBytesWritten

Expand All @@ -242,19 +264,13 @@ private[spark] class SortShuffleReader[K, C](
} else {
_diskBytesSpilled = file.length()
}
releaseTempShuffleMemory(blocksToSpill)
}
}

tieredMerger.registerOnDiskBlock(tmpBlockId, file)

logInfo(s"Merged ${blocksToSpill.size} in-memory blocks into file ${file.getName}")

for (block <- blocksToSpill) {
block.blockData.release()
if (block != tippingBlock) {
shuffleMemoryManager.release(block.blockData.size)
}
}
}

private def inMemoryBlocksToIterators(blocks: Seq[MemoryShuffleBlock])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ private[spark] class TieredDiskMerger[K, C](
private val blockManager = SparkEnv.get.blockManager
private val ser = Serializer.getSerializer(dep.serializer)

/** Number of bytes spilled on disk */
private var _diskBytesSpilled: Long = 0L

/** PriorityQueue to store the on-disk merging blocks, blocks are merged by size ordering */
private val onDiskBlocks = new PriorityBlockingQueue[DiskShuffleBlock]()

Expand All @@ -77,6 +74,11 @@ private[spark] class TieredDiskMerger[K, C](

@volatile private var doneRegistering = false

/** Number of bytes spilled on disk */
private var _diskBytesSpilled: Long = 0L

def diskBytesSpilled: Long = _diskBytesSpilled

def registerOnDiskBlock(blockId: BlockId, file: File): Unit = {
assert(!doneRegistering)
onDiskBlocks.put(new DiskShuffleBlock(blockId, file, file.length()))
Expand All @@ -88,8 +90,6 @@ private[spark] class TieredDiskMerger[K, C](
}
}

def diskBytesSpilled: Long = _diskBytesSpilled

/**
* Notify the merger that no more on disk blocks will be registered.
*/
Expand Down Expand Up @@ -184,9 +184,10 @@ private[spark] class TieredDiskMerger[K, C](
val blocksToMerge = new ArrayBuffer[DiskShuffleBlock]()
// Try to pick the smallest merge width that will result in the next merge being the final
// merge.
val mergeFactor = math.min(onDiskBlocks.size - maxMergeFactor + 1, maxMergeFactor)
(0 until mergeFactor).foreach {
var mergeFactor = math.min(onDiskBlocks.size - maxMergeFactor + 1, maxMergeFactor)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason for this change? Spark tends to prefer foreach to while loops outside of performance-critical regions.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Sandy, there's a bug around here:

java.lang.IndexOutOfBoundsException: 1
        at scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
        at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
        at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
        at scala.collection.immutable.Range.foreach(Range.scala:141)
        at org.apache.spark.util.collection.TieredDiskMerger$DiskToDiskMerger.run(TieredDiskMerger.scala:188)

I'm guessing this Range.foreach may have some problems with queue.take. I changed to while loop to try again, seems the exception is gone.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the problem be that foreach is missing a parameter?

Does

(0 until mergeFactor).foreach { x=>
  blocksToMerge += onDiskBlocks.take()
}

work?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't try this, I think this can be worked from my understanding. Meantime I think while is just the style choice, also very simple and straightforward here :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, not a big deal either way.

Do you think we're ready to submit this otherwise?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second problem seems like it's a product of the first one, right? If we continually are unable to acquire memory, we are forced to spill.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, this two problems may both related to the unexpected behavior of ShuffleMemoryManage. I have no small reproducible unit test to address this issue. Though I think it's not the problem of SortShuffleReader, it's worthy to investigate it.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @sryza , seems this JIRA addresses the same problem as I mentioned before (https://issues.apache.org/jira/browse/SPARK-4452).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still wouldn't explained why the shuffle memory manager reports negative memory though, right?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the first problem is actually a bug, while the second one seems like the same issue as SPARK-4452.

while (mergeFactor > 0) {
blocksToMerge += onDiskBlocks.take()
mergeFactor -= 1
}

// Merge the blocks
Expand All @@ -201,7 +202,7 @@ private[spark] class TieredDiskMerger[K, C](
var success = false

try {
partialMergedItr.foreach(p => writer.write(p))
partialMergedItr.foreach(writer.write)
success = true
} finally {
if (!success) {
Expand All @@ -216,9 +217,9 @@ private[spark] class TieredDiskMerger[K, C](
writer.commitAndClose()
writer = null
}
releaseShuffleBlocks(blocksToMerge.toArray)
}
_diskBytesSpilled += curWriteMetrics.shuffleBytesWritten
releaseShuffleBlocks(blocksToMerge.toArray)

logInfo(s"Merged ${blocksToMerge.size} on-disk blocks into file ${file.getName}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,9 @@ public String toString() {
.add("length", length)
.toString();
}

@Override
public boolean isDirect() {
return length >= MIN_MEMORY_MAP_BYTES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,10 @@ public abstract class ManagedBuffer {
* Convert the buffer into an Netty object, used to write the data out.
*/
public abstract Object convertToNetty() throws IOException;

/**
* Tell whether to not this byte buffer is direct
* @return
*/
public abstract boolean isDirect();
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,9 @@ public String toString() {
.add("buf", buf)
.toString();
}

@Override
public boolean isDirect() {
return buf.isDirect();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,10 @@ public String toString() {
.add("buf", buf)
.toString();
}

@Override
public boolean isDirect() {
return buf.isDirect();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,9 @@ public boolean equals(Object other) {
}
return false;
}

@Override
public boolean isDirect() {
return underlying.isDirect();
}
}