Skip to content

Commit 72a4dbf

Browse files
committed
Added tests for LocalLeaderEndPointTest
1 parent 117be10 commit 72a4dbf

File tree

3 files changed

+279
-151
lines changed

3 files changed

+279
-151
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.server
19+
20+
import kafka.log.LogManager
21+
import kafka.utils.TestUtils
22+
import org.apache.kafka.common.config.TopicConfig
23+
import org.apache.kafka.common.protocol.Errors
24+
import org.apache.kafka.server.common.OffsetAndEpoch
25+
import org.apache.kafka.storage.internals.log.LogConfig
26+
import org.junit.jupiter.api.Assertions._
27+
import org.junit.jupiter.api.Test
28+
29+
import java.io.File
30+
import java.util.Properties
31+
import scala.collection.Map
32+
import scala.jdk.CollectionConverters._
33+
34+
class LocalLeaderEndPointRemoteTest extends LocalLeaderEndPointTestBase {
35+
36+
override def createLogManager(config: KafkaConfig): LogManager = {
37+
val logProps = new Properties()
38+
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
39+
// Keep cleanup.policy=delete (default), not compact, so remote storage is allowed
40+
val defaultLogConfig = LogConfig.fromProps(Map.empty[String, Object].asJava, logProps)
41+
42+
TestUtils.createLogManager(
43+
config.logDirs.asScala.map(new File(_)),
44+
defaultConfig = defaultLogConfig,
45+
remoteStorageSystemEnable = true
46+
)
47+
}
48+
49+
@Test
50+
def testEarliestPendingUploadOffsetWhenNoSegmentsUploaded(): Unit = {
51+
// Append some records; no remote upload happened yet
52+
appendRecords(replicaManager, topicIdPartition, records)
53+
.onFire(response => assertEquals(Errors.NONE, response.error))
54+
55+
val expected = endPoint.fetchEarliestOffset(topicPartition, 0)
56+
val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
57+
assertEquals(expected, result)
58+
}
59+
60+
@Test
61+
def testEarliestPendingUploadOffsetWhenLocalStartGreaterThanStart(): Unit = {
62+
appendRecords(replicaManager, topicIdPartition, records)
63+
.onFire(response => assertEquals(Errors.NONE, response.error))
64+
65+
// Bump epoch and advance local log start offset without changing log start offset
66+
bumpLeaderEpoch()
67+
replicaManager.logManager.getLog(topicPartition).foreach(_.updateLocalLogStartOffset(3))
68+
69+
val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 1)
70+
assertEquals(new OffsetAndEpoch(-1L, -1), result)
71+
}
72+
73+
@Test
74+
def testEarliestPendingUploadOffsetWhenHighestRemoteOffsetKnown(): Unit = {
75+
appendRecords(replicaManager, topicIdPartition, records)
76+
.onFire(response => assertEquals(Errors.NONE, response.error))
77+
78+
// Highest remote is 1 => earliest pending should be max(1+1, logStart)
79+
val log = replicaManager.getPartitionOrException(topicPartition).localLogOrException
80+
log.updateHighestOffsetInRemoteStorage(1)
81+
82+
val expectedOffset = Math.max(2L, log.logStartOffset())
83+
val epoch = log.leaderEpochCache().epochForOffset(expectedOffset).orElse(0)
84+
85+
val result = endPoint.fetchEarliestPendingUploadOffset(topicPartition, 0)
86+
assertEquals(new OffsetAndEpoch(expectedOffset, epoch), result)
87+
}
88+
}

core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala

Lines changed: 3 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -17,104 +17,18 @@
1717

1818
package kafka.server
1919

20-
import kafka.server.QuotaFactory.QuotaManagers
21-
import kafka.server.metadata.KRaftMetadataCache
22-
import kafka.utils.{CoreUtils, Logging, TestUtils}
23-
import org.apache.kafka.common.compress.Compression
24-
import org.apache.kafka.common.{TopicIdPartition, Uuid}
2520
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
2621
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
27-
import org.apache.kafka.common.metadata.{FeatureLevelRecord, PartitionChangeRecord, PartitionRecord, TopicRecord}
28-
import org.apache.kafka.common.metrics.Metrics
2922
import org.apache.kafka.common.protocol.Errors
30-
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
31-
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
32-
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
33-
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, OffsetAndEpoch}
34-
import org.apache.kafka.server.network.BrokerEndPoint
35-
import org.apache.kafka.server.LeaderEndPoint
36-
import org.apache.kafka.server.util.{MockScheduler, MockTime}
37-
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogDirFailureChannel}
38-
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
23+
import org.apache.kafka.server.common.OffsetAndEpoch
24+
import org.junit.jupiter.api.Test
3925
import org.junit.jupiter.api.Assertions._
40-
import org.mockito.Mockito.mock
4126

42-
import java.io.File
4327
import java.util.{Map => JMap}
4428
import scala.collection.Map
4529
import scala.jdk.CollectionConverters._
4630

