Skip to content

Commit b31aa65

Browse files
authored
MINOR: Fix MessageFormatters (#18266)
While looking at the message formatters in #18261, I have noticed at few incorrect test cases. * We should not log anything when the record type is unknown because the formatters have clear goals. * We should not parse the value when the key is null or when the key cannot be parsed. While it works in the tests, in practice, this is wrong because we cannot assume that type of the value if the type of the key is not defined. The key drives the type of the entire record. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 64279d2 commit b31aa65

File tree

7 files changed

+62
-156
lines changed

7 files changed

+62
-156
lines changed

tools/src/main/java/org/apache/kafka/tools/consumer/ApiMessageFormatter.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
public abstract class ApiMessageFormatter implements MessageFormatter {
3535

36+
private static final String TYPE = "type";
3637
private static final String VERSION = "version";
3738
private static final String DATA = "data";
3839
private static final String KEY = "key";
@@ -46,22 +47,22 @@ public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream o
4647
byte[] key = consumerRecord.key();
4748
if (Objects.nonNull(key)) {
4849
short keyVersion = ByteBuffer.wrap(key).getShort();
49-
JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key), keyVersion);
50+
JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key));
5051

5152
if (dataNode instanceof NullNode) {
5253
return;
5354
}
5455
json.putObject(KEY)
55-
.put(VERSION, keyVersion)
56+
.put(TYPE, keyVersion)
5657
.set(DATA, dataNode);
5758
} else {
58-
json.set(KEY, NullNode.getInstance());
59+
return;
5960
}
6061

6162
byte[] value = consumerRecord.value();
6263
if (Objects.nonNull(value)) {
6364
short valueVersion = ByteBuffer.wrap(value).getShort();
64-
JsonNode dataNode = readToValueJson(ByteBuffer.wrap(value), valueVersion);
65+
JsonNode dataNode = readToValueJson(ByteBuffer.wrap(value));
6566

6667
json.putObject(VALUE)
6768
.put(VERSION, valueVersion)
@@ -77,6 +78,6 @@ public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream o
7778
}
7879
}
7980

80-
protected abstract JsonNode readToKeyJson(ByteBuffer byteBuffer, short version);
81-
protected abstract JsonNode readToValueJson(ByteBuffer byteBuffer, short version);
81+
protected abstract JsonNode readToKeyJson(ByteBuffer byteBuffer);
82+
protected abstract JsonNode readToValueJson(ByteBuffer byteBuffer);
8283
}

tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java

Lines changed: 9 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,66 +16,34 @@
1616
*/
1717
package org.apache.kafka.tools.consumer;
1818

19-
import org.apache.kafka.common.protocol.ApiMessage;
2019
import org.apache.kafka.common.protocol.ByteBufferAccessor;
2120
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
2221
import org.apache.kafka.coordinator.group.generated.GroupMetadataKeyJsonConverter;
2322
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
2423
import org.apache.kafka.coordinator.group.generated.GroupMetadataValueJsonConverter;
25-
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
2624

2725
import com.fasterxml.jackson.databind.JsonNode;
2826
import com.fasterxml.jackson.databind.node.NullNode;
2927
import com.fasterxml.jackson.databind.node.TextNode;
3028

3129
import java.nio.ByteBuffer;
32-
import java.util.Optional;
3330

3431
public class GroupMetadataMessageFormatter extends ApiMessageFormatter {
35-
3632
@Override
37-
protected JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) {
38-
return readToGroupMetadataKey(byteBuffer)
39-
.map(logKey -> transferKeyMessageToJsonNode(logKey, version))
40-
.orElseGet(() -> new TextNode(UNKNOWN));
41-
}
42-
43-
@Override
44-
protected JsonNode readToValueJson(ByteBuffer byteBuffer, short version) {
45-
return readToGroupMetadataValue(byteBuffer)
46-
.map(logValue -> GroupMetadataValueJsonConverter.write(logValue, version))
47-
.orElseGet(() -> new TextNode(UNKNOWN));
48-
}
49-
50-
private Optional<ApiMessage> readToGroupMetadataKey(ByteBuffer byteBuffer) {
33+
protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
5134
short version = byteBuffer.getShort();
52-
if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION
53-
&& version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) {
54-
return Optional.of(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), version));
55-
} else if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) {
56-
return Optional.of(new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), version));
57-
} else {
58-
return Optional.empty();
35+
if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) {
36+
return GroupMetadataKeyJsonConverter.write(new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), version), version);
5937
}
38+
return NullNode.getInstance();
6039
}
6140

