Skip to content

Commit 594d005

Browse files
garyrussellartembilan
authored andcommitted
DefaultKafkaHeaderMapper Improvement
String-valued headers are now mapped as raw strings by default instead of being encoded as JSON strings. This can be disabled until all applications have been upgraded to use 2.3 or later. * Fix use of `encodeStrings`; remove bogus `SuppressWarnings`.
1 parent ae711a8 commit 594d005

File tree

6 files changed

+90
-37
lines changed

6 files changed

+90
-37
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java

Lines changed: 58 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.nio.charset.StandardCharsets;
2121
import java.util.Arrays;
2222
import java.util.HashMap;
23-
import java.util.Iterator;
2423
import java.util.LinkedHashSet;
2524
import java.util.List;
2625
import java.util.Map;
@@ -60,6 +59,8 @@
6059
*/
6160
public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
6261

62+
private static final String JAVA_LANG_STRING = "java.lang.String";
63+
6364
private static final List<String> DEFAULT_TRUSTED_PACKAGES =
6465
Arrays.asList(
6566
"java.lang",
@@ -79,6 +80,8 @@ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
7980

8081
private final Set<String> toStringClasses = new LinkedHashSet<>();
8182

83+
private boolean encodeStrings;
84+
8285
/**
8386
* Construct an instance with the default object mapper and default header patterns
8487
* for outbound headers; all inbound headers are mapped. The default pattern list is
@@ -171,6 +174,22 @@ protected Set<String> getToStringClasses() {
171174
return this.toStringClasses;
172175
}
173176

177+
protected boolean isEncodeStrings() {
178+
return this.encodeStrings;
179+
}
180+
181+
/**
182+
* Set to true to encode String-valued headers as JSON ("..."), by default just the
183+
* raw String value is converted to a byte array using the configured charset. Set to
184+
* true if a consumer of the outbound record is using Spring for Apache Kafka version
185+
* less than 2.3
186+
* @param encodeStrings true to encode (default false).
187+
* @since 2.3
188+
*/
189+
public void setEncodeStrings(boolean encodeStrings) {
190+
this.encodeStrings = encodeStrings;
191+
}
192+
174193
/**
175194
* Add packages to the trusted packages list (default {@code java.util, java.lang}) used
176195
* when constructing objects from JSON.
@@ -207,9 +226,9 @@ public void addToStringClasses(String... classNames) {
207226
public void fromHeaders(MessageHeaders headers, Headers target) {
208227
final Map<String, String> jsonHeaders = new HashMap<>();
209228
final ObjectMapper headerObjectMapper = getObjectMapper();
210-
headers.forEach((key, val) -> {
211-
if (matches(key, val)) {
212-
Object valueToAdd = headerValueToAddOut(key, val);
229+
headers.forEach((key, rawValue) -> {
230+
if (matches(key, rawValue)) {
231+
Object valueToAdd = headerValueToAddOut(key, rawValue);
213232
if (valueToAdd instanceof byte[]) {
214233
target.add(new RecordHeader(key, (byte[]) valueToAdd));
215234
}
@@ -218,14 +237,20 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
218237
Object value = valueToAdd;
219238
String className = valueToAdd.getClass().getName();
220239
if (this.toStringClasses.contains(className)) {
221-
value = valueToAdd.toString();
222-
className = "java.lang.String";
240+
valueToAdd = valueToAdd.toString();
241+
className = JAVA_LANG_STRING;
242+
}
243+
if (!this.encodeStrings && valueToAdd instanceof String) {
244+
target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset())));
245+
className = JAVA_LANG_STRING;
246+
}
247+
else {
248+
target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(value)));
223249
}
224-
target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(value)));
225250
jsonHeaders.put(key, className);
226251
}
227252
catch (Exception e) {
228-
logger.debug(e, () -> "Could not map " + key + " with type " + valueToAdd.getClass().getName());
253+
logger.debug(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName());
229254
}
230255
}
231256
}
@@ -258,20 +283,25 @@ public void toHeaders(Headers source, final Map<String, Object> headers) {
258283
catch (Exception e) {
259284
logger.error(e, () -> "Could not load class for header: " + header.key());
260285
}
261-
if (trusted) {
262-
try {
263-
Object value = decodeValue(header, type);
264-
headers.put(header.key(), value);
265-
}
266-
catch (IOException e) {
267-
logger.error(e, () ->
268-
"Could not decode json type: " + new String(header.value()) + " for key: "
269-
+ header.key());
270-
headers.put(header.key(), header.value());
271-
}
286+
if (String.class.equals(type) && header.value().length > 0 && header.value()[0] != '"') {
287+
headers.put(header.key(), new String(header.value(), getCharset()));
272288
}
273289
else {
274-
headers.put(header.key(), new NonTrustedHeaderType(header.value(), requestedType));
290+
if (trusted) {
291+
try {
292+
Object value = decodeValue(header, type);
293+
headers.put(header.key(), value);
294+
}
295+
catch (IOException e) {
296+
logger.error(e, () ->
297+
"Could not decode json type: " + new String(header.value()) + " for key: "
298+
+ header.key());
299+
headers.put(header.key(), header.value());
300+
}
301+
}
302+
else {
303+
headers.put(header.key(), new NonTrustedHeaderType(header.value(), requestedType));
304+
}
275305
}
276306
}
277307
else {
@@ -304,18 +334,14 @@ private Object decodeValue(Header h, Class<?> type) throws IOException, LinkageE
304334
@Nullable
305335
private Map<String, String> decodeJsonTypes(Headers source) {
306336
Map<String, String> types = null;
307-
Iterator<Header> iterator = source.iterator();
308-
ObjectMapper headerObjectMapper = getObjectMapper();
309-
while (iterator.hasNext()) {
310-
Header next = iterator.next();
311-
if (next.key().equals(JSON_TYPES)) {
312-
try {
313-
types = headerObjectMapper.readValue(next.value(), Map.class);
314-
}
315-
catch (IOException e) {
316-
logger.error(e, () -> "Could not decode json types: " + new String(next.value()));
317-
}
318-
break;
337+
Header jsonTypes = source.lastHeader(JSON_TYPES);
338+
if (jsonTypes != null) {
339+
ObjectMapper headerObjectMapper = getObjectMapper();
340+
try {
341+
types = headerObjectMapper.readValue(jsonTypes.value(), Map.class);
342+
}
343+
catch (IOException e) {
344+
logger.error(e, () -> "Could not decode json types: " + new String(jsonTypes.value()));
319345
}
320346
}
321347
return types;

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ public void testWithMessage() {
208208
assertThat(iterator.hasNext()).isTrue();
209209
Header next = iterator.next();
210210
assertThat(next.key()).isEqualTo("foo");
211-
assertThat(new String(next.value())).isEqualTo("\"bar\"");
211+
assertThat(new String(next.value())).isEqualTo("bar");
212212
assertThat(iterator.hasNext()).isTrue();
213213
next = iterator.next();
214214
assertThat(next.key()).isEqualTo(DefaultKafkaHeaderMapper.JSON_TYPES);

spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,15 +188,29 @@ public void testJsonStringConvert() {
188188
assertThat(target).containsExactlyInAnyOrder(
189189
new RecordHeader(DefaultKafkaHeaderMapper.JSON_TYPES,
190190
"{\"thisOnesAString\":\"java.lang.String\"}".getBytes()),
191-
new RecordHeader("thisOnesAString", "\"foo\"".getBytes()),
191+
new RecordHeader("thisOnesAString", "foo".getBytes()),
192192
new RecordHeader("alwaysRaw", "baz".getBytes()),
193193
new RecordHeader("thisOnesBytes", "bar".getBytes()));
194194
headersMap.clear();
195+
target.add(new RecordHeader(DefaultKafkaHeaderMapper.JSON_TYPES,
196+
("{\"thisOnesAString\":\"java.lang.String\","
197+
+ "\"backwardCompatible\":\"java.lang.String\"}").getBytes()));
198+
target.add(new RecordHeader("backwardCompatible", "\"qux\"".getBytes()));
195199
mapper.toHeaders(target, headersMap);
196200
assertThat(headersMap).contains(
197201
entry("thisOnesAString", "foo"),
198202
entry("thisOnesBytes", "bar".getBytes()),
199-
entry("alwaysRaw", "baz".getBytes()));
203+
entry("alwaysRaw", "baz".getBytes()),
204+
entry("backwardCompatible", "qux"));
205+
mapper.setEncodeStrings(true);
206+
target = new RecordHeaders();
207+
mapper.fromHeaders(headers, target);
208+
assertThat(target).containsExactlyInAnyOrder(
209+
new RecordHeader(DefaultKafkaHeaderMapper.JSON_TYPES,
210+
"{\"thisOnesAString\":\"java.lang.String\"}".getBytes()),
211+
new RecordHeader("thisOnesAString", "\"foo\"".getBytes()),
212+
new RecordHeader("alwaysRaw", "baz".getBytes()),
213+
new RecordHeader("thisOnesBytes", "bar".getBytes()));
200214
}
201215

202216
@Test

spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingSerializationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private void doTest(DelegatingSerializer serializer, DelegatingDeserializer dese
9898
Collections.singletonMap(DelegatingSerializer.SERIALIZATION_SELECTOR, "string"));
9999
new DefaultKafkaHeaderMapper().fromHeaders(messageHeaders, headers);
100100
assertThat(headers.lastHeader(DelegatingSerializer.SERIALIZATION_SELECTOR).value())
101-
.isEqualTo(new byte[] { '"', 's', 't', 'r', 'i', 'n', 'g', '"' });
101+
.isEqualTo(new byte[] { 's', 't', 'r', 'i', 'n', 'g' });
102102
serialized = serializer.serialize("foo", headers, "bar");
103103
assertThat(serialized).isEqualTo(new byte[] { 'b', 'a', 'r' });
104104
assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo("bar");

src/reference/asciidoc/kafka.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3221,6 +3221,14 @@ The `DefaultKafkaHeaderMapper` has a method called `addToStringClasses()` that l
32213221
During inbound mapping, they are mapped as `String`.
32223222
By default, only `org.springframework.util.MimeType` and `org.springframework.http.MediaType` are mapped this way.
32233223

3224+
NOTE: Starting with version 2.3, handling of String-valued headers is simplified.
3225+
Such headers are no longer JSON encoded, by default (i.e. they do not have enclosing `"..."` added).
3226+
The type is still added to the JSON_TYPES header so the receiving system can convert back to a String (from `byte[]`).
3227+
The mapper can handle (decode) headers produced by older versions (it checks for a leading `"`); in this way an application using 2.3 can consume records from older versions.
3228+
3229+
IMPORTANT: To be compatible with earlier versions, set `encodeStrings` to `true`, if records produced by a version using 2.3 might be consumed by applications using earlier versions.
3230+
When all applications are using 2.3 or higher, you can leave the property at its default value of `false`.
3231+
32243232
[[tombstones]]
32253233
==== Null Payloads and Log Compaction of 'Tombstone' Records
32263234

src/reference/asciidoc/whats-new.adoc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ Please submit GitHub issues and/or pull requests for additional entries in that
1212
[[kafka-client-2.2]]
1313
==== Kafka Client Version
1414

15-
This version requires the 2.2.0 `kafka-clients` or higher.
15+
This version requires the 2.3.0 `kafka-clients` or higher.
1616

1717
==== Class/Package Changes
1818

@@ -148,3 +148,8 @@ See <<kafka-testing-embeddedkafka-annotation>> for more information.
148148

149149
You can now customize the header names for correlation, reply topic and reply partition.
150150
See <<replying-template>> for more information.
151+
152+
==== Header Mapper Changes
153+
154+
The `DefaultKafkaHeaderMapper` no longer encodes simple String-valued headers as JSON.
155+
See <<header-mapping>> for more information.

0 commit comments

Comments
 (0)