Skip to content

KAFKA-19301: Move and rewrite partition state classes to Java in org.apache.kafka.storage.internals.log #20083

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
61d3b8e
move PartitionListener.java
joshua2519 Jun 26, 2025
3df2115
move AlterPartitionListener
joshua2519 Jun 27, 2025
740fb12
move AssignmentState and PartitionState
joshua2519 Jul 1, 2025
90c8318
check
joshua2519 Jul 2, 2025
93c20a2
change to java collection
joshua2519 Jul 2, 2025
238ed5f
Merge branch 'trunk' into move-partial-Partition-scala-to-storage-int…
joshua2519 Jul 2, 2025
929b42f
Merge branch 'trunk' into move-partial-Partition-scala-to-storage-int…
joshua2519 Jul 4, 2025
4c3efde
remove unnesscessry override
joshua2519 Jul 4, 2025
abb764a
Merge branch 'trunk' into move-partial-Partition-scala-to-storage-int…
joshua2519 Jul 8, 2025
94f5b00
remove override
joshua2519 Jul 8, 2025
b97411e
fix boolean
joshua2519 Jul 8, 2025
daefe12
fix boolean
joshua2519 Jul 8, 2025
5ed44aa
fix collection copy
joshua2519 Jul 8, 2025
9309b7e
change LeaderAndIsr.isr to return immutable set
joshua2519 Jul 8, 2025
075753f
Merge branch 'trunk' into move-partial-Partition-scala-to-storage-int…
joshua2519 Jul 8, 2025
19bdd00
fix collection copy
joshua2519 Jul 9, 2025
91013d9
to Set
joshua2519 Jul 9, 2025
75193f0
Merge remote-tracking branch 'upstream/trunk' into move-partial-Parti…
joshua2519 Jul 16, 2025
c0f3ea3
Merge remote-tracking branch 'upstream/trunk' into move-partial-Parti…
joshua2519 Jul 17, 2025
a5e3e62
initialize isr as immutable
joshua2519 Jul 17, 2025
0ef88b9
Decouple AlterPartitionListener from PendingPartitionChange.
joshua2519 Jul 17, 2025
854f343
Merge remote-tracking branch 'upstream/trunk' into move-partial-Parti…
joshua2519 Jul 17, 2025
3c9a1d8
fix comments
joshua2519 Jul 17, 2025
2a761fc
Merge remote-tracking branch 'upstream/trunk' into move-partial-Parti…
joshua2519 Jul 31, 2025
b43bb0b
remove default implementation in interface
joshua2519 Jul 31, 2025
b50256b
remove unmodifiableset
joshua2519 Jul 31, 2025
b5979a0
make list immutable
joshua2519 Jul 31, 2025
1fc3399
implement methods in subclass
joshua2519 Jul 31, 2025
c5d1718
fix import
joshua2519 Jul 31, 2025
51a55d2
Merge remote-tracking branch 'upstream/trunk' into move-partial-Parti…
joshua2519 Aug 6, 2025
b264418
compact constructor
joshua2519 Aug 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package kafka.server.share;

import kafka.cluster.PartitionListener;
import kafka.server.ReplicaManager;

import org.apache.kafka.common.TopicIdPartition;
Expand Down Expand Up @@ -59,6 +58,7 @@
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.PartitionListener;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;

import org.slf4j.Logger;
Expand Down
214 changes: 34 additions & 180 deletions core/src/main/scala/kafka/cluster/Partition.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package kafka.coordinator.group

import kafka.cluster.PartitionListener
import kafka.server.ReplicaManager
import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
import org.apache.kafka.common.protocol.Errors
Expand All @@ -25,7 +24,7 @@ import org.apache.kafka.coordinator.common.runtime.PartitionWriter
import org.apache.kafka.server.ActionQueue
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, PartitionListener, VerificationGuard}

import java.util.concurrent.CompletableFuture
import scala.collection.Map
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package kafka.server

import com.yammer.metrics.core.Meter
import kafka.cluster.{Partition, PartitionListener}
import kafka.cluster.Partition
import kafka.controller.StateChangeLogger
import kafka.log.LogManager
import kafka.server.HostedPartition.Online
Expand Down Expand Up @@ -65,7 +65,7 @@ import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, LogReadResult, common}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard,PartitionListener}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats

import java.io.File
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,AlterPartitionListener}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.mockito.ArgumentMatchers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
*/
package kafka.cluster

import org.apache.kafka.storage.internals.log.SimpleAssignmentState
import org.apache.kafka.common.PartitionState
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}

import java.util

import scala.jdk.CollectionConverters._

object AssignmentStateTest {
import AbstractPartitionTest._

Expand Down Expand Up @@ -88,7 +87,7 @@ class AssignmentStateTest extends AbstractPartitionTest {
@MethodSource(Array("parameters"))
def testPartitionAssignmentStatus(isr: util.List[Integer], replicas: util.List[Integer],
adding: util.List[Integer], removing: util.List[Integer],
original: util.List[Int], isUnderReplicated: Boolean): Unit = {
original: util.List[Integer], isUnderReplicated: Boolean): Unit = {
val leaderState = new PartitionState()
.setLeader(brokerId)
.setLeaderEpoch(6)
Expand All @@ -105,7 +104,7 @@ class AssignmentStateTest extends AbstractPartitionTest {

// set the original replicas as the URP calculation will need them
if (!original.isEmpty)
partition.assignmentState = SimpleAssignmentState(original.asScala)
partition.assignmentState = new SimpleAssignmentState(original)
// do the test
partition.makeLeader(leaderState, offsetCheckpoints, None)
assertEquals(isReassigning, partition.isReassigning)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetsListener, LogSegments, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AlterPartitionListener, AppendOrigin, CleanerConfig, CommittedPartitionState, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetsListener, LogSegments, PendingShrinkIsr, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
Expand Down
Loading