Skip to content

Commit c07a3f6

Browse files
committed
KAFKA-17302: ReplicaFetcher changes for fetching from tiered offset
1 parent f61876d commit c07a3f6

File tree

14 files changed

+851
-43
lines changed

14 files changed

+851
-43
lines changed

core/src/main/java/kafka/server/TierStateMachine.java

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import org.apache.kafka.common.KafkaException;
2323
import org.apache.kafka.common.TopicPartition;
24-
import org.apache.kafka.common.message.FetchResponseData.PartitionData;
24+
import org.apache.kafka.common.Uuid;
2525
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
2626
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
2727
import org.apache.kafka.common.protocol.Errors;
@@ -85,21 +85,19 @@ public TierStateMachine(LeaderEndPoint leader,
8585
/**
8686
* Start the tier state machine for the provided topic partition.
8787
*
88-
* @param topicPartition the topic partition
89-
* @param currentFetchState the current PartitionFetchState which will
90-
* be used to derive the return value
91-
* @param fetchPartitionData the data from the fetch response that returned the offset moved to tiered storage error
92-
*
93-
* @return the new PartitionFetchState after the successful start of the
94-
* tier state machine
88+
* @param topicPartition the topic partition for which the fetch state is to be started
89+
* @param topicId the optional unique identifier of the topic
90+
* @param currentLeaderEpoch the current leader epoch of the partition
91+
* @param epochAndStartingOffset the offset on the leader's local log from which to start replicating logs
92+
* @param leaderLogStartOffset the starting offset in the leader's log
93+
* @return the new PartitionFetchState after the successful start of the tier state machine
94+
* @throws Exception if an error occurs during the process, such as issues with remote storage
9595
*/
9696
PartitionFetchState start(TopicPartition topicPartition,
97-
PartitionFetchState currentFetchState,
98-
PartitionData fetchPartitionData) throws Exception {
99-
OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
100-
int epoch = epochAndLeaderLocalStartOffset.epoch();
101-
long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
102-
97+
Optional<Uuid> topicId,
98+
int currentLeaderEpoch,
99+
OffsetAndEpoch epochAndStartingOffset,
100+
long leaderLogStartOffset) throws Exception {
103101
long offsetToFetch;
104102
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
105103
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
@@ -112,21 +110,20 @@ PartitionFetchState start(TopicPartition topicPartition,
112110
}
113111

114112
try {
115-
offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset(), unifiedLog);
113+
offsetToFetch = buildRemoteLogAuxState(topicPartition, currentLeaderEpoch, epochAndStartingOffset.offset(), epochAndStartingOffset.epoch(), leaderLogStartOffset, unifiedLog);
116114
} catch (RemoteStorageException e) {
117115
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
118116
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
119117
throw e;
120118
}
121119

122-
OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
120+
OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentLeaderEpoch);
123121
long leaderEndOffset = fetchLatestOffsetResult.offset();
124122

125123
long initialLag = leaderEndOffset - offsetToFetch;
126124

127-
return new PartitionFetchState(currentFetchState.topicId(), offsetToFetch, Optional.of(initialLag), currentFetchState.currentLeaderEpoch(),
125+
return new PartitionFetchState(topicId, offsetToFetch, Optional.of(initialLag), currentLeaderEpoch,
128126
ReplicaState.FETCHING, unifiedLog.latestEpoch());
129-
130127
}
131128

