Skip to content

Commit 1d0ea10

Browse files
garyrussellartembilan
authored andcommitted
GH-1139: Json Serialization: Add Fluent API
Resolves #1139
1 parent 5ce4df9 commit 1d0ea10

File tree

8 files changed

+243
-5
lines changed

8 files changed

+243
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonDeserializer.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,4 +463,51 @@ public void close() {
463463
// No-op
464464
}
465465

466+
// Fluent API
467+
468+
/**
469+
* Designate this deserializer for deserializing keys (default is values); only
470+
* applies if the default type mapper is used.
471+
* @return the deserializer.
472+
* @since 2.3
473+
*/
474+
public JsonDeserializer<T> forKeys() {
475+
setUseTypeMapperForKey(true);
476+
return this;
477+
}
478+
479+
/**
480+
* Don't remove type information headers.
481+
* @return the deserializer.
482+
* @since 2.3
483+
* @see #setRemoveTypeHeaders(boolean)
484+
*/
485+
public JsonDeserializer<T> dontRemoveTypeHeaders() {
486+
setRemoveTypeHeaders(false);
487+
return this;
488+
}
489+
490+
/**
491+
* Ignore type information headers and use the configured target class.
492+
* @return the deserializer.
493+
* @since 2.3
494+
* @see #setUseTypeHeaders(boolean)
495+
*/
496+
public JsonDeserializer<T> ignoreTypeHeaders() {
497+
setUseTypeHeaders(false);
498+
return this;
499+
}
500+
501+
/**
502+
* Use the supplied {@link Jackson2JavaTypeMapper}.
503+
* @param mapper the mapper.
504+
* @return the deserializer.
505+
* @since 2.3
506+
* @see #setTypeMapper(Jackson2JavaTypeMapper)
507+
*/
508+
public JsonDeserializer<T> typeMapper(Jackson2JavaTypeMapper mapper) {
509+
setTypeMapper(mapper);
510+
return this;
511+
}
512+
466513
}

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerde.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.springframework.core.ResolvableType;
2626
import org.springframework.kafka.support.JacksonUtils;
27+
import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper;
2728
import org.springframework.lang.Nullable;
2829
import org.springframework.util.Assert;
2930

