Skip to content

docs(kafka): provide example payloads #4094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions docs/features/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,60 @@ Depending on the schema types you want to use, install the library and the corre

Additionally, if you want to use output parsing with [Standard Schema](https://github.com/standard-schema/standard-schema), you can install [any of the supported libraries](https://standardschema.dev/#what-schema-libraries-implement-the-spec), for example: Zod, Valibot, or ArkType.

<!-- ### Required resources
### Required resources

To use the Kafka consumer utility, you need an AWS Lambda function configured with a Kafka event source. This can be Amazon MSK, MSK Serverless, or a self-hosted Kafka cluster.

=== "gettingStartedWithMsk.yaml"

```yaml
--8<-- "examples/snippets/kafka/templates/gettingStartedWithMsk.yaml"
``` -->
```

### Using ESM with Schema Registry

The Event Source Mapping configuration determines which mode is used. With `JSON`, Lambda converts all messages to JSON before invoking your function. With `SOURCE` mode, Lambda preserves the original format, requiring you function to handle the appropriate deserialization.

Powertools for AWS supports both Schema Registry integration modes in your Event Source Mapping configuration.

For simplicity, we will use a simple schema containing `name` and `age` in most of our examples. You can also copy the payload example with the expected Kafka event to test your code.

=== "JSON"

```json
--8<-- "examples/snippets/kafka/samples/user.json"
```

=== "Payload JSON"

```json
--8<-- "examples/snippets/kafka/samples/kafkaEventJson.json"
```

=== "Avro Schema"

```json
--8<-- "examples/snippets/kafka/samples/user.avsc"
```

=== "Payload Avro"

```json
--8<-- "examples/snippets/kafka/samples/kafkaEventAvro.json"
```

=== "Protobuf Schema"

```typescript
--8<-- "examples/snippets/kafka/samples/user.proto"
```

=== "Payload Protobuf"

```json
--8<-- "examples/snippets/kafka/samples/kafkaEventProtobuf.json"
```

### Processing Kafka events

The Kafka consumer utility transforms raw Kafka events into an intuitive format for processing. To handle messages effectively, you'll need to configure a schema that matches your data format.
Expand All @@ -110,9 +148,9 @@ The Kafka consumer utility transforms raw Kafka events into an intuitive format
--8<-- "examples/snippets/kafka/gettingStartedJson.ts"
```

### Deserializing keys and values
### Deserializing key and value

The `kafkaConsumer` function can deserialize both keys and values independently based on your schema configuration. This flexibility allows you to work with different data formats in the same message.
The `kafkaConsumer` function can deserialize both key and value independently based on your schema configuration. This flexibility allows you to work with different data formats in the same message.

=== "index.ts"

Expand Down
8 changes: 4 additions & 4 deletions examples/snippets/kafka/advancedWorkingWithIdempotency.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dyn
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import { User } from './samples/user.es6.generated.js'; // protobuf generated class
import { com } from './samples/user.generated.js'; // protobuf generated class

const logger = new Logger({ serviceName: 'kafka-consumer' });
const persistenceStore = new DynamoDBPersistenceLayer({
Expand All @@ -16,14 +16,14 @@ const persistenceStore = new DynamoDBPersistenceLayer({
const schemaConfig = {
value: {
type: SchemaType.PROTOBUF,
schema: User,
schema: com.example.User,
},
} satisfies SchemaConfig;

const processRecord = makeIdempotent(
async (user, topic, partition, offset) => {
logger.info('processing user', {
userId: user.id,
user,
meta: {
topic,
partition,
Expand All @@ -35,7 +35,7 @@ const processRecord = makeIdempotent(

return {
success: true,
userId: user.id,
user,
};
},
{
Expand Down
8 changes: 4 additions & 4 deletions examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
import { Logger } from '@aws-lambda-powertools/logger';
import { type IUser, User } from './samples/user.es6.generated.js'; // protobuf generated class
import { com } from './samples/user.generated.js'; // protobuf generated class

const logger = new Logger({ serviceName: 'kafka-consumer' });

export const handler = kafkaConsumer<unknown, IUser>(
export const handler = kafkaConsumer<unknown, com.example.IUser>(
async (event, _context) => {
for (const record of event.records) {
const { value, topic, partition, offset, timestamp, headers } = record;
Expand All @@ -24,15 +24,15 @@ export const handler = kafkaConsumer<unknown, IUser>(

// Process the deserialized value
logger.info('User data', {
userId: value.id,
userName: value.name,
userAge: value.age,
});
}
},
{
value: {
type: SchemaType.PROTOBUF,
schema: User,
schema: com.example.User,
},
}
);
4 changes: 2 additions & 2 deletions examples/snippets/kafka/gettingStartedPrimitiveValues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Logger } from '@aws-lambda-powertools/logger';

const logger = new Logger({ serviceName: 'kafka-consumer' });

export const handler = kafkaConsumer<string, { id: number; name: string }>(
export const handler = kafkaConsumer<string, { name: string; age: number }>(
async (event, _context) => {
for (const record of event.records) {
// Key is automatically decoded as UTF-8 string
Expand All @@ -14,8 +14,8 @@ export const handler = kafkaConsumer<string, { id: number; name: string }>(
logger.info('received value', {
key,
product: {
id: value.id,
name: value.name,
age: value.age,
},
});
}
Expand Down
4 changes: 2 additions & 2 deletions examples/snippets/kafka/gettingStartedProtobuf.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import { User } from './samples/user.es6.generated.js'; // protobuf generated class
import { com } from './samples/user.generated.js'; // protobuf generated class

const logger = new Logger({ serviceName: 'kafka-consumer' });

const schemaConfig = {
value: {
type: SchemaType.PROTOBUF,
schema: User,
schema: com.example.User,
},
} satisfies SchemaConfig;

Expand Down
Empty file.
19 changes: 19 additions & 0 deletions examples/snippets/kafka/samples/kafkaEventAvro.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
"bootstrapServers": "boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
"records": {
"python-with-avro-doc-3": [
{
"topic": "python-with-avro-doc",
"partition": 3,
"offset": 0,
"timestamp": 1750547105187,
"timestampType": "CREATE_TIME",
"key": "MTIz",
"value": "AwBXT2qalUhN6oaj2CwEeaEWFFBvd2VydG9vbHMK",
"headers": []
}
]
}
}
19 changes: 19 additions & 0 deletions examples/snippets/kafka/samples/kafkaEventJson.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
"bootstrapServers": "boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
"records": {
"python-with-avro-doc-5": [
{
"topic": "python-with-avro-doc",
"partition": 5,
"offset": 0,
"timestamp": 1750547462087,
"timestampType": "CREATE_TIME",
"key": "MTIz",
"value": "eyJuYW1lIjogIlBvd2VydG9vbHMiLCAiYWdlIjogNX0=",
"headers": []
}
]
}
}
19 changes: 19 additions & 0 deletions examples/snippets/kafka/samples/kafkaEventProtobuf.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:eu-west-3:992382490249:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
"bootstrapServers": "boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
"records": {
"python-with-avro-doc-5": [
{
"topic": "python-with-avro-doc",
"partition": 5,
"offset": 1,
"timestamp": 1750624373324,
"timestampType": "CREATE_TIME",
"key": "MTIz",
"value": "Cgpwb3dlcnRvb2xzEAU=",
"headers": []
}
]
}
}
9 changes: 9 additions & 0 deletions examples/snippets/kafka/samples/user.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"type": "record",
"name": "User",
"namespace": "com.example",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
120 changes: 0 additions & 120 deletions examples/snippets/kafka/samples/user.es6.generated.d.ts

This file was deleted.

Loading
Loading