|
17 | 17 |
|
18 | 18 | package kafka.server
|
19 | 19 |
|
20 |
| -import kafka.server.QuotaFactory.QuotaManagers |
21 |
| -import kafka.server.metadata.KRaftMetadataCache |
22 |
| -import kafka.utils.{CoreUtils, Logging, TestUtils} |
23 |
| -import org.apache.kafka.common.compress.Compression |
24 |
| -import org.apache.kafka.common.{TopicIdPartition, Uuid} |
25 | 20 | import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
|
26 | 21 | import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
|
27 |
| -import org.apache.kafka.common.metadata.{FeatureLevelRecord, PartitionChangeRecord, PartitionRecord, TopicRecord} |
28 |
| -import org.apache.kafka.common.metrics.Metrics |
29 | 22 | import org.apache.kafka.common.protocol.Errors
|
30 |
| -import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord} |
31 |
| -import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse |
32 |
| -import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance} |
33 |
| -import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, OffsetAndEpoch} |
34 |
| -import org.apache.kafka.server.network.BrokerEndPoint |
35 |
| -import org.apache.kafka.server.LeaderEndPoint |
36 |
| -import org.apache.kafka.server.util.{MockScheduler, MockTime} |
37 |
| -import org.apache.kafka.storage.internals.log.{AppendOrigin, LogDirFailureChannel} |
38 |
| -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} |
| 23 | +import org.apache.kafka.server.common.OffsetAndEpoch |
| 24 | +import org.junit.jupiter.api.Test |
39 | 25 | import org.junit.jupiter.api.Assertions._
|
40 |
| -import org.mockito.Mockito.mock |
41 | 26 |
|
42 |
| -import java.io.File |
43 | 27 | import java.util.{Map => JMap}
|
44 | 28 | import scala.collection.Map
|
45 | 29 | import scala.jdk.CollectionConverters._
|
46 | 30 |
|
47 |
| -class LocalLeaderEndPointTest extends Logging { |
48 |
| - |
49 |
| - val time = new MockTime |
50 |
| - val topicId = Uuid.randomUuid() |
51 |
| - val topic = "test" |
52 |
| - val partition = 5 |
53 |
| - val topicIdPartition = new TopicIdPartition(topicId, partition, topic) |
54 |
| - val topicPartition = topicIdPartition.topicPartition() |
55 |
| - val sourceBroker: BrokerEndPoint = new BrokerEndPoint(0, "localhost", 9092) |
56 |
| - var replicaManager: ReplicaManager = _ |
57 |
| - var endPoint: LeaderEndPoint = _ |
58 |
| - var quotaManager: QuotaManagers = _ |
59 |
| - var image: MetadataImage = _ |
60 |
| - |
61 |
| - @BeforeEach |
62 |
| - def setUp(): Unit = { |
63 |
| - val props = TestUtils.createBrokerConfig(sourceBroker.id, port = sourceBroker.port) |
64 |
| - val config = KafkaConfig.fromProps(props) |
65 |
| - val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_))) |
66 |
| - val alterPartitionManager = mock(classOf[AlterPartitionManager]) |
67 |
| - val metrics = new Metrics |
68 |
| - quotaManager = QuotaFactory.instantiate(config, metrics, time, "", "") |
69 |
| - replicaManager = new ReplicaManager( |
70 |
| - metrics = metrics, |
71 |
| - config = config, |
72 |
| - time = time, |
73 |
| - scheduler = new MockScheduler(time), |
74 |
| - logManager = mockLogMgr, |
75 |
| - quotaManagers = quotaManager, |
76 |
| - metadataCache = new KRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), |
77 |
| - logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), |
78 |
| - alterPartitionManager = alterPartitionManager |
79 |
| - ) |
80 |
| - |
81 |
| - val delta = new MetadataDelta(MetadataImage.EMPTY) |
82 |
| - delta.replay(new FeatureLevelRecord() |
83 |
| - .setName(MetadataVersion.FEATURE_NAME) |
84 |
| - .setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()) |
85 |
| - ) |
86 |
| - delta.replay(new TopicRecord() |
87 |
| - .setName(topic) |
88 |
| - .setTopicId(topicId) |
89 |
| - ) |
90 |
| - delta.replay(new PartitionRecord() |
91 |
| - .setPartitionId(partition) |
92 |
| - .setTopicId(topicId) |
93 |
| - .setReplicas(java.util.List.of[Integer](sourceBroker.id)) |
94 |
| - .setIsr(java.util.List.of[Integer](sourceBroker.id)) |
95 |
| - .setLeader(sourceBroker.id) |
96 |
| - .setLeaderEpoch(0) |
97 |
| - .setPartitionEpoch(0) |
98 |
| - ) |
99 |
| - |
100 |
| - image = delta.apply(MetadataProvenance.EMPTY) |
101 |
| - replicaManager.applyDelta(delta.topicsDelta(), image) |
102 |
| - |
103 |
| - replicaManager.getPartitionOrException(topicPartition) |
104 |
| - .localLogOrException |
105 |
| - endPoint = new LocalLeaderEndPoint( |
106 |
| - sourceBroker, |
107 |
| - config, |
108 |
| - replicaManager, |
109 |
| - QuotaFactory.UNBOUNDED_QUOTA |
110 |
| - ) |
111 |
| - } |
112 |
| - |
113 |
| - @AfterEach |
114 |
| - def tearDown(): Unit = { |
115 |
| - CoreUtils.swallow(replicaManager.shutdown(checkpointHW = false), this) |
116 |
| - CoreUtils.swallow(quotaManager.shutdown(), this) |
117 |
| - } |
| 31 | +class LocalLeaderEndPointTest extends LocalLeaderEndPointTestBase { |
118 | 32 |
|
119 | 33 | @Test
|
120 | 34 | def testFetchLatestOffset(): Unit = {
|
@@ -233,66 +147,4 @@ class LocalLeaderEndPointTest extends Logging {
|
233 | 147 | assertEquals(expected, result)
|
234 | 148 | }
|
235 | 149 |
|
236 |
| - private class CallbackResult[T] { |
237 |
| - private var value: Option[T] = None |
238 |
| - private var fun: Option[T => Unit] = None |
239 |
| - |
240 |
| - private def hasFired: Boolean = { |
241 |
| - value.isDefined |
242 |
| - } |
243 |
| - |
244 |
| - def fire(value: T): Unit = { |
245 |
| - this.value = Some(value) |
246 |
| - fun.foreach(f => f(value)) |
247 |
| - } |
248 |
| - |
249 |
| - def onFire(fun: T => Unit): CallbackResult[T] = { |
250 |
| - this.fun = Some(fun) |
251 |
| - if (this.hasFired) fire(value.get) |
252 |
| - this |
253 |
| - } |
254 |
| - } |
255 |
| - |
256 |
| - private def bumpLeaderEpoch(): Unit = { |
257 |
| - val delta = new MetadataDelta(image) |
258 |
| - delta.replay(new PartitionChangeRecord() |
259 |
| - .setTopicId(topicId) |
260 |
| - .setPartitionId(partition) |
261 |
| - .setLeader(sourceBroker.id) |
262 |
| - ) |
263 |
| - |
264 |
| - image = delta.apply(MetadataProvenance.EMPTY) |
265 |
| - replicaManager.applyDelta(delta.topicsDelta, image) |
266 |
| - } |
267 |
| - |
268 |
| - private def appendRecords(replicaManager: ReplicaManager, |
269 |
| - partition: TopicIdPartition, |
270 |
| - records: MemoryRecords, |
271 |
| - origin: AppendOrigin = AppendOrigin.CLIENT, |
272 |
| - requiredAcks: Short = -1): CallbackResult[PartitionResponse] = { |
273 |
| - val result = new CallbackResult[PartitionResponse]() |
274 |
| - def appendCallback(responses: scala.collection.Map[TopicIdPartition, PartitionResponse]): Unit = { |
275 |
| - val response = responses.get(partition) |
276 |
| - assertTrue(response.isDefined) |
277 |
| - result.fire(response.get) |
278 |
| - } |
279 |
| - |
280 |
| - replicaManager.appendRecords( |
281 |
| - timeout = 1000, |
282 |
| - requiredAcks = requiredAcks, |
283 |
| - internalTopicsAllowed = false, |
284 |
| - origin = origin, |
285 |
| - entriesPerPartition = Map(partition -> records), |
286 |
| - responseCallback = appendCallback) |
287 |
| - |
288 |
| - result |
289 |
| - } |
290 |
| - |
291 |
| - private def records: MemoryRecords = { |
292 |
| - MemoryRecords.withRecords(Compression.NONE, |
293 |
| - new SimpleRecord("first message".getBytes()), |
294 |
| - new SimpleRecord("second message".getBytes()), |
295 |
| - new SimpleRecord("third message".getBytes()), |
296 |
| - ) |
297 |
| - } |
298 | 150 | }
|
0 commit comments