Skip to content

Commit 19b2eb0

Browse files
authored
docs(kafka): provide example payloads (#4094)
1 parent a2e6eca commit 19b2eb0

19 files changed

+511
-403
lines changed

docs/features/kafka.md

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,22 +69,60 @@ Depending on the schema types you want to use, install the library and the corre
6969

7070
Additionally, if you want to use output parsing with [Standard Schema](https://github.com/standard-schema/standard-schema), you can install [any of the supported libraries](https://standardschema.dev/#what-schema-libraries-implement-the-spec), for example: Zod, Valibot, or ArkType.
7171

72-
<!-- ### Required resources
72+
### Required resources
7373

7474
To use the Kafka consumer utility, you need an AWS Lambda function configured with a Kafka event source. This can be Amazon MSK, MSK Serverless, or a self-hosted Kafka cluster.
7575

7676
=== "gettingStartedWithMsk.yaml"
7777

7878
```yaml
7979
--8<-- "examples/snippets/kafka/templates/gettingStartedWithMsk.yaml"
80-
``` -->
80+
```
8181

8282
### Using ESM with Schema Registry
8383

8484
The Event Source Mapping configuration determines which mode is used. With `JSON`, Lambda converts all messages to JSON before invoking your function. With `SOURCE` mode, Lambda preserves the original format, requiring you function to handle the appropriate deserialization.
8585

8686
Powertools for AWS supports both Schema Registry integration modes in your Event Source Mapping configuration.
8787

88+
For simplicity, we will use a simple schema containing `name` and `age` in most of our examples. You can also copy the payload example with the expected Kafka event to test your code.
89+
90+
=== "JSON"
91+
92+
```json
93+
--8<-- "examples/snippets/kafka/samples/user.json"
94+
```
95+
96+
=== "Payload JSON"
97+
98+
```json
99+
--8<-- "examples/snippets/kafka/samples/kafkaEventJson.json"
100+
```
101+
102+
=== "Avro Schema"
103+
104+
```json
105+
--8<-- "examples/snippets/kafka/samples/user.avsc"
106+
```
107+
108+
=== "Payload Avro"
109+
110+
```json
111+
--8<-- "examples/snippets/kafka/samples/kafkaEventAvro.json"
112+
```
113+
114+
=== "Protobuf Schema"
115+
116+
```typescript
117+
--8<-- "examples/snippets/kafka/samples/user.proto"
118+
```
119+
120+
=== "Payload Protobuf"
121+
122+
```json
123+
--8<-- "examples/snippets/kafka/samples/kafkaEventProtobuf.json"
124+
```
125+
88126
### Processing Kafka events
89127

90128
The Kafka consumer utility transforms raw Kafka events into an intuitive format for processing. To handle messages effectively, you'll need to configure a schema that matches your data format.
@@ -110,9 +148,9 @@ The Kafka consumer utility transforms raw Kafka events into an intuitive format
110148
--8<-- "examples/snippets/kafka/gettingStartedJson.ts"
111149
```
112150

113-
### Deserializing keys and values
151+
### Deserializing key and value
114152

115-
The `kafkaConsumer` function can deserialize both keys and values independently based on your schema configuration. This flexibility allows you to work with different data formats in the same message.
153+
The `kafkaConsumer` function can deserialize both key and value independently based on your schema configuration. This flexibility allows you to work with different data formats in the same message.
116154

117155
=== "index.ts"
118156

examples/snippets/kafka/advancedWorkingWithIdempotency.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dyn
66
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
77
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
88
import { Logger } from '@aws-lambda-powertools/logger';
9-
import { User } from './samples/user.es6.generated.js'; // protobuf generated class
9+
import { com } from './samples/user.generated.js'; // protobuf generated class
1010

1111
const logger = new Logger({ serviceName: 'kafka-consumer' });
1212
const persistenceStore = new DynamoDBPersistenceLayer({
@@ -16,14 +16,14 @@ const persistenceStore = new DynamoDBPersistenceLayer({
1616
const schemaConfig = {
1717
value: {
1818
type: SchemaType.PROTOBUF,
19-
schema: User,
19+
schema: com.example.User,
2020
},
2121
} satisfies SchemaConfig;
2222

2323
const processRecord = makeIdempotent(
2424
async (user, topic, partition, offset) => {
2525
logger.info('processing user', {
26-
userId: user.id,
26+
user,
2727
meta: {
2828
topic,
2929
partition,
@@ -35,7 +35,7 @@ const processRecord = makeIdempotent(
3535

3636
return {
3737
success: true,
38-
userId: user.id,
38+
user,
3939
};
4040
},
4141
{

examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
22
import { Logger } from '@aws-lambda-powertools/logger';
3-
import { type IUser, User } from './samples/user.es6.generated.js'; // protobuf generated class
3+
import { com } from './samples/user.generated.js'; // protobuf generated class
44

55
const logger = new Logger({ serviceName: 'kafka-consumer' });
66

7-
export const handler = kafkaConsumer<unknown, IUser>(
7+
export const handler = kafkaConsumer<unknown, com.example.IUser>(
88
async (event, _context) => {
99
for (const record of event.records) {
1010
const { value, topic, partition, offset, timestamp, headers } = record;
@@ -24,15 +24,15 @@ export const handler = kafkaConsumer<unknown, IUser>(
2424

2525
// Process the deserialized value
2626
logger.info('User data', {
27-
userId: value.id,
2827
userName: value.name,
28+
userAge: value.age,
2929
});
3030
}
3131
},
3232
{
3333
value: {
3434
type: SchemaType.PROTOBUF,
35-
schema: User,
35+
schema: com.example.User,
3636
},
3737
}
3838
);

examples/snippets/kafka/gettingStartedPrimitiveValues.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { Logger } from '@aws-lambda-powertools/logger';
33

44
const logger = new Logger({ serviceName: 'kafka-consumer' });
55

6-
export const handler = kafkaConsumer<string, { id: number; name: string }>(
6+
export const handler = kafkaConsumer<string, { name: string; age: number }>(
77
async (event, _context) => {
88
for (const record of event.records) {
99
// Key is automatically decoded as UTF-8 string
@@ -14,8 +14,8 @@ export const handler = kafkaConsumer<string, { id: number; name: string }>(
1414
logger.info('received value', {
1515
key,
1616
product: {
17-
id: value.id,
1817
name: value.name,
18+
age: value.age,
1919
},
2020
});
2121
}

examples/snippets/kafka/gettingStartedProtobuf.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
22
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
33
import { Logger } from '@aws-lambda-powertools/logger';
4-
import { User } from './samples/user.es6.generated.js'; // protobuf generated class
4+
import { com } from './samples/user.generated.js'; // protobuf generated class
55

66
const logger = new Logger({ serviceName: 'kafka-consumer' });
77

88
const schemaConfig = {
99
value: {
1010
type: SchemaType.PROTOBUF,
11-
schema: User,
11+
schema: com.example.User,
1212
},
1313
} satisfies SchemaConfig;
1414

examples/snippets/kafka/gettingStartedValueOnly.ts

Whitespace-only changes.
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"eventSource": "aws:kafka",
3+
"eventSourceArn": "arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
4+
"bootstrapServers": "boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
5+
"records": {
6+
"python-with-avro-doc-3": [
7+
{
8+
"topic": "python-with-avro-doc",
9+
"partition": 3,
10+
"offset": 0,
11+
"timestamp": 1750547105187,
12+
"timestampType": "CREATE_TIME",
13+
"key": "MTIz",
14+
"value": "AwBXT2qalUhN6oaj2CwEeaEWFFBvd2VydG9vbHMK",
15+
"headers": []
16+
}
17+
]
18+
}
19+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"eventSource": "aws:kafka",
3+
"eventSourceArn": "arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
4+
"bootstrapServers": "boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
5+
"records": {
6+
"python-with-avro-doc-5": [
7+
{
8+
"topic": "python-with-avro-doc",
9+
"partition": 5,
10+
"offset": 0,
11+
"timestamp": 1750547462087,
12+
"timestampType": "CREATE_TIME",
13+
"key": "MTIz",
14+
"value": "eyJuYW1lIjogIlBvd2VydG9vbHMiLCAiYWdlIjogNX0=",
15+
"headers": []
16+
}
17+
]
18+
}
19+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"eventSource": "aws:kafka",
3+
"eventSourceArn": "arn:aws:kafka:eu-west-3:992382490249:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
4+
"bootstrapServers": "boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
5+
"records": {
6+
"python-with-avro-doc-5": [
7+
{
8+
"topic": "python-with-avro-doc",
9+
"partition": 5,
10+
"offset": 1,
11+
"timestamp": 1750624373324,
12+
"timestampType": "CREATE_TIME",
13+
"key": "MTIz",
14+
"value": "Cgpwb3dlcnRvb2xzEAU=",
15+
"headers": []
16+
}
17+
]
18+
}
19+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"type": "record",
3+
"name": "User",
4+
"namespace": "com.example",
5+
"fields": [
6+
{"name": "name", "type": "string"},
7+
{"name": "age", "type": "int"}
8+
]
9+
}

examples/snippets/kafka/samples/user.es6.generated.d.ts

Lines changed: 0 additions & 120 deletions
This file was deleted.

0 commit comments

Comments
 (0)