-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-19407 Fix potential IllegalStateException when appending to timeIndex #19972
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This PR is supposed to introduce no any new lock contention, because (potentially blocking) lazy timeIndex materialization has already exclusive control by lock in LazyIndex#get |
@@ -192,8 +192,13 @@ public void sanityCheck(boolean timeIndexFileNewlyCreated) throws IOException { | |||
* the time index). | |||
*/ | |||
public TimestampOffset readMaxTimestampAndOffsetSoFar() throws IOException { | |||
if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) | |||
maxTimestampAndOffsetSoFar = timeIndex().lastEntry(); | |||
if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this volatile field become an Atomic instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Atomic doesn't make the thing simple, because we need double-check locking anyways to ensure:
- initialize the value only once at first time
- always return initialized value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought updateAndGet
could work to this effect
maxTimestampAndOffsetSoFar.updateAndGet(t -> if (t == TimestampOffset.UNKNOWN) timeIndex().lastEntry() else t)
But I have not fully considered whether it would end up being slower on any microbenchmark. It just looked simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ocadaruma nice find!
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
Show resolved
Hide resolved
if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) | ||
maxTimestampAndOffsetSoFar = timeIndex().lastEntry(); | ||
if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) { | ||
synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The monitor lock of LogSegment
may be used by external code. Perhaps it would be safer to use the monitor lock of an internal Object
instead?
private final Object maxTimestampAndOffsetLock = new Object();
synchronized (maxTimestampAndOffsetLock) {
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough. fixed.
when(seg.timeIndex()).thenReturn(mockTimeIndex); | ||
List<Future<?>> futures = new ArrayList<>(); | ||
for (int i = 0; i < numThreads; i++) { | ||
futures.add(executor.submit(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
futures.add(executor.submit(() -> assertDoesNotThrow(seg::maxTimestampSoFar)));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or we can use CompletableFuture
to streamline the code.
long remainingDurationNanos = Duration.ofSeconds(1).toNanos();
while (remainingDurationNanos > 0) {
long t0 = System.nanoTime();
clearInvocations(mockTimeIndex);
try (LogSegment seg = spy(LogTestUtils.createSegment(0, logDir, 10, Time.SYSTEM))) {
when(seg.timeIndex()).thenReturn(mockTimeIndex);
var futures = IntStream.range(0, numThreads).mapToObj(ignored -> CompletableFuture.runAsync(() -> assertDoesNotThrow(seg::maxTimestampSoFar))).toList();
futures.forEach(CompletableFuture::join);
// timeIndex.lastEntry should be called once if no race
verify(mockTimeIndex, times(1)).lastEntry();
long elapsedNanos = System.nanoTime() - t0;
remainingDurationNanos -= elapsedNanos;
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, fixed to use assertDoesNotThrow
if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) | ||
maxTimestampAndOffsetSoFar = timeIndex().lastEntry(); | ||
if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) { | ||
synchronized (maxTimestampAndOffsetLock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add comments for this lock? By default, LogSegment
is not thread-safe so all modification should be executed within UnifiedLog
lock. However, there is an exceptional path that could change the maxTimestampAndOffsetSoFar
without UnifiedLog
lock, which causes this concurrent issue ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Added a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ocadaruma : Thanks for the PR. LGTM
…eIndex (#19972) ## Summary - Fix potential race condition in LogSegment#readMaxTimestampAndOffsetSoFar(), which may result in non-monotonic offsets and causes replication to stop. - See https://issues.apache.org/jira/browse/KAFKA-19407 for the details how it happen. Reviewers: Vincent PÉRICART <[email protected]>, Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
…eIndex (#19972) ## Summary - Fix potential race condition in LogSegment#readMaxTimestampAndOffsetSoFar(), which may result in non-monotonic offsets and causes replication to stop. - See https://issues.apache.org/jira/browse/KAFKA-19407 for the details how it happen. Reviewers: Vincent PÉRICART <[email protected]>, Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
…eIndex (apache#19972) ## Summary - Fix potential race condition in LogSegment#readMaxTimestampAndOffsetSoFar(), which may result in non-monotonic offsets and causes replication to stop. - See https://issues.apache.org/jira/browse/KAFKA-19407 for the details how it happen. Reviewers: Vincent PÉRICART <[email protected]>, Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
…eIndex (apache#19972) ## Summary - Fix potential race condition in LogSegment#readMaxTimestampAndOffsetSoFar(), which may result in non-monotonic offsets and causes replication to stop. - See https://issues.apache.org/jira/browse/KAFKA-19407 for the details how it happen. Reviewers: Vincent PÉRICART <[email protected]>, Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
Summary
LogSegment#readMaxTimestampAndOffsetSoFar(), which may result in
non-monotonic offsets and causes replication to stop.
how it happen.
Reviewers: Vincent PÉRICART [email protected], Jun Rao
[email protected], Chia-Ping Tsai [email protected]