-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-22083][CORE] Release locks in MemoryStore.evictBlocksToFreeSpace #19311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #82046 has finished for PR 19311 at commit
|
if (exceptionWasThrown) { | ||
selectedBlocks.foreach { id => | ||
// some of the blocks may have already been unlocked, or completely removed | ||
blockInfoManager.get(id).foreach { info => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels racy. Let's say you're dropping 10 blocks here.
You try to drop the first one, but newEffectiveStorageLevel.isValid
is true, so you just unlock the block. Then you get to this code some time later, but some other thread has locked that first block. Aren't you going to drop that lock which you don't really own?
I think you'd need to keep track of which blocks have successfully been processed by dropBlock
instead of doing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, thanks, I've handled this now
MemoryStore.evictBlocksToFreeSpace acquires write locks for all the blocks it intends to evict up front. If there is a failure to evict blocks (eg., some failure dropping a block to disk), then we have to release the lock. Otherwise the lock is never released and an executor trying to get the lock will wait forever.
Test build #82091 has finished for PR 19311 at commit
|
memManager.setMemoryStore(memoryStore) | ||
|
||
// Put in some small blocks to fill up the memory store | ||
val initialBlocks = (1 to 10).map { id => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic looks fine, but I kinda dislike the magic number ("10") being used everywhere.
A constant would make this much better (val initialBlocks = 10
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To piggy back on @vanzin's comment, sizePerBlock also please (so that 100 goes away) ? Thx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @squito for the change, this looks like a nasty bugfix !
// blocks getting evicted. We'll make the eviction throw an exception, and make sure that | ||
// all locks are released. | ||
val ct = implicitly[ClassTag[Array[Byte]]] | ||
def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, readLockAfterDrop: Boolean): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: failAfterDroppingNBlocks -> numValidBlocks, readLockAfterDrop -> validBlock ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think validBlock
captures the intent here -- I don't see anything valid or invalid about it either way. The part of the behavior which changes is whether or not another thread grabs a reader lock on the thread after it gets dropped to disk.
(To go along with that, we drop the block to disk, rather than just evicting it completely, as otherwise there is nothing to grab a lock of. I could always drop the block to disk, instead of having that depend on this, it just seemed like another useful thing to check, whether the number of blocks was successfully updated in blockInfoManager
, when the block was dropped completely.)
memManager.setMemoryStore(memoryStore) | ||
|
||
// Put in some small blocks to fill up the memory store | ||
val initialBlocks = (1 to 10).map { id => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To piggy back on @vanzin's comment, sizePerBlock also please (so that 100 goes away) ? Thx
val blocksStillInMemory = blockInfoManager.entries.filter { case (id, info) => | ||
assert(info.writerTask === BlockInfo.NO_WRITER, id) | ||
// in this test, all the blocks in memory have no reader, but everything dropped to disk | ||
// had another thread read the block. We shouldn't lose the other thread's reader lock. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious about this part of the test.
Why are we checking for this (and so, why afterDropAction
in the test case) ? Even without this, the change and testcase looks fine to me.
Am I missing something ?
Are we testing for write lock release resulting in read unlock for other task's as well ?
Or something else ?
(To nitpick, the write lock release and read lock acquire can be interspersed by another read or write acquire (ofcourse not in this test) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In an earlier version of this, I was always unconditionally releasing all locks that were held by anything in the finally
. I've changed it to only release locks that this thread holds, and this part of the test is to verify that. We simulate another thread grabbing a lock on the blocks which get successfully dropped (just a read lock in this case, though doesn't really matter). The test makes sure that even though we drop some of the remaining locks owned by this thread in the finally
, the other thread still keeps its read lock.
Yes, there are many other possible interleavings of locks possible with other threads, but thats not the point of this test case. Its to make sure that the finally
block releases only the correct set of locks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarifying @squito ... I was assuming the test was for something along those line, but good to know I did not misunderstand !
This patch looks great.
LGTM. Merging to master / 2.2 / 2.1. |
(After tests are done, I mean.) |
Test build #82150 has finished for PR 19311 at commit
|
## What changes were proposed in this pull request? MemoryStore.evictBlocksToFreeSpace acquires write locks for all the blocks it intends to evict up front. If there is a failure to evict blocks (eg., some failure dropping a block to disk), then we have to release the lock. Otherwise the lock is never released and an executor trying to get the lock will wait forever. ## How was this patch tested? Added unit test. Author: Imran Rashid <[email protected]> Closes #19311 from squito/SPARK-22083. (cherry picked from commit 2c5b9b1) Signed-off-by: Marcelo Vanzin <[email protected]>
## What changes were proposed in this pull request? MemoryStore.evictBlocksToFreeSpace acquires write locks for all the blocks it intends to evict up front. If there is a failure to evict blocks (eg., some failure dropping a block to disk), then we have to release the lock. Otherwise the lock is never released and an executor trying to get the lock will wait forever. ## How was this patch tested? Added unit test. Author: Imran Rashid <[email protected]> Closes #19311 from squito/SPARK-22083. (cherry picked from commit 2c5b9b1) Signed-off-by: Marcelo Vanzin <[email protected]>
## What changes were proposed in this pull request? MemoryStore.evictBlocksToFreeSpace acquires write locks for all the blocks it intends to evict up front. If there is a failure to evict blocks (eg., some failure dropping a block to disk), then we have to release the lock. Otherwise the lock is never released and an executor trying to get the lock will wait forever. ## How was this patch tested? Added unit test. Author: Imran Rashid <[email protected]> Closes apache#19311 from squito/SPARK-22083. (cherry picked from commit 2c5b9b1) Signed-off-by: Marcelo Vanzin <[email protected]>
MemoryStore.evictBlocksToFreeSpace acquires write locks for all the blocks it intends to evict up front. If there is a failure to evict blocks (eg., some failure dropping a block to disk), then we have to release the lock. Otherwise the lock is never released and an executor trying to get the lock will wait forever. Added unit test. Author: Imran Rashid <[email protected]> Closes apache#19311 from squito/SPARK-22083.
What changes were proposed in this pull request?
MemoryStore.evictBlocksToFreeSpace acquires write locks for all the
blocks it intends to evict up front. If there is a failure to evict
blocks (eg., some failure dropping a block to disk), then we have to
release the lock. Otherwise the lock is never released and an executor
trying to get the lock will wait forever.
How was this patch tested?
Added unit test.