Skip to content

Commit 8126e4b

Browse files
GH-3879: Add cache to optimize header match
Fixes: #3879 Issue link: #3879 What Add a LRU cache for pattern match of KafkaHeaderMapper. Why? To improve CPU usage used by pattern match of KafkaHeaderMapper. Commonly, many Kafka records in the same topic will have the same header name. Currently, Pattern Match has O(M*N) time complexity, where M is pattern length, N is String length. If results of patterns match are cached and KafkaHeaderMapper uses it, KafkaHeaderMapper can expect improvement in terms of CPU usage. * Remove useless import Signed-off-by: Sanghyeok An <[email protected]>
1 parent f8eb136 commit 8126e4b

File tree

2 files changed

+119
-2
lines changed

2 files changed

+119
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.springframework.core.log.LogAccessor;
3737
import org.springframework.messaging.MessageHeaders;
3838
import org.springframework.util.Assert;
39+
import org.springframework.util.ConcurrentLruCache;
3940
import org.springframework.util.ObjectUtils;
4041
import org.springframework.util.PatternMatchUtils;
4142

@@ -65,8 +66,14 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper {
6566

6667
private final List<HeaderMatcher> matchers = new ArrayList<>();
6768

69+
private final ConcurrentLruCache<String, Boolean> matcherResultCache =
70+
new ConcurrentLruCache<>(1000, this::doesMatchInternal);
71+
6872
private final List<HeaderMatcher> multiValueHeaderMatchers = new ArrayList<>();
6973

74+
private final ConcurrentLruCache<String, Boolean> multiValueMatcherResultCache =
75+
new ConcurrentLruCache<>(1000, this::doesMatchMultiValueHeaderInternal);
76+
7077
private final Map<String, Boolean> rawMappedHeaders = new HashMap<>();
7178

7279
{
@@ -240,13 +247,17 @@ protected boolean matchesForInbound(String header) {
240247
}
241248

242249
private boolean doesMatch(String header) {
250+
return this.matcherResultCache.get(header);
251+
}
252+
253+
private boolean doesMatchInternal(String header) {
243254
for (HeaderMatcher matcher : this.matchers) {
244255
if (matcher.matchHeader(header)) {
245256
return !matcher.isNegated();
246257
}
247258
}
248259
this.logger.debug(() -> MessageFormat.format("headerName=[{0}] WILL NOT be mapped; matched no patterns",
249-
header));
260+
header));
250261
return false;
251262
}
252263

@@ -266,12 +277,20 @@ protected Object headerValueToAddOut(String key, Object value) {
266277
}
267278

