diff --git a/examples/powertools-examples-kafka/events/kafka-protobuf-event.json b/examples/powertools-examples-kafka/events/kafka-protobuf-event.json index b3e0139e3..e0547ad88 100644 --- a/examples/powertools-examples-kafka/events/kafka-protobuf-event.json +++ b/examples/powertools-examples-kafka/events/kafka-protobuf-event.json @@ -25,7 +25,7 @@ "timestamp": 1545084650988, "timestampType": "CREATE_TIME", "key": "NDI=", - "value": "COoHEgpTbWFydHBob25lGVK4HoXrv4JA", + "value": "AAjpBxIGTGFwdG9wGVK4HoXrP49A", "headers": [ { "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] @@ -39,7 +39,7 @@ "timestamp": 1545084650989, "timestampType": "CREATE_TIME", "key": null, - "value": "COsHEgpIZWFkcGhvbmVzGUjhehSuv2JA", + "value": "AgEACOkHEgZMYXB0b3AZUrgehes/j0A=", "headers": [ { "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] diff --git a/examples/powertools-examples-kafka/tools/README.md b/examples/powertools-examples-kafka/tools/README.md index 53d07b0c4..02e8dde9b 100644 --- a/examples/powertools-examples-kafka/tools/README.md +++ b/examples/powertools-examples-kafka/tools/README.md @@ -45,7 +45,7 @@ The tool will output base64-encoded values for Avro products that can be used in mvn exec:java -Dexec.mainClass="org.demo.kafka.tools.GenerateProtobufSamples" ``` -The tool will output base64-encoded values for Protobuf products that can be used in `../events/kafka-protobuf-event.json`. +The tool will output base64-encoded values for Protobuf products that can be used in `../events/kafka-protobuf-event.json`. This generator creates samples with and without Confluent message-indexes to test different serialization scenarios. ## Output @@ -55,6 +55,13 @@ Each generator produces: 2. An integer key (42) and one entry with a nullish key to test for edge-cases 3. A complete sample event structure that can be used directly for testing +The Protobuf generators additionally create samples with different Confluent message-index formats: +- Standard protobuf (no message indexes) +- Simple message index (single 0 byte) +- Complex message index (length-prefixed array) + +For more information about Confluent Schema Registry serialization formats and wire format specifications, see the [Confluent documentation](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format). + ## Example After generating the samples, you can copy the output into the respective event files: diff --git a/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateAvroSamples.java b/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateAvroSamples.java index 4bd6ebd13..e6f4d38fd 100644 --- a/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateAvroSamples.java +++ b/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateAvroSamples.java @@ -14,62 +14,68 @@ * Utility class to generate base64-encoded Avro serialized products * for use in test events. */ -public class GenerateAvroSamples { +public final class GenerateAvroSamples { + + private GenerateAvroSamples() { + // Utility class + } public static void main(String[] args) throws IOException { // Create three different products AvroProduct product1 = new AvroProduct(1001, "Laptop", 999.99); AvroProduct product2 = new AvroProduct(1002, "Smartphone", 599.99); AvroProduct product3 = new AvroProduct(1003, "Headphones", 149.99); - + // Serialize and encode each product String encodedProduct1 = serializeAndEncode(product1); String encodedProduct2 = serializeAndEncode(product2); String encodedProduct3 = serializeAndEncode(product3); - + // Serialize and encode an integer key String encodedKey = serializeAndEncodeInteger(42); - + // Print the results System.out.println("Base64 encoded Avro products for use in kafka-avro-event.json:"); System.out.println("\nProduct 1 (with key):"); System.out.println("key: \"" + encodedKey + "\","); System.out.println("value: \"" + encodedProduct1 + "\","); - + System.out.println("\nProduct 2 (with key):"); System.out.println("key: \"" + encodedKey + "\","); System.out.println("value: \"" + encodedProduct2 + "\","); - + System.out.println("\nProduct 3 (without key):"); System.out.println("key: null,"); System.out.println("value: \"" + encodedProduct3 + "\","); - + // Print a sample event structure System.out.println("\nSample event structure:"); printSampleEvent(encodedKey, encodedProduct1, encodedProduct2, encodedProduct3); } - + private static String serializeAndEncode(AvroProduct product) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null); DatumWriter writer = new SpecificDatumWriter<>(AvroProduct.class); - + writer.write(product, encoder); encoder.flush(); - + return Base64.getEncoder().encodeToString(baos.toByteArray()); } - + private static String serializeAndEncodeInteger(Integer value) throws IOException { // For simple types like integers, we'll just convert to string and encode return Base64.getEncoder().encodeToString(value.toString().getBytes()); } - + private static void printSampleEvent(String key, String product1, String product2, String product3) { System.out.println("{\n" + " \"eventSource\": \"aws:kafka\",\n" + - " \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n" + - " \"bootstrapServers\": \"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092\",\n" + + " \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n" + + + " \"bootstrapServers\": \"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092\",\n" + + " \"records\": {\n" + " \"mytopic-0\": [\n" + " {\n" + diff --git a/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateJsonSamples.java b/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateJsonSamples.java index a4fd6565a..d0ef7cb55 100644 --- a/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateJsonSamples.java +++ b/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateJsonSamples.java @@ -11,7 +11,11 @@ * Utility class to generate base64-encoded JSON serialized products * for use in test events. */ -public class GenerateJsonSamples { +public final class GenerateJsonSamples { + + private GenerateJsonSamples() { + // Utility class + } public static void main(String[] args) throws IOException { // Create three different products diff --git a/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java b/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java index ae078a28a..eecd3e1cc 100644 --- a/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java +++ b/examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java @@ -1,73 +1,110 @@ package org.demo.kafka.tools; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Base64; import org.demo.kafka.protobuf.ProtobufProduct; +import com.google.protobuf.CodedOutputStream; + /** * Utility class to generate base64-encoded Protobuf serialized products * for use in test events. */ -public class GenerateProtobufSamples { +public final class GenerateProtobufSamples { + + private GenerateProtobufSamples() { + // Utility class + } public static void main(String[] args) throws IOException { - // Create three different products - ProtobufProduct product1 = ProtobufProduct.newBuilder() + // Create a single product that will be used for all three scenarios + ProtobufProduct product = ProtobufProduct.newBuilder() .setId(1001) .setName("Laptop") .setPrice(999.99) .build(); - ProtobufProduct product2 = ProtobufProduct.newBuilder() - .setId(1002) - .setName("Smartphone") - .setPrice(599.99) - .build(); - - ProtobufProduct product3 = ProtobufProduct.newBuilder() - .setId(1003) - .setName("Headphones") - .setPrice(149.99) - .build(); - - // Serialize and encode each product - String encodedProduct1 = serializeAndEncode(product1); - String encodedProduct2 = serializeAndEncode(product2); - String encodedProduct3 = serializeAndEncode(product3); + // Create three different serializations of the same product + String standardProduct = serializeAndEncode(product); + String productWithSimpleIndex = serializeWithSimpleMessageIndex(product); + String productWithComplexIndex = serializeWithComplexMessageIndex(product); - // Serialize and encode an integer key + // Serialize and encode an integer key (same for all records) String encodedKey = serializeAndEncodeInteger(42); // Print the results - System.out.println("Base64 encoded Protobuf products for use in kafka-protobuf-event.json:"); - System.out.println("\nProduct 1 (with key):"); - System.out.println("key: \"" + encodedKey + "\","); - System.out.println("value: \"" + encodedProduct1 + "\","); - - System.out.println("\nProduct 2 (with key):"); - System.out.println("key: \"" + encodedKey + "\","); - System.out.println("value: \"" + encodedProduct2 + "\","); - - System.out.println("\nProduct 3 (without key):"); - System.out.println("key: null,"); - System.out.println("value: \"" + encodedProduct3 + "\","); - - // Print a sample event structure - System.out.println("\nSample event structure:"); - printSampleEvent(encodedKey, encodedProduct1, encodedProduct2, encodedProduct3); + System.out.println("Base64 encoded Protobuf products with different message index scenarios:"); + System.out.println("\n1. Standard Protobuf (no message index):"); + System.out.println("value: \"" + standardProduct + "\""); + + System.out.println("\n2. Simple Message Index (single 0):"); + System.out.println("value: \"" + productWithSimpleIndex + "\""); + + System.out.println("\n3. Complex Message Index (array [1,0]):"); + System.out.println("value: \"" + productWithComplexIndex + "\""); + + // Print the merged event structure + System.out.println("\n" + "=".repeat(80)); + System.out.println("MERGED EVENT WITH ALL THREE SCENARIOS"); + System.out.println("=".repeat(80)); + printSampleEvent(encodedKey, standardProduct, productWithSimpleIndex, productWithComplexIndex); } private static String serializeAndEncode(ProtobufProduct product) { return Base64.getEncoder().encodeToString(product.toByteArray()); } + /** + * Serializes a protobuf product with a simple Confluent message index (single 0). + * Format: [0][protobuf_data] + * + * @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format} + */ + private static String serializeWithSimpleMessageIndex(ProtobufProduct product) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos); + + // Write simple message index (single 0) + codedOutput.writeUInt32NoTag(0); + + // Write the protobuf data + product.writeTo(codedOutput); + + codedOutput.flush(); + return Base64.getEncoder().encodeToString(baos.toByteArray()); + } + + /** + * Serializes a protobuf product with a complex Confluent message index (array [1,0]). + * Format: [2][1][0][protobuf_data] where 2 is the array length + * + * @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format} + */ + private static String serializeWithComplexMessageIndex(ProtobufProduct product) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos); + + // Write complex message index array [1,0] + codedOutput.writeUInt32NoTag(2); // Array length + codedOutput.writeUInt32NoTag(1); // First index value + codedOutput.writeUInt32NoTag(0); // Second index value + + // Write the protobuf data + product.writeTo(codedOutput); + + codedOutput.flush(); + return Base64.getEncoder().encodeToString(baos.toByteArray()); + } + private static String serializeAndEncodeInteger(Integer value) { // For simple types like integers, we'll just convert to string and encode return Base64.getEncoder().encodeToString(value.toString().getBytes()); } - private static void printSampleEvent(String key, String product1, String product2, String product3) { + private static void printSampleEvent(String key, String standardProduct, String simpleIndexProduct, + String complexIndexProduct) { System.out.println("{\n" + " \"eventSource\": \"aws:kafka\",\n" + " \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n" @@ -83,7 +120,7 @@ private static void printSampleEvent(String key, String product1, String product " \"timestamp\": 1545084650987,\n" + " \"timestampType\": \"CREATE_TIME\",\n" + " \"key\": \"" + key + "\",\n" + - " \"value\": \"" + product1 + "\",\n" + + " \"value\": \"" + standardProduct + "\",\n" + " \"headers\": [\n" + " {\n" + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + @@ -97,7 +134,7 @@ private static void printSampleEvent(String key, String product1, String product " \"timestamp\": 1545084650988,\n" + " \"timestampType\": \"CREATE_TIME\",\n" + " \"key\": \"" + key + "\",\n" + - " \"value\": \"" + product2 + "\",\n" + + " \"value\": \"" + simpleIndexProduct + "\",\n" + " \"headers\": [\n" + " {\n" + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + @@ -111,7 +148,7 @@ private static void printSampleEvent(String key, String product1, String product " \"timestamp\": 1545084650989,\n" + " \"timestampType\": \"CREATE_TIME\",\n" + " \"key\": null,\n" + - " \"value\": \"" + product3 + "\",\n" + + " \"value\": \"" + complexIndexProduct + "\",\n" + " \"headers\": [\n" + " {\n" + " \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" + diff --git a/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializer.java b/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializer.java index 025f203c4..c15be552f 100644 --- a/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializer.java +++ b/powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializer.java @@ -13,14 +13,27 @@ package software.amazon.lambda.powertools.kafka.serializers; import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.CodedInputStream; import com.google.protobuf.Message; import com.google.protobuf.Parser; /** * Deserializer for Kafka records using Protocol Buffers format. + * Supports both standard protobuf serialization and Confluent Schema Registry serialization using messages indices. + * + * For Confluent-serialized data, assumes the magic byte and schema ID have already been stripped + * by the Kafka ESM, leaving only the message index (if present) and protobuf data. + * + * @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format} */ public class KafkaProtobufDeserializer extends AbstractKafkaDeserializer { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProtobufDeserializer.class); + @Override @SuppressWarnings("unchecked") protected T deserializeObject(byte[] data, Class type) throws IOException { @@ -29,7 +42,9 @@ protected T deserializeObject(byte[] data, Class type) throws IOException try { // Get the parser from the generated Protobuf class Parser parser = (Parser) type.getMethod("parser").invoke(null); - Message message = parser.parseFrom(data); + + // Try to deserialize the data, handling potential Confluent message indices + Message message = deserializeWithMessageIndexHandling(data, parser); return type.cast(message); } catch (Exception e) { throw new IOException("Failed to deserialize Protobuf data.", e); @@ -40,4 +55,45 @@ protected T deserializeObject(byte[] data, Class type) throws IOException + "Consider using an alternative Deserializer."); } } + + private Message deserializeWithMessageIndexHandling(byte[] data, Parser parser) throws IOException { + try { + LOGGER.debug("Attempting to deserialize as standard protobuf data"); + return parser.parseFrom(data); + } catch (Exception e) { + LOGGER.debug("Standard protobuf parsing failed, attempting Confluent message-index handling"); + return deserializeWithMessageIndex(data, parser); + } + } + + private Message deserializeWithMessageIndex(byte[] data, Parser parser) throws IOException { + CodedInputStream codedInputStream = CodedInputStream.newInstance(data); + + try { + // https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format + // Read the first varint - this could be: + // 1. A single 0 (simple case - first message type) + // 2. The length of the message index array (complex case) + int firstValue = codedInputStream.readUInt32(); + + if (firstValue == 0) { + // Simple case: Single 0 byte means first message type + LOGGER.debug("Found simple message-index case (single 0), parsing remaining data as protobuf"); + return parser.parseFrom(codedInputStream); + } else { + // Complex case: firstValue is the length of the message index array + LOGGER.debug("Found complex message-index case with array length: {}, skipping {} message index values", + firstValue, firstValue); + for (int i = 0; i < firstValue; i++) { + codedInputStream.readUInt32(); // Skip each message index value + } + // Now the remaining data should be the actual protobuf message + LOGGER.debug("Finished skipping message indexes, parsing remaining data as protobuf"); + return parser.parseFrom(codedInputStream); + } + + } catch (Exception e) { + throw new IOException("Failed to parse protobuf data with or without message index", e); + } + } } diff --git a/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializerTest.java b/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializerTest.java index 2d506de4b..3315e1172 100644 --- a/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializerTest.java +++ b/powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializerTest.java @@ -15,11 +15,14 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.io.ByteArrayOutputStream; import java.io.IOException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.google.protobuf.CodedOutputStream; + import software.amazon.lambda.powertools.kafka.serializers.test.protobuf.TestProduct; class KafkaProtobufDeserializerTest { @@ -72,4 +75,78 @@ void shouldThrowExceptionWhenDeserializingInvalidProtobufData() { .isInstanceOf(IOException.class) .hasMessageContaining("Failed to deserialize Protobuf data"); } + + @Test + void shouldDeserializeProtobufDataWithSimpleMessageIndex() throws IOException { + // Given + TestProduct product = TestProduct.newBuilder() + .setId(456) + .setName("Simple Index Product") + .setPrice(199.99) + .build(); + + // Create protobuf data with simple message index (single 0) + byte[] protobufDataWithSimpleIndex = createProtobufDataWithSimpleMessageIndex(product); + + // When + TestProduct result = deserializer.deserializeObject(protobufDataWithSimpleIndex, TestProduct.class); + + // Then + assertThat(result).isNotNull(); + assertThat(result.getId()).isEqualTo(456); + assertThat(result.getName()).isEqualTo("Simple Index Product"); + assertThat(result.getPrice()).isEqualTo(199.99); + } + + @Test + void shouldDeserializeProtobufDataWithComplexMessageIndex() throws IOException { + // Given + TestProduct product = TestProduct.newBuilder() + .setId(789) + .setName("Complex Index Product") + .setPrice(299.99) + .build(); + + // Create protobuf data with complex message index (array [1,0]) + byte[] protobufDataWithComplexIndex = createProtobufDataWithComplexMessageIndex(product); + + // When + TestProduct result = deserializer.deserializeObject(protobufDataWithComplexIndex, TestProduct.class); + + // Then + assertThat(result).isNotNull(); + assertThat(result.getId()).isEqualTo(789); + assertThat(result.getName()).isEqualTo("Complex Index Product"); + assertThat(result.getPrice()).isEqualTo(299.99); + } + + private byte[] createProtobufDataWithSimpleMessageIndex(TestProduct product) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos); + + // Write simple message index (single 0) + codedOutput.writeUInt32NoTag(0); + + // Write the protobuf data + product.writeTo(codedOutput); + + codedOutput.flush(); + return baos.toByteArray(); + } + + private byte[] createProtobufDataWithComplexMessageIndex(TestProduct product) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos); + + // Write complex message index array [1,0] + codedOutput.writeUInt32NoTag(2); // Array length + codedOutput.writeUInt32NoTag(1); // First index value + codedOutput.writeUInt32NoTag(0); // Second index value + + // Write the protobuf data + product.writeTo(codedOutput); + + codedOutput.flush(); + return baos.toByteArray(); + } }