132129
private OffsetForLeaderEpochResponseData.EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
@@ -182,7 +179,7 @@ private void buildProducerSnapshotFile(UnifiedLog unifiedLog,
182179
* fetching records from the leader. The return value is the next offset to fetch from the leader, which is the
183180
* next offset following the end offset of the remote log portion.
184181
*/
185-
private Long buildRemoteLogAuxState(TopicPartition topicPartition,
182+
protected Long buildRemoteLogAuxState(TopicPartition topicPartition,
186183
Integer currentLeaderEpoch,
187184
Long leaderLocalLogStartOffset,
188185
Integer epochForLeaderLocalLogStartOffset,

core/src/main/scala/kafka/server/AbstractFetcherThread.scala

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ abstract class AbstractFetcherThread(name: String,
101101

102102
protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Optional[OffsetAndEpoch]
103103

104+
protected def shouldUseTieredOffsetStrategy(topicPartition: TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean
105+
104106
override def shutdown(): Unit = {
105107
initiateShutdown()
106108
inLock(partitionMapLock) {
@@ -673,7 +675,13 @@ abstract class AbstractFetcherThread(name: String,
673675
*/
674676
val offsetAndEpoch = leader.fetchLatestOffset(topicPartition, currentLeaderEpoch)
675677
val leaderEndOffset = offsetAndEpoch.offset
676-
if (leaderEndOffset < replicaEndOffset) {
678+
val useTieredOffsetStrategy = shouldUseTieredOffsetStrategy(topicPartition, leaderEndOffset, replicaEndOffset)
679+
680+
if (useTieredOffsetStrategy) {
681+
val leaderStartOffset = leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
682+
val epochAndStartingOffset = earliestPendingUploadOffset(topicPartition, currentLeaderEpoch, leaderStartOffset)
683+
fetchTierStateMachine.start(topicPartition, topicId.asJava, currentLeaderEpoch, epochAndStartingOffset, leaderStartOffset.offset())
684+
} else if (leaderEndOffset < replicaEndOffset) {
677685
warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
678686
s"leader's latest offset $leaderEndOffset")
679687
truncate(topicPartition, OffsetTruncationState(leaderEndOffset, truncationCompleted = true))
@@ -783,7 +791,15 @@ abstract class AbstractFetcherThread(name: String,
783791
leaderEpochInRequest: Optional[Integer],
784792
fetchPartitionData: PartitionData): Boolean = {
785793
try {
786-
val newFetchState = fetchTierStateMachine.start(topicPartition, fetchState, fetchPartitionData)
794+
val useTieredOffsetStrategy = shouldUseTieredOffsetStrategy(topicPartition, fetchState)
795+
val epochAndLogStartOffset = leader.fetchEarliestOffset(topicPartition, fetchState.currentLeaderEpoch())
796+
val epochAndStartingOffset = if (useTieredOffsetStrategy) {
797+
earliestPendingUploadOffset(topicPartition, fetchState.currentLeaderEpoch(), epochAndLogStartOffset)
798+
} else {
799+
leader.fetchEarliestLocalOffset(topicPartition, fetchState.currentLeaderEpoch())
800+
}
801+
val newFetchState = fetchTierStateMachine.start(topicPartition, fetchState.topicId(), fetchState.currentLeaderEpoch(),
802+
epochAndStartingOffset, epochAndLogStartOffset.offset())
787803

788804
// TODO: use fetchTierStateMachine.maybeAdvanceState when implementing async tiering logic in KAFKA-13560
789805

@@ -807,6 +823,29 @@ abstract class AbstractFetcherThread(name: String,
807823
}
808824
}
809825

826+
/**
827+
* Determines the earliest offset for pending uploads, taking into account
828+
* both local and remote storage conditions.
829+
*/
830+
private def earliestPendingUploadOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, leaderLogStartOffset: OffsetAndEpoch): OffsetAndEpoch = {
831+
val earliestPendingUploadOffset = leader.fetchEarliestPendingUploadOffset(topicPartition, currentLeaderEpoch)
832+
if (earliestPendingUploadOffset.offset == -1L) {
833+
val leaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch)
834+
if (leaderLocalStartOffset.offset == leaderLogStartOffset.offset) {
835+
return leaderLocalStartOffset
836+
}
837+
throw new OffsetNotAvailableException("Segments are uploaded to remote storage, but the leader does not have the information about the uploaded segments")
838+
}
839+
earliestPendingUploadOffset
840+
}
841+
842+
private def shouldUseTieredOffsetStrategy(topicPartition: TopicPartition, fetchState: PartitionFetchState): Boolean = {
843+
val leaderEndOffset = leader.fetchLatestOffset(topicPartition, fetchState.currentLeaderEpoch())
844+
val replicaEndOffset = logEndOffset(topicPartition)
845+
846+
shouldUseTieredOffsetStrategy(topicPartition, leaderEndOffset.offset(), replicaEndOffset)
847+
}
848+
810849
private def delayPartitions(partitions: Iterable[TopicPartition], delay: Long): Unit = {
811850
partitionMapLock.lockInterruptibly()
812851
try {

core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,28 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
135135
new OffsetAndEpoch(localLogStartOffset, epoch.orElse(0))
136136
}
137137

138+
override def fetchEarliestPendingUploadOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
139+
// TODO check remote log enabled
140+
val partition = replicaManager.getPartitionOrException(topicPartition)
141+
val highestRemoteOffset = partition.localLogOrException.highestOffsetInRemoteStorage()
142+
if (highestRemoteOffset == -1) {
143+
val localLogStartOffset = fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch)
144+
val logStartOffset = fetchEarliestOffset(topicPartition, currentLeaderEpoch)
145+
if (localLogStartOffset.offset() == logStartOffset.offset()) {
146+
// No segments have been uploaded yet
147+
logStartOffset;
148+
} else {
149+
// Leader currently does not know about the already uploaded segments
150+
new OffsetAndEpoch(-1L, -1);
151+
}
152+
} else {
153+
val logStartOffset = fetchEarliestOffset(topicPartition, currentLeaderEpoch)
154+
val earliestPendingUploadOffset = Math.max(highestRemoteOffset + 1, logStartOffset.offset())
155+
val epoch = partition.localLogOrException.leaderEpochCache.epochForOffset(earliestPendingUploadOffset)
156+
new OffsetAndEpoch(earliestPendingUploadOffset, epoch.orElse(0))
157+
}
158+
}
159+
138160
override def fetchEpochEndOffsets(partitions: util.Map[TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition]): util.Map[TopicPartition, EpochEndOffset] = {
139161
partitions.asScala.map { case (tp, epochData) =>
140162
try {

core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ class RemoteLeaderEndPoint(logPrefix: String,
105105
fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
106106
}
107107

108+
override def fetchEarliestPendingUploadOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
109+
fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
110+
}
111+
108112
private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, timestamp: Long): OffsetAndEpoch = {
109113
val topic = new ListOffsetsTopic()
110114
.setName(topicPartition.topic)

core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ class ReplicaAlterLogDirsThread(name: String,
6868
replicaMgr.futureLocalLogOrException(topicPartition).endOffsetForEpoch(epoch)
6969
}
7070

71+
override protected def shouldUseTieredOffsetStrategy(topicPartition: TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean = false
72+
7173
// process fetched data
7274
override def processPartitionData(
7375
topicPartition: TopicPartition,

core/src/main/scala/kafka/server/ReplicaFetcherThread.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,17 @@ class ReplicaFetcherThread(name: String,
6363
replicaMgr.localLogOrException(topicPartition).endOffsetForEpoch(epoch)
6464
}
6565

66+
override protected def shouldUseTieredOffsetStrategy(topicPartition: TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean = {
67+
val isCompactTopic = replicaMgr.localLog(topicPartition).exists(_.config.compact)
68+
val remoteStorageEnabled = replicaMgr.localLog(topicPartition).exists(_.remoteLogEnabled())
69+
70+
brokerConfig.followerFetchLastTieredOffsetEnable &&
71+
remoteStorageEnabled &&
72+
!isCompactTopic &&
73+
replicaEndOffset == 0 &&
74+
leaderEndOffset != 0
75+
}
76+
6677
override def initiateShutdown(): Boolean = {
6778
val justShutdown = super.initiateShutdown()
6879
if (justShutdown) {

core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package kafka.server
1919
import com.yammer.metrics.core.Gauge
2020
import kafka.utils.TestUtils
2121
import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData}
22-
import org.apache.kafka.common.message.FetchResponseData.PartitionData
2322
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
2423
import org.apache.kafka.common.requests.FetchRequest
2524
import org.apache.kafka.common.utils.Utils
@@ -317,10 +316,13 @@ class AbstractFetcherManagerTest {
317316
override val isTruncationOnFetchSupported: Boolean = false
318317

319318
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0)
319+
320+
override def fetchEarliestPendingUploadOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = ???
320321
}
321322

322323
private class MockResizeFetcherTierStateMachine extends TierStateMachine(null, null, false) {
323-
override def start(topicPartition: TopicPartition, currentFetchState: PartitionFetchState, fetchPartitionData: PartitionData): PartitionFetchState = {
324+
325+
override def start(topicPartition: TopicPartition, topicId: Optional[Uuid], currentLeaderEpoch: Int, epochAndStartingOffset: OffsetAndEpoch, leaderLogStartOffset: Long): PartitionFetchState = {
324326
throw new UnsupportedOperationException("Materializing tier state is not supported in this test.")
325327
}
326328
}
@@ -353,6 +355,8 @@ class AbstractFetcherManagerTest {
353355
override protected def logEndOffset(topicPartition: TopicPartition): Long = 1
354356

355357
override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Optional[OffsetAndEpoch] = Optional.of(new OffsetAndEpoch(1, 0))
358+
359+
override protected def shouldUseTieredOffsetStrategy(topicPartition: TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean = ???
356360
}
357361

358362
}

0 commit comments

Comments
 (0)