268279
/**
269-
* Check whether the header value should be mapped to multiple values.
280+
* Determine whether the given header name should be mapped to multiple values.
281+
* This method first checks if the mapping result is already cached.
282+
* If a cached result exists, it is returned immediately.
283+
* If not, {@code doesMatchInternal(headerName)} is called to compute the result,
284+
* which is then cached and returned.
270285
* @param headerName the header name.
271286
* @return True for multiple values at the same key.
272287
* @since 4.0
273288
*/
274289
protected boolean doesMatchMultiValueHeader(String headerName) {
290+
return this.multiValueMatcherResultCache.get(headerName);
291+
}
292+
293+
private boolean doesMatchMultiValueHeaderInternal(String headerName) {
275294
for (HeaderMatcher headerMatcher : this.multiValueHeaderMatchers) {
276295
if (headerMatcher.matchHeader(headerName)) {
277296
return true;

spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.kafka.common.header.internals.RecordHeaders;
3232
import org.assertj.core.api.InstanceOfAssertFactories;
3333
import org.junit.jupiter.api.Test;
34+
import org.junit.jupiter.params.ParameterizedTest;
35+
import org.junit.jupiter.params.provider.ValueSource;
3436

3537
import org.springframework.core.log.LogAccessor;
3638
import org.springframework.kafka.retrytopic.RetryTopicHeaders;
@@ -413,6 +415,102 @@ void multiValueHeaderToTest() {
413415
.containsExactly(multiValueWildCardHeader2Value1, multiValueWildCardHeader2Value2);
414416
}
415417

418+
@ParameterizedTest
419+
@ValueSource(ints = {500, 1000, 2000})
420+
void hugeNumberOfSingleValueHeaderToTest(int numberOfSingleValueHeaderCount) {
421+
// GIVEN
422+
Headers rawHeaders = new RecordHeaders();
423+
424+
String multiValueHeader1 = "test-multi-value1";
425+
byte[] multiValueHeader1Value1 = { 0, 0, 0, 0 };
426+
byte[] multiValueHeader1Value2 = { 0, 0, 0, 1 };
427+
428+
rawHeaders.add(multiValueHeader1, multiValueHeader1Value1);
429+
rawHeaders.add(multiValueHeader1, multiValueHeader1Value2);
430+
431+
byte[] deliveryAttemptHeaderValue = { 0, 0, 0, 1 };
432+
byte[] originalOffsetHeaderValue = { 0, 0, 0, 2 };
433+
byte[] defaultHeaderAttemptsValues = { 0, 0, 0, 5 };
434+
435+
rawHeaders.add(KafkaHeaders.DELIVERY_ATTEMPT, deliveryAttemptHeaderValue);
436+
rawHeaders.add(KafkaHeaders.ORIGINAL_OFFSET, originalOffsetHeaderValue);
437+
rawHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, defaultHeaderAttemptsValues);
438+
439+
byte[] singleValueHeaderValue = { 0, 0, 0, 6 };
440+
for (int i = 0; i < numberOfSingleValueHeaderCount; i++) {
441+
String singleValueHeader = "test-single-value" + i;
442+
rawHeaders.add(singleValueHeader, singleValueHeaderValue);
443+
}
444+
445+
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
446+
mapper.setMultiValueHeaderPatterns(multiValueHeader1);
447+
448+
// WHEN
449+
Map<String, Object> mappedHeaders = new HashMap<>();
450+
mapper.toHeaders(rawHeaders, mappedHeaders);
451+
452+
// THEN
453+
assertThat(mappedHeaders.get(KafkaHeaders.DELIVERY_ATTEMPT)).isEqualTo(1);
454+
assertThat(mappedHeaders.get(KafkaHeaders.ORIGINAL_OFFSET)).isEqualTo(originalOffsetHeaderValue);
455+
assertThat(mappedHeaders.get(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS)).isEqualTo(defaultHeaderAttemptsValues);
456+
457+
for (int i = 0; i < numberOfSingleValueHeaderCount; i++) {
458+
String singleValueHeader = "test-single-value" + i;
459+
assertThat(mappedHeaders.get(singleValueHeader)).isEqualTo(singleValueHeaderValue);
460+
}
461+
462+
assertThat(mappedHeaders)
463+
.extractingByKey(multiValueHeader1, InstanceOfAssertFactories.list(byte[].class))
464+
.containsExactly(multiValueHeader1Value1, multiValueHeader1Value2);
465+
}
466+
467+
@ParameterizedTest
468+
@ValueSource(ints = {500, 1000, 2000})
469+
void hugeNumberOfMultiValueHeaderToTest(int numberOfMultiValueHeaderCount) {
470+
// GIVEN
471+
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
472+
Headers rawHeaders = new RecordHeaders();
473+
474+
byte[] multiValueHeader1Value1 = { 0, 0, 0, 0 };
475+
byte[] multiValueHeader1Value2 = { 0, 0, 0, 1 };
476+
477+
for (int i = 0; i < numberOfMultiValueHeaderCount; i++) {
478+
String multiValueHeader = "test-multi-value" + i;
479+
mapper.setMultiValueHeaderPatterns(multiValueHeader);
480+
rawHeaders.add(multiValueHeader, multiValueHeader1Value1);
481+
rawHeaders.add(multiValueHeader, multiValueHeader1Value2);
482+
}
483+
484+
byte[] deliveryAttemptHeaderValue = { 0, 0, 0, 1 };
485+
byte[] originalOffsetHeaderValue = { 0, 0, 0, 2 };
486+
byte[] defaultHeaderAttemptsValues = { 0, 0, 0, 5 };
487+
488+
rawHeaders.add(KafkaHeaders.DELIVERY_ATTEMPT, deliveryAttemptHeaderValue);
489+
rawHeaders.add(KafkaHeaders.ORIGINAL_OFFSET, originalOffsetHeaderValue);
490+
rawHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, defaultHeaderAttemptsValues);
491+
492+
String singleValueHeader = "test-single-value";
493+
byte[] singleValueHeaderValue = { 0, 0, 0, 6 };
494+
rawHeaders.add(singleValueHeader, singleValueHeaderValue);
495+
496+
// WHEN
497+
Map<String, Object> mappedHeaders = new HashMap<>();
498+
mapper.toHeaders(rawHeaders, mappedHeaders);
499+
500+
// THEN
501+
assertThat(mappedHeaders.get(KafkaHeaders.DELIVERY_ATTEMPT)).isEqualTo(1);
502+
assertThat(mappedHeaders.get(KafkaHeaders.ORIGINAL_OFFSET)).isEqualTo(originalOffsetHeaderValue);
503+
assertThat(mappedHeaders.get(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS)).isEqualTo(defaultHeaderAttemptsValues);
504+
assertThat(mappedHeaders.get(singleValueHeader)).isEqualTo(singleValueHeaderValue);
505+
506+
for (int i = 0; i < numberOfMultiValueHeaderCount; i++) {
507+
String multiValueHeader = "test-multi-value" + i;
508+
assertThat(mappedHeaders)
509+
.extractingByKey(multiValueHeader, InstanceOfAssertFactories.list(byte[].class))
510+
.containsExactly(multiValueHeader1Value1, multiValueHeader1Value2);
511+
}
512+
}
513+
416514
@Test
417515
void multiValueHeaderFromTest() {
418516
// GIVEN

0 commit comments

Comments
 (0)