Skip to content

Commit 7c98c33

Browse files
committed
refactor: update to use lock helper in LockUtils for consistency and simplify inRemapReadLock()
1 parent d320291 commit 7c98c33

File tree

1 file changed

+26
-36
lines changed

1 file changed

+26
-36
lines changed

storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java

Lines changed: 26 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -191,36 +191,31 @@ public void updateParentDir(File parentDir) {
191191
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
192192
*/
193193
public boolean resize(int newSize) throws IOException {
194-
return LockUtils.inLockThrows(lock, () -> {
195-
remapLock.writeLock().lock();
196-
try {
197-
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());
198-
199-
if (length == roundedNewSize) {
200-
log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize);
201-
return false;
202-
} else {
203-
RandomAccessFile raf = new RandomAccessFile(file, "rw");
204-
try {
205-
int position = mmap.position();
206-
207-
safeForceUnmap();
208-
raf.setLength(roundedNewSize);
209-
this.length = roundedNewSize;
210-
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
211-
this.maxEntries = mmap.limit() / entrySize();
212-
mmap.position(position);
213-
log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize,
214-
mmap.position(), mmap.limit());
215-
return true;
216-
} finally {
217-
Utils.closeQuietly(raf, "index file " + file.getName());
218-
}
194+
return LockUtils.inLockThrows(lock, () -> LockUtils.inLockThrows(remapLock.writeLock(), () -> {
195+
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());
196+
197+
if (length == roundedNewSize) {
198+
log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize);
199+
return false;
200+
} else {
201+
RandomAccessFile raf = new RandomAccessFile(file, "rw");
202+
try {
203+
int position = mmap.position();
204+
205+
safeForceUnmap();
206+
raf.setLength(roundedNewSize);
207+
this.length = roundedNewSize;
208+
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
209+
this.maxEntries = mmap.limit() / entrySize();
210+
mmap.position(position);
211+
log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize,
212+
mmap.position(), mmap.limit());
213+
return true;
214+
} finally {
215+
Utils.closeQuietly(raf, "index file " + file.getName());
219216
}
220-
} finally {
221-
remapLock.writeLock().unlock();
222217
}
223-
});
218+
}));
224219
}
225220

226221
/**
@@ -286,7 +281,7 @@ public void closeHandler() {
286281
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
287282
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
288283
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
289-
LockUtils.inLockThrows(lock, () -> {
284+
LockUtils.inLockThrows(remapLock.writeLock(), () -> {
290285
safeForceUnmap();
291286
});
292287
}
@@ -427,13 +422,8 @@ protected final <T, E extends Exception> T inLockThrows(LockUtils.ThrowingSuppli
427422
return LockUtils.inLockThrows(lock, action);
428423
}
429424

430-
protected final <T, E extends Exception> T inRemapReadLock(StorageAction<T, E> action) throws E {
431-
remapLock.readLock().lock();
432-
try {
433-
return action.execute();
434-
} finally {
435-
remapLock.readLock().unlock();
436-
}
425+
protected final <T> T inRemapReadLock(Supplier<T> action) {
426+
return LockUtils.inLock(remapLock.readLock(), () -> action).get();
437427
}
438428

439429
/**

0 commit comments

Comments
 (0)