Skip to content

Conversation

abhijeetk88
Copy link
Contributor

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.

@abhijeetk88 abhijeetk88 marked this pull request as draft August 28, 2025 07:05
@github-actions github-actions bot added the triage PRs from the community label Aug 28, 2025
@abhijeetk88 abhijeetk88 changed the title [WIP] KAFKA-17302 KAFKA-17302 Aug 28, 2025
@github-actions github-actions bot added core Kafka Broker storage Pull requests that target the storage module labels Aug 28, 2025
@abhijeetk88 abhijeetk88 force-pushed the KAFKA-17302 branch 2 times, most recently from c125bd0 to c07a3f6 Compare August 28, 2025 09:28
@abhijeetk88 abhijeetk88 changed the title KAFKA-17302 KAFKA-17302: ReplicaFetcher changes for fetching from tiered offset Aug 28, 2025
@abhijeetk88 abhijeetk88 force-pushed the KAFKA-17302 branch 8 times, most recently from 408fb46 to 72a4dbf Compare September 1, 2025 05:31
Copy link
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @abhijeetk88 for the patch! Went over the first pass.

Could you explain why the TierStateMachine#buildRemoteLogAuxState is not used to decide the offset-to-fetch either leaderLocalStartOffset or lastTieredOffset?

It fits nicely:

  1. Minimal code change and maintenance.
  2. If this code change is to handle the case when leader-log-start-offset == leader-local-log-start-offset == 0 and earliestUploadOffset > 0 (yet to be approved), then we may have to discuss it before proceeding with the patch. It is one of the edge-case but not sure how useful it would be in practice.

@@ -182,7 +179,7 @@ private void buildProducerSnapshotFile(UnifiedLog unifiedLog,
* fetching records from the leader. The return value is the next offset to fetch from the leader, which is the
* next offset following the end offset of the remote log portion.
*/
private Long buildRemoteLogAuxState(TopicPartition topicPartition,
protected Long buildRemoteLogAuxState(TopicPartition topicPartition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fix alignment

val earliestPendingUploadOffset = Math.max(highestRemoteOffset + 1, logStartOffset.offset())
val epoch = log.leaderEpochCache.epochForOffset(earliestPendingUploadOffset)

new OffsetAndEpoch(earliestPendingUploadOffset, epoch.orElse(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

epoch 0 is vaild. If the epoch is empty, shall we return -1?

val leaderState = leaderPartitionState(topicPartition)
checkLeaderEpochAndThrow(leaderEpoch, leaderState)
val offsetAndEpoch = leaderState.earliestPendingUploadOffset match {
case -1L => (-1L, -1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the OffsetAndEpoch be returned directly?

leaderState.earliestPendingUploadOffset match {
  case -1L => new OffsetAndEpoch(-1L, -1)
  case _ => new OffsetAndEpoch(math.max(leaderState.earliestPendingUploadOffset, leaderState.logStartOffset), leaderState.leaderEpoch)
}

expected: Boolean): Unit = {
val tp = new TopicPartition("t", 0)

def runScenario(enableLastTieredOffsetFetch: Boolean,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this internal runScenario method? Can this be flattened?

assertEquals(
      expected,
      thread.callShouldFetchFromLastTieredOffset(tp, leaderEndOffset, replicaEndOffset)
    )

)
}

private class MockReplicaFetcherThread(name: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this MockReplicaFetcherThread and expose the shouldFetchFromLastTieredOffset in RFT to server package?

override protected[server] def shouldFetchFromLastTieredOffset(topicPartition: TopicPartition, leaderEndOffset: Long, replicaEndOffset: Long): Boolean 

@github-actions github-actions bot removed the triage PRs from the community label Sep 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker storage Pull requests that target the storage module
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants