Skip to content

fix(kafka): Add support for confluent message indices. #1902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"timestamp": 1545084650988,
"timestampType": "CREATE_TIME",
"key": "NDI=",
"value": "COoHEgpTbWFydHBob25lGVK4HoXrv4JA",
"value": "AAjpBxIGTGFwdG9wGVK4HoXrP49A",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
Expand All @@ -39,7 +39,7 @@
"timestamp": 1545084650989,
"timestampType": "CREATE_TIME",
"key": null,
"value": "COsHEgpIZWFkcGhvbmVzGUjhehSuv2JA",
"value": "AgEACOkHEgZMYXB0b3AZUrgehes/j0A=",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
Expand Down
9 changes: 8 additions & 1 deletion examples/powertools-examples-kafka/tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ The tool will output base64-encoded values for Avro products that can be used in
mvn exec:java -Dexec.mainClass="org.demo.kafka.tools.GenerateProtobufSamples"
```

The tool will output base64-encoded values for Protobuf products that can be used in `../events/kafka-protobuf-event.json`.
The tool will output base64-encoded values for Protobuf products that can be used in `../events/kafka-protobuf-event.json`. This generator creates samples with and without Confluent message-indexes to test different serialization scenarios.

## Output

Expand All @@ -55,6 +55,13 @@ Each generator produces:
2. An integer key (42) and one entry with a nullish key to test for edge-cases
3. A complete sample event structure that can be used directly for testing

The Protobuf generators additionally create samples with different Confluent message-index formats:
- Standard protobuf (no message indexes)
- Simple message index (single 0 byte)
- Complex message index (length-prefixed array)

For more information about Confluent Schema Registry serialization formats and wire format specifications, see the [Confluent documentation](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format).

## Example

After generating the samples, you can copy the output into the respective event files:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,62 +14,68 @@
* Utility class to generate base64-encoded Avro serialized products
* for use in test events.
*/
public class GenerateAvroSamples {
public final class GenerateAvroSamples {

private GenerateAvroSamples() {
// Utility class
}

public static void main(String[] args) throws IOException {
// Create three different products
AvroProduct product1 = new AvroProduct(1001, "Laptop", 999.99);
AvroProduct product2 = new AvroProduct(1002, "Smartphone", 599.99);
AvroProduct product3 = new AvroProduct(1003, "Headphones", 149.99);

// Serialize and encode each product
String encodedProduct1 = serializeAndEncode(product1);
String encodedProduct2 = serializeAndEncode(product2);
String encodedProduct3 = serializeAndEncode(product3);

// Serialize and encode an integer key
String encodedKey = serializeAndEncodeInteger(42);

// Print the results
System.out.println("Base64 encoded Avro products for use in kafka-avro-event.json:");
System.out.println("\nProduct 1 (with key):");
System.out.println("key: \"" + encodedKey + "\",");
System.out.println("value: \"" + encodedProduct1 + "\",");

System.out.println("\nProduct 2 (with key):");
System.out.println("key: \"" + encodedKey + "\",");
System.out.println("value: \"" + encodedProduct2 + "\",");

System.out.println("\nProduct 3 (without key):");
System.out.println("key: null,");
System.out.println("value: \"" + encodedProduct3 + "\",");

// Print a sample event structure
System.out.println("\nSample event structure:");
printSampleEvent(encodedKey, encodedProduct1, encodedProduct2, encodedProduct3);
}

private static String serializeAndEncode(AvroProduct product) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
DatumWriter<AvroProduct> writer = new SpecificDatumWriter<>(AvroProduct.class);

writer.write(product, encoder);
encoder.flush();

return Base64.getEncoder().encodeToString(baos.toByteArray());
}

private static String serializeAndEncodeInteger(Integer value) throws IOException {
// For simple types like integers, we'll just convert to string and encode
return Base64.getEncoder().encodeToString(value.toString().getBytes());
}

private static void printSampleEvent(String key, String product1, String product2, String product3) {
System.out.println("{\n" +
" \"eventSource\": \"aws:kafka\",\n" +
" \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n" +
" \"bootstrapServers\": \"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092\",\n" +
" \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n"
+
" \"bootstrapServers\": \"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092\",\n"
+
" \"records\": {\n" +
" \"mytopic-0\": [\n" +
" {\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
* Utility class to generate base64-encoded JSON serialized products
* for use in test events.
*/
public class GenerateJsonSamples {
public final class GenerateJsonSamples {

private GenerateJsonSamples() {
// Utility class
}

public static void main(String[] args) throws IOException {
// Create three different products
Expand Down
Original file line number Diff line number Diff line change
@@ -1,73 +1,110 @@
package org.demo.kafka.tools;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Base64;

import org.demo.kafka.protobuf.ProtobufProduct;

import com.google.protobuf.CodedOutputStream;

/**
* Utility class to generate base64-encoded Protobuf serialized products
* for use in test events.
*/
public class GenerateProtobufSamples {
public final class GenerateProtobufSamples {

private GenerateProtobufSamples() {
// Utility class
}

public static void main(String[] args) throws IOException {
// Create three different products
ProtobufProduct product1 = ProtobufProduct.newBuilder()
// Create a single product that will be used for all three scenarios
ProtobufProduct product = ProtobufProduct.newBuilder()
.setId(1001)
.setName("Laptop")
.setPrice(999.99)
.build();

ProtobufProduct product2 = ProtobufProduct.newBuilder()
.setId(1002)
.setName("Smartphone")
.setPrice(599.99)
.build();

ProtobufProduct product3 = ProtobufProduct.newBuilder()
.setId(1003)
.setName("Headphones")
.setPrice(149.99)
.build();

// Serialize and encode each product
String encodedProduct1 = serializeAndEncode(product1);
String encodedProduct2 = serializeAndEncode(product2);
String encodedProduct3 = serializeAndEncode(product3);
// Create three different serializations of the same product
String standardProduct = serializeAndEncode(product);
String productWithSimpleIndex = serializeWithSimpleMessageIndex(product);
String productWithComplexIndex = serializeWithComplexMessageIndex(product);

// Serialize and encode an integer key
// Serialize and encode an integer key (same for all records)
String encodedKey = serializeAndEncodeInteger(42);

// Print the results
System.out.println("Base64 encoded Protobuf products for use in kafka-protobuf-event.json:");
System.out.println("\nProduct 1 (with key):");
System.out.println("key: \"" + encodedKey + "\",");
System.out.println("value: \"" + encodedProduct1 + "\",");

System.out.println("\nProduct 2 (with key):");
System.out.println("key: \"" + encodedKey + "\",");
System.out.println("value: \"" + encodedProduct2 + "\",");

System.out.println("\nProduct 3 (without key):");
System.out.println("key: null,");
System.out.println("value: \"" + encodedProduct3 + "\",");

// Print a sample event structure
System.out.println("\nSample event structure:");
printSampleEvent(encodedKey, encodedProduct1, encodedProduct2, encodedProduct3);
System.out.println("Base64 encoded Protobuf products with different message index scenarios:");
System.out.println("\n1. Standard Protobuf (no message index):");
System.out.println("value: \"" + standardProduct + "\"");

System.out.println("\n2. Simple Message Index (single 0):");
System.out.println("value: \"" + productWithSimpleIndex + "\"");

System.out.println("\n3. Complex Message Index (array [1,0]):");
System.out.println("value: \"" + productWithComplexIndex + "\"");

// Print the merged event structure
System.out.println("\n" + "=".repeat(80));
System.out.println("MERGED EVENT WITH ALL THREE SCENARIOS");
System.out.println("=".repeat(80));
printSampleEvent(encodedKey, standardProduct, productWithSimpleIndex, productWithComplexIndex);
}

private static String serializeAndEncode(ProtobufProduct product) {
return Base64.getEncoder().encodeToString(product.toByteArray());
}

/**
* Serializes a protobuf product with a simple Confluent message index (single 0).
* Format: [0][protobuf_data]
*
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
*/
private static String serializeWithSimpleMessageIndex(ProtobufProduct product) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);

// Write simple message index (single 0)
codedOutput.writeUInt32NoTag(0);

// Write the protobuf data
product.writeTo(codedOutput);

codedOutput.flush();
return Base64.getEncoder().encodeToString(baos.toByteArray());
}

/**
* Serializes a protobuf product with a complex Confluent message index (array [1,0]).
* Format: [2][1][0][protobuf_data] where 2 is the array length
*
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
*/
private static String serializeWithComplexMessageIndex(ProtobufProduct product) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);

// Write complex message index array [1,0]
codedOutput.writeUInt32NoTag(2); // Array length
codedOutput.writeUInt32NoTag(1); // First index value
codedOutput.writeUInt32NoTag(0); // Second index value

// Write the protobuf data
product.writeTo(codedOutput);

codedOutput.flush();
return Base64.getEncoder().encodeToString(baos.toByteArray());
}

private static String serializeAndEncodeInteger(Integer value) {
// For simple types like integers, we'll just convert to string and encode
return Base64.getEncoder().encodeToString(value.toString().getBytes());
}

private static void printSampleEvent(String key, String product1, String product2, String product3) {
private static void printSampleEvent(String key, String standardProduct, String simpleIndexProduct,
String complexIndexProduct) {
System.out.println("{\n" +
" \"eventSource\": \"aws:kafka\",\n" +
" \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n"
Expand All @@ -83,7 +120,7 @@ private static void printSampleEvent(String key, String product1, String product
" \"timestamp\": 1545084650987,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": \"" + key + "\",\n" +
" \"value\": \"" + product1 + "\",\n" +
" \"value\": \"" + standardProduct + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
Expand All @@ -97,7 +134,7 @@ private static void printSampleEvent(String key, String product1, String product
" \"timestamp\": 1545084650988,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": \"" + key + "\",\n" +
" \"value\": \"" + product2 + "\",\n" +
" \"value\": \"" + simpleIndexProduct + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
Expand All @@ -111,7 +148,7 @@ private static void printSampleEvent(String key, String product1, String product
" \"timestamp\": 1545084650989,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": null,\n" +
" \"value\": \"" + product3 + "\",\n" +
" \"value\": \"" + complexIndexProduct + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,27 @@
package software.amazon.lambda.powertools.kafka.serializers;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.protobuf.CodedInputStream;
import com.google.protobuf.Message;
import com.google.protobuf.Parser;

/**
* Deserializer for Kafka records using Protocol Buffers format.
* Supports both standard protobuf serialization and Confluent Schema Registry serialization using messages indices.
*
* For Confluent-serialized data, assumes the magic byte and schema ID have already been stripped
* by the Kafka ESM, leaving only the message index (if present) and protobuf data.
*
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
*/
public class KafkaProtobufDeserializer extends AbstractKafkaDeserializer {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProtobufDeserializer.class);

@Override
@SuppressWarnings("unchecked")
protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException {
Expand All @@ -29,7 +42,9 @@ protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException
try {
// Get the parser from the generated Protobuf class
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);
Message message = parser.parseFrom(data);

// Try to deserialize the data, handling potential Confluent message indices
Message message = deserializeWithMessageIndexHandling(data, parser);
return type.cast(message);
} catch (Exception e) {
throw new IOException("Failed to deserialize Protobuf data.", e);
Expand All @@ -40,4 +55,45 @@ protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException
+ "Consider using an alternative Deserializer.");
}
}

private Message deserializeWithMessageIndexHandling(byte[] data, Parser<Message> parser) throws IOException {
try {
LOGGER.debug("Attempting to deserialize as standard protobuf data");
return parser.parseFrom(data);
} catch (Exception e) {
LOGGER.debug("Standard protobuf parsing failed, attempting Confluent message-index handling");
return deserializeWithMessageIndex(data, parser);
}
}

private Message deserializeWithMessageIndex(byte[] data, Parser<Message> parser) throws IOException {
CodedInputStream codedInputStream = CodedInputStream.newInstance(data);

try {
// https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
// Read the first varint - this could be:
// 1. A single 0 (simple case - first message type)
// 2. The length of the message index array (complex case)
int firstValue = codedInputStream.readUInt32();

if (firstValue == 0) {
// Simple case: Single 0 byte means first message type
LOGGER.debug("Found simple message-index case (single 0), parsing remaining data as protobuf");
return parser.parseFrom(codedInputStream);
} else {
// Complex case: firstValue is the length of the message index array
LOGGER.debug("Found complex message-index case with array length: {}, skipping {} message index values",
firstValue, firstValue);
for (int i = 0; i < firstValue; i++) {
codedInputStream.readUInt32(); // Skip each message index value
}
Comment on lines +87 to +89

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will clip multiple bytes which might be part of the actual data as well. We have seen cases for confluent serialized proto messages where first byte is not the length but index of the message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I will update to reading varint and also handle Glue bytes as an additional edge-case.

// Now the remaining data should be the actual protobuf message
LOGGER.debug("Finished skipping message indexes, parsing remaining data as protobuf");
return parser.parseFrom(codedInputStream);
}

} catch (Exception e) {
throw new IOException("Failed to parse protobuf data with or without message index", e);
}
}
}
Loading
Loading