Skip to content

Cherry-pick KAFKA-18583 fix (#18635) to 3.9 #18657

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ class KRaftMetadataCache(
val image = _currentImage
val result = new mutable.HashMap[Int, Node]()
Option(image.topics().getTopic(tp.topic())).foreach { topic =>
topic.partitions().values().forEach { partition =>
Option(topic.partitions().get(tp.partition())).foreach { partition =>
partition.replicas.foreach { replicaId =>
val broker = image.cluster().broker(replicaId)
if (broker != null && !broker.fenced()) {
Expand Down
109 changes: 109 additions & 0 deletions core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.junit.jupiter.params.provider.MethodSource
import java.util
import java.util.Arrays.asList
import java.util.Collections
import java.util.stream.Collectors
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -645,6 +646,114 @@ class MetadataCacheTest {
assertEquals(Seq(expectedNode1), partitionInfo.offlineReplicas.toSeq)
}

@ParameterizedTest
@MethodSource(Array("cacheProvider"))
def testGetPartitionReplicaEndpoints(cache: MetadataCache): Unit = {
val securityProtocol = SecurityProtocol.PLAINTEXT
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)

// Set up broker data for the metadata cache
val numBrokers = 10
// Set only the last broker in the list to be offline in order to allow easy
// indexing of brokers in the brokerStates list - the index in the list will
// be the same as the brokerId of the broker at that position.
val offlineBrokerId = numBrokers - 1
val brokerStates = (0 until numBrokers - 1).map { brokerId =>
new UpdateMetadataBroker()
.setId(brokerId)
.setRack("rack" + (brokerId % 3))
.setEndpoints(
Seq(new UpdateMetadataEndpoint()
.setHost("foo" + brokerId)
.setPort(9092)
.setSecurityProtocol(securityProtocol.id)
.setListener(listenerName.value)
).asJava)
}

val topic = "many-partitions-topic"
val topicId = Uuid.randomUuid()

// Set up a number of partitions such that each different combination of
// $replicationFactor brokers is made a replica set for exactly one partition
val replicationFactor = 3
val replicaSets = getAllReplicaSets(numBrokers, replicationFactor)
val numPartitions = replicaSets.length
val partitionStates = (0 until numPartitions).map { partitionId =>
val replicas = replicaSets(partitionId)
val onlineReplicas = replicas.stream().filter(id => id != offlineBrokerId).collect(Collectors.toList())
new UpdateMetadataPartitionState()
.setTopicName(topic)
.setPartitionIndex(partitionId)
.setReplicas(replicas)
.setLeader(onlineReplicas.get(0))
.setIsr(onlineReplicas)
.setOfflineReplicas(Collections.singletonList(offlineBrokerId))
}

// Load the prepared data in the metadata cache
val version = ApiKeys.UPDATE_METADATA.latestVersion
val controllerId = 0
val controllerEpoch = 123
val updateMetadataRequest = new UpdateMetadataRequest.Builder(
version,
controllerId,
controllerEpoch,
brokerEpoch,
partitionStates.asJava,
brokerStates.asJava,
Collections.singletonMap(topic, topicId)).build()
MetadataCacheTest.updateCache(cache, updateMetadataRequest)

(0 until numPartitions).foreach { partitionId =>
val tp = new TopicPartition(topic, partitionId)
val brokerIdToNodeMap = cache.getPartitionReplicaEndpoints(tp, listenerName)
val replicaSet = brokerIdToNodeMap.keySet
val expectedReplicaSet = partitionStates(partitionId).replicas().asScala.toSet
// Verify that we have endpoints for exactly the non-fenced brokers of the replica set
if (expectedReplicaSet.contains(offlineBrokerId)) {
assertEquals(expectedReplicaSet,
replicaSet + offlineBrokerId,
s"Unexpected partial replica set for partition $partitionId")
Comment on lines +714 to +717
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This condition should also have asserted that replicaSet doesn't contain offlineBrokerId - right now the existing assertEquals will pass even if that's not the case. Also applies to the original trunk change.

} else {
assertEquals(expectedReplicaSet,
replicaSet,
s"Unexpected replica set for partition $partitionId")
}
// Verify that the endpoint data for each non-fenced replica is as expected
replicaSet.foreach { brokerId =>
val brokerNode =
brokerIdToNodeMap.getOrElse(
brokerId, fail(s"No brokerNode for broker $brokerId and partition $partitionId"))
val expectedBroker = brokerStates(brokerId)
val expectedEndpoint = expectedBroker.endpoints().get(0)
assertEquals(securityProtocol.id, expectedEndpoint.securityProtocol())
assertEquals(listenerName.value(), expectedEndpoint.listener())
assertEquals(expectedEndpoint.host(),
brokerNode.host(),
s"Unexpected host for broker $brokerId and partition $partitionId")
assertEquals(expectedEndpoint.port(),
brokerNode.port(),
s"Unexpected port for broker $brokerId and partition $partitionId")
assertEquals(expectedBroker.rack(),
brokerNode.rack(),
s"Unexpected rack for broker $brokerId and partition $partitionId")
}
}

val tp = new TopicPartition(topic, numPartitions)
val brokerIdToNodeMap = cache.getPartitionReplicaEndpoints(tp, listenerName)
assertTrue(brokerIdToNodeMap.isEmpty)
}

private def getAllReplicaSets(numBrokers: Int,
replicationFactor: Int): Array[util.List[Integer]] = {
(0 until numBrokers)
.combinations(replicationFactor)
.map(replicaSet => replicaSet.map(Integer.valueOf).toList.asJava)
.toArray
}

@Test
def testIsBrokerFenced(): Unit = {
val metadataCache = MetadataCache.kRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_0)
Expand Down