@@ -111,10 +112,65 @@ public Deserializer<T> deserializer() {
111112
* @param isKey Use key type headers if true
112113
* @return the JsonSerde
113114
* @since 2.1.3
115+
* @deprecated in favor of {@link #forKeys()}.
114116
*/
117+
@Deprecated
115118
public JsonSerde<T> setUseTypeMapperForKey(boolean isKey) {
116-
this.jsonSerializer.setUseTypeMapperForKey(isKey);
117-
this.jsonDeserializer.setUseTypeMapperForKey(isKey);
119+
return forKeys();
120+
}
121+
122+
// Fluent API
123+
124+
/**
125+
* Designate this Serde for serializing/deserializing keys (default is values).
126+
* @return the serde.
127+
* @since 2.3
128+
*/
129+
public JsonSerde<T> forKeys() {
130+
this.jsonSerializer.forKeys();
131+
this.jsonDeserializer.forKeys();
132+
return this;
133+
}
134+
135+
/**
136+
* Configure the serializer to not add type information.
137+
* @return the serde.
138+
* @since 2.3
139+
*/
140+
public JsonSerde<T> noTypeInfo() {
141+
this.jsonSerializer.noTypeInfo();
142+
return this;
143+
}
144+
145+
/**
146+
* Don't remove type information headers after deserialization.
147+
* @return the serde.
148+
* @since 2.3
149+
*/
150+
public JsonSerde<T> dontRemoveTypeHeaders() {
151+
this.jsonDeserializer.dontRemoveTypeHeaders();
152+
return this;
153+
}
154+
155+
/**
156+
* Ignore type information headers and use the configured target class.
157+
* @return the serde.
158+
* @since 2.3
159+
*/
160+
public JsonSerde<T> ignoreTypeHeaders() {
161+
this.jsonDeserializer.ignoreTypeHeaders();
162+
return this;
163+
}
164+
165+
/**
166+
* Use the supplied {@link Jackson2JavaTypeMapper}.
167+
* @param mapper the mapper.
168+
* @return the serde.
169+
* @since 2.3
170+
*/
171+
public JsonSerde<T> typeMapper(Jackson2JavaTypeMapper mapper) {
172+
this.jsonSerializer.setTypeMapper(mapper);
173+
this.jsonDeserializer.setTypeMapper(mapper);
118174
return this;
119175
}
120176

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerializer.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,4 +186,41 @@ public void close() {
186186
// No-op
187187
}
188188

189+
// Fluent API
190+
191+
/**
192+
* Designate this serializer for serializing keys (default is values); only applies if
193+
* the default type mapper is used.
194+
* @return the serializer.
195+
* @since 2.3
196+
* @see #setUseTypeMapperForKey(boolean)
197+
*/
198+
public JsonSerializer<T> forKeys() {
199+
setUseTypeMapperForKey(true);
200+
return this;
201+
}
202+
203+
/**
204+
* Do not include type info headers.
205+
* @return the serializer.
206+
* @since 2.3
207+
* @see #setAddTypeInfo(boolean)
208+
*/
209+
public JsonSerializer<T> noTypeInfo() {
210+
setAddTypeInfo(false);
211+
return this;
212+
}
213+
214+
/**
215+
* Use the supplied {@link Jackson2JavaTypeMapper}.
216+
* @param mapper the mapper.
217+
* @return the serializer.
218+
* @since 2.3
219+
* @see #setTypeMapper(Jackson2JavaTypeMapper)
220+
*/
221+
public JsonSerializer<T> typeMapper(Jackson2JavaTypeMapper mapper) {
222+
setTypeMapper(mapper);
223+
return this;
224+
}
225+
189226
}

spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsJsonSerializationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public class KafkaStreamsJsonSerializationTests {
7979
public static final String OBJECT_OUTPUT_TOPIC = "object-output-topic";
8080

8181
public static final JsonSerde<JsonObjectKey> jsonObjectKeySerde =
82-
new JsonSerde<>(JsonObjectKey.class).setUseTypeMapperForKey(true);
82+
new JsonSerde<>(JsonObjectKey.class).forKeys();
8383

8484
public static final JsonSerde<JsonObjectValue> jsonObjectValueSerde = new JsonSerde<>(JsonObjectValue.class);
8585

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.serializer;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
import org.springframework.kafka.support.converter.AbstractJavaTypeMapper;
24+
import org.springframework.kafka.test.utils.KafkaTestUtils;
25+
26+
/**
27+
* @author Gary Russell
28+
* @since 2.3
29+
*
30+
*/
31+
public class JsonSerdeTests {
32+
33+
@Test
34+
void noTypeInfo() {
35+
JsonSerde<String> serde = new JsonSerde<>(String.class)
36+
.forKeys()
37+
.noTypeInfo()
38+
.ignoreTypeHeaders()
39+
.dontRemoveTypeHeaders();
40+
assertThat(KafkaTestUtils.getPropertyValue(serde, "jsonSerializer.typeMapper.classIdFieldName"))
41+
.isEqualTo(AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME);
42+
assertThat(KafkaTestUtils.getPropertyValue(serde, "jsonDeserializer.typeMapper.classIdFieldName"))
43+
.isEqualTo(AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME);
44+
assertThat(KafkaTestUtils.getPropertyValue(serde, "jsonSerializer.addTypeInfo", Boolean.class)).isFalse();
45+
assertThat(KafkaTestUtils.getPropertyValue(serde, "jsonDeserializer.useTypeHeaders", Boolean.class)).isFalse();
46+
assertThat(KafkaTestUtils.getPropertyValue(serde, "jsonDeserializer.removeTypeHeaders", Boolean.class))
47+
.isFalse();
48+
serde.close();
49+
}
50+
51+
}

src/reference/asciidoc/kafka.adoc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2471,6 +2471,37 @@ In addition, you can configure the serializer and deserializer by using the foll
24712471
Starting with version 2.2, the type information headers (if added by the serializer) are removed by the deserializer.
24722472
You can revert to the previous behavior by setting the `removeTypeHeaders` property to `false`, either directly on the deserializer or with the configuration property described earlier.
24732473

2474+
When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.
2475+
2476+
The following example assumes you are using Spring Boot:
2477+
2478+
====
2479+
[source, java]
2480+
----
2481+
@Bean
2482+
public DefaultKafkaProducerFactory pf(KafkaProperties properties) {
2483+
Map<String, Object> props = properties.buildProducerProperties();
2484+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(props,
2485+
new JsonSerializer<>(MyKeyType.class)
2486+
.forKeys()
2487+
.noTypeInfo(),
2488+
new JsonSerializer<>(MyValueType.class)
2489+
.noTypeInfo());
2490+
}
2491+
2492+
@Bean
2493+
public DefaultKafkaConsumerFactory pf(KafkaProperties properties) {
2494+
Map<String, Object> props = properties.buildConsumerProperties();
2495+
DefaultKafkaConsumerFactory pf = new DefaultKafkaConsumerFactory(props,
2496+
new JsonDeserializer<>(MyKeyType.class)
2497+
.forKeys()
2498+
.ignoreTypeHeaders(),
2499+
new JsonSerializer<>(MyValueType.class)
2500+
.ignoreTypeHeaders());
2501+
}
2502+
----
2503+
====
2504+
24742505
[[serdes-mapping-types]]
24752506
===== Mapping Types
24762507

src/reference/asciidoc/streams.adoc

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
121121
----
122122
====
123123

124-
==== JSON Serialization and Deserialization
124+
[[serde]]
125+
==== Streams JSON Serialization and Deserialization
125126

126127
For serializing and deserializing data when reading or writing to topics or state stores in JSON format, Spring Kafka provides a `JsonSerde` implementation that uses JSON, delegating to the `JsonSerializer` and `JsonDeserializer` described in <<serdes>>.
127128
The `JsonSerde` implementation provides the same configuration options through its constructor (target type or `ObjectMapper`).
@@ -134,7 +135,19 @@ stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
134135
----
135136
====
136137

137-
IMPORTANT: Since Kafka Streams do not support headers, the `addTypeInfo` property on the `JsonSerializer` is ignored.
138+
When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.
139+
140+
====
141+
[source, java]
142+
----
143+
stream.through(new JsonSerde<>(MyKeyType.class)
144+
.forKeys()
145+
.noTypeInfo(),
146+
new JsonSerde<>(MyValueType.class)
147+
.noTypeInfo(),
148+
"myTypes");
149+
----
150+
====
138151

139152
==== Using `KafkaStreamsBrancher`
140153

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ A `ByteArrayJsonMessageConverter` has been provided as well as a new super class
7878
Also, a `StringOrBytesSerializer` is now available; it can serialize `byte[]`, `Bytes` and `String` values in `ProducerRecord` s.
7979
See <<messaging-message-conversion>> for more information.
8080

81+
The `JsonSerializer`, `JsonDeserializer` and `JsonSerde` now have fluent APIs to make programmatic configuration simpler.
82+
See the javadocs, <<serdes>>, and <<serde>> for more informaion.
83+
8184
==== ReplyingKafkaTemplate
8285

8386
When a reply times out, the future is completed exceptionally with a `KafkaReplyTimeoutException` instead of a `KafkaException`.

0 commit comments

Comments
 (0)