Skip to content

Commit ef83978

Browse files
ijumaPatrick Druley
authored andcommitted
KAFKA-18648: Add back support for metadata version 0-3 (apache#18716)
During testing, we identified that kafka-python (and aiokafka) relies on metadata request v0 and hence we need to add these back to comply with the premise of KIP-896 - i.e. it should not break the clients listed within it. I reverted the changes from apache#18218 related to the removal of metadata versions 0-3. I will submit a separate PR to undeprecate these API versions on the relevant 3.x branches. kafka-python (and aiokafka) work correctly (produce & consume) with this change on top of the 4.0 branch. Reviewers: David Arthur <[email protected]>
1 parent 0716976 commit ef83978

File tree

6 files changed

+75
-29
lines changed

6 files changed

+75
-29
lines changed

clients/src/main/resources/common/message/MetadataRequest.json

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@
1818
"type": "request",
1919
"listeners": ["broker"],
2020
"name": "MetadataRequest",
21-
"validVersions": "4-13",
21+
"validVersions": "0-13",
2222
"flexibleVersions": "9+",
2323
"fields": [
24-
// Versions 0-3 were removed in Apache Kafka 4.0, Version 4 is the new baseline.
25-
//
2624
// In version 0, an empty array indicates "request metadata for all topics." In version 1 and
2725
// higher, an empty array indicates "request metadata for no topics," and a null array is used to
2826
// indicate "request metadata for all topics."
27+
//
2928
// Version 2 and 3 are the same as version 1.
3029
//
3130
// Version 4 adds AllowAutoTopicCreation.

clients/src/main/resources/common/message/MetadataResponse.json

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717
"apiKey": 3,
1818
"type": "response",
1919
"name": "MetadataResponse",
20-
// Versions 0-3 were removed in Apache Kafka 4.0, Version 4 is the new baseline.
20+
// Version 1 adds fields for the rack of each broker, the controller id, and whether or not the topic is internal.
2121
//
22-
// Version 1 adds fields for the rack of each broker, the controller id, and
23-
// whether or not the topic is internal.
2422
// Version 2 adds the cluster ID field.
23+
//
2524
// Version 3 adds the throttle time.
2625
//
2726
// Version 4 is the same as version 3.
@@ -43,7 +42,7 @@
4342
// by the DescribeCluster API (KIP-700).
4443
// Version 12 supports topicId.
4544
// Version 13 supports top-level error code in the response.
46-
"validVersions": "4-13",
45+
"validVersions": "0-13",
4746
"flexibleVersions": "9+",
4847
"fields": [
4948
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,

clients/src/test/java/org/apache/kafka/common/message/MessageTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,6 +1127,8 @@ public void testWriteNullForNonNullableFieldRaisesException() {
11271127
for (short version : ApiKeys.CREATE_TOPICS.allVersions()) {
11281128
verifyWriteRaisesNpe(version, createTopics);
11291129
}
1130+
MetadataRequestData metadata = new MetadataRequestData().setTopics(null);
1131+
verifyWriteRaisesNpe((short) 0, metadata);
11301132
}
11311133

11321134
@Test

clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,23 @@
3030
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3131
import static org.junit.jupiter.api.Assertions.assertEquals;
3232
import static org.junit.jupiter.api.Assertions.assertFalse;
33+
import static org.junit.jupiter.api.Assertions.assertNull;
3334
import static org.junit.jupiter.api.Assertions.assertThrows;
35+
import static org.junit.jupiter.api.Assertions.assertTrue;
3436

3537
public class MetadataRequestTest {
3638

3739
@Test
38-
public void testEmptyMeansEmptyForAllVersions() {
39-
for (int i = ApiKeys.METADATA.oldestVersion(); i < MetadataRequestData.SCHEMAS.length; i++) {
40+
public void testEmptyMeansAllTopicsV0() {
41+
MetadataRequestData data = new MetadataRequestData();
42+
MetadataRequest parsedRequest = new MetadataRequest(data, (short) 0);
43+
assertTrue(parsedRequest.isAllTopics());
44+
assertNull(parsedRequest.topics());
45+
}
46+
47+
@Test
48+
public void testEmptyMeansEmptyForVersionsAboveV0() {
49+
for (int i = 1; i < MetadataRequestData.SCHEMAS.length; i++) {
4050
MetadataRequestData data = new MetadataRequestData();
4151
data.setAllowAutoTopicCreation(true);
4252
MetadataRequest parsedRequest = new MetadataRequest(data, (short) i);

core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2297,7 +2297,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
22972297
def testMetadataClusterAuthorizedOperationsWithoutDescribeCluster(quorum: String): Unit = {
22982298
removeAllClientAcls()
22992299

2300-
for (version <- ApiKeys.METADATA.oldestVersion to ApiKeys.METADATA.latestVersion) {
2300+
// MetadataRequest versions older than 1 are not supported.
2301+
for (version <- 1 to ApiKeys.METADATA.latestVersion) {
23012302
testMetadataClusterClusterAuthorizedOperations(version.toShort, 0)
23022303
}
23032304
}
@@ -2317,7 +2318,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
23172318
val expectedClusterAuthorizedOperations = Utils.to32BitField(
23182319
acls.map(_.operation.code.asInstanceOf[JByte]).asJava)
23192320

2320-
for (version <- ApiKeys.METADATA.oldestVersion to ApiKeys.METADATA.latestVersion) {
2321+
// MetadataRequest versions older than 1 are not supported.
2322+
for (version <- 1 to ApiKeys.METADATA.latestVersion) {
23212323
testMetadataClusterClusterAuthorizedOperations(version.toShort, expectedClusterAuthorizedOperations)
23222324
}
23232325
}

core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package kafka.server
2020
import java.util.Optional
2121
import kafka.utils.TestUtils
2222
import org.apache.kafka.common.Uuid
23+
import org.apache.kafka.common.errors.UnsupportedVersionException
2324
import org.apache.kafka.common.internals.Topic
2425
import org.apache.kafka.common.protocol.Errors
2526
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
@@ -40,6 +41,14 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
4041
doSetup(testInfo, createOffsetsTopic = false)
4142
}
4243

44+
@ParameterizedTest
45+
@ValueSource(strings = Array("kraft"))
46+
def testClusterIdWithRequestVersion1(quorum: String): Unit = {
47+
val v1MetadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
48+
val v1ClusterId = v1MetadataResponse.clusterId
49+
assertNull(v1ClusterId, s"v1 clusterId should be null")
50+
}
51+
4352
@ParameterizedTest
4453
@ValueSource(strings = Array("kraft"))
4554
def testClusterIdIsValid(quorum: String): Unit = {
@@ -96,17 +105,27 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
96105
def testAutoTopicCreation(quorum: String): Unit = {
97106
val topic1 = "t1"
98107
val topic2 = "t2"
99-
val topic3 = "t4"
100-
val topic4 = "t5"
108+
val topic3 = "t3"
109+
val topic4 = "t4"
110+
val topic5 = "t5"
101111
createTopic(topic1)
102112

103113
val response1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true).build())
104114
assertNull(response1.errors.get(topic1))
105115
checkAutoCreatedTopic(topic2, response1)
106116

107-
val response2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic3, topic4).asJava, false, 4.toShort).build)
108-
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response2.errors.get(topic3))
109-
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response2.errors.get(topic4))
117+
// The default behavior in old versions of the metadata API is to allow topic creation, so
118+
// protocol downgrades should happen gracefully when auto-creation is explicitly requested.
119+
val response2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic3).asJava, true).build(1))
120+
checkAutoCreatedTopic(topic3, response2)
121+
122+
// V3 doesn't support a configurable allowAutoTopicCreation, so disabling auto-creation is not supported
123+
assertThrows(classOf[UnsupportedVersionException], () => sendMetadataRequest(new MetadataRequest(requestData(List(topic4), allowAutoTopicCreation = false), 3.toShort)))
124+
125+
// V4 and higher support a configurable allowAutoTopicCreation
126+
val response3 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic4, topic5).asJava, false, 4.toShort).build)
127+
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response3.errors.get(topic4))
128+
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response3.errors.get(topic5))
110129
}
111130