47-
class LocalLeaderEndPointTest extends Logging {
48-
49-
val time = new MockTime
50-
val topicId = Uuid.randomUuid()
51-
val topic = "test"
52-
val partition = 5
53-
val topicIdPartition = new TopicIdPartition(topicId, partition, topic)
54-
val topicPartition = topicIdPartition.topicPartition()
55-
val sourceBroker: BrokerEndPoint = new BrokerEndPoint(0, "localhost", 9092)
56-
var replicaManager: ReplicaManager = _
57-
var endPoint: LeaderEndPoint = _
58-
var quotaManager: QuotaManagers = _
59-
var image: MetadataImage = _
60-
61-
@BeforeEach
62-
def setUp(): Unit = {
63-
val props = TestUtils.createBrokerConfig(sourceBroker.id, port = sourceBroker.port)
64-
val config = KafkaConfig.fromProps(props)
65-
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)))
66-
val alterPartitionManager = mock(classOf[AlterPartitionManager])
67-
val metrics = new Metrics
68-
quotaManager = QuotaFactory.instantiate(config, metrics, time, "", "")
69-
replicaManager = new ReplicaManager(
70-
metrics = metrics,
71-
config = config,
72-
time = time,
73-
scheduler = new MockScheduler(time),
74-
logManager = mockLogMgr,
75-
quotaManagers = quotaManager,
76-
metadataCache = new KRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0),
77-
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
78-
alterPartitionManager = alterPartitionManager
79-
)
80-
81-
val delta = new MetadataDelta(MetadataImage.EMPTY)
82-
delta.replay(new FeatureLevelRecord()
83-
.setName(MetadataVersion.FEATURE_NAME)
84-
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
85-
)
86-
delta.replay(new TopicRecord()
87-
.setName(topic)
88-
.setTopicId(topicId)
89-
)
90-
delta.replay(new PartitionRecord()
91-
.setPartitionId(partition)
92-
.setTopicId(topicId)
93-
.setReplicas(java.util.List.of[Integer](sourceBroker.id))
94-
.setIsr(java.util.List.of[Integer](sourceBroker.id))
95-
.setLeader(sourceBroker.id)
96-
.setLeaderEpoch(0)
97-
.setPartitionEpoch(0)
98-
)
99-
100-
image = delta.apply(MetadataProvenance.EMPTY)
101-
replicaManager.applyDelta(delta.topicsDelta(), image)
102-
103-
replicaManager.getPartitionOrException(topicPartition)
104-
.localLogOrException
105-
endPoint = new LocalLeaderEndPoint(
106-
sourceBroker,
107-
config,
108-
replicaManager,
109-
QuotaFactory.UNBOUNDED_QUOTA
110-
)
111-
}
112-
113-
@AfterEach
114-
def tearDown(): Unit = {
115-
CoreUtils.swallow(replicaManager.shutdown(checkpointHW = false), this)
116-
CoreUtils.swallow(quotaManager.shutdown(), this)
117-
}
31+
class LocalLeaderEndPointTest extends LocalLeaderEndPointTestBase {
11832

11933
@Test
12034
def testFetchLatestOffset(): Unit = {
@@ -233,66 +147,4 @@ class LocalLeaderEndPointTest extends Logging {
233147
assertEquals(expected, result)
234148
}
235149

236-
private class CallbackResult[T] {
237-
private var value: Option[T] = None
238-
private var fun: Option[T => Unit] = None
239-
240-
private def hasFired: Boolean = {
241-
value.isDefined
242-
}
243-
244-
def fire(value: T): Unit = {
245-
this.value = Some(value)
246-
fun.foreach(f => f(value))
247-
}
248-
249-
def onFire(fun: T => Unit): CallbackResult[T] = {
250-
this.fun = Some(fun)
251-
if (this.hasFired) fire(value.get)
252-
this
253-
}
254-
}
255-
256-
private def bumpLeaderEpoch(): Unit = {
257-
val delta = new MetadataDelta(image)
258-
delta.replay(new PartitionChangeRecord()
259-
.setTopicId(topicId)
260-
.setPartitionId(partition)
261-
.setLeader(sourceBroker.id)
262-
)
263-
264-
image = delta.apply(MetadataProvenance.EMPTY)
265-
replicaManager.applyDelta(delta.topicsDelta, image)
266-
}
267-
268-
private def appendRecords(replicaManager: ReplicaManager,
269-
partition: TopicIdPartition,
270-
records: MemoryRecords,
271-
origin: AppendOrigin = AppendOrigin.CLIENT,
272-
requiredAcks: Short = -1): CallbackResult[PartitionResponse] = {
273-
val result = new CallbackResult[PartitionResponse]()
274-
def appendCallback(responses: scala.collection.Map[TopicIdPartition, PartitionResponse]): Unit = {
275-
val response = responses.get(partition)
276-
assertTrue(response.isDefined)
277-
result.fire(response.get)
278-
}
279-
280-
replicaManager.appendRecords(
281-
timeout = 1000,
282-
requiredAcks = requiredAcks,
283-
internalTopicsAllowed = false,
284-
origin = origin,
285-
entriesPerPartition = Map(partition -> records),
286-
responseCallback = appendCallback)
287-
288-
result
289-
}
290-
291-
private def records: MemoryRecords = {
292-
MemoryRecords.withRecords(Compression.NONE,
293-
new SimpleRecord("first message".getBytes()),
294-
new SimpleRecord("second message".getBytes()),
295-
new SimpleRecord("third message".getBytes()),
296-
)
297-
}
298150
}

0 commit comments

Comments
 (0)