diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 5f0bf1fa2394a..b0328e7cd8cfd 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -16,7 +16,6 @@ */ package kafka.server.share; -import kafka.cluster.PartitionListener; import kafka.server.ReplicaManager; import org.apache.kafka.common.TopicIdPartition; @@ -59,6 +58,7 @@ import org.apache.kafka.server.util.timer.SystemTimerReaper; import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.server.util.timer.TimerTask; +import org.apache.kafka.storage.internals.log.PartitionListener; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.slf4j.Logger; diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index d896217b2fe7b..e05b0aa43e91f 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -42,7 +42,7 @@ import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCac import org.apache.kafka.server.common.RequestLocal import org.apache.kafka.server.log.remote.TopicPartitionLog import org.apache.kafka.server.log.remote.storage.RemoteLogManager -import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AlterPartitionListener, AppendOrigin, AssignmentState, AsyncOffsetReader, CommittedPartitionState, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, OngoingReassignmentState, PartitionListener, PartitionState, PendingExpandIsr, PendingPartitionChange, PendingShrinkIsr, SimpleAssignmentState, UnifiedLog, VerificationGuard} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.replica.Replica @@ -51,51 +51,12 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, Unexpec import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints import org.slf4j.event.Level +import java.util import scala.collection.Seq import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters.{RichOption, RichOptional} import scala.jdk.javaapi.OptionConverters -/** - * Listener receives notification from an Online Partition. - * - * A listener can be (re-)registered to an Online partition only. The listener - * is notified as long as the partition remains Online. When the partition fails - * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further - * notifications are sent after this point on. - * - * Note that the callbacks are executed in the thread that triggers the change - * AND that locks may be held during their execution. They are meant to be used - * as notification mechanism only. - */ -trait PartitionListener { - /** - * Called when the Log increments its high watermark. - */ - def onHighWatermarkUpdated(partition: TopicPartition, offset: Long): Unit = {} - - /** - * Called when the Partition (or replica) on this broker has a failure (e.g. goes offline). - */ - def onFailed(partition: TopicPartition): Unit = {} - - /** - * Called when the Partition (or replica) on this broker is deleted. Note that it does not mean - * that the partition was deleted but only that this broker does not host a replica of it any more. - */ - def onDeleted(partition: TopicPartition): Unit = {} - - /** - * Called when the Partition on this broker is transitioned to follower. - */ - def onBecomingFollower(partition: TopicPartition): Unit = {} -} - -trait AlterPartitionListener { - def markIsrExpand(): Unit - def markIsrShrink(): Unit - def markFailed(): Unit -} class DelayedOperations(topicId: Option[Uuid], topicPartition: TopicPartition, @@ -177,117 +138,6 @@ object Partition { } } - -sealed trait AssignmentState { - def replicas: Seq[Int] - def replicationFactor: Int = replicas.size - def isAddingReplica(brokerId: Int): Boolean = false -} - -case class OngoingReassignmentState(addingReplicas: Seq[Int], - removingReplicas: Seq[Int], - replicas: Seq[Int]) extends AssignmentState { - - override def replicationFactor: Int = replicas.diff(addingReplicas).size // keep the size of the original replicas - override def isAddingReplica(replicaId: Int): Boolean = addingReplicas.contains(replicaId) -} - -case class SimpleAssignmentState(replicas: Seq[Int]) extends AssignmentState - - -sealed trait PartitionState { - /** - * Includes only the in-sync replicas which have been committed to ZK. - */ - def isr: Set[Int] - - /** - * This set may include un-committed ISR members following an expansion. This "effective" ISR is used for advancing - * the high watermark as well as determining which replicas are required for acks=all produce requests. - * - * Only applicable as of IBP 2.7-IV2, for older versions this will return the committed ISR - */ - def maximalIsr: Set[Int] - - /** - * The leader recovery state. See the description for LeaderRecoveryState for details on the different values. - */ - def leaderRecoveryState: LeaderRecoveryState - - /** - * Indicates if we have an AlterPartition request inflight. - */ - def isInflight: Boolean -} - -sealed trait PendingPartitionChange extends PartitionState { - def lastCommittedState: CommittedPartitionState - def sentLeaderAndIsr: LeaderAndIsr - - override val leaderRecoveryState: LeaderRecoveryState = LeaderRecoveryState.RECOVERED - - def notifyListener(alterPartitionListener: AlterPartitionListener): Unit -} - -case class PendingExpandIsr( - newInSyncReplicaId: Int, - sentLeaderAndIsr: LeaderAndIsr, - lastCommittedState: CommittedPartitionState -) extends PendingPartitionChange { - val isr: Set[Int] = lastCommittedState.isr - val maximalIsr: Set[Int] = isr + newInSyncReplicaId - val isInflight = true - - def notifyListener(alterPartitionListener: AlterPartitionListener): Unit = { - alterPartitionListener.markIsrExpand() - } - - override def toString: String = { - s"PendingExpandIsr(newInSyncReplicaId=$newInSyncReplicaId" + - s", sentLeaderAndIsr=$sentLeaderAndIsr" + - s", leaderRecoveryState=$leaderRecoveryState" + - s", lastCommittedState=$lastCommittedState" + - ")" - } -} - -case class PendingShrinkIsr( - outOfSyncReplicaIds: Set[Int], - sentLeaderAndIsr: LeaderAndIsr, - lastCommittedState: CommittedPartitionState -) extends PendingPartitionChange { - val isr: Set[Int] = lastCommittedState.isr - val maximalIsr: Set[Int] = isr - val isInflight = true - - def notifyListener(alterPartitionListener: AlterPartitionListener): Unit = { - alterPartitionListener.markIsrShrink() - } - - override def toString: String = { - s"PendingShrinkIsr(outOfSyncReplicaIds=$outOfSyncReplicaIds" + - s", sentLeaderAndIsr=$sentLeaderAndIsr" + - s", leaderRecoveryState=$leaderRecoveryState" + - s", lastCommittedState=$lastCommittedState" + - ")" - } -} - -case class CommittedPartitionState( - isr: Set[Int], - leaderRecoveryState: LeaderRecoveryState -) extends PartitionState { - val maximalIsr: Set[Int] = isr - val isInflight = false - - override def toString: String = { - s"CommittedPartitionState(isr=$isr" + - s", leaderRecoveryState=$leaderRecoveryState" + - ")" - } -} - - /** * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR * @@ -338,8 +188,8 @@ class Partition(val topicPartition: TopicPartition, @volatile private[cluster] var leaderEpochStartOffsetOpt: Option[Long] = None // Replica ID of the leader, defined when this broker is leader or follower for the partition. @volatile var leaderReplicaIdOpt: Option[Int] = None - @volatile private[cluster] var partitionState: PartitionState = CommittedPartitionState(Set.empty, LeaderRecoveryState.RECOVERED) - @volatile var assignmentState: AssignmentState = SimpleAssignmentState(Seq.empty) + @volatile private[cluster] var partitionState: PartitionState = new CommittedPartitionState(util.Set.of(), LeaderRecoveryState.RECOVERED) + @volatile var assignmentState: AssignmentState = new SimpleAssignmentState(util.List.of()) // Logs belonging to this partition. Majority of time it will be only one log, but if log directory // is getting changed (as a result of ReplicaAlterLogDirs command), we may have two logs until copy @@ -410,7 +260,7 @@ class Partition(val topicPartition: TopicPartition, // Visible for testing def removeExpiredProducers(currentTimeMs: Long): Unit = log.foreach(_.removeExpiredProducers(currentTimeMs)) - def inSyncReplicaIds: Set[Int] = partitionState.isr + def inSyncReplicaIds: Set[Int] = partitionState.isr.asScala.map(_.toInt).toSet def maybeAddListener(listener: PartitionListener): Boolean = { inReadLock(leaderIsrUpdateLock) { @@ -713,10 +563,10 @@ class Partition(val topicPartition: TopicPartition, private def clear(): Unit = { remoteReplicasMap.clear() - assignmentState = SimpleAssignmentState(Seq.empty) + assignmentState = new SimpleAssignmentState(util.List.of()) log = None futureLog = None - partitionState = CommittedPartitionState(Set.empty, LeaderRecoveryState.RECOVERED) + partitionState = new CommittedPartitionState(util.Set.of(), LeaderRecoveryState.RECOVERED) leaderReplicaIdOpt = None leaderEpochStartOffsetOpt = None Partition.removeMetrics(topicPartition) @@ -995,11 +845,11 @@ class Partition(val topicPartition: TopicPartition, } assignmentState = if (addingReplicas.nonEmpty || removingReplicas.nonEmpty) - OngoingReassignmentState(addingReplicas, removingReplicas, replicas) + new OngoingReassignmentState(addingReplicas.map(Int.box).asJava, removingReplicas.map(Int.box).asJava, replicas.map(Int.box).asJava) else - SimpleAssignmentState(replicas) + new SimpleAssignmentState(replicas.map(Int.box).asJava) - partitionState = CommittedPartitionState(isr, leaderRecoveryState) + partitionState = new CommittedPartitionState(isr.map(Int.box).asJava, leaderRecoveryState) } /** @@ -1098,7 +948,7 @@ class Partition(val topicPartition: TopicPartition, case (brokerId, logEndOffset) => s"broker $brokerId: $logEndOffset" } - val curInSyncReplicaObjects = (curMaximalIsr - localBrokerId).flatMap(getReplica) + val curInSyncReplicaObjects = (curMaximalIsr.asScala.map(_.toInt) - localBrokerId).flatMap(getReplica) val replicaInfo = curInSyncReplicaObjects.map(replica => (replica.brokerId, replica.stateSnapshot.logEndOffset)) val localLogInfo = (localBrokerId, localLogOrException.logEndOffset) val (ackedReplicas, awaitingReplicas) = (replicaInfo + localLogInfo).partition { _._2 >= requiredOffset} @@ -1252,8 +1102,8 @@ class Partition(val topicPartition: TopicPartition, .getOrElse("unknown") s"(brokerId: $replicaId, endOffset: $logEndOffsetMessage, lastCaughtUpTimeMs: $lastCaughtUpTimeMessage)" }.mkString(" ") - val newIsrLog = (partitionState.isr -- outOfSyncReplicaIds).mkString(",") - info(s"Shrinking ISR from ${partitionState.isr.mkString(",")} to $newIsrLog. " + + val newIsrLog = partitionState.isr.asScala.map(_.toInt).diff(outOfSyncReplicaIds).mkString(",") + info(s"Shrinking ISR from ${partitionState.isr.asScala.mkString(",")} to $newIsrLog. " + s"Leader: (highWatermark: ${leaderLog.highWatermark}, " + s"endOffset: ${leaderLog.logEndOffset}). " + s"Out of sync replicas: $outOfSyncReplicaLog.") @@ -1298,7 +1148,7 @@ class Partition(val topicPartition: TopicPartition, def getOutOfSyncReplicas(maxLagMs: Long): Set[Int] = { val current = partitionState if (!current.isInflight) { - val candidateReplicaIds = current.isr - localBrokerId + val candidateReplicaIds = (current.isr.asScala.map(_.toInt) - localBrokerId).toSet val currentTimeMs = time.milliseconds() val leaderEndOffset = localLogOrException.logEndOffset candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, leaderEndOffset, currentTimeMs, maxLagMs)) @@ -1477,7 +1327,7 @@ class Partition(val topicPartition: TopicPartition, getReplica(replicaId).getOrElse { debug(s"Leader $localBrokerId failed to record follower $replicaId's position " + s"${fetchPartitionData.fetchOffset}, and last sent high watermark since the replica is " + - s"not recognized to be one of the assigned replicas ${assignmentState.replicas.mkString(",")} " + + s"not recognized to be one of the assigned replicas ${assignmentState.replicas.asScala.mkString(",")} " + s"for leader epoch $leaderEpoch with partition epoch $partitionEpoch") val error = if (fetchPartitionData.currentLeaderEpoch.isPresent) { @@ -1754,7 +1604,7 @@ class Partition(val topicPartition: TopicPartition, // reflect the updated ISR even if there is a delay before we receive the confirmation. // Alternatively, if the update fails, no harm is done since the expanded ISR puts // a stricter requirement for advancement of the HW. - val isrToSend = partitionState.isr + newInSyncReplicaId + val isrToSend = partitionState.isr.asScala.map(_.toInt) + newInSyncReplicaId val isrWithBrokerEpoch = addBrokerEpochToIsr(isrToSend.toList).asJava val newLeaderAndIsr = new LeaderAndIsr( localBrokerId, @@ -1763,7 +1613,7 @@ class Partition(val topicPartition: TopicPartition, isrWithBrokerEpoch, partitionEpoch ) - val updatedState = PendingExpandIsr( + val updatedState = new PendingExpandIsr( newInSyncReplicaId, newLeaderAndIsr, currentState @@ -1779,7 +1629,7 @@ class Partition(val topicPartition: TopicPartition, // When shrinking the ISR, we cannot assume that the update will succeed as this could // erroneously advance the HW if the `AlterPartition` were to fail. Hence the "maximal ISR" // for `PendingShrinkIsr` is the current ISR. - val isrToSend = partitionState.isr -- outOfSyncReplicaIds + val isrToSend = partitionState.isr.asScala.map(_.toInt).diff(outOfSyncReplicaIds) val isrWithBrokerEpoch = addBrokerEpochToIsr(isrToSend.toList).asJava val newLeaderAndIsr = new LeaderAndIsr( localBrokerId, @@ -1788,8 +1638,8 @@ class Partition(val topicPartition: TopicPartition, isrWithBrokerEpoch, partitionEpoch ) - val updatedState = PendingShrinkIsr( - outOfSyncReplicaIds, + val updatedState = new PendingShrinkIsr( + outOfSyncReplicaIds.map(Int.box).asJava, newLeaderAndIsr, currentState ) @@ -1949,13 +1799,15 @@ class Partition(val topicPartition: TopicPartition, // 2) leaderAndIsr.partitionEpoch == partitionEpoch: No update was performed since proposed and actual state are the same. // In both cases, we want to move from Pending to Committed state to ensure new updates are processed. - partitionState = CommittedPartitionState(leaderAndIsr.isr.asScala.map(_.toInt).toSet, leaderAndIsr.leaderRecoveryState) + partitionState = new CommittedPartitionState(leaderAndIsr.isr, leaderAndIsr.leaderRecoveryState) partitionEpoch = leaderAndIsr.partitionEpoch - info(s"ISR updated to ${partitionState.isr.mkString(",")} ${if (isUnderMinIsr) "(under-min-isr)" else ""} " + + info(s"ISR updated to ${partitionState.isr.asScala.mkString(",")} ${if (isUnderMinIsr) "(under-min-isr)" else ""} " + s"and version updated to $partitionEpoch") - - proposedIsrState.notifyListener(alterPartitionListener) - + proposedIsrState match { + case _: PendingExpandIsr => alterPartitionListener.markIsrExpand() + case _: PendingShrinkIsr => alterPartitionListener.markIsrShrink() + case _ => + } // we may need to increment high watermark since ISR could be down to 1 leaderLogIfLocal.exists(log => maybeIncrementLeaderHW(log)) } @@ -1974,12 +1826,14 @@ class Partition(val topicPartition: TopicPartition, partitionString.append("Topic: " + topic) partitionString.append("; Partition: " + partitionId) partitionString.append("; Leader: " + leaderReplicaIdOpt) - partitionString.append("; Replicas: " + assignmentState.replicas.mkString(",")) - partitionString.append("; ISR: " + partitionState.isr.mkString(",")) + partitionString.append("; Replicas: " + assignmentState.replicas.asScala.mkString(",")) + partitionString.append("; ISR: " + partitionState.isr.asScala.mkString(",")) assignmentState match { - case OngoingReassignmentState(adding, removing, _) => - partitionString.append("; AddingReplicas: " + adding.mkString(",")) - partitionString.append("; RemovingReplicas: " + removing.mkString(",")) + case s: OngoingReassignmentState => + val adding = s.addingReplicas() + val removing = s.removingReplicas() + partitionString.append("; AddingReplicas: " + adding.asScala.mkString(",")) + partitionString.append("; RemovingReplicas: " + removing.asScala.mkString(",")) case _ => } partitionString.append("; LeaderRecoveryState: " + partitionState.leaderRecoveryState) diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index dbbdbb09868e8..104f2893f7e06 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -16,7 +16,6 @@ */ package kafka.coordinator.group -import kafka.cluster.PartitionListener import kafka.server.ReplicaManager import org.apache.kafka.common.{TopicIdPartition, TopicPartition} import org.apache.kafka.common.protocol.Errors @@ -25,7 +24,7 @@ import org.apache.kafka.coordinator.common.runtime.PartitionWriter import org.apache.kafka.server.ActionQueue import org.apache.kafka.server.common.RequestLocal import org.apache.kafka.server.transaction.AddPartitionsToTxnManager -import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, PartitionListener, VerificationGuard} import java.util.concurrent.CompletableFuture import scala.collection.Map diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8fa705ef8c4ba..2b59d319f3c5d 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -17,7 +17,7 @@ package kafka.server import com.yammer.metrics.core.Meter -import kafka.cluster.{Partition, PartitionListener} +import kafka.cluster.Partition import kafka.controller.StateChangeLogger import kafka.log.LogManager import kafka.server.HostedPartition.Online @@ -65,7 +65,7 @@ import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask} import org.apache.kafka.server.util.{Scheduler, ShutdownableThread} import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, LogReadResult, common} import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} -import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard,PartitionListener} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import java.io.File diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala index c15f62df51a6c..913ed9bade06c 100644 --- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala @@ -28,7 +28,7 @@ import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.config.ReplicationConfigs import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints -import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig} +import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,AlterPartitionListener} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach} import org.mockito.ArgumentMatchers diff --git a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala index 0f995661d3f5f..ebf3cc96793c9 100644 --- a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala @@ -16,6 +16,7 @@ */ package kafka.cluster +import org.apache.kafka.storage.internals.log.SimpleAssignmentState import org.apache.kafka.common.PartitionState import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.params.ParameterizedTest @@ -23,8 +24,6 @@ import org.junit.jupiter.params.provider.{Arguments, MethodSource} import java.util -import scala.jdk.CollectionConverters._ - object AssignmentStateTest { import AbstractPartitionTest._ @@ -88,7 +87,7 @@ class AssignmentStateTest extends AbstractPartitionTest { @MethodSource(Array("parameters")) def testPartitionAssignmentStatus(isr: util.List[Integer], replicas: util.List[Integer], adding: util.List[Integer], removing: util.List[Integer], - original: util.List[Int], isUnderReplicated: Boolean): Unit = { + original: util.List[Integer], isUnderReplicated: Boolean): Unit = { val leaderState = new PartitionState() .setLeader(brokerId) .setLeaderEpoch(6) @@ -105,7 +104,7 @@ class AssignmentStateTest extends AbstractPartitionTest { // set the original replicas as the URP calculation will need them if (!original.isEmpty) - partition.assignmentState = SimpleAssignmentState(original.asScala) + partition.assignmentState = new SimpleAssignmentState(original) // do the test partition.makeLeader(leaderState, offsetCheckpoints, None) assertEquals(isReassigning, partition.isReassigning) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 7e4800ce5b1bd..337d6d6633e46 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -39,7 +39,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams} import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetsListener, LogSegments, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AlterPartitionListener, AppendOrigin, CleanerConfig, CommittedPartitionState, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetsListener, LogSegments, PendingShrinkIsr, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index e309943427c46..d3bdd22b18a4f 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -63,7 +63,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, Unexpec import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AlterPartitionListener, AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, OngoingReassignmentState, PartitionListener, PendingShrinkIsr, ProducerStateManager, ProducerStateManagerConfig, SimpleAssignmentState, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -820,7 +820,7 @@ class PartitionTest extends AbstractPartitionTest { assertTrue(partition.makeLeader(leaderState, offsetCheckpoints, topicId), "Expected first makeLeader() to return 'leader changed'") assertEquals(leaderEpoch, partition.getLeaderEpoch, "Current leader epoch") - assertEquals(Set[Integer](leader, follower2), partition.partitionState.isr, "ISR") + assertEquals(util.Set.of(leader, follower2), partition.partitionState.isr, "ISR") val requestLocal = RequestLocal.withThreadConfinedCaching // after makeLeader(() call, partition should know about all the replicas @@ -1185,7 +1185,7 @@ class PartitionTest extends AbstractPartitionTest { .setIsNew(true) assertTrue(partition.makeLeader(leaderState, offsetCheckpoints, topicId), "Expected first makeLeader() to return 'leader changed'") assertEquals(leaderEpoch, partition.getLeaderEpoch, "Current leader epoch") - assertEquals(Set[Integer](leader, follower2), partition.partitionState.isr, "ISR") + assertEquals(util.Set.of(leader, follower2), partition.partitionState.isr, "ISR") val requestLocal = RequestLocal.withThreadConfinedCaching @@ -1228,15 +1228,15 @@ class PartitionTest extends AbstractPartitionTest { // fetch from follower not in ISR from log start offset should not add this follower to ISR fetchFollower(partition, replicaId = follower1, fetchOffset = 0) fetchFollower(partition, replicaId = follower1, fetchOffset = lastOffsetOfFirstBatch) - assertEquals(Set[Integer](leader, follower2), partition.partitionState.isr, "ISR") + assertEquals(util.Set.of(leader, follower2), partition.partitionState.isr, "ISR") // fetch from the follower not in ISR from start offset of the current leader epoch should // add this follower to ISR fetchFollower(partition, replicaId = follower1, fetchOffset = currentLeaderEpochStartOffset) // Expansion does not affect the ISR - assertEquals(Set[Integer](leader, follower2), partition.partitionState.isr, "ISR") - assertEquals(Set[Integer](leader, follower1, follower2), partition.partitionState.maximalIsr, "ISR") + assertEquals(util.Set.of(leader, follower2), partition.partitionState.isr, "ISR") + assertEquals(util.Set.of(leader, follower1, follower2), partition.partitionState.maximalIsr, "ISR") assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr.asScala.toSet, Set(leader, follower1, follower2), "AlterIsr") } @@ -1457,7 +1457,7 @@ class PartitionTest extends AbstractPartitionTest { .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(Set(brokerId), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId), partition.partitionState.isr) assertReplicaState(partition, remoteBrokerId, lastCaughtUpTimeMs = 0L, @@ -1469,7 +1469,7 @@ class PartitionTest extends AbstractPartitionTest { // Check that the isr didn't change and alter update is scheduled assertEquals(Set(brokerId), partition.inSyncReplicaIds) - assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) + assertEquals(util.Set.of(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) assertEquals(1, alterPartitionManager.isrUpdates.size) assertEquals(Set(brokerId, remoteBrokerId), alterPartitionManager.isrUpdates.head.leaderAndIsr.isr.asScala.toSet) @@ -1478,7 +1478,7 @@ class PartitionTest extends AbstractPartitionTest { // Still no ISR change and no retry assertEquals(Set(brokerId), partition.inSyncReplicaIds) - assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) + assertEquals(util.Set.of(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) assertEquals(0, alterPartitionManager.isrUpdates.size) assertEquals(0, alterPartitionListener.expands.get) @@ -1507,7 +1507,7 @@ class PartitionTest extends AbstractPartitionTest { .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(Set(brokerId), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId), partition.partitionState.isr) assertReplicaState(partition, remoteBrokerId, lastCaughtUpTimeMs = 0L, @@ -1516,7 +1516,7 @@ class PartitionTest extends AbstractPartitionTest { ) fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 3L) - assertEquals(Set(brokerId), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId), partition.partitionState.isr) assertReplicaState(partition, remoteBrokerId, lastCaughtUpTimeMs = 0L, logStartOffset = 0L, @@ -1526,13 +1526,13 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L) assertEquals(alterPartitionManager.isrUpdates.size, 1) val isrItem = alterPartitionManager.isrUpdates.head - assertEquals(isrItem.leaderAndIsr.isr, util.List.of[Integer](brokerId, remoteBrokerId)) + assertEquals(isrItem.leaderAndIsr.isr, util.Set.of[Integer](brokerId, remoteBrokerId)) isrItem.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState => // the broker epochs should be equal to broker epoch of the leader assertEquals(defaultBrokerEpoch(brokerState.brokerId()), brokerState.brokerEpoch()) } - assertEquals(Set(brokerId), partition.partitionState.isr) - assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) + assertEquals(util.Set.of(brokerId), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) assertReplicaState(partition, remoteBrokerId, lastCaughtUpTimeMs = time.milliseconds(), logStartOffset = 0L, @@ -1541,7 +1541,7 @@ class PartitionTest extends AbstractPartitionTest { // Complete the ISR expansion alterPartitionManager.completeIsrUpdate(2) - assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId, remoteBrokerId), partition.partitionState.isr) assertEquals(alterPartitionListener.expands.get, 1) assertEquals(alterPartitionListener.shrinks.get, 0) @@ -1569,7 +1569,7 @@ class PartitionTest extends AbstractPartitionTest { .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(Set(brokerId), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId), partition.partitionState.isr) assertReplicaState(partition, remoteBrokerId, lastCaughtUpTimeMs = 0L, @@ -1581,7 +1581,7 @@ class PartitionTest extends AbstractPartitionTest { // Follower state is updated, but the ISR has not expanded assertEquals(Set(brokerId), partition.inSyncReplicaIds) - assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) + assertEquals(util.Set.of(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) assertEquals(alterPartitionManager.isrUpdates.size, 1) assertReplicaState(partition, remoteBrokerId, lastCaughtUpTimeMs = time.milliseconds(), @@ -1594,7 +1594,7 @@ class PartitionTest extends AbstractPartitionTest { // Still no ISR change and it doesn't retry assertEquals(Set(brokerId), partition.inSyncReplicaIds) - assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) + assertEquals(util.Set.of(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) assertEquals(alterPartitionManager.isrUpdates.size, 0) assertEquals(alterPartitionListener.expands.get, 0) assertEquals(alterPartitionListener.shrinks.get, 0) @@ -1642,8 +1642,8 @@ class PartitionTest extends AbstractPartitionTest { ), "Expected become leader transition to succeed" ) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(replicas), partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas), partition.partitionState.maximalIsr) // Fetch to let the follower catch up to the log end offset fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) @@ -1685,8 +1685,8 @@ class PartitionTest extends AbstractPartitionTest { ) assertTrue(partition.isLeader) - assertEquals(util.Set.copyOf(shrinkedIsr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(shrinkedIsr).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(shrinkedIsr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(shrinkedIsr), partition.partitionState.maximalIsr) assertEquals(Set.empty, partition.getOutOfSyncReplicas(partition.replicaLagTimeMaxMs)) // In the case of unfenced, the HWM doesn't increase, otherwise the HWM increases because the @@ -1739,8 +1739,8 @@ class PartitionTest extends AbstractPartitionTest { .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.maximalIsr) markRemoteReplicaEligible(true) @@ -1756,16 +1756,16 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is triggered. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas), partition.partitionState.maximalIsr) assertEquals(1, alterPartitionManager.isrUpdates.size) // Controller rejects the expansion because the broker is fenced or offline. alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA) // The leader reverts back to the previous ISR. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -1776,8 +1776,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is not triggered because the follower is fenced. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -1788,8 +1788,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is triggered. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas), partition.partitionState.maximalIsr) assertTrue(partition.partitionState.isInflight) assertEquals(1, alterPartitionManager.isrUpdates.size) @@ -1797,8 +1797,8 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 1) // ISR is committed. - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(replicas), partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas), partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) } @@ -1843,8 +1843,8 @@ class PartitionTest extends AbstractPartitionTest { .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.maximalIsr) // Fetch to let the follower catch up to the log end offset, but using a wrong broker epoch. The expansion should fail. addBrokerEpochToMockMetadataCache(metadataCache, util.List.of[Integer](brokerId, remoteBrokerId2)) @@ -1865,8 +1865,8 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is not triggered. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.maximalIsr) assertEquals(0, alterPartitionManager.isrUpdates.size) // Fetch again, this time with correct default broker epoch. @@ -1883,8 +1883,8 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is triggered. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas), partition.partitionState.maximalIsr) assertEquals(1, alterPartitionManager.isrUpdates.size) val isrUpdate = alterPartitionManager.isrUpdates.head isrUpdate.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState => @@ -1931,8 +1931,8 @@ class PartitionTest extends AbstractPartitionTest { .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.maximalIsr) val expectedReplicaEpoch = defaultBrokerEpoch(remoteBrokerId1) fetchFollower(partition, @@ -1993,8 +1993,8 @@ class PartitionTest extends AbstractPartitionTest { .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.maximalIsr) // Fetch to let the follower catch up to the log end offset and // to check if an expansion is possible. @@ -2008,16 +2008,16 @@ class PartitionTest extends AbstractPartitionTest { ) // Expansion is triggered. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas), partition.partitionState.maximalIsr) assertEquals(1, alterPartitionManager.isrUpdates.size) // Controller rejects the expansion because the broker is in controlled shutdown. alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA) // The leader reverts back to the previous ISR. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -2028,8 +2028,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is not triggered because the follower is fenced. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) @@ -2040,8 +2040,8 @@ class PartitionTest extends AbstractPartitionTest { fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset) // Expansion is triggered. - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas), partition.partitionState.maximalIsr) assertTrue(partition.partitionState.isInflight) assertEquals(1, alterPartitionManager.isrUpdates.size) @@ -2049,8 +2049,8 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager.completeIsrUpdate(newPartitionEpoch= 1) // ISR is committed. - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(replicas), partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas), partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) } @@ -2082,9 +2082,9 @@ class PartitionTest extends AbstractPartitionTest { // Try to shrink the ISR partition.maybeShrinkIsr() assertEquals(alterPartitionManager.isrUpdates.size, 1) - assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, util.List.of[Integer](brokerId)) - assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr) - assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) + assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, util.Set.of[Integer](brokerId)) + assertEquals(util.Set.of(brokerId, remoteBrokerId), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) // The shrink fails and we retry alterPartitionManager.failIsrUpdate(Errors.NETWORK_EXCEPTION) @@ -2092,8 +2092,8 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(1, alterPartitionListener.failures.get) assertEquals(1, partition.getPartitionEpoch) assertEquals(alterPartitionManager.isrUpdates.size, 1) - assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr) - assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) + assertEquals(util.Set.of(brokerId, remoteBrokerId), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) assertEquals(0L, partition.localLogOrException.highWatermark) // The shrink succeeds after retrying @@ -2101,8 +2101,8 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(1, alterPartitionListener.shrinks.get) assertEquals(2, partition.getPartitionEpoch) assertEquals(alterPartitionManager.isrUpdates.size, 0) - assertEquals(Set(brokerId), partition.partitionState.isr) - assertEquals(Set(brokerId), partition.partitionState.maximalIsr) + assertEquals(util.Set.of(brokerId), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId), partition.partitionState.maximalIsr) assertEquals(log.logEndOffset, partition.localLogOrException.highWatermark) } @@ -2157,7 +2157,7 @@ class PartitionTest extends AbstractPartitionTest { // On initialization, the replica is considered caught up and should not be removed partition.maybeShrinkIsr() assertEquals(alterPartitionManager.isrUpdates.size, 0) - assertEquals(Set(brokerId, remoteBrokerId1, remoteBrokerId2), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId, remoteBrokerId1, remoteBrokerId2), partition.partitionState.isr) // If enough time passes without a fetch update, the ISR should shrink after the following maybeShrinkIsr time.sleep(partition.replicaLagTimeMaxMs + 1) @@ -2166,13 +2166,13 @@ class PartitionTest extends AbstractPartitionTest { partition.maybeShrinkIsr() assertEquals(0, alterPartitionListener.shrinks.get) assertEquals(alterPartitionManager.isrUpdates.size, 1) - assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, util.List.of[Integer](brokerId, remoteBrokerId1)) + assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, util.Set.of[Integer](brokerId, remoteBrokerId1)) val isrUpdate = alterPartitionManager.isrUpdates.head isrUpdate.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState => assertEquals(defaultBrokerEpoch(brokerState.brokerId()), brokerState.brokerEpoch()) } - assertEquals(Set(brokerId, remoteBrokerId1, remoteBrokerId2), partition.partitionState.isr) - assertEquals(Set(brokerId, remoteBrokerId1, remoteBrokerId2), partition.partitionState.maximalIsr) + assertEquals(util.Set.of(brokerId, remoteBrokerId1, remoteBrokerId2), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId, remoteBrokerId1, remoteBrokerId2), partition.partitionState.maximalIsr) assertEquals(0L, partition.localLogOrException.highWatermark) // After the ISR shrink completes, the ISR state should be updated and the @@ -2181,8 +2181,8 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(1, alterPartitionListener.shrinks.get) assertEquals(2, partition.getPartitionEpoch) assertEquals(alterPartitionManager.isrUpdates.size, 0) - assertEquals(Set(brokerId, remoteBrokerId1), partition.partitionState.isr) - assertEquals(Set(brokerId, remoteBrokerId1), partition.partitionState.maximalIsr) + assertEquals(util.Set.of(brokerId, remoteBrokerId1), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId, remoteBrokerId1), partition.partitionState.maximalIsr) assertEquals(log.logEndOffset, partition.localLogOrException.highWatermark) } @@ -2351,7 +2351,7 @@ class PartitionTest extends AbstractPartitionTest { // The ISR should not be shrunk because the follower has caught up with the leader at the // time of the first fetch. partition.maybeShrinkIsr() - assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId, remoteBrokerId), partition.partitionState.isr) assertEquals(alterPartitionManager.isrUpdates.size, 0) } @@ -2397,7 +2397,7 @@ class PartitionTest extends AbstractPartitionTest { // The ISR should not be shrunk because the follower is caught up to the leader's log end partition.maybeShrinkIsr() - assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId, remoteBrokerId), partition.partitionState.isr) assertEquals(alterPartitionManager.isrUpdates.size, 0) } @@ -2450,8 +2450,8 @@ class PartitionTest extends AbstractPartitionTest { def testAlterIsrNewLeaderElected(): Unit = { handleAlterIsrFailure(Errors.NEW_LEADER_ELECTED, (brokerId: Int, remoteBrokerId: Int, partition: Partition) => { - assertEquals(partition.partitionState.isr, Set(brokerId)) - assertEquals(partition.partitionState.maximalIsr, Set(brokerId, remoteBrokerId)) + assertEquals(partition.partitionState.isr, util.Set.of(brokerId)) + assertEquals(partition.partitionState.maximalIsr, util.Set.of(brokerId, remoteBrokerId)) assertEquals(alterPartitionManager.isrUpdates.size, 0) }) } @@ -2460,8 +2460,8 @@ class PartitionTest extends AbstractPartitionTest { def testAlterIsrUnknownTopic(): Unit = { handleAlterIsrFailure(Errors.UNKNOWN_TOPIC_OR_PARTITION, (brokerId: Int, remoteBrokerId: Int, partition: Partition) => { - assertEquals(partition.partitionState.isr, Set(brokerId)) - assertEquals(partition.partitionState.maximalIsr, Set(brokerId, remoteBrokerId)) + assertEquals(partition.partitionState.isr, util.Set.of(brokerId)) + assertEquals(partition.partitionState.maximalIsr, util.Set.of(brokerId, remoteBrokerId)) assertEquals(alterPartitionManager.isrUpdates.size, 0) }) } @@ -2470,8 +2470,8 @@ class PartitionTest extends AbstractPartitionTest { def testAlterIsrInvalidVersion(): Unit = { handleAlterIsrFailure(Errors.INVALID_UPDATE_VERSION, (brokerId: Int, remoteBrokerId: Int, partition: Partition) => { - assertEquals(partition.partitionState.isr, Set(brokerId)) - assertEquals(partition.partitionState.maximalIsr, Set(brokerId, remoteBrokerId)) + assertEquals(partition.partitionState.isr, util.Set.of(brokerId)) + assertEquals(partition.partitionState.maximalIsr, util.Set.of(brokerId, remoteBrokerId)) assertEquals(alterPartitionManager.isrUpdates.size, 0) }) } @@ -2481,8 +2481,8 @@ class PartitionTest extends AbstractPartitionTest { handleAlterIsrFailure(Errors.UNKNOWN_SERVER_ERROR, (brokerId: Int, remoteBrokerId: Int, partition: Partition) => { // We retry these - assertEquals(partition.partitionState.isr, Set(brokerId)) - assertEquals(partition.partitionState.maximalIsr, Set(brokerId, remoteBrokerId)) + assertEquals(partition.partitionState.isr, util.Set.of(brokerId)) + assertEquals(partition.partitionState.maximalIsr, util.Set.of(brokerId, remoteBrokerId)) assertEquals(alterPartitionManager.isrUpdates.size, 1) }) } @@ -2519,7 +2519,7 @@ class PartitionTest extends AbstractPartitionTest { // Follower state is updated, but the ISR has not expanded assertEquals(Set(brokerId), partition.inSyncReplicaIds) - assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) + assertEquals(util.Set.of(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) assertEquals(alterPartitionManager.isrUpdates.size, 1) assertReplicaState(partition, remoteBrokerId, lastCaughtUpTimeMs = firstFetchTimeMs, @@ -2623,7 +2623,7 @@ class PartitionTest extends AbstractPartitionTest { // Expand ISR fetchFollower(partition, replicaId = follower3, fetchOffset = 10L) - assertEquals(Set(brokerId, follower1, follower2, follower3), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId, follower1, follower2, follower3), partition.partitionState.isr) assertEquals(partitionEpoch + 1, partition.getPartitionEpoch) // Verify that the AlterPartition request was sent twice verify(mockChannelManager, times(2)).sendRequest(any(), any()) @@ -2658,8 +2658,8 @@ class PartitionTest extends AbstractPartitionTest { // Expand ISR fetchFollower(partition, replicaId = follower3, fetchOffset = 10L) - assertEquals(Set(brokerId, follower1, follower2), partition.partitionState.isr) - assertEquals(Set(brokerId, follower1, follower2, follower3), partition.partitionState.maximalIsr) + assertEquals(util.Set.of(brokerId, follower1, follower2), partition.partitionState.isr) + assertEquals(util.Set.of(brokerId, follower1, follower2, follower3), partition.partitionState.maximalIsr) // One AlterIsr request in-flight assertEquals(alterPartitionManager.isrUpdates.size, 1) @@ -2852,10 +2852,10 @@ class PartitionTest extends AbstractPartitionTest { ) assertTrue(partition.assignmentState.isInstanceOf[OngoingReassignmentState], "The assignmentState is not OngoingReassignmentState") - assertEquals(replicas, partition.assignmentState.replicas) - assertEquals(isr, partition.partitionState.isr) - assertEquals(adding, partition.assignmentState.asInstanceOf[OngoingReassignmentState].addingReplicas) - assertEquals(removing, partition.assignmentState.asInstanceOf[OngoingReassignmentState].removingReplicas) + assertEquals(replicas.map(Int.box).asJava, partition.assignmentState.replicas) + assertEquals(isr.map(Int.box).asJava, partition.partitionState.isr) + assertEquals(adding.map(Int.box).asJava, partition.assignmentState.asInstanceOf[OngoingReassignmentState].addingReplicas) + assertEquals(removing.map(Int.box).asJava, partition.assignmentState.asInstanceOf[OngoingReassignmentState].removingReplicas) assertEquals(followers, partition.remoteReplicas.map(_.brokerId)) // Test with simple assignment @@ -2872,8 +2872,8 @@ class PartitionTest extends AbstractPartitionTest { ) assertTrue(partition.assignmentState.isInstanceOf[SimpleAssignmentState], "The assignmentState is not SimpleAssignmentState") - assertEquals(replicas2, partition.assignmentState.replicas) - assertEquals(isr2, partition.partitionState.isr) + assertEquals(replicas2.map(Int.box).asJava, partition.assignmentState.replicas) + assertEquals(isr2.map(Int.box).asJava, partition.partitionState.isr) assertEquals(followers2, partition.remoteReplicas.map(_.brokerId)) // Test with no followers @@ -2888,8 +2888,8 @@ class PartitionTest extends AbstractPartitionTest { ) assertTrue(partition.assignmentState.isInstanceOf[SimpleAssignmentState], "The assignmentState is not SimpleAssignmentState") - assertEquals(replicas3, partition.assignmentState.replicas) - assertEquals(Set.empty, partition.partitionState.isr) + assertEquals(replicas3, partition.assignmentState.replicas.asScala) + assertEquals(util.Set.of(), partition.partitionState.isr) assertEquals(Seq.empty, partition.remoteReplicas.map(_.brokerId)) } @@ -3025,7 +3025,7 @@ class PartitionTest extends AbstractPartitionTest { assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId))) assertEquals(1, partition.getPartitionEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch) - assertEquals(Set(leaderId), partition.partitionState.isr) + assertEquals(util.Set.of(leaderId), partition.partitionState.isr) // Follower's state is initialized with unknown offset because it is not // in the ISR. @@ -3057,7 +3057,7 @@ class PartitionTest extends AbstractPartitionTest { assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId))) assertEquals(2, partition.getPartitionEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch) - assertEquals(Set(leaderId), partition.partitionState.isr) + assertEquals(util.Set.of(leaderId), partition.partitionState.isr) // Follower's state has not been reset. assertReplicaState(partition, followerId, @@ -3086,7 +3086,7 @@ class PartitionTest extends AbstractPartitionTest { assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId))) assertEquals(1, partition.getPartitionEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch) - assertEquals(Set(leaderId), partition.partitionState.isr) + assertEquals(util.Set.of(leaderId), partition.partitionState.isr) assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt) val leaderLog = partition.localLogOrException @@ -3111,7 +3111,7 @@ class PartitionTest extends AbstractPartitionTest { assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId))) assertEquals(2, partition.getPartitionEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch) - assertEquals(Set(leaderId), partition.partitionState.isr) + assertEquals(util.Set.of(leaderId), partition.partitionState.isr) assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt) assertEquals(Optional.of(new EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.latestEntry) } @@ -3206,9 +3206,9 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(1, partition.getPartitionEpoch) assertEquals(1, partition.getLeaderEpoch) assertEquals(Some(localReplica), partition.leaderReplicaIdOpt) - assertEquals(util.Set.copyOf(replicas).asScala, partition.partitionState.isr) + assertEquals(util.Set.copyOf(replicas), partition.partitionState.isr) assertEquals(Seq(remoteReplica1, remoteReplica2), partition.remoteReplicas.map(_.brokerId).toSeq) - assertEquals(replicas.asScala, partition.assignmentState.replicas) + assertEquals(replicas, partition.assignmentState.replicas) // The local replica becomes a follower. val updatedLeaderState = new JPartitionState() @@ -3223,9 +3223,9 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(2, partition.getPartitionEpoch) assertEquals(2, partition.getLeaderEpoch) assertEquals(Some(remoteReplica1), partition.leaderReplicaIdOpt) - assertEquals(Set.empty, partition.partitionState.isr) + assertEquals(util.Set.of(), partition.partitionState.isr) assertEquals(Seq.empty, partition.remoteReplicas.map(_.brokerId).toSeq) - assertEquals(replicas.asScala, partition.assignmentState.replicas) + assertEquals(replicas, partition.assignmentState.replicas) } @Test @@ -3567,8 +3567,8 @@ class PartitionTest extends AbstractPartitionTest { assertFalse(partition.partitionState.isInflight) assertEquals(topicId, partition.topicId) assertEquals(leaderEpoch, partition.getLeaderEpoch) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.isr) - assertEquals(util.Set.copyOf(isr).asScala, partition.partitionState.maximalIsr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.isr) + assertEquals(util.Set.copyOf(isr), partition.partitionState.maximalIsr) assertEquals(partitionEpoch, partition.getPartitionEpoch) newLeader } diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 03944faaefeec..33e85e30a7a6c 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -447,7 +447,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { private def waitForNoLeaderAndIsrHasOldLeaderId(metadataCache: MetadataCache, leaderId: Int): Unit = { waitUntilTrue(() => metadataCache.getLeaderAndIsr(topic, partitionId).isPresent() && metadataCache.getLeaderAndIsr(topic, partitionId).get.leader() == LeaderConstants.NO_LEADER && - util.List.of(leaderId).equals(metadataCache.getLeaderAndIsr(topic, partitionId).get.isr()), + util.Set.of(leaderId).equals(metadataCache.getLeaderAndIsr(topic, partitionId).get.isr()), "Timed out waiting for broker metadata cache updates the info for topic partition:" + topicPartition) } } diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 78a857a202f59..d252195789ac3 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -883,7 +883,7 @@ class MetadataCacheTest { val leaderAndIsr = cache.getLeaderAndIsr(topic, partitionIndex) assertEquals(util.Optional.of(leader), leaderAndIsr.map(_.leader())) assertEquals(util.Optional.of(leaderEpoch), leaderAndIsr.map(_.leaderEpoch())) - assertEquals(util.Optional.of(isr), leaderAndIsr.map(_.isr())) + assertEquals(util.Optional.of(util.Set.copyOf(isr)), leaderAndIsr.map(_.isr())) assertEquals(util.Optional.of(-1), leaderAndIsr.map(_.partitionEpoch())) assertEquals(util.Optional.of(LeaderRecoveryState.RECOVERED), leaderAndIsr.map(_.leaderRecoveryState())) } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala index 52dd464e5c3e0..0099223eba95d 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala @@ -447,7 +447,7 @@ class ReplicaManagerConcurrencyTest extends Logging { delta.replay(new PartitionChangeRecord() .setTopicId(topic.topicId) .setPartitionId(partitionId) - .setIsr(leaderAndIsr.isr) + .setIsr(util.List.copyOf(leaderAndIsr.isr)) .setLeader(leaderAndIsr.leader) ) this.registration = delta.topicsDelta diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java index 8dfc70ca7c58d..24f0d14dfd140 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java @@ -17,7 +17,6 @@ package org.apache.kafka.jmh.partition; -import kafka.cluster.AlterPartitionListener; import kafka.cluster.DelayedOperations; import kafka.cluster.Partition; import kafka.log.LogManager; @@ -36,6 +35,7 @@ import org.apache.kafka.metadata.MockConfigRepository; import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints; +import org.apache.kafka.storage.internals.log.AlterPartitionListener; import org.apache.kafka.storage.internals.log.CleanerConfig; import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.internals.log.LogDirFailureChannel; diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index 665fc5f459652..64b7f23230528 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -17,7 +17,6 @@ package org.apache.kafka.jmh.partition; -import kafka.cluster.AlterPartitionListener; import kafka.cluster.DelayedOperations; import kafka.cluster.Partition; import kafka.log.LogManager; @@ -33,6 +32,7 @@ import org.apache.kafka.server.replica.Replica; import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints; +import org.apache.kafka.storage.internals.log.AlterPartitionListener; import org.apache.kafka.storage.internals.log.CleanerConfig; import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.internals.log.LogDirFailureChannel; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/LeaderAndIsr.java b/metadata/src/main/java/org/apache/kafka/metadata/LeaderAndIsr.java index 1a818dfca58df..5639a2af93c9c 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/LeaderAndIsr.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/LeaderAndIsr.java @@ -21,6 +21,8 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; public class LeaderAndIsr { public static final int INITIAL_LEADER_EPOCH = 0; @@ -119,10 +121,10 @@ public Optional leaderOpt() { return leader == LeaderAndIsr.NO_LEADER ? Optional.empty() : Optional.of(leader); } - public List isr() { + public Set isr() { return isrWithBrokerEpoch.stream() .map(BrokerState::brokerId) - .toList(); + .collect(Collectors.toUnmodifiableSet()); } @Override diff --git a/metadata/src/test/java/org/apache/kafka/metadata/LeaderAndIsrTest.java b/metadata/src/test/java/org/apache/kafka/metadata/LeaderAndIsrTest.java index f6d7b66fba339..68a689cefb1ef 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/LeaderAndIsrTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/LeaderAndIsrTest.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.Test; import java.util.List; +import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -30,7 +31,7 @@ public void testRecoveringLeaderAndIsr() { LeaderAndIsr recoveringLeaderAndIsr = leaderAndIsr.newRecoveringLeaderAndIsr(3, List.of(3)); assertEquals(3, recoveringLeaderAndIsr.leader()); - assertEquals(List.of(3), recoveringLeaderAndIsr.isr()); + assertEquals(Set.of(3), recoveringLeaderAndIsr.isr()); assertEquals(LeaderRecoveryState.RECOVERING, recoveringLeaderAndIsr.leaderRecoveryState()); } @@ -40,7 +41,7 @@ public void testNewLeaderAndIsr() { LeaderAndIsr newLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(2, List.of(1, 2)); assertEquals(2, newLeaderAndIsr.leader()); - assertEquals(List.of(1, 2), newLeaderAndIsr.isr()); + assertEquals(Set.of(1, 2), newLeaderAndIsr.isr()); assertEquals(LeaderRecoveryState.RECOVERED, newLeaderAndIsr.leaderRecoveryState()); } @@ -49,12 +50,12 @@ public void testNewLeader() { LeaderAndIsr leaderAndIsr = new LeaderAndIsr(2, List.of(1, 2, 3)); assertEquals(2, leaderAndIsr.leader()); - assertEquals(List.of(1, 2, 3), leaderAndIsr.isr()); + assertEquals(Set.of(1, 2, 3), leaderAndIsr.isr()); LeaderAndIsr newLeaderAndIsr = leaderAndIsr.newLeader(3); assertEquals(3, newLeaderAndIsr.leader()); - assertEquals(List.of(1, 2, 3), newLeaderAndIsr.isr()); + assertEquals(Set.of(1, 2, 3), newLeaderAndIsr.isr()); } @Test diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AlterPartitionListener.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AlterPartitionListener.java new file mode 100644 index 0000000000000..6cd3192d1d60a --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AlterPartitionListener.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +public interface AlterPartitionListener { + void markIsrExpand(); + void markIsrShrink(); + void markFailed(); +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AssignmentState.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AssignmentState.java new file mode 100644 index 0000000000000..ffb60c199d9e2 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AssignmentState.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.util.List; + +public interface AssignmentState { + + List replicas(); + + int replicationFactor(); + + boolean isAddingReplica(int brokerId); +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/CommittedPartitionState.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/CommittedPartitionState.java new file mode 100644 index 0000000000000..cb9a605a9da65 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/CommittedPartitionState.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.metadata.LeaderRecoveryState; + +import java.util.Set; + +public record CommittedPartitionState(Set isr, LeaderRecoveryState leaderRecoveryState) implements PartitionState { + + public CommittedPartitionState { + isr = Set.copyOf(isr); + } + + @Override + public Set maximalIsr() { + return isr; + } + + @Override + public boolean isInflight() { + return false; + } + +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/OngoingReassignmentState.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/OngoingReassignmentState.java new file mode 100644 index 0000000000000..84c44430e98ad --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/OngoingReassignmentState.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.util.List; + +public record OngoingReassignmentState( + List addingReplicas, + List removingReplicas, + List replicas +) implements AssignmentState { + + public OngoingReassignmentState { + addingReplicas = List.copyOf(addingReplicas); + removingReplicas = List.copyOf(removingReplicas); + replicas = List.copyOf(replicas); + } + + @Override + public int replicationFactor() { + return (int) replicas.stream().filter(r -> !addingReplicas.contains(r)).count(); + } + + @Override + public boolean isAddingReplica(int brokerId) { + return addingReplicas.contains(brokerId); + } +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionListener.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionListener.java new file mode 100644 index 0000000000000..402d83374b2b8 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionListener.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.TopicPartition; + +/** + * Listener receives notification from an Online Partition. + * + * A listener can be (re-)registered to an Online partition only. The listener + * is notified as long as the partition remains Online. When the partition fails + * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further + * notifications are sent after this point on. + * + * Note that the callbacks are executed in the thread that triggers the change + * AND that locks may be held during their execution. They are meant to be used + * as notification mechanism only. + */ +public interface PartitionListener { + /** + * Called when the Log increments its high watermark. + * + * @param partition The topic partition for which the high watermark was updated. + * @param offset The new high watermark offset. + */ + default void onHighWatermarkUpdated(TopicPartition partition, long offset) {} + + /** + * Called when the Partition (or replica) on this broker has a failure (e.g. goes offline). + * + * @param partition The topic partition that failed. + */ + default void onFailed(TopicPartition partition) {} + + /** + * Called when the Partition (or replica) on this broker is deleted. Note that it does not mean + * that the partition was deleted but only that this broker does not host a replica of it any more. + * + * @param partition The topic partition that was deleted from this broker. + */ + default void onDeleted(TopicPartition partition) {} + + /** + * Called when the Partition on this broker is transitioned to follower. + * + * @param partition The topic partition that transitioned to follower role. + */ + default void onBecomingFollower(TopicPartition partition) {} +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionState.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionState.java new file mode 100644 index 0000000000000..124578760783a --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionState.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.metadata.LeaderRecoveryState; + +import java.util.Set; + +public interface PartitionState { + /** + * Includes only the in-sync replicas which have been committed to ZK/Controller. + */ + Set isr(); + + /** + * This set may include un-committed ISR members following an expansion. This "effective" ISR is used for advancing + * the high watermark as well as determining which replicas are required for acks=all produce requests.* + */ + Set maximalIsr(); + + /** + * The leader recovery state. See the description for LeaderRecoveryState for details on the different values. + */ + LeaderRecoveryState leaderRecoveryState(); + + /** + * Indicates if we have an AlterPartition request inflight. + */ + boolean isInflight(); + +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/PendingExpandIsr.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/PendingExpandIsr.java new file mode 100644 index 0000000000000..429e1540a99ba --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/PendingExpandIsr.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.metadata.LeaderAndIsr; +import org.apache.kafka.metadata.LeaderRecoveryState; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public record PendingExpandIsr(int newInSyncReplicaId, + LeaderAndIsr sentLeaderAndIsr, + CommittedPartitionState lastCommittedState) implements PendingPartitionChange { + + @Override + public Set isr() { + return lastCommittedState.isr(); + } + + @Override + public Set maximalIsr() { + Set newIsr = new HashSet<>(lastCommittedState.isr()); + newIsr.add(newInSyncReplicaId); + return Collections.unmodifiableSet(newIsr); + } + + @Override + public LeaderRecoveryState leaderRecoveryState() { + return LeaderRecoveryState.RECOVERED; + } + + @Override + public boolean isInflight() { + return true; + } + + @Override + public String toString() { + return "PendingExpandIsr(newInSyncReplicaId=" + newInSyncReplicaId + + ", sentLeaderAndIsr=" + sentLeaderAndIsr + + ", leaderRecoveryState=" + leaderRecoveryState() + + ", lastCommittedState=" + lastCommittedState + + ")"; + } +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/PendingPartitionChange.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/PendingPartitionChange.java new file mode 100644 index 0000000000000..a732f117bcd12 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/PendingPartitionChange.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.metadata.LeaderAndIsr; + +public interface PendingPartitionChange extends PartitionState { + CommittedPartitionState lastCommittedState(); + LeaderAndIsr sentLeaderAndIsr(); +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/PendingShrinkIsr.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/PendingShrinkIsr.java new file mode 100644 index 0000000000000..17ec5da1245e1 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/PendingShrinkIsr.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.metadata.LeaderAndIsr; +import org.apache.kafka.metadata.LeaderRecoveryState; + +import java.util.Set; + +public record PendingShrinkIsr(Set outOfSyncReplicaIds, + LeaderAndIsr sentLeaderAndIsr, + CommittedPartitionState lastCommittedState) implements PendingPartitionChange { + + @Override + public Set isr() { + return lastCommittedState.isr(); + } + + @Override + public Set maximalIsr() { + return isr(); + } + + @Override + public LeaderRecoveryState leaderRecoveryState() { + return LeaderRecoveryState.RECOVERED; + } + + @Override + public boolean isInflight() { + return true; + } + + @Override + public String toString() { + return "PendingShrinkIsr(outOfSyncReplicaIds=" + outOfSyncReplicaIds + + ", sentLeaderAndIsr=" + sentLeaderAndIsr + + ", leaderRecoveryState=" + leaderRecoveryState() + + ", lastCommittedState=" + lastCommittedState + + ")"; + } +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/SimpleAssignmentState.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/SimpleAssignmentState.java new file mode 100644 index 0000000000000..186e6c689b3ea --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/SimpleAssignmentState.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.util.List; + +public record SimpleAssignmentState(List replicas) implements AssignmentState { + public SimpleAssignmentState { + replicas = List.copyOf(replicas); + } + + @Override + public int replicationFactor() { + return replicas().size(); + } + + @Override + public boolean isAddingReplica(int brokerId) { + return false; + } +}