112131
@ParameterizedTest
@@ -132,10 +151,15 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
132151
createTopic("t1", 3, 2)
133152
createTopic("t2", 3, 2)
134153

135-
// v4, Null represents all topics
136-
val metadataResponseV1 = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(4.toShort))
137-
assertTrue(metadataResponseV1.errors.isEmpty, "V4 Response should have no errors")
138-
assertEquals(2, metadataResponseV1.topicMetadata.size(), "V4 Response should have 2 (all) topics")
154+
// v0, Empty list represents all topics
155+
val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(requestData(List(), allowAutoTopicCreation = true), 0.toShort))
156+
assertTrue(metadataResponseV0.errors.isEmpty, "V0 Response should have no errors")
157+
assertEquals(2, metadataResponseV0.topicMetadata.size(), "V0 Response should have 2 (all) topics")
158+
159+
// v1, Null represents all topics
160+
val metadataResponseV1 = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
161+
assertTrue(metadataResponseV1.errors.isEmpty, "V1 Response should have no errors")
162+
assertEquals(2, metadataResponseV1.topicMetadata.size(), "V1 Response should have 2 (all) topics")
139163
}
140164

141165
@ParameterizedTest
@@ -217,15 +241,25 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
217241
!response.brokers.asScala.exists(_.id == downNode.dataPlaneRequestProcessor.brokerId)
218242
}, "Replica was not found down", 50000)
219243

