-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-18180: Move OffsetResultHolder to storage module #18100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
return result; | ||
} | ||
|
||
public static class FileRecordsOrError { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this name is precise enough, but I can't think of a better idea right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I carried out an initial pass - thank you for the changes!
try { | ||
// If it is not found in remote storage, then search in the local storage starting with local log start offset. | ||
Option<FileRecords.TimestampAndOffset> timestampAndOffsetOpt = | ||
OptionConverters.toScala(rlm.findOffsetByTimestamp(tp, timestamp, startingOffset, leaderEpochCache)) | ||
.orElse(searchInLocalLog::get); | ||
result = Right.apply(timestampAndOffsetOpt); | ||
result = new OffsetResultHolder.FileRecordsOrError(Optional.empty(), | ||
timestampAndOffsetOpt.isDefined() ? Optional.of(timestampAndOffsetOpt.get()) : Optional.empty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't timestampAndOffsetOpt.isDefined() ? Optional.of(timestampAndOffsetOpt.get()) : Optional.empty()
be simplified to just timestampAndOffsetOpt
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I think I answered my own question - one is a Scala Option and the other is a Java Optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still, maybe something like https://www.scala-lang.org/api/2.13.6/scala/jdk/OptionConverters$.html will simplify this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I think I answered my own question - one is a Scala Option and the other is a Java Optional?
Yes, Scala Option and Java Optional is different type
Still, maybe something like https://www.scala-lang.org/api/2.13.6/scala/jdk/OptionConverters$.html will simplify this?
It a good idea, I will address it :)
case ListOffsetsRequest.EARLIEST_TIMESTAMP | ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP => | ||
getOffsetByTimestamp | ||
case _ => | ||
val offsetResultHolder = getOffsetByTimestamp | ||
offsetResultHolder.maybeOffsetsError = maybeOffsetsError | ||
offsetResultHolder.lastFetchableOffset = Some(lastFetchableOffset) | ||
offsetResultHolder.maybeOffsetsError(if (maybeOffsetsError.isDefined) Optional.of(maybeOffsetsError.get) else Optional.empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question
this.futureHolderOpt = futureHolderOpt; | ||
} | ||
|
||
public OffsetResultHolder(Optional<TimestampAndOffset> timestampAndOffsetOpt) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a downside to defining a few more constructors which take the concrete type and create an Optional internally? I believe this will save you a lot of Optional.of(new Something()
elsewhere, no?
I had opened a PR moving only AsyncOffsetReadFutureHolder in #18095 |
Heya @m1a2st, I reviewed, approved and merged Mickael's PR. If you rebase this PR on top of trunk, I can then review it as well |
# Conflicts: # core/src/main/scala/kafka/log/OffsetResultHolder.scala # core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala # core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala
Hello @clolov, I have been merge trunk into this PR, Thanks for your review :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the changes and apologies for the delay!
I believe there were failures in trunk after this build. Checking why and issuing a fix |
There is the hotfix |
Ah, thank you, reviewing! |
…e-old-protocol-versions * apache-github/trunk: KAFKA-18312: Added entityType: topicName to SubscribedTopicNames in ShareGroupHeartbeatRequest.json (apache#18285) HOTFIX: fix incompatible types: Optional<TimestampAndOffset> cannot be converted to Option<TimestampAndOffset> (apache#18284) MINOR Fix some test-catalog issues (apache#18272) KAFKA-18180: Move OffsetResultHolder to storage module (apache#18100) KAFKA-18301; Make coordinator records first class citizen (apache#18261) KAFKA-18262 Remove DefaultPartitioner and UniformStickyPartitioner (apache#18204) KAFKA-18296 Remove deprecated KafkaBasedLog constructor (apache#18257) KAFKA-12829: Remove old Processor and ProcessorSupplier interfaces (apache#18238) KAFKA-18292 Remove deprecated methods of UpdateFeaturesOptions (apache#18245) KAFKA-12829: Remove deprecated Topology#addProcessor of old Processor API (apache#18154) KAFKA-18035, KAFKA-18306, KAFKA-18092: Address TransactionsTest flaky tests (apache#18264) MINOR: change the default linger time in the new coordinator (apache#18274) KAFKA-18305: validate controller.listener.names is not in inter.broker.listener.name for kcontrollers (apache#18222) KAFKA-18207: Serde for handling transaction records (apache#18136) KAFKA-13722: Refactor Kafka Streams store interfaces (apache#18243) KAFKA-17131: Refactor TimeDefinitions (apache#18241) MINOR: Fix MessageFormatters (apache#18266) Mark flaky tests for Dec 18, 2024 (apache#18263)
Reviewers: Christo Lolov <[email protected]>
Fyi, I cherry-picked this to 4.0 as it made it easier to then cherry-pick #18321. |
Reviewers: Christo Lolov <[email protected]>
Jira: https://issues.apache.org/jira/browse/KAFKA-18180
as title
Committer Checklist (excluded from commit message)