|
33 | 33 | import java.util.Arrays;
|
34 | 34 | import java.util.BitSet;
|
35 | 35 | import java.util.Collection;
|
| 36 | +import java.util.Collections; |
36 | 37 | import java.util.HashMap;
|
37 | 38 | import java.util.List;
|
38 | 39 | import java.util.Map;
|
@@ -1612,7 +1613,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
|
1612 | 1613 | public void testAckModeCount() throws Exception {
|
1613 | 1614 | ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
|
1614 | 1615 | Consumer<Integer, String> consumer = mock(Consumer.class);
|
1615 |
| - given(cf.createConsumer(isNull(), eq("clientId"), isNull())).willReturn(consumer); |
| 1616 | + given(cf.createConsumer(isNull(), eq("clientId"))).willReturn(consumer); |
1616 | 1617 | TopicPartition topicPartition = new TopicPartition("foo", 0);
|
1617 | 1618 | final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records1 = new HashMap<>();
|
1618 | 1619 | records1.put(topicPartition, Arrays.asList(
|
@@ -1657,13 +1658,13 @@ public void testAckModeCount() throws Exception {
|
1657 | 1658 | ContainerProperties containerProps = new ContainerProperties(topicPartitionOffset);
|
1658 | 1659 | containerProps.setAckMode(AckMode.COUNT);
|
1659 | 1660 | containerProps.setAckCount(3);
|
1660 |
| - containerProps.setClientId("clientId"); |
1661 | 1661 | AtomicInteger recordCount = new AtomicInteger();
|
1662 | 1662 | containerProps.setMessageListener((MessageListener) r -> {
|
1663 | 1663 | recordCount.incrementAndGet();
|
1664 | 1664 | });
|
1665 | 1665 | KafkaMessageListenerContainer<Integer, String> container =
|
1666 | 1666 | new KafkaMessageListenerContainer<>(cf, containerProps);
|
| 1667 | + container.setClientIdSuffix("clientId"); |
1667 | 1668 | container.start();
|
1668 | 1669 | assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
|
1669 | 1670 | assertThat(recordCount.get()).isEqualTo(7);
|
|
0 commit comments