220-
// Validate version 4 returns unavailable replicas with no error
221-
val v4MetadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build(4))
222-
val v4BrokerIds = v4MetadataResponse.brokers().asScala.map(_.id).toSeq
223-
assertTrue(v4MetadataResponse.errors.isEmpty, "Response should have no errors")
224-
assertFalse(v4BrokerIds.contains(downNode.config.brokerId), s"The downed broker should not be in the brokers list")
225-
assertEquals(1, v4MetadataResponse.topicMetadata.size, "Response should have one topic")
226-
val v4PartitionMetadata = v4MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
227-
assertEquals(Errors.NONE, v4PartitionMetadata.error, "PartitionMetadata should have no errors")
228-
assertEquals(replicaCount, v4PartitionMetadata.replicaIds.size, s"Response should have $replicaCount replicas")
244+
// Validate version 0 still filters unavailable replicas and contains error
245+
val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(requestData(List(replicaDownTopic), allowAutoTopicCreation = true), 0.toShort))
246+
val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq
247+
assertTrue(v0MetadataResponse.errors.isEmpty, "Response should have no errors")
248+
assertFalse(v0BrokerIds.contains(downNode.config.brokerId), s"The downed broker should not be in the brokers list")
249+
assertTrue(v0MetadataResponse.topicMetadata.size == 1, "Response should have one topic")
250+
val v0PartitionMetadata = v0MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
251+
assertTrue(v0PartitionMetadata.error == Errors.REPLICA_NOT_AVAILABLE, "PartitionMetadata should have an error")
252+
assertTrue(v0PartitionMetadata.replicaIds.size == replicaCount - 1, s"Response should have ${replicaCount - 1} replicas")
253+
254+
// Validate version 1 returns unavailable replicas with no error
255+
val v1MetadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build(1))
256+
val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
257+
assertTrue(v1MetadataResponse.errors.isEmpty, "Response should have no errors")
258+
assertFalse(v1BrokerIds.contains(downNode.config.brokerId), s"The downed broker should not be in the brokers list")
259+
assertEquals(1, v1MetadataResponse.topicMetadata.size, "Response should have one topic")
260+
val v1PartitionMetadata = v1MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
261+
assertEquals(Errors.NONE, v1PartitionMetadata.error, "PartitionMetadata should have no errors")
262+
assertEquals(replicaCount, v1PartitionMetadata.replicaIds.size, s"Response should have $replicaCount replicas")
229263
}
230264

231265
@ParameterizedTest

0 commit comments

Comments
 (0)