-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19390: Call safeForceUnmap() in AbstractIndex.resize() on Linux to prevent stale mmap of index files #19961
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
da8841f
feat: Add lock helpers
Forest0923 34f40ec
fix: Unmap old mmap during resizing
Forest0923 5994659
test: Add test for AbstractIndex
Forest0923 71eec03
Merge branch 'trunk' into KAFKA-19390
Forest0923 130d8ad
Merge branch 'apache:trunk' into KAFKA-19390
Forest0923 61220c6
refactor: remove lock in constructors
Forest0923 b5e085c
refactor: Add resizeLock
Forest0923 b035e4c
refactor: make locks private, refactor lock helper, and add comments
Forest0923 5bc31d2
fix: fix comment for remapLock field
Forest0923 d320291
refactor: rename read lock helper
Forest0923 7c98c33
refactor: update to use lock helper in LockUtils for consistency and …
Forest0923 ba483b3
refactor: add additional lock in closeHandler and fix style
Forest0923 079e2e0
style: resolve issues found by checkstyle
Forest0923 150159a
refactor: add inRemapReadWriteLock
Forest0923 a0d5df9
fix: resolve warning in inRemapReadWriteLock
Forest0923 3268b08
refactor: remove inRemapReadWriteLock
Forest0923 be13b4b
fix: fix lock helpers
Forest0923 af3cf9d
fix: correct lock helper implementation
Forest0923 0657c7a
refactor: optimize lock
Forest0923 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,8 +17,8 @@ | |
| package org.apache.kafka.storage.internals.log; | ||
|
|
||
| import org.apache.kafka.common.utils.ByteBufferUnmapper; | ||
| import org.apache.kafka.common.utils.OperatingSystem; | ||
| import org.apache.kafka.common.utils.Utils; | ||
| import org.apache.kafka.server.util.LockUtils; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
@@ -33,8 +33,9 @@ | |
| import java.nio.file.Files; | ||
| import java.util.Objects; | ||
| import java.util.OptionalInt; | ||
| import java.util.concurrent.locks.Lock; | ||
| import java.util.concurrent.locks.ReentrantLock; | ||
| import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
| import java.util.function.Supplier; | ||
|
|
||
| /** | ||
| * The abstract index class which holds entry format agnostic methods. | ||
|
|
@@ -47,7 +48,10 @@ private enum SearchResultType { | |
|
|
||
| private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class); | ||
|
|
||
| protected final ReentrantLock lock = new ReentrantLock(); | ||
| // Serializes all index operations that mutate internal state | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a minor thing. It would be useful to add a comment on why the reader doesn't need to take this lock. |
||
| private final ReentrantLock lock = new ReentrantLock(); | ||
| // Allows concurrent read operations while ensuring exclusive access if the underlying mmap is changed | ||
| private final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock(); | ||
|
|
||
| private final long baseOffset; | ||
| private final int maxIndexSize; | ||
|
|
@@ -187,36 +191,32 @@ public void updateParentDir(File parentDir) { | |
| * @return a boolean indicating whether the size of the memory map and the underneath file is changed or not. | ||
| */ | ||
| public boolean resize(int newSize) throws IOException { | ||
| lock.lock(); | ||
| try { | ||
| int roundedNewSize = roundDownToExactMultiple(newSize, entrySize()); | ||
|
|
||
| if (length == roundedNewSize) { | ||
| log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize); | ||
| return false; | ||
| } else { | ||
| RandomAccessFile raf = new RandomAccessFile(file, "rw"); | ||
| try { | ||
| int position = mmap.position(); | ||
|
|
||
| /* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */ | ||
| if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) | ||
| safeForceUnmap(); | ||
| raf.setLength(roundedNewSize); | ||
| this.length = roundedNewSize; | ||
| mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize); | ||
| this.maxEntries = mmap.limit() / entrySize(); | ||
| mmap.position(position); | ||
| log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize, | ||
| mmap.position(), mmap.limit()); | ||
| return true; | ||
| } finally { | ||
| Utils.closeQuietly(raf, "index file " + file.getName()); | ||
| } | ||
| } | ||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
| return inLockThrows(() -> | ||
| inRemapWriteLockThrows(() -> { | ||
| int roundedNewSize = roundDownToExactMultiple(newSize, entrySize()); | ||
|
|
||
| if (length == roundedNewSize) { | ||
| log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize); | ||
| return false; | ||
| } else { | ||
| RandomAccessFile raf = new RandomAccessFile(file, "rw"); | ||
| try { | ||
| int position = mmap.position(); | ||
|
|
||
| safeForceUnmap(); | ||
| raf.setLength(roundedNewSize); | ||
| this.length = roundedNewSize; | ||
| mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize); | ||
| this.maxEntries = mmap.limit() / entrySize(); | ||
| mmap.position(position); | ||
| log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize, | ||
| mmap.position(), mmap.limit()); | ||
| return true; | ||
| } finally { | ||
| Utils.closeQuietly(raf, "index file " + file.getName()); | ||
| } | ||
| } | ||
| })); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -236,12 +236,9 @@ public void renameTo(File f) throws IOException { | |
| * Flush the data in the index to disk | ||
| */ | ||
| public void flush() { | ||
| lock.lock(); | ||
| try { | ||
| inLock(() -> { | ||
| mmap.force(); | ||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -261,14 +258,11 @@ public boolean deleteIfExists() throws IOException { | |
| * the file. | ||
| */ | ||
| public void trimToValidSize() throws IOException { | ||
| lock.lock(); | ||
| try { | ||
| inLockThrows(() -> { | ||
| if (mmap != null) { | ||
| resize(entrySize() * entries); | ||
| } | ||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -288,12 +282,10 @@ public void closeHandler() { | |
| // However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk. | ||
| // To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness. | ||
| // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details. | ||
| lock.lock(); | ||
| try { | ||
| safeForceUnmap(); | ||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
| inLockThrows(() -> | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. inLockThrows(() -> inRemapWriteLockThrows(this::safeForceUnmap)); |
||
| inRemapWriteLockThrows(() -> { | ||
| safeForceUnmap(); | ||
| })); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -420,20 +412,36 @@ protected void truncateToEntries0(int entries) { | |
| mmap.position(entries * entrySize()); | ||
| } | ||
|
|
||
| /** | ||
| * Execute the given function in a lock only if we are running on windows or z/OS. We do this | ||
| * because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it | ||
| * and this requires synchronizing reads. | ||
| */ | ||
| protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action) throws E { | ||
| if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) | ||
| lock.lock(); | ||
| try { | ||
| return action.execute(); | ||
| } finally { | ||
| if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) | ||
| lock.unlock(); | ||
| } | ||
| protected final <T> T inLock(Supplier<T> action) { | ||
| return LockUtils.inLock(lock, action); | ||
| } | ||
|
|
||
| protected final void inLock(Runnable action) { | ||
| LockUtils.inLock(lock, action); | ||
| } | ||
|
|
||
| protected final <T, E extends Exception> T inLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E { | ||
| return LockUtils.inLockThrows(lock, action); | ||
| } | ||
|
|
||
| protected final <E extends Exception> void inLockThrows(LockUtils.ThrowingRunnable<E> action) throws E { | ||
| LockUtils.inLockThrows(lock, action); | ||
| } | ||
|
|
||
| protected final <T> T inRemapReadLock(Supplier<T> action) { | ||
| return LockUtils.inLock(remapLock.readLock(), action); | ||
| } | ||
|
|
||
| protected final void inRemapReadLock(Runnable action) { | ||
| LockUtils.inLock(remapLock.readLock(), action); | ||
| } | ||
|
|
||
| protected final <T, E extends Exception> T inRemapWriteLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E { | ||
| return LockUtils.inLockThrows(remapLock.writeLock(), action); | ||
| } | ||
|
|
||
| protected final <E extends Exception> void inRemapWriteLockThrows(LockUtils.ThrowingRunnable<E> action) throws E { | ||
| LockUtils.inLockThrows(remapLock.writeLock(), action); | ||
| } | ||
|
|
||
| /** | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excuse me, have we considered replacing
inLockbyinLockThrows? Keeping both varieties seems a bit verbose to me.