62-
private JsonNode transferKeyMessageToJsonNode(ApiMessage message, short version) {
63-
if (message instanceof OffsetCommitKey) {
64-
return NullNode.getInstance();
65-
} else if (message instanceof GroupMetadataKey) {
66-
return GroupMetadataKeyJsonConverter.write((GroupMetadataKey) message, version);
67-
} else {
68-
return new TextNode(UNKNOWN);
69-
}
70-
}
71-
72-
private Optional<GroupMetadataValue> readToGroupMetadataValue(ByteBuffer byteBuffer) {
41+
@Override
42+
protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
7343
short version = byteBuffer.getShort();
74-
if (version >= GroupMetadataValue.LOWEST_SUPPORTED_VERSION
75-
&& version <= GroupMetadataValue.HIGHEST_SUPPORTED_VERSION) {
76-
return Optional.of(new GroupMetadataValue(new ByteBufferAccessor(byteBuffer), version));
77-
} else {
78-
return Optional.empty();
44+
if (version >= GroupMetadataValue.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataValue.HIGHEST_SUPPORTED_VERSION) {
45+
return GroupMetadataValueJsonConverter.write(new GroupMetadataValue(new ByteBufferAccessor(byteBuffer), version), version);
7946
}
47+
return new TextNode(UNKNOWN);
8048
}
8149
}

tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java

Lines changed: 9 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
*/
1717
package org.apache.kafka.tools.consumer;
1818

19-
import org.apache.kafka.common.protocol.ApiMessage;
2019
import org.apache.kafka.common.protocol.ByteBufferAccessor;
21-
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
2220
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
2321
import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter;
2422
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
@@ -29,56 +27,26 @@
2927
import com.fasterxml.jackson.databind.node.TextNode;
3028

3129
import java.nio.ByteBuffer;
32-
import java.util.Optional;
3330

3431
/**
3532
* Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
3633
*/
3734
public class OffsetsMessageFormatter extends ApiMessageFormatter {
38-
3935
@Override
40-
protected JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) {
41-
return readToGroupMetadataKey(byteBuffer)
42-
.map(logKey -> transferKeyMessageToJsonNode(logKey, version))
43-
.orElseGet(() -> new TextNode(UNKNOWN));
44-
}
45-
46-
@Override
47-
protected JsonNode readToValueJson(ByteBuffer byteBuffer, short version) {
48-
return readToOffsetMessageValue(byteBuffer)
49-
.map(logValue -> OffsetCommitValueJsonConverter.write(logValue, version))
50-
.orElseGet(() -> new TextNode(UNKNOWN));
51-
}
52-
53-
private Optional<ApiMessage> readToGroupMetadataKey(ByteBuffer byteBuffer) {
36+
protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
5437
short version = byteBuffer.getShort();
55-
if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION
56-
&& version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) {
57-
return Optional.of(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), version));
58-
} else if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) {
59-
return Optional.of(new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), version));
60-
} else {
61-
return Optional.empty();
38+
if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION && version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) {
39+
return OffsetCommitKeyJsonConverter.write(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), version), version);
6240
}
41+
return NullNode.getInstance();
6342
}
6443

65-
private JsonNode transferKeyMessageToJsonNode(ApiMessage logKey, short keyVersion) {
66-
if (logKey instanceof OffsetCommitKey) {
67-
return OffsetCommitKeyJsonConverter.write((OffsetCommitKey) logKey, keyVersion);
68-
} else if (logKey instanceof GroupMetadataKey) {
69-
return NullNode.getInstance();
70-
} else {
71-
return new TextNode(UNKNOWN);
72-
}
73-
}
74-
75-
private Optional<OffsetCommitValue> readToOffsetMessageValue(ByteBuffer byteBuffer) {
44+
@Override
45+
protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
7646
short version = byteBuffer.getShort();
77-
if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION
78-
&& version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) {
79-
return Optional.of(new OffsetCommitValue(new ByteBufferAccessor(byteBuffer), version));
80-
} else {
81-
return Optional.empty();
47+
if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION && version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) {
48+
return OffsetCommitValueJsonConverter.write(new OffsetCommitValue(new ByteBufferAccessor(byteBuffer), version), version);
8249
}
50+
return new TextNode(UNKNOWN);
8351
}
8452
}

tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,44 +23,27 @@
2323
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
2424

2525
import com.fasterxml.jackson.databind.JsonNode;
26+
import com.fasterxml.jackson.databind.node.NullNode;
2627
import com.fasterxml.jackson.databind.node.TextNode;
2728

2829
import java.nio.ByteBuffer;
29-
import java.util.Optional;
3030

3131
public class TransactionLogMessageFormatter extends ApiMessageFormatter {
32-
33-
@Override
34-
protected JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) {
35-
return readToTransactionLogKey(byteBuffer)
36-
.map(logKey -> TransactionLogKeyJsonConverter.write(logKey, version))
37-
.orElseGet(() -> new TextNode(UNKNOWN));
38-
}
39-
4032
@Override
41-
protected JsonNode readToValueJson(ByteBuffer byteBuffer, short version) {
42-
return readToTransactionLogValue(byteBuffer)
43-
.map(logValue -> TransactionLogValueJsonConverter.write(logValue, version))
44-
.orElseGet(() -> new TextNode(UNKNOWN));
45-
}
46-
47-
private Optional<TransactionLogKey> readToTransactionLogKey(ByteBuffer byteBuffer) {
33+
protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
4834
short version = byteBuffer.getShort();
49-
if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION
50-
&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
51-
return Optional.of(new TransactionLogKey(new ByteBufferAccessor(byteBuffer), version));
52-
} else {
53-
return Optional.empty();
35+
if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION && version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
36+
return TransactionLogKeyJsonConverter.write(new TransactionLogKey(new ByteBufferAccessor(byteBuffer), version), version);
5437
}
38+
return NullNode.getInstance();
5539
}
5640

57-
private Optional<TransactionLogValue> readToTransactionLogValue(ByteBuffer byteBuffer) {
41+
@Override
42+
protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
5843
short version = byteBuffer.getShort();
59-
if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION
60-
&& version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
61-
return Optional.of(new TransactionLogValue(new ByteBufferAccessor(byteBuffer), version));
62-
} else {
63-
return Optional.empty();
44+
if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
45+
return TransactionLogValueJsonConverter.write(new TransactionLogValue(new ByteBufferAccessor(byteBuffer), version), version);
6446
}
47+
return new TextNode(UNKNOWN);
6548
}
6649
}

tools/src/test/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatterTest.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,12 @@ private static Stream<Arguments> parameters() {
7676
Arguments.of(
7777
MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_KEY).array(),
7878
MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_VALUE).array(),
79-
"{\"key\":{\"version\":10,\"data\":\"unknown\"},\"value\":{\"version\":10,\"data\":\"unknown\"}}"
79+
""
8080
),
8181
Arguments.of(
8282
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
8383
MessageUtil.toVersionPrefixedByteBuffer((short) 0, GROUP_METADATA_VALUE).array(),
84-
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":0," +
84+
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":0," +
8585
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
8686
"\"leader\":\"leader\",\"members\":[{\"memberId\":\"member-1\",\"clientId\":\"client-1\"," +
8787
"\"clientHost\":\"host-1\",\"sessionTimeout\":1500,\"subscription\":\"AAE=\"," +
@@ -90,7 +90,7 @@ private static Stream<Arguments> parameters() {
9090
Arguments.of(
9191
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
9292
MessageUtil.toVersionPrefixedByteBuffer((short) 1, GROUP_METADATA_VALUE).array(),
93-
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":1," +
93+
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":1," +
9494
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
9595
"\"leader\":\"leader\",\"members\":[{\"memberId\":\"member-1\",\"clientId\":\"client-1\"," +
9696
"\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000,\"sessionTimeout\":1500," +
@@ -99,7 +99,7 @@ private static Stream<Arguments> parameters() {
9999
Arguments.of(
100100
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
101101
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_VALUE).array(),
102-
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":2," +
102+
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":2," +
103103
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
104104
"\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," +
105105
"\"clientId\":\"client-1\",\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000," +
@@ -108,7 +108,7 @@ private static Stream<Arguments> parameters() {
108108
Arguments.of(
109109
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
110110
MessageUtil.toVersionPrefixedByteBuffer((short) 3, GROUP_METADATA_VALUE).array(),
111-
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":3," +
111+
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":3," +
112112
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
113113
"\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," +
114114
"\"groupInstanceId\":\"group-instance-1\",\"clientId\":\"client-1\",\"clientHost\":\"host-1\"," +
@@ -117,7 +117,7 @@ private static Stream<Arguments> parameters() {
117117
Arguments.of(
118118
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
119119
MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(),
120-
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":4," +
120+
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":{\"version\":4," +
121121
"\"data\":{\"protocolType\":\"consumer\",\"generation\":1,\"protocol\":\"range\"," +
122122
"\"leader\":\"leader\",\"currentStateTimestamp\":1234,\"members\":[{\"memberId\":\"member-1\"," +
123123
"\"groupInstanceId\":\"group-instance-1\",\"clientId\":\"client-1\",\"clientHost\":\"host-1\"," +
@@ -126,16 +126,12 @@ private static Stream<Arguments> parameters() {
126126
Arguments.of(
127127
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
128128
null,
129-
"{\"key\":{\"version\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}"),
129+
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}"),
130130
Arguments.of(
131131
null,
132132
MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(),
133-
"{\"key\":null,\"value\":{\"version\":4,\"data\":{\"protocolType\":\"consumer\",\"generation\":1," +
134-
"\"protocol\":\"range\",\"leader\":\"leader\",\"currentStateTimestamp\":1234," +
135-
"\"members\":[{\"memberId\":\"member-1\",\"groupInstanceId\":\"group-instance-1\"," +
136-
"\"clientId\":\"client-1\",\"clientHost\":\"host-1\",\"rebalanceTimeout\":1000," +
137-
"\"sessionTimeout\":1500,\"subscription\":\"AAE=\",\"assignment\":\"AQI=\"}]}}}"),
138-
Arguments.of(null, null, "{\"key\":null,\"value\":null}"),
133+
""),
134+
Arguments.of(null, null, ""),
139135
Arguments.of(
140136
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
141137
MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(),

0 commit comments

Comments
 (0)