Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@
* such as acquiring and releasing locks in a safe manner.
*/
public class LockUtils {
@FunctionalInterface
public interface ThrowingSupplier<T, E extends Exception> {
T get() throws E;
}
@FunctionalInterface
public interface ThrowingRunnable<E extends Exception> {
void run() throws E;
}

/**
* Executes the given {@link Supplier} within the context of the specified {@link Lock}.
Expand All @@ -49,4 +57,73 @@ public static <T> T inLock(Lock lock, Supplier<T> supplier) {
lock.unlock();
}
}

/**
* Executes the given {@link Runnable} within the context of the specified {@link Lock}.
* The lock is acquired before executing the runnable and released after the execution,
* ensuring that the lock is always released, even if an exception is thrown.
*
* @param lock the lock to be acquired and released
* @param runnable the runnable to be executed within the lock context
* @throws NullPointerException if either {@code lock} or {@code runnable} is null
*/
public static void inLock(Lock lock, Runnable runnable) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excuse me, have we considered replacing inLock by inLockThrows? Keeping both varieties seems a bit verbose to me.

Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(runnable, "Runnable must not be null");

lock.lock();
try {
runnable.run();
} finally {
lock.unlock();
}
}

/**
* Executes the given {@link ThrowingSupplier} within the context of the specified {@link Lock}.
* The lock is acquired before executing the supplier and released after the execution,
* ensuring that the lock is always released, even if an exception is thrown.
*
* @param <T> the type of the result returned by the supplier
* @param <E> the type of exception that may be thrown by the supplier
* @param lock the lock to be acquired and released
* @param supplier the supplier to be executed within the lock context
* @return the result of the supplier
* @throws E if an exception occurs during the execution of the supplier
* @throws NullPointerException if either {@code lock} or {@code supplier} is null
*/
public static <T, E extends Exception> T inLockThrows(Lock lock, ThrowingSupplier<T, E> supplier) throws E {
Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(supplier, "Supplier must not be null");

lock.lock();
try {
return supplier.get();
} finally {
lock.unlock();
}
}

/**
* Executes the given {@link ThrowingRunnable} within the context of the specified {@link Lock}.
* The lock is acquired before executing the runnable and released after the execution,
* ensuring that the lock is always released, even if an exception is thrown.
*
* @param <E> the type of exception that may be thrown by the runnable
* @param lock the lock to be acquired and released
* @param runnable the runnable to be executed within the lock context
* @throws E if an exception occurs during the execution of the runnable
* @throws NullPointerException if either {@code lock} or {@code runnable} is null
*/
public static <E extends Exception> void inLockThrows(Lock lock, ThrowingRunnable<E> runnable) throws E {
Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(runnable, "Runnable must not be null");

lock.lock();
try {
runnable.run();
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.kafka.storage.internals.log;

import org.apache.kafka.common.utils.ByteBufferUnmapper;
import org.apache.kafka.common.utils.OperatingSystem;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.LockUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,8 +33,9 @@
import java.nio.file.Files;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;

/**
* The abstract index class which holds entry format agnostic methods.
Expand All @@ -47,7 +48,10 @@ private enum SearchResultType {

private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class);

protected final ReentrantLock lock = new ReentrantLock();
// Serializes all index operations that mutate internal state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor thing. It would be useful to add a comment on why the reader doesn't need to take this lock.

private final ReentrantLock lock = new ReentrantLock();
// Allows concurrent read operations while ensuring exclusive access if the underlying mmap is changed
private final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock();

private final long baseOffset;
private final int maxIndexSize;
Expand Down Expand Up @@ -187,8 +191,7 @@ public void updateParentDir(File parentDir) {
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
*/
public boolean resize(int newSize) throws IOException {
lock.lock();
try {
return inRemapReadWriteLock(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably clearly to do sth like the following. Ditto for closeHandler().

inLockThrows(() ->
      inRemapWriteLockThrows( () -> {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestions! I've applied the fix

int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());

if (length == roundedNewSize) {
Expand All @@ -199,9 +202,7 @@ public boolean resize(int newSize) throws IOException {
try {
int position = mmap.position();

/* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
safeForceUnmap();
safeForceUnmap();
raf.setLength(roundedNewSize);
this.length = roundedNewSize;
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
Expand All @@ -214,9 +215,7 @@ public boolean resize(int newSize) throws IOException {
Utils.closeQuietly(raf, "index file " + file.getName());
}
}
} finally {
lock.unlock();
}
});
}

/**
Expand All @@ -236,12 +235,9 @@ public void renameTo(File f) throws IOException {
* Flush the data in the index to disk
*/
public void flush() {
lock.lock();
try {
LockUtils.inLock(lock, () -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could just be inLock( () -> {, right? Ditto in a few other places.

mmap.force();
} finally {
lock.unlock();
}
});
}

/**
Expand All @@ -261,14 +257,11 @@ public boolean deleteIfExists() throws IOException {
* the file.
*/
public void trimToValidSize() throws IOException {
lock.lock();
try {
LockUtils.inLockThrows(lock, () -> {
if (mmap != null) {
resize(entrySize() * entries);
}
} finally {
lock.unlock();
}
});
}

/**
Expand All @@ -288,12 +281,9 @@ public void closeHandler() {
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
lock.lock();
try {
inRemapReadWriteLock(() -> {
safeForceUnmap();
} finally {
lock.unlock();
}
});
}

/**
Expand Down Expand Up @@ -420,20 +410,28 @@ protected void truncateToEntries0(int entries) {
mmap.position(entries * entrySize());
}

/**
* Execute the given function in a lock only if we are running on windows or z/OS. We do this
* because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it
* and this requires synchronizing reads.
*/
protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action) throws E {
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
lock.lock();
try {
return action.execute();
} finally {
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
lock.unlock();
}
protected final <T> T inLock(Supplier<T> action) {
return LockUtils.inLock(lock, () -> action).get();
}

protected final void inLock(Runnable action) {
LockUtils.inLock(lock, action);
}

protected final <T, E extends Exception> T inLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLockThrows(lock, action);
}

protected final <T> T inRemapReadLock(Supplier<T> action) {
return LockUtils.inLock(remapLock.readLock(), () -> action).get();
}

protected final <T, E extends Exception> T inRemapReadWriteLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLockThrows(lock, () -> LockUtils.inLockThrows(remapLock.writeLock(), action));
}

protected final <E extends Exception> void inRemapReadWriteLock(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLockThrows(lock, () -> LockUtils.inLockThrows(remapLock.writeLock(), action));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void sanityCheck() {
* the pair (baseOffset, 0) is returned.
*/
public OffsetPosition lookup(long targetOffset) {
return maybeLock(lock, () -> {
return inRemapReadLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY);
if (slot == -1)
Expand All @@ -111,7 +111,7 @@ public OffsetPosition lookup(long targetOffset) {
* @return The offset/position pair at that entry
*/
public OffsetPosition entry(int n) {
return maybeLock(lock, () -> {
return inRemapReadLock(() -> {
if (n >= entries())
throw new IllegalArgumentException("Attempt to fetch the " + n + "th entry from index " +
file().getAbsolutePath() + ", which has size " + entries());
Expand All @@ -125,7 +125,7 @@ public OffsetPosition entry(int n) {
* such offset.
*/
public Optional<OffsetPosition> fetchUpperBoundOffset(OffsetPosition fetchOffset, int fetchSize) {
return maybeLock(lock, () -> {
return inRemapReadLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE);
if (slot == -1)
Expand All @@ -141,8 +141,7 @@ public Optional<OffsetPosition> fetchUpperBoundOffset(OffsetPosition fetchOffset
* @throws InvalidOffsetException if provided offset is not larger than the last offset
*/
public void append(long offset, int position) {
lock.lock();
try {
inLock(() -> {
if (isFull())
throw new IllegalArgumentException("Attempt to append to a full index (size = " + entries() + ").");

Expand All @@ -157,15 +156,12 @@ public void append(long offset, int position) {
} else
throw new InvalidOffsetException("Attempt to append an offset " + offset + " to position " + entries() +
" no larger than the last offset appended (" + lastOffset + ") to " + file().getAbsolutePath());
} finally {
lock.unlock();
}
});
}

@Override
public void truncateTo(long offset) {
lock.lock();
try {
inLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY);

Expand All @@ -182,9 +178,7 @@ else if (relativeOffset(idx, slot) == offset - baseOffset())
else
newEntries = slot + 1;
truncateToEntries(newEntries);
} finally {
lock.unlock();
}
});
}

public long lastOffset() {
Expand Down Expand Up @@ -218,30 +212,24 @@ private int physical(ByteBuffer buffer, int n) {
* Truncates index to a known number of entries.
*/
private void truncateToEntries(int entries) {
lock.lock();
try {
inLock(() -> {
super.truncateToEntries0(entries);
this.lastOffset = lastEntry().offset;
log.debug("Truncated index {} to {} entries; position is now {} and last offset is now {}",
file().getAbsolutePath(), entries, mmap().position(), lastOffset);
} finally {
lock.unlock();
}
file().getAbsolutePath(), entries, mmap().position(), lastOffset);
});
}

/**
* The last entry in the index
*/
private OffsetPosition lastEntry() {
lock.lock();
try {
return inLock(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastEntry() is a reader. So it should only take remap read lock.

Also, there is an existing issue. It seems there is no memory barrier for the instance level field lastOffset. So a reader may not see the latest value. We need to make it volatile.

int entries = entries();
if (entries == 0)
return new OffsetPosition(baseOffset(), 0);
else
return parseEntry(mmap(), entries - 1);
} finally {
lock.unlock();
}
});
}
}
Loading
Loading