Skip to content

Commit 2cca235

Browse files
sobychackoartembilan
authored andcommitted
GH-3786: Remove duplicated trace header
Fixes: #3786 Issue link: #3786 When tracing is enabled, the KafkaRecordSenderContext was adding a new trace header without removing existing ones, resulting in multiple headers in the same record. This commit fixes the issue by Updating KafkaRecordSenderContext to remove existing trace headers before adding new ones. # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java
1 parent 829bf5e commit 2cca235

File tree

2 files changed

+93
-18
lines changed

2 files changed

+93
-18
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2024 the original author or authors.
2+
* Copyright 2022-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,13 +21,15 @@
2121

2222
import io.micrometer.observation.transport.SenderContext;
2323
import org.apache.kafka.clients.producer.ProducerRecord;
24+
import org.apache.kafka.common.header.Headers;
2425

2526
/**
2627
* {@link SenderContext} for {@link ProducerRecord}s.
2728
*
2829
* @author Gary Russell
2930
* @author Christian Mergenthaler
3031
* @author Wang Zhiyang
32+
* @author Soby Chacko
3133
*
3234
* @since 3.0
3335
*
@@ -39,8 +41,12 @@ public class KafkaRecordSenderContext extends SenderContext<ProducerRecord<?, ?>
3941
private final ProducerRecord<?, ?> record;
4042

4143
public KafkaRecordSenderContext(ProducerRecord<?, ?> record, String beanName, Supplier<String> clusterId) {
42-
super((carrier, key, value) -> record.headers().add(key,
43-
value == null ? null : value.getBytes(StandardCharsets.UTF_8)));
44+
super((carrier, key, value) -> {
45+
Headers headers = record.headers();
46+
headers.remove(key);
47+
headers.add(key, value == null ? null : value.getBytes(StandardCharsets.UTF_8));
48+
});
49+
4450
setCarrier(record);
4551
this.beanName = beanName;
4652
this.record = record;

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 84 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.support.micrometer;
1818

19+
import java.nio.charset.StandardCharsets;
1920
import java.util.Arrays;
2021
import java.util.Deque;
2122
import java.util.List;
@@ -25,6 +26,7 @@
2526
import java.util.concurrent.TimeUnit;
2627
import java.util.concurrent.TimeoutException;
2728
import java.util.concurrent.atomic.AtomicReference;
29+
import java.util.stream.StreamSupport;
2830

2931
import io.micrometer.common.KeyValues;
3032
import io.micrometer.core.instrument.MeterRegistry;
@@ -53,6 +55,7 @@
5355
import org.apache.kafka.common.errors.InvalidTopicException;
5456
import org.apache.kafka.common.header.Header;
5557
import org.apache.kafka.common.header.Headers;
58+
import org.apache.kafka.common.header.internals.RecordHeader;
5659
import org.junit.jupiter.api.Test;
5760

5861
import org.springframework.beans.factory.annotation.Autowired;
@@ -72,6 +75,7 @@
7275
import org.springframework.kafka.core.KafkaTemplate;
7376
import org.springframework.kafka.core.ProducerFactory;
7477
import org.springframework.kafka.listener.MessageListenerContainer;
78+
import org.springframework.kafka.support.ProducerListener;
7579
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
7680
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
7781
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -97,9 +101,9 @@
97101
* @since 3.0
98102
*/
99103
@SpringJUnitConfig
100-
@EmbeddedKafka(topics = { ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
104+
@EmbeddedKafka(topics = {ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
101105
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_RUNTIME_EXCEPTION,
102-
ObservationTests.OBSERVATION_ERROR }, partitions = 1)
106+
ObservationTests.OBSERVATION_ERROR, ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE}, partitions = 1)
103107
@DirtiesContext
104108
public class ObservationTests {
105109

@@ -113,18 +117,21 @@ public class ObservationTests {
113117

114118
public final static String OBSERVATION_ERROR = "observation.error";
115119

120+
public final static String OBSERVATION_TRACEPARENT_DUPLICATE = "observation.traceparent.duplicate";
121+
116122
@Test
117123
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
118124
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
119125
@Autowired MeterRegistry meterRegistry, @Autowired EmbeddedKafkaBroker broker,
120126
@Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired KafkaAdmin admin,
121127
@Autowired @Qualifier("customTemplate") KafkaTemplate<Integer, String> customTemplate,
122128
@Autowired Config config)
123-
throws InterruptedException, ExecutionException, TimeoutException {
129+
throws InterruptedException, ExecutionException, TimeoutException {
124130

125131
AtomicReference<SimpleSpan> spanFromCallback = new AtomicReference<>();
126132

127133
template.setProducerInterceptor(new ProducerInterceptor<>() {
134+
128135
@Override
129136
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
130137
tracer.currentSpanCustomizer().tag("key", "value");
@@ -309,10 +316,10 @@ private void assertThatTemplateHasTimerWithNameAndTags(MeterRegistryAssert meter
309316

310317
meterRegistryAssert.hasTimerWithNameAndTags("spring.kafka.template",
311318
KeyValues.of("spring.kafka.template.name", "template",
312-
"messaging.operation", "publish",
313-
"messaging.system", "kafka",
314-
"messaging.destination.kind", "topic",
315-
"messaging.destination.name", destName)
319+
"messaging.operation", "publish",
320+
"messaging.system", "kafka",
321+
"messaging.destination.kind", "topic",
322+
"messaging.destination.name", destName)
316323
.and(keyValues));
317324
}
318325

@@ -321,12 +328,12 @@ private void assertThatListenerHasTimerWithNameAndTags(MeterRegistryAssert meter
321328

322329
meterRegistryAssert.hasTimerWithNameAndTags("spring.kafka.listener",
323330
KeyValues.of(
324-
"messaging.kafka.consumer.group", consumerGroup,
325-
"messaging.operation", "receive",
326-
"messaging.source.kind", "topic",
327-
"messaging.source.name", destName,
328-
"messaging.system", "kafka",
329-
"spring.kafka.listener.id", listenerId)
331+
"messaging.kafka.consumer.group", consumerGroup,
332+
"messaging.operation", "receive",
333+
"messaging.source.kind", "topic",
334+
"messaging.source.name", destName,
335+
"messaging.system", "kafka",
336+
"spring.kafka.listener.id", listenerId)
330337
.and(keyValues));
331338
}
332339

@@ -369,7 +376,7 @@ void observationRuntimeException(@Autowired ExceptionListener listener, @Autowir
369376
void observationErrorException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
370377
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> errorTemplate,
371378
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
372-
throws ExecutionException, InterruptedException, TimeoutException {
379+
throws ExecutionException, InterruptedException, TimeoutException {
373380

374381
errorTemplate.send(OBSERVATION_ERROR, "testError").get(10, TimeUnit.SECONDS);
375382
assertThat(listener.latch5.await(10, TimeUnit.SECONDS)).isTrue();
@@ -394,6 +401,63 @@ void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig(
394401
assertThat(template.getKafkaAdmin()).isSameAs(kafkaAdmin);
395402
}
396403

404+
@Test
405+
void verifyKafkaRecordSenderContextTraceParentHandling() {
406+
String initialTraceParent = "traceparent-from-previous";
407+
String updatedTraceParent = "traceparent-current";
408+
ProducerRecord<Integer, String> record = new ProducerRecord<>("test-topic", "test-value");
409+
record.headers().add("traceparent", initialTraceParent.getBytes(StandardCharsets.UTF_8));
410+
411+
// Create the context and update the traceparent
412+
KafkaRecordSenderContext context = new KafkaRecordSenderContext(
413+
record,
414+
"test-bean",
415+
() -> "test-cluster"
416+
);
417+
context.getSetter().set(record, "traceparent", updatedTraceParent);
418+
419+
Iterable<Header> traceparentHeaders = record.headers().headers("traceparent");
420+
421+
List<String> headerValues = StreamSupport.stream(traceparentHeaders.spliterator(), false)
422+
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
423+
.toList();
424+
425+
// Verify there's only one traceparent header and it contains the updated value
426+
assertThat(headerValues).containsExactly(updatedTraceParent);
427+
}
428+
429+
@Test
430+
void verifyTraceParentHeader(@Autowired KafkaTemplate<Integer, String> template,
431+
@Autowired SimpleTracer tracer) throws Exception {
432+
CompletableFuture<ProducerRecord<Integer, String>> producerRecordFuture = new CompletableFuture<>();
433+
template.setProducerListener(new ProducerListener<>() {
434+
435+
@Override
436+
public void onSuccess(ProducerRecord<Integer, String> producerRecord, RecordMetadata recordMetadata) {
437+
producerRecordFuture.complete(producerRecord);
438+
}
439+
});
440+
String initialTraceParent = "traceparent-from-previous";
441+
Header header = new RecordHeader("traceparent", initialTraceParent.getBytes(StandardCharsets.UTF_8));
442+
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(
443+
OBSERVATION_TRACEPARENT_DUPLICATE,
444+
null, null, null,
445+
"test-value",
446+
List.of(header)
447+
);
448+
449+
template.send(producerRecord).get(10, TimeUnit.SECONDS);
450+
ProducerRecord<Integer, String> recordResult = producerRecordFuture.get(10, TimeUnit.SECONDS);
451+
452+
Iterable<Header> traceparentHeaders = recordResult.headers().headers("traceparent");
453+
assertThat(traceparentHeaders).hasSize(1);
454+
455+
String traceparentValue = new String(traceparentHeaders.iterator().next().value(), StandardCharsets.UTF_8);
456+
assertThat(traceparentValue).isEqualTo("traceparent-from-propagator");
457+
458+
tracer.getSpans().clear();
459+
}
460+
397461
@Configuration
398462
@EnableKafka
399463
public static class Config {
@@ -523,6 +587,9 @@ public List<String> fields() {
523587
public <C> void inject(TraceContext context, @Nullable C carrier, Setter<C> setter) {
524588
setter.set(carrier, "foo", "some foo value");
525589
setter.set(carrier, "bar", "some bar value");
590+
591+
// Add a traceparent header to simulate W3C trace context
592+
setter.set(carrier, "traceparent", "traceparent-from-propagator");
526593
}
527594

528595
// This is called on the consumer side when the message is consumed
@@ -531,7 +598,9 @@ public <C> void inject(TraceContext context, @Nullable C carrier, Setter<C> sett
531598
public <C> Span.Builder extract(C carrier, Getter<C> getter) {
532599
String foo = getter.get(carrier, "foo");
533600
String bar = getter.get(carrier, "bar");
534-
return tracer.spanBuilder().tag("foo", foo).tag("bar", bar);
601+
return tracer.spanBuilder()
602+
.tag("foo", foo)
603+
.tag("bar", bar);
535604
}
536605
};
537606
}

0 commit comments

Comments
 (0)