-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19390: Call safeForceUnmap() in AbstractIndex.resize() on Linux to prevent stale mmap of index files #19961
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the PR. Great fix! Left a couple of comments.
|
|
||
| log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize = {}, entries = {}, lastOffset = {}, file position = {}", | ||
| file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), lastEntry.offset, mmap().position()); | ||
| inLock(lock, () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is lock needed here in the constructor? Ditto for TimeIndex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t think it’s necessary in the constructor, so I’ll remove it.
| */ | ||
| public OffsetPosition lookup(long targetOffset) { | ||
| return maybeLock(lock, () -> { | ||
| return inLock(lock, () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, in Linux, we don't acquire the lock for lookup(), which is used for fetch requests. Both fetch and produce requests are common. Acquiring the lock in the fetch path reduces read/write concurrency. The reader only truly needs to lock when the underlying mmap changes by resize(). Since resize() is an infrequent event, we could introduce a separate resize lock, which will be held by resize() and all readers (currently calling maybeLock()). This will help maintain the current level of concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’d like to ask the following questions:
-
The JavaDoc for java.nio.Buffer says it is not thread-safe. In the case of reads, is it still safe for multiple threads to access the same buffer concurrently?
(See: https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/nio/Buffer.html) -
To improve read-operation concurrency, are you suggesting the use of ReentrantReadWriteLock instead of ReentrantLock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Good question. It's true that happens-before guarantee is not guaranteed according to javadoc. However, according to this, MappedByteBuffer is a thin wrapper and provides direct access to buffer cache of the OS. So, once you have written to it, every thread, and every process, will see the update. In addition to the index file, we read from the buffer cache for the log file without synchronization too. The implementation may change in the future, but probably unlikely.
So for now, it's probably better to use a separate resize lock to maintain the current read/write concurrency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the advice!
I have added resizeLock, could you please check?
Please let me know if I have misunderstood anything.
| throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " | ||
| + "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp " | ||
| + timestamp(mmap(), 0)); | ||
| inLock(lock, () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch here!
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR. A couple of more comments. Also, could you run some perf test (produce perf + consumer perf together) to make sure there is no degradation?
| private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class); | ||
|
|
||
| protected final ReentrantLock lock = new ReentrantLock(); | ||
| protected final ReentrantReadWriteLock resizeLock = new ReentrantReadWriteLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we have two locks, could we add comments on how each one is being used? Also, remapLock is probably more accurate?
| protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action) throws E { | ||
| if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) | ||
| lock.lock(); | ||
| protected final <T, E extends Exception> T inResizeLock(Lock lock, StorageAction<T, E> action) throws E { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it's better to make this inResizeReadLock and avoid passing in lock from the input. For consistency, we could also introduce a method like inLock for the other lock. This way, we could make both locks private.
|
Here is the result of perf test:
trunk (7cd99ea)pr (b035e4c)pr (af3cf9d) |
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR and the perf results. The perf numbers look good. A few more comments.
| protected final ReentrantLock lock = new ReentrantLock(); | ||
| // Serializes all index operations that mutate internal state | ||
| private final ReentrantLock lock = new ReentrantLock(); | ||
| // Allows concurrent read operations while ensuring exclusive access during index resizing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
during index resizing => if the underlying mmap is changed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. Fixed in 5bc31d2
| return LockUtils.inLockThrows(lock, action); | ||
| } | ||
|
|
||
| protected final <T, E extends Exception> T inResizeReadLock(StorageAction<T, E> action) throws E { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inResizeReadLock => inRemapReadLock ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in d320291
| } | ||
|
|
||
| protected final <T, E extends Exception> T inResizeReadLock(StorageAction<T, E> action) throws E { | ||
| remapLock.readLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem that the caller passes in action that actually throws. Could we just use LockUtils.inLock()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've addressed the remaining comments. 7c98c33
Could you please check it again?
| /* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */ | ||
| if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) | ||
| return LockUtils.inLockThrows(lock, () -> { | ||
| remapLock.writeLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use LockUtils.inLockThrows(remapLock.writeLock().lock(), () -> {?
| // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details. | ||
| lock.lock(); | ||
| try { | ||
| LockUtils.inLockThrows(lock, () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
safeForceUnmap() also changes mmap. So we want to protect it by the remap lock.
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR. A few more comments.
| protected final ReentrantLock lock = new ReentrantLock(); | ||
| // Serializes all index operations that mutate internal state | ||
| private final ReentrantLock lock = new ReentrantLock(); | ||
| // Allows concurrent read operations while ensuring exclusive access if the underlying file is changed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the underlying file => the underlying mmap
| public boolean resize(int newSize) throws IOException { | ||
| lock.lock(); | ||
| try { | ||
| return LockUtils.inLockThrows(lock, () -> LockUtils.inLockThrows(remapLock.writeLock(), () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we split this into two lines?
| // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details. | ||
| lock.lock(); | ||
| try { | ||
| LockUtils.inLockThrows(remapLock.writeLock(), () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need both locks here since we need to prevent both concurrent writer and reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in ba483b3
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR. One more comment.
| } finally { | ||
| lock.unlock(); | ||
| } | ||
| return LockUtils.inLockThrows(lock, () -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use the utility method in this class? Ditto in a few other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ve added inRemapReadWriteLock method. Does this address your comment appropriately?
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR. A couple of more comments.
| public void flush() { | ||
| lock.lock(); | ||
| try { | ||
| LockUtils.inLock(lock, () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could just be inLock( () -> {, right? Ditto in a few other places.
| public boolean resize(int newSize) throws IOException { | ||
| lock.lock(); | ||
| try { | ||
| return inRemapReadWriteLock(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably clearly to do sth like the following. Ditto for closeHandler().
inLockThrows(() ->
inRemapWriteLockThrows( () -> {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestions! I've applied the fix
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR. One more comment.
| return LockUtils.inLockThrows(remapLock.readLock(), () -> action).get(); | ||
| } | ||
|
|
||
| protected final <E extends Exception> void inRemapReadLockThrows(LockUtils.ThrowingRunnable<E> action) throws E { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inRemapReadLockThrows => inRemapWriteLockThrows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the lock helpers.
|
@Forest0923 : Thanks for the updated PR. The following test failure seems related to this PR. My understanding is that this is caused by low retention size that closes the handler of the index file during async delete. However, by default, the delete is delayed by 60 secs. So, I don't quite understand why this failure is showing up in this test. Do you know what's causing this test to fail now? |
|
I'm not exactly sure why I was able to reproduce the issue locally by increasing diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
index 854be39808..06421fd96a 100644
--- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
@@ -67,7 +67,7 @@ class LogConcurrencyTest {
def testUncommittedDataNotConsumed(log: UnifiedLog): Unit = {
val executor = Executors.newFixedThreadPool(2)
try {
- val maxOffset = 5000
+ val maxOffset = 10000
val consumer = new ConsumerTask(log, maxOffset)
val appendTask = new LogAppendTask(log, maxOffset)Applying the following change seems to fix the issue: diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
index 04d68e9a54..89875484f7 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
@@ -212,7 +212,7 @@ public final class OffsetIndex extends AbstractIndex {
* Truncates index to a known number of entries.
*/
private void truncateToEntries(int entries) {
- inLock(() -> {
+ inRemapWriteLockThrows(() -> {
super.truncateToEntries0(entries);
this.lastOffset = lastEntry().offset;
log.debug("Truncated index {} to {} entries; position is now {} and last offset is now {}", |
|
@Forest0923 : Thanks for the analysis. I was able to reproduce this issue too. It seems that mmap is temporarily set to null by Also, why would adding |
|
Sorry, my earlier comment about The Supplier's actual action was executed outside the lock. protected final <T> T inRemapReadLock(Supplier<T> action) {
return LockUtils.inLock(remapLock.readLock(), () -> action).get();
}updated: #19961 (comment) |
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR. A few more comments. Also, do you know what's causing the following test failure?
2025-07-02T03:49:27.3858133Z
2025-07-02T03:49:27.3860984Z [Error] /home/runner/work/kafka/kafka/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java:132: error: no suitable method found for produceKeyValuesSynchronouslyWithTimestamp(String,List
<KeyValue<Integer,String>>,Properties,long)
2025-07-02T03:49:27.3863357Z > Task :streams:integration-tests:compileTestScala FAILED
2025-07-02T03:49:27.3864394Z javac exited with exit code 1
| throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " | ||
| + "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp " | ||
| + timestamp(mmap(), 0)); | ||
| inLock(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs the remap read lock.
| private OffsetPosition lastEntry() { | ||
| lock.lock(); | ||
| try { | ||
| return inLock(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lastEntry() is a reader. So it should only take remap read lock.
Also, there is an existing issue. It seems there is no memory barrier for the instance level field lastOffset. So a reader may not see the latest value. We need to make it volatile.
| private TimestampOffset lastEntryFromIndexFile() { | ||
| lock.lock(); | ||
| try { | ||
| return inLock(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lastEntryFromIndexFile() is a reader. So, it needs the remap read lock.
|
Thanks for the review. I've fixed some lock usages and added volatile for lastOffset. By the way, I couldn’t reproduce the error in EmitOnChangeIntegrationTest locally or find failed github actions. Could it be related to this PR? |
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR. The code LGTM. Could you run the same producer/consumer perf test again?
|
Here is the result of the perf test:
Trunk (a3ed705)PR (0657c7a) |
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the the perf results. LGTM
chia7712
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 thanks for this patch, and sorry for delayed review. two small comments remain. Please take a look.
| * @param runnable the runnable to be executed within the lock context | ||
| * @throws NullPointerException if either {@code lock} or {@code runnable} is null | ||
| */ | ||
| public static void inLock(Lock lock, Runnable runnable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excuse me, have we considered replacing inLock by inLockThrows? Keeping both varieties seems a bit verbose to me.
| } finally { | ||
| lock.unlock(); | ||
| } | ||
| inLockThrows(() -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inLockThrows(() -> inRemapWriteLockThrows(this::safeForceUnmap));| private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class); | ||
|
|
||
| protected final ReentrantLock lock = new ReentrantLock(); | ||
| // Serializes all index operations that mutate internal state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a minor thing. It would be useful to add a comment on why the reader doesn't need to take this lock.
) (#20131) This PR performs a refactoring of LockUtils and improves inline comments, as a follow-up to #19961. Reviewers: Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
…390) (apache#20131) This PR performs a refactoring of LockUtils and improves inline comments, as a follow-up to apache#19961. Reviewers: Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
… to prevent stale mmap of index files (apache#19961) https://issues.apache.org/jira/browse/KAFKA-19390 The AbstractIndex.resize() method does not release the old memory map for both index and time index files. In some cases, Mixed GC may not run for a long time, which can cause the broker to crash when the vm.max_map_count limit is reached. The root cause is that safeForceUnmap() is not being called on Linux within resize(), so we have changed the code to unmap old mmap on all operating systems. The same problem was reported in [KAFKA-7442](https://issues.apache.org/jira/browse/KAFKA-7442), but the PR submitted at that time did not acquire all necessary locks around the mmap accesses and was closed without fixing the issue. Reviewers: Jun Rao <[email protected]>
…390) (apache#20131) This PR performs a refactoring of LockUtils and improves inline comments, as a follow-up to apache#19961. Reviewers: Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
|
@Forest0923 : Do you think that you could back port this fix to 3.9? We plan to do a minor release in 3.9 and it will be useful to include this fix. |
Backport KAFKA-19390 to v3.9, which includes PRs #19961 and #20131 Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
… to prevent stale mmap of index files (#19961) https://issues.apache.org/jira/browse/KAFKA-19390 The AbstractIndex.resize() method does not release the old memory map for both index and time index files. In some cases, Mixed GC may not run for a long time, which can cause the broker to crash when the vm.max_map_count limit is reached. The root cause is that safeForceUnmap() is not being called on Linux within resize(), so we have changed the code to unmap old mmap on all operating systems. The same problem was reported in [KAFKA-7442](https://issues.apache.org/jira/browse/KAFKA-7442), but the PR submitted at that time did not acquire all necessary locks around the mmap accesses and was closed without fixing the issue. Reviewers: Jun Rao <[email protected]>
) (#20131) This PR performs a refactoring of LockUtils and improves inline comments, as a follow-up to #19961. Reviewers: Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
https://issues.apache.org/jira/browse/KAFKA-19390
The AbstractIndex.resize() method does not release the old memory map
for both index and time index files. In some cases, Mixed GC may not
run for a long time, which can cause the broker to crash when the
vm.max_map_count limit is reached.
The root cause is that safeForceUnmap() is not being called on Linux
within resize(), so we have changed the code to unmap old mmap on all
operating systems.
The same problem was reported in
KAFKA-7442, but the
PR submitted at that time did not acquire all necessary locks around the
mmap accesses and was closed without fixing the issue.