-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-2670] FetchFailedException should be thrown when local fetch has failed #1578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Thanks - this is a good idea. Two questions: (a) what type of exception have you seen here? (b) could you add a unit test for this? Jenkins, test this please. |
QA tests have started for PR 1578. This patch merges cleanly. |
QA results for PR 1578: |
// Pass 0 as size since it's not in flight | ||
results.put(new FetchResult(id, 0, () => iter)) | ||
logDebug("Got local block " + id) | ||
try{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small code style thing, add a space before the {
@pwendell I found this issue when I simulated disk fault. When shuffle____ cannot be open successfully, FileNotFoundException was thrown from the constructor of RandomAccessFile in DiskStore#getBytes. Yes, I will add test cases later. |
Here also should throw an override def next(): (BlockId, Option[Iterator[Any]]) = {
resultsGotten += 1
val startFetchWait = System.currentTimeMillis()
val result = results.take()
val stopFetchWait = System.currentTimeMillis()
_fetchWaitTime += (stopFetchWait - startFetchWait)
if (!result.failed) bytesInFlight -= result.size
while (!fetchRequests.isEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}
(result.blockId, if (result.failed) None else Some(result.deserialize()))
} |
I've modified BasicBlockFetcherIterator to fail fast and added test cases. |
case e: Exception => { | ||
logError(s"Error occurred while fetching local blocks", e) | ||
for (id <- localBlocksToFetch.drop(fetchIndex)) { | ||
results.put(new FetchResult(id, -1, null)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't do drop and such on a ConcurrentQueue, since it might drop stuff other threads were adding. Just do a results.put on the failed block and don't worry about dropping other ones. You can actually move the try/catch into the for loop and add a "return" at the bottom of the catch after adding this failing FetchResult.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your comment, @mateiz .
I wouldn't do drop and such on a ConcurrentQueue, since it might drop stuff other threads were adding. Just do a results.put on the failed block and don't worry about dropping other ones. You can actually move the try/catch into the for loop and add a "return" at the bottom of the catch after adding this failing FetchResult.
But, if it returns from getLocalBlocks immediately rest of FetchResults is not set to results, and we waits on results.take() in next method forever right? results is a instance of LinkedBlockingQueue and take method is blocking method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought next() would return a failure block, and then the caller of BlockFetcherIterator will just stop. Did you see it not doing that? I think all you have to do is put one FetchResult with size = -1 in the queue and return, and everything will be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought wrong. Exactly, in current usage of BlockFetcherIterator, next() is not invoked after FetchFailedException has been thrown.
I wonder it's a little bit problem that we can invoke next() after FetchFailedException even if there are no such usages in current implementation.
I think it's better to prohibit invoking next() after FetchFailedException to clearly express the correct usage of the method.
Thanks for adding the test! I had one more comment on using drop() on the concurrent queue -- it seems like it might be troublesome. I'd rather just put the failed result and exit from getLocalBlocks |
localBlocksToFetch is a instance of ArrayBuffer, not concurrent queue and it is used from only one thread right? |
Jenkins, retest this please |
QA tests have started for PR 1578. This patch merges cleanly. |
QA results for PR 1578: |
Thanks for the changes! I've merged this in. |
…as failed Author: Kousuke Saruta <[email protected]> Closes apache#1578 from sarutak/SPARK-2670 and squashes the following commits: 85c8938 [Kousuke Saruta] Removed useless results.put for fail fast e8713cc [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2670 d353984 [Kousuke Saruta] Refined assertion messages in BlockFetcherIteratorSuite.scala 03bcb02 [Kousuke Saruta] Merge branch 'SPARK-2670' of github.com:sarutak/spark into SPARK-2670 5d05855 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2670 4fca130 [Kousuke Saruta] Added test cases for BasicBlockFetcherIterator b7b8250 [Kousuke Saruta] Modified BasicBlockFetchIterator to fail fast when local fetch error has been occurred a3a9be1 [Kousuke Saruta] Modified BlockFetcherIterator for SPARK-2670 460dc01 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2670 e310c0b [Kousuke Saruta] Modified BlockFetcherIterator to handle local fetch failure as fatch fail
No description provided.