diff --git a/docs/features/kafka.md b/docs/features/kafka.md index e6cb0cb9ff..25e2428dea 100644 --- a/docs/features/kafka.md +++ b/docs/features/kafka.md @@ -69,7 +69,7 @@ Depending on the schema types you want to use, install the library and the corre Additionally, if you want to use output parsing with [Standard Schema](https://github.com/standard-schema/standard-schema), you can install [any of the supported libraries](https://standardschema.dev/#what-schema-libraries-implement-the-spec), for example: Zod, Valibot, or ArkType. - + ``` ### Using ESM with Schema Registry @@ -85,6 +85,44 @@ The Event Source Mapping configuration determines which mode is used. With `JSON Powertools for AWS supports both Schema Registry integration modes in your Event Source Mapping configuration. +For simplicity, we will use a simple schema containing `name` and `age` in most of our examples. You can also copy the payload example with the expected Kafka event to test your code. + +=== "JSON" + + ```json + --8<-- "examples/snippets/kafka/samples/user.json" + ``` + +=== "Payload JSON" + + ```json + --8<-- "examples/snippets/kafka/samples/kafkaEventJson.json" + ``` + +=== "Avro Schema" + + ```json + --8<-- "examples/snippets/kafka/samples/user.avsc" + ``` + +=== "Payload Avro" + + ```json + --8<-- "examples/snippets/kafka/samples/kafkaEventAvro.json" + ``` + +=== "Protobuf Schema" + + ```typescript + --8<-- "examples/snippets/kafka/samples/user.proto" + ``` + +=== "Payload Protobuf" + + ```json + --8<-- "examples/snippets/kafka/samples/kafkaEventProtobuf.json" + ``` + ### Processing Kafka events The Kafka consumer utility transforms raw Kafka events into an intuitive format for processing. To handle messages effectively, you'll need to configure a schema that matches your data format. @@ -110,9 +148,9 @@ The Kafka consumer utility transforms raw Kafka events into an intuitive format --8<-- "examples/snippets/kafka/gettingStartedJson.ts" ``` -### Deserializing keys and values +### Deserializing key and value -The `kafkaConsumer` function can deserialize both keys and values independently based on your schema configuration. This flexibility allows you to work with different data formats in the same message. +The `kafkaConsumer` function can deserialize both key and value independently based on your schema configuration. This flexibility allows you to work with different data formats in the same message. === "index.ts" diff --git a/examples/snippets/kafka/advancedWorkingWithIdempotency.ts b/examples/snippets/kafka/advancedWorkingWithIdempotency.ts index 1c4a58c132..d6a5292e9e 100644 --- a/examples/snippets/kafka/advancedWorkingWithIdempotency.ts +++ b/examples/snippets/kafka/advancedWorkingWithIdempotency.ts @@ -6,7 +6,7 @@ import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dyn import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; -import { User } from './samples/user.es6.generated.js'; // protobuf generated class +import { com } from './samples/user.generated.js'; // protobuf generated class const logger = new Logger({ serviceName: 'kafka-consumer' }); const persistenceStore = new DynamoDBPersistenceLayer({ @@ -16,14 +16,14 @@ const persistenceStore = new DynamoDBPersistenceLayer({ const schemaConfig = { value: { type: SchemaType.PROTOBUF, - schema: User, + schema: com.example.User, }, } satisfies SchemaConfig; const processRecord = makeIdempotent( async (user, topic, partition, offset) => { logger.info('processing user', { - userId: user.id, + user, meta: { topic, partition, @@ -35,7 +35,7 @@ const processRecord = makeIdempotent( return { success: true, - userId: user.id, + user, }; }, { diff --git a/examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts b/examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts index 7bda7f5f2d..35974aa359 100644 --- a/examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts +++ b/examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts @@ -1,10 +1,10 @@ import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; import { Logger } from '@aws-lambda-powertools/logger'; -import { type IUser, User } from './samples/user.es6.generated.js'; // protobuf generated class +import { com } from './samples/user.generated.js'; // protobuf generated class const logger = new Logger({ serviceName: 'kafka-consumer' }); -export const handler = kafkaConsumer( +export const handler = kafkaConsumer( async (event, _context) => { for (const record of event.records) { const { value, topic, partition, offset, timestamp, headers } = record; @@ -24,15 +24,15 @@ export const handler = kafkaConsumer( // Process the deserialized value logger.info('User data', { - userId: value.id, userName: value.name, + userAge: value.age, }); } }, { value: { type: SchemaType.PROTOBUF, - schema: User, + schema: com.example.User, }, } ); diff --git a/examples/snippets/kafka/gettingStartedPrimitiveValues.ts b/examples/snippets/kafka/gettingStartedPrimitiveValues.ts index 96b03a08bf..63696a9a74 100644 --- a/examples/snippets/kafka/gettingStartedPrimitiveValues.ts +++ b/examples/snippets/kafka/gettingStartedPrimitiveValues.ts @@ -3,7 +3,7 @@ import { Logger } from '@aws-lambda-powertools/logger'; const logger = new Logger({ serviceName: 'kafka-consumer' }); -export const handler = kafkaConsumer( +export const handler = kafkaConsumer( async (event, _context) => { for (const record of event.records) { // Key is automatically decoded as UTF-8 string @@ -14,8 +14,8 @@ export const handler = kafkaConsumer( logger.info('received value', { key, product: { - id: value.id, name: value.name, + age: value.age, }, }); } diff --git a/examples/snippets/kafka/gettingStartedProtobuf.ts b/examples/snippets/kafka/gettingStartedProtobuf.ts index 34c9368f28..5e1731f18f 100644 --- a/examples/snippets/kafka/gettingStartedProtobuf.ts +++ b/examples/snippets/kafka/gettingStartedProtobuf.ts @@ -1,14 +1,14 @@ import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; -import { User } from './samples/user.es6.generated.js'; // protobuf generated class +import { com } from './samples/user.generated.js'; // protobuf generated class const logger = new Logger({ serviceName: 'kafka-consumer' }); const schemaConfig = { value: { type: SchemaType.PROTOBUF, - schema: User, + schema: com.example.User, }, } satisfies SchemaConfig; diff --git a/examples/snippets/kafka/gettingStartedValueOnly.ts b/examples/snippets/kafka/gettingStartedValueOnly.ts deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/examples/snippets/kafka/samples/kafkaEventAvro.json b/examples/snippets/kafka/samples/kafkaEventAvro.json new file mode 100644 index 0000000000..73c3f00bdf --- /dev/null +++ b/examples/snippets/kafka/samples/kafkaEventAvro.json @@ -0,0 +1,19 @@ +{ + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3", + "bootstrapServers": "boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098", + "records": { + "python-with-avro-doc-3": [ + { + "topic": "python-with-avro-doc", + "partition": 3, + "offset": 0, + "timestamp": 1750547105187, + "timestampType": "CREATE_TIME", + "key": "MTIz", + "value": "AwBXT2qalUhN6oaj2CwEeaEWFFBvd2VydG9vbHMK", + "headers": [] + } + ] + } +} \ No newline at end of file diff --git a/examples/snippets/kafka/samples/kafkaEventJson.json b/examples/snippets/kafka/samples/kafkaEventJson.json new file mode 100644 index 0000000000..8900f6e15e --- /dev/null +++ b/examples/snippets/kafka/samples/kafkaEventJson.json @@ -0,0 +1,19 @@ +{ + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3", + "bootstrapServers": "boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098", + "records": { + "python-with-avro-doc-5": [ + { + "topic": "python-with-avro-doc", + "partition": 5, + "offset": 0, + "timestamp": 1750547462087, + "timestampType": "CREATE_TIME", + "key": "MTIz", + "value": "eyJuYW1lIjogIlBvd2VydG9vbHMiLCAiYWdlIjogNX0=", + "headers": [] + } + ] + } +} \ No newline at end of file diff --git a/examples/snippets/kafka/samples/kafkaEventProtobuf.json b/examples/snippets/kafka/samples/kafkaEventProtobuf.json new file mode 100644 index 0000000000..ba5dc97fd7 --- /dev/null +++ b/examples/snippets/kafka/samples/kafkaEventProtobuf.json @@ -0,0 +1,19 @@ +{ + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:eu-west-3:992382490249:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3", + "bootstrapServers": "boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098", + "records": { + "python-with-avro-doc-5": [ + { + "topic": "python-with-avro-doc", + "partition": 5, + "offset": 1, + "timestamp": 1750624373324, + "timestampType": "CREATE_TIME", + "key": "MTIz", + "value": "Cgpwb3dlcnRvb2xzEAU=", + "headers": [] + } + ] + } +} \ No newline at end of file diff --git a/examples/snippets/kafka/samples/user.avsc b/examples/snippets/kafka/samples/user.avsc new file mode 100644 index 0000000000..a8a7248815 --- /dev/null +++ b/examples/snippets/kafka/samples/user.avsc @@ -0,0 +1,9 @@ +{ + "type": "record", + "name": "User", + "namespace": "com.example", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"} + ] +} \ No newline at end of file diff --git a/examples/snippets/kafka/samples/user.es6.generated.d.ts b/examples/snippets/kafka/samples/user.es6.generated.d.ts deleted file mode 100644 index 277a592edc..0000000000 --- a/examples/snippets/kafka/samples/user.es6.generated.d.ts +++ /dev/null @@ -1,120 +0,0 @@ -import * as $protobuf from "protobufjs"; -import Long = require("long"); -/** Properties of a User. */ -export interface IUser { - /** User id */ - id?: number | null; - - /** User name */ - name?: string | null; - - /** User price */ - price?: number | null; -} - -/** Represents a User. */ -export class User implements IUser { - /** - * Constructs a new User. - * @param [properties] Properties to set - */ - constructor(properties?: IUser); - - /** User id. */ - public id: number; - - /** User name. */ - public name: string; - - /** User price. */ - public price: number; - - /** - * Creates a new User instance using the specified properties. - * @param [properties] Properties to set - * @returns User instance - */ - public static create(properties?: IUser): User; - - /** - * Encodes the specified User message. Does not implicitly {@link User.verify|verify} messages. - * @param message User message or plain object to encode - * @param [writer] Writer to encode to - * @returns Writer - */ - public static encode( - message: IUser, - writer?: $protobuf.Writer, - ): $protobuf.Writer; - - /** - * Encodes the specified User message, length delimited. Does not implicitly {@link User.verify|verify} messages. - * @param message User message or plain object to encode - * @param [writer] Writer to encode to - * @returns Writer - */ - public static encodeDelimited( - message: IUser, - writer?: $protobuf.Writer, - ): $protobuf.Writer; - - /** - * Decodes a User message from the specified reader or buffer. - * @param reader Reader or buffer to decode from - * @param [length] Message length if known beforehand - * @returns User - * @throws {Error} If the payload is not a reader or valid buffer - * @throws {$protobuf.util.ProtocolError} If required fields are missing - */ - public static decode( - reader: $protobuf.Reader | Uint8Array, - length?: number, - ): User; - - /** - * Decodes a User message from the specified reader or buffer, length delimited. - * @param reader Reader or buffer to decode from - * @returns User - * @throws {Error} If the payload is not a reader or valid buffer - * @throws {$protobuf.util.ProtocolError} If required fields are missing - */ - public static decodeDelimited(reader: $protobuf.Reader | Uint8Array): User; - - /** - * Verifies a User message. - * @param message Plain object to verify - * @returns `null` if valid, otherwise the reason why it is not - */ - public static verify(message: { [k: string]: any }): string | null; - - /** - * Creates a User message from a plain object. Also converts values to their respective internal types. - * @param object Plain object - * @returns User - */ - public static fromObject(object: { [k: string]: any }): User; - - /** - * Creates a plain object from a User message. Also converts values to other types if specified. - * @param message User - * @param [options] Conversion options - * @returns Plain object - */ - public static toObject( - message: User, - options?: $protobuf.IConversionOptions, - ): { [k: string]: any }; - - /** - * Converts this User to JSON. - * @returns JSON object - */ - public toJSON(): { [k: string]: any }; - - /** - * Gets the default type url for User - * @param [typeUrlPrefix] your custom typeUrlPrefix(default "type.googleapis.com") - * @returns The default type url - */ - public static getTypeUrl(typeUrlPrefix?: string): string; -} diff --git a/examples/snippets/kafka/samples/user.es6.generated.js b/examples/snippets/kafka/samples/user.es6.generated.js deleted file mode 100644 index 2b21fddd88..0000000000 --- a/examples/snippets/kafka/samples/user.es6.generated.js +++ /dev/null @@ -1,262 +0,0 @@ -/*eslint-disable block-scoped-var, id-length, no-control-regex, no-magic-numbers, no-prototype-builtins, no-redeclare, no-shadow, no-var, sort-vars*/ -import * as $protobuf from "protobufjs/minimal"; - -// Common aliases -const $Reader = $protobuf.Reader, $Writer = $protobuf.Writer, $util = $protobuf.util; - -// Exported root namespace -const $root = $protobuf.roots["default"] || ($protobuf.roots["default"] = {}); - -export const User = $root.User = (() => { - - /** - * Properties of a User. - * @exports IUser - * @interface IUser - * @property {number|null} [id] User id - * @property {string|null} [name] User name - * @property {number|null} [price] User price - */ - - /** - * Constructs a new User. - * @exports User - * @classdesc Represents a User. - * @implements IUser - * @constructor - * @param {IUser=} [properties] Properties to set - */ - function User(properties) { - if (properties) - for (let keys = Object.keys(properties), i = 0; i < keys.length; ++i) - if (properties[keys[i]] != null) - this[keys[i]] = properties[keys[i]]; - } - - /** - * User id. - * @member {number} id - * @memberof User - * @instance - */ - User.prototype.id = 0; - - /** - * User name. - * @member {string} name - * @memberof User - * @instance - */ - User.prototype.name = ""; - - /** - * User price. - * @member {number} price - * @memberof User - * @instance - */ - User.prototype.price = 0; - - /** - * Creates a new User instance using the specified properties. - * @function create - * @memberof User - * @static - * @param {IUser=} [properties] Properties to set - * @returns {User} User instance - */ - User.create = function create(properties) { - return new User(properties); - }; - - /** - * Encodes the specified User message. Does not implicitly {@link User.verify|verify} messages. - * @function encode - * @memberof User - * @static - * @param {IUser} message User message or plain object to encode - * @param {$protobuf.Writer} [writer] Writer to encode to - * @returns {$protobuf.Writer} Writer - */ - User.encode = function encode(message, writer) { - if (!writer) - writer = $Writer.create(); - if (message.id != null && Object.hasOwnProperty.call(message, "id")) - writer.uint32(/* id 1, wireType 0 =*/8).int32(message.id); - if (message.name != null && Object.hasOwnProperty.call(message, "name")) - writer.uint32(/* id 2, wireType 2 =*/18).string(message.name); - if (message.price != null && Object.hasOwnProperty.call(message, "price")) - writer.uint32(/* id 3, wireType 1 =*/25).double(message.price); - return writer; - }; - - /** - * Encodes the specified User message, length delimited. Does not implicitly {@link User.verify|verify} messages. - * @function encodeDelimited - * @memberof User - * @static - * @param {IUser} message User message or plain object to encode - * @param {$protobuf.Writer} [writer] Writer to encode to - * @returns {$protobuf.Writer} Writer - */ - User.encodeDelimited = function encodeDelimited(message, writer) { - return this.encode(message, writer).ldelim(); - }; - - /** - * Decodes a User message from the specified reader or buffer. - * @function decode - * @memberof User - * @static - * @param {$protobuf.Reader|Uint8Array} reader Reader or buffer to decode from - * @param {number} [length] Message length if known beforehand - * @returns {User} User - * @throws {Error} If the payload is not a reader or valid buffer - * @throws {$protobuf.util.ProtocolError} If required fields are missing - */ - User.decode = function decode(reader, length, error) { - if (!(reader instanceof $Reader)) - reader = $Reader.create(reader); - let end = length === undefined ? reader.len : reader.pos + length, message = new $root.User(); - while (reader.pos < end) { - let tag = reader.uint32(); - if (tag === error) - break; - switch (tag >>> 3) { - case 1: { - message.id = reader.int32(); - break; - } - case 2: { - message.name = reader.string(); - break; - } - case 3: { - message.price = reader.double(); - break; - } - default: - reader.skipType(tag & 7); - break; - } - } - return message; - }; - - /** - * Decodes a User message from the specified reader or buffer, length delimited. - * @function decodeDelimited - * @memberof User - * @static - * @param {$protobuf.Reader|Uint8Array} reader Reader or buffer to decode from - * @returns {User} User - * @throws {Error} If the payload is not a reader or valid buffer - * @throws {$protobuf.util.ProtocolError} If required fields are missing - */ - User.decodeDelimited = function decodeDelimited(reader) { - if (!(reader instanceof $Reader)) - reader = new $Reader(reader); - return this.decode(reader, reader.uint32()); - }; - - /** - * Verifies a User message. - * @function verify - * @memberof User - * @static - * @param {Object.} message Plain object to verify - * @returns {string|null} `null` if valid, otherwise the reason why it is not - */ - User.verify = function verify(message) { - if (typeof message !== "object" || message === null) - return "object expected"; - if (message.id != null && message.hasOwnProperty("id")) - if (!$util.isInteger(message.id)) - return "id: integer expected"; - if (message.name != null && message.hasOwnProperty("name")) - if (!$util.isString(message.name)) - return "name: string expected"; - if (message.price != null && message.hasOwnProperty("price")) - if (typeof message.price !== "number") - return "price: number expected"; - return null; - }; - - /** - * Creates a User message from a plain object. Also converts values to their respective internal types. - * @function fromObject - * @memberof User - * @static - * @param {Object.} object Plain object - * @returns {User} User - */ - User.fromObject = function fromObject(object) { - if (object instanceof $root.User) - return object; - let message = new $root.User(); - if (object.id != null) - message.id = object.id | 0; - if (object.name != null) - message.name = String(object.name); - if (object.price != null) - message.price = Number(object.price); - return message; - }; - - /** - * Creates a plain object from a User message. Also converts values to other types if specified. - * @function toObject - * @memberof User - * @static - * @param {User} message User - * @param {$protobuf.IConversionOptions} [options] Conversion options - * @returns {Object.} Plain object - */ - User.toObject = function toObject(message, options) { - if (!options) - options = {}; - let object = {}; - if (options.defaults) { - object.id = 0; - object.name = ""; - object.price = 0; - } - if (message.id != null && message.hasOwnProperty("id")) - object.id = message.id; - if (message.name != null && message.hasOwnProperty("name")) - object.name = message.name; - if (message.price != null && message.hasOwnProperty("price")) - object.price = options.json && !isFinite(message.price) ? String(message.price) : message.price; - return object; - }; - - /** - * Converts this User to JSON. - * @function toJSON - * @memberof User - * @instance - * @returns {Object.} JSON object - */ - User.prototype.toJSON = function toJSON() { - return this.constructor.toObject(this, $protobuf.util.toJSONOptions); - }; - - /** - * Gets the default type url for User - * @function getTypeUrl - * @memberof User - * @static - * @param {string} [typeUrlPrefix] your custom typeUrlPrefix(default "type.googleapis.com") - * @returns {string} The default type url - */ - User.getTypeUrl = function getTypeUrl(typeUrlPrefix) { - if (typeUrlPrefix === undefined) { - typeUrlPrefix = "type.googleapis.com"; - } - return typeUrlPrefix + "/User"; - }; - - return User; -})(); - -export { $root as default }; diff --git a/examples/snippets/kafka/samples/user.generated.d.ts b/examples/snippets/kafka/samples/user.generated.d.ts new file mode 100644 index 0000000000..6b28327092 --- /dev/null +++ b/examples/snippets/kafka/samples/user.generated.d.ts @@ -0,0 +1,112 @@ +import * as $protobuf from "protobufjs"; +import Long = require("long"); +/** Namespace com. */ +export namespace com { + + /** Namespace example. */ + namespace example { + + /** Properties of a User. */ + interface IUser { + + /** User name */ + name?: (string|null); + + /** User age */ + age?: (number|null); + } + + /** Represents a User. */ + class User implements IUser { + + /** + * Constructs a new User. + * @param [properties] Properties to set + */ + constructor(properties?: com.example.IUser); + + /** User name. */ + public name: string; + + /** User age. */ + public age: number; + + /** + * Creates a new User instance using the specified properties. + * @param [properties] Properties to set + * @returns User instance + */ + public static create(properties?: com.example.IUser): com.example.User; + + /** + * Encodes the specified User message. Does not implicitly {@link com.example.User.verify|verify} messages. + * @param message User message or plain object to encode + * @param [writer] Writer to encode to + * @returns Writer + */ + public static encode(message: com.example.IUser, writer?: $protobuf.Writer): $protobuf.Writer; + + /** + * Encodes the specified User message, length delimited. Does not implicitly {@link com.example.User.verify|verify} messages. + * @param message User message or plain object to encode + * @param [writer] Writer to encode to + * @returns Writer + */ + public static encodeDelimited(message: com.example.IUser, writer?: $protobuf.Writer): $protobuf.Writer; + + /** + * Decodes a User message from the specified reader or buffer. + * @param reader Reader or buffer to decode from + * @param [length] Message length if known beforehand + * @returns User + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decode(reader: ($protobuf.Reader|Uint8Array), length?: number): com.example.User; + + /** + * Decodes a User message from the specified reader or buffer, length delimited. + * @param reader Reader or buffer to decode from + * @returns User + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decodeDelimited(reader: ($protobuf.Reader|Uint8Array)): com.example.User; + + /** + * Verifies a User message. + * @param message Plain object to verify + * @returns `null` if valid, otherwise the reason why it is not + */ + public static verify(message: { [k: string]: any }): (string|null); + + /** + * Creates a User message from a plain object. Also converts values to their respective internal types. + * @param object Plain object + * @returns User + */ + public static fromObject(object: { [k: string]: any }): com.example.User; + + /** + * Creates a plain object from a User message. Also converts values to other types if specified. + * @param message User + * @param [options] Conversion options + * @returns Plain object + */ + public static toObject(message: com.example.User, options?: $protobuf.IConversionOptions): { [k: string]: any }; + + /** + * Converts this User to JSON. + * @returns JSON object + */ + public toJSON(): { [k: string]: any }; + + /** + * Gets the default type url for User + * @param [typeUrlPrefix] your custom typeUrlPrefix(default "type.googleapis.com") + * @returns The default type url + */ + public static getTypeUrl(typeUrlPrefix?: string): string; + } + } +} diff --git a/examples/snippets/kafka/samples/user.generated.js b/examples/snippets/kafka/samples/user.generated.js new file mode 100644 index 0000000000..ab9c529229 --- /dev/null +++ b/examples/snippets/kafka/samples/user.generated.js @@ -0,0 +1,263 @@ +/*eslint-disable block-scoped-var, id-length, no-control-regex, no-magic-numbers, no-prototype-builtins, no-redeclare, no-shadow, no-var, sort-vars*/ +import * as $protobuf from "protobufjs/minimal"; + +// Common aliases +const $Reader = $protobuf.Reader, $Writer = $protobuf.Writer, $util = $protobuf.util; + +// Exported root namespace +const $root = $protobuf.roots["default"] || ($protobuf.roots["default"] = {}); + +export const com = $root.com = (() => { + + /** + * Namespace com. + * @exports com + * @namespace + */ + const com = {}; + + com.example = (function() { + + /** + * Namespace example. + * @memberof com + * @namespace + */ + const example = {}; + + example.User = (function() { + + /** + * Properties of a User. + * @memberof com.example + * @interface IUser + * @property {string|null} [name] User name + * @property {number|null} [age] User age + */ + + /** + * Constructs a new User. + * @memberof com.example + * @classdesc Represents a User. + * @implements IUser + * @constructor + * @param {com.example.IUser=} [properties] Properties to set + */ + function User(properties) { + if (properties) + for (let keys = Object.keys(properties), i = 0; i < keys.length; ++i) + if (properties[keys[i]] != null) + this[keys[i]] = properties[keys[i]]; + } + + /** + * User name. + * @member {string} name + * @memberof com.example.User + * @instance + */ + User.prototype.name = ""; + + /** + * User age. + * @member {number} age + * @memberof com.example.User + * @instance + */ + User.prototype.age = 0; + + /** + * Creates a new User instance using the specified properties. + * @function create + * @memberof com.example.User + * @static + * @param {com.example.IUser=} [properties] Properties to set + * @returns {com.example.User} User instance + */ + User.create = function create(properties) { + return new User(properties); + }; + + /** + * Encodes the specified User message. Does not implicitly {@link com.example.User.verify|verify} messages. + * @function encode + * @memberof com.example.User + * @static + * @param {com.example.IUser} message User message or plain object to encode + * @param {$protobuf.Writer} [writer] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + User.encode = function encode(message, writer) { + if (!writer) + writer = $Writer.create(); + if (message.name != null && Object.hasOwnProperty.call(message, "name")) + writer.uint32(/* id 1, wireType 2 =*/10).string(message.name); + if (message.age != null && Object.hasOwnProperty.call(message, "age")) + writer.uint32(/* id 2, wireType 0 =*/16).int32(message.age); + return writer; + }; + + /** + * Encodes the specified User message, length delimited. Does not implicitly {@link com.example.User.verify|verify} messages. + * @function encodeDelimited + * @memberof com.example.User + * @static + * @param {com.example.IUser} message User message or plain object to encode + * @param {$protobuf.Writer} [writer] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + User.encodeDelimited = function encodeDelimited(message, writer) { + return this.encode(message, writer).ldelim(); + }; + + /** + * Decodes a User message from the specified reader or buffer. + * @function decode + * @memberof com.example.User + * @static + * @param {$protobuf.Reader|Uint8Array} reader Reader or buffer to decode from + * @param {number} [length] Message length if known beforehand + * @returns {com.example.User} User + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + User.decode = function decode(reader, length, error) { + if (!(reader instanceof $Reader)) + reader = $Reader.create(reader); + let end = length === undefined ? reader.len : reader.pos + length, message = new $root.com.example.User(); + while (reader.pos < end) { + let tag = reader.uint32(); + if (tag === error) + break; + switch (tag >>> 3) { + case 1: { + message.name = reader.string(); + break; + } + case 2: { + message.age = reader.int32(); + break; + } + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }; + + /** + * Decodes a User message from the specified reader or buffer, length delimited. + * @function decodeDelimited + * @memberof com.example.User + * @static + * @param {$protobuf.Reader|Uint8Array} reader Reader or buffer to decode from + * @returns {com.example.User} User + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + User.decodeDelimited = function decodeDelimited(reader) { + if (!(reader instanceof $Reader)) + reader = new $Reader(reader); + return this.decode(reader, reader.uint32()); + }; + + /** + * Verifies a User message. + * @function verify + * @memberof com.example.User + * @static + * @param {Object.} message Plain object to verify + * @returns {string|null} `null` if valid, otherwise the reason why it is not + */ + User.verify = function verify(message) { + if (typeof message !== "object" || message === null) + return "object expected"; + if (message.name != null && message.hasOwnProperty("name")) + if (!$util.isString(message.name)) + return "name: string expected"; + if (message.age != null && message.hasOwnProperty("age")) + if (!$util.isInteger(message.age)) + return "age: integer expected"; + return null; + }; + + /** + * Creates a User message from a plain object. Also converts values to their respective internal types. + * @function fromObject + * @memberof com.example.User + * @static + * @param {Object.} object Plain object + * @returns {com.example.User} User + */ + User.fromObject = function fromObject(object) { + if (object instanceof $root.com.example.User) + return object; + let message = new $root.com.example.User(); + if (object.name != null) + message.name = String(object.name); + if (object.age != null) + message.age = object.age | 0; + return message; + }; + + /** + * Creates a plain object from a User message. Also converts values to other types if specified. + * @function toObject + * @memberof com.example.User + * @static + * @param {com.example.User} message User + * @param {$protobuf.IConversionOptions} [options] Conversion options + * @returns {Object.} Plain object + */ + User.toObject = function toObject(message, options) { + if (!options) + options = {}; + let object = {}; + if (options.defaults) { + object.name = ""; + object.age = 0; + } + if (message.name != null && message.hasOwnProperty("name")) + object.name = message.name; + if (message.age != null && message.hasOwnProperty("age")) + object.age = message.age; + return object; + }; + + /** + * Converts this User to JSON. + * @function toJSON + * @memberof com.example.User + * @instance + * @returns {Object.} JSON object + */ + User.prototype.toJSON = function toJSON() { + return this.constructor.toObject(this, $protobuf.util.toJSONOptions); + }; + + /** + * Gets the default type url for User + * @function getTypeUrl + * @memberof com.example.User + * @static + * @param {string} [typeUrlPrefix] your custom typeUrlPrefix(default "type.googleapis.com") + * @returns {string} The default type url + */ + User.getTypeUrl = function getTypeUrl(typeUrlPrefix) { + if (typeUrlPrefix === undefined) { + typeUrlPrefix = "type.googleapis.com"; + } + return typeUrlPrefix + "/com.example.User"; + }; + + return User; + })(); + + return example; + })(); + + return com; +})(); + +export { $root as default }; diff --git a/examples/snippets/kafka/samples/user.json b/examples/snippets/kafka/samples/user.json new file mode 100644 index 0000000000..bb2ebdafd8 --- /dev/null +++ b/examples/snippets/kafka/samples/user.json @@ -0,0 +1,4 @@ +{ + "name": "...", + "age": "..." +} \ No newline at end of file diff --git a/examples/snippets/kafka/samples/user.proto b/examples/snippets/kafka/samples/user.proto new file mode 100644 index 0000000000..d7f088fe45 --- /dev/null +++ b/examples/snippets/kafka/samples/user.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package com.example; + +message User { + string name = 1; + int32 age = 2; +} \ No newline at end of file diff --git a/examples/snippets/kafka/templates/advancedBatchSizeConfiguration.yaml b/examples/snippets/kafka/templates/advancedBatchSizeConfiguration.yaml index 8c9cf98b32..02a399b7dd 100644 --- a/examples/snippets/kafka/templates/advancedBatchSizeConfiguration.yaml +++ b/examples/snippets/kafka/templates/advancedBatchSizeConfiguration.yaml @@ -2,8 +2,8 @@ Resources: OrderProcessingFunction: Type: AWS::Serverless::Function Properties: - Handler: app.lambda_handler - Runtime: python3.9 + Runtime: nodejs22.x + Handler: index.js Events: KafkaEvent: Type: MSK diff --git a/examples/snippets/kafka/templates/gettingStartedWithMsk.yaml b/examples/snippets/kafka/templates/gettingStartedWithMsk.yaml index ab14d728ff..3a22c9328a 100644 --- a/examples/snippets/kafka/templates/gettingStartedWithMsk.yaml +++ b/examples/snippets/kafka/templates/gettingStartedWithMsk.yaml @@ -4,8 +4,8 @@ Resources: KafkaConsumerFunction: Type: AWS::Serverless::Function Properties: - Handler: app.lambda_handler - Runtime: python3.13 + Runtime: nodejs22.x + Handler: index.js Timeout: 30 Events: MSKEvent: diff --git a/packages/kafka/package.json b/packages/kafka/package.json index e26e87b8c3..71dc5686ac 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -114,7 +114,6 @@ ] } }, - "private": true, "devDependencies": { "avro-js": "^1.12.0", "protobufjs": "^7.5.3",