Skip to content

fix(kafka): Add support for confluent message indices. #1902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 19, 2025

Conversation

phipag
Copy link
Contributor

@phipag phipag commented Jun 19, 2025

Issue #, if available: #1901

Description of changes:

This is an import fix to support customers using Confluent schema integration on the producer side of Protobuf messages. This only applies for Protobuf and not for Avro.

According to the wire format of the Confluent serializers message-indices are injected as additional bytes after magic bytes and schema id:

magic-byte, schema-id, message-indexes, protobuf-payload

We need to strip these message-indexes before attempting to parse the data using the plain Google protobuf library. This is what this PR is doing:

  1. Attempt to parse using plain protobuf
  2. If it fails
    1. Attempt to strip message-indexes and parse again
  3. If it still fails we have some corrupted data or a schema mismatch and raise an exception

Added unit tests covering 100% of branches.

image

Checklist

Copy link

@phipag phipag linked an issue Jun 19, 2025 that may be closed by this pull request
@phipag phipag merged commit f563d23 into main Jun 19, 2025
14 checks passed
@phipag phipag deleted the feature/kafka-protobuf-confluent-compatibility branch June 19, 2025 13:09
@github-project-automation github-project-automation bot moved this from Pending review to Coming soon in Powertools for AWS Lambda (Java) Jun 19, 2025
Comment on lines +87 to +89
for (int i = 0; i < firstValue; i++) {
codedInputStream.readUInt32(); // Skip each message index value
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will clip multiple bytes which might be part of the actual data as well. We have seen cases for confluent serialized proto messages where first byte is not the length but index of the message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I will update to reading varint and also handle Glue bytes as an additional edge-case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Coming soon
Development

Successfully merging this pull request may close these issues.

Feature: Support Confluent producers in new Kafka utility
3 participants