Skip to content

eventbridge pipes to FIFO SQS dynamic message id pattern created #2764

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions eventbridge-pipes-dynamic-message-group-id/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Assigning a dynamic message group ID from the message body using Amazon EventBridge Pipes

Amazon EventBridge, a serverless event bus service, and Amazon SQS, a managed message queuing service, work together in event-driven architectures to route and process messages between AWS services and applications. While Amazon EventBridge routes messages to Amazon SQS queues for processing by microservices, FIFO queues can be used for strict message ordering. Although Amazon EventBridge cannot directly set message group IDs for organizing related messages, Amazon EventBridge Pipes provides a solution by allowing dynamic message group ID assignment based on event properties, enabling ordered processing for specific users, applications, or locations without additional coding.

Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/eventbridge-pipes-dynamic-message-group-id

> [!Important]
> This application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements
* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed


## Deployment

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```
git clone https://github.com/aws-samples/serverless-patterns
```
2. Change directory to the pattern directory:
```
cd eventbridge-pipes-dynamic-message-group-id
```

3. Build and deploy the SAM application
```bash
sam deploy --guided
```

During the prompts:
* Enter a stack name
* Enter the desired AWS Region
* Allow SAM CLI to create IAM roles with the required permissions.

You can accept all other defaults. Copy down the Amazon SQS Output Queue URL. You'll use this later in testing.

Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in future to use these defaults.

## How it works
![AWS Architecture Amazon EventBridge rule to Amazon SQS FIFO Queue to Amazon EventBridge Pipe to Amazon SQS FIFO Queue with message group ID set](assets/architecture.png)

This project implements an event processing pipeline using Amazon EventBridge and Amazon SQS FIFO queues. The pipeline consists of:

- An Amazon EventBridge rule that captures events from "my-custom-app" source
- An input Amazon SQS FIFO queue that receives events from the Amazon EventBridge rule
- An Amazon EventBridge pipe that processes messages from the input queue and dynamically sets the message group ID for the target Amazon SQS FIFO queue
- An output Amazon SQS FIFO queue that receives processed messages

A key component of this pattern is the syntax in the EventBrige Pipe Target to obtain the correct property value to set the message group id. The following shows an example in the AWS Console. You can also view this configuration in the SAM template under the CustomEventPipe resource.
![Amazon EventBridge Pipe Target Configuration](assets/pipeTargetConfiguration.png)

## Testing

To test, you'll send an event from a custom source to Amazon EventBridge, which will trigger the Amazon EventBridge rule to send the event to an Amazon SQS FIFO Queue. You'll then wait for the Amazon EventBridge pipe to process and finally verify the message within the destination Amazon SQS FIFO queue with the message group ID set.

1. Send a test event to Amazon EventBridge. Take a look at the event structure and attributes. The account attribute will be used as the message group ID.
```bash
aws events put-events --entries file://events/test-event.json
```

2. Wait for a couple seconds for the Amazon EventBridge pipe to process the message, then check the output queue. Make sure you replace the queue-url value with the correct output queue URL from deployment.
```bash
aws sqs receive-message \
--queue-url <OUTPUT_QUEUE_URL> \
--attribute-names All \
--message-attribute-names All \
--max-number-of-messages 10
```

You should see a list of messages. View the event and take note of the MessageGroupID under the Attributes section. This originally came from your event input "accountId" property. The Amazon EventBridge Pipe is what allows us to perform this modification. Feel free to modify the event, send additional events to Amazon EventBridge and review the output queue messages.

## Cleanup

To remove all resources:

```bash
sam delete
```

----
Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: MIT-0
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[
{
"Source": "my-custom-app",
"DetailType": "CustomEvent",
"Detail": "{\"accountId\": \"12345678\", \"message\": \"Test message\", \"timestamp\": \"2023-01-01T00:00:00Z\"}"
}
]
61 changes: 61 additions & 0 deletions eventbridge-pipes-dynamic-message-group-id/example-pattern.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
{
"title": "Set an Amazon SQS FIFO queue's message group ID with an EventBridge Pipe",
"description": "Uses Amazon EventBridge Pipes to dynamically retrieve and set the message group ID for a SQS FIFO destination queue",
"language": "YAML",
"level": "200",
"framework": "AWS SAM",
"introBox": {
"headline": "How it works",
"text": [
"This pattern demonstrates how to use an Amazon EventBridge rule, Amazon EventBridge Pipes and Amazon SQS FIFO queues to dynamically retrieve and set a message group ID from the message body for an Amazon SQS FIFO queue."
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/eventbridge-pipes-dynamic-message-group-id",
"templateURL": "serverless-patterns/eventbridge-pipes-dynamic-message-group-id",
"projectFolder": "eventbridge-pipes-dynamic-message-group-id",
"templateFile": "template.yaml"
}
},
"resources": {
"bullets": [
{
"text": "Amazon EventBridge Pipes",
"link": "https://aws.amazon.com/eventbridge/pipes/"
},
{
"text": "Amazon EventBridge User Guide",
"link": "https://docs.aws.amazon.com/eventbridge/latest/userguide/"
},
{
"text": "Amazon SQS (Simple Queue Service) FIFO (First-In-First-Out) Queues",
"link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fifo-queues.html"
}
]
},
"deploy": {
"text": [
"sam deploy"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"Delete the stack: sam delete"
]
},
"authors": [
{
"name": "Kurt Tometich",
"image": "https://raw.githubusercontent.com/boomtown15/static-references/refs/heads/main/kurt-tometich.jpeg",
"bio": "Kurt is a Sr. Solutions Architect based in Colorado who enjoys building lean, mean serverless solutions.",
"linkedin": "kurt-tometich",
"twitter": ""
}
]
}
108 changes: 108 additions & 0 deletions eventbridge-pipes-dynamic-message-group-id/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: SAM template for EventBridge rule, SQS FIFO queues and EventBridge pipe setup

Resources:

# EventBridge Rule IAM Role
EventBridgeRuleRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: events.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: AllowSQSAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- sqs:SendMessage
Resource: !GetAtt InputFifoQueue.Arn

# Input FIFO Queue
InputFifoQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: input-queue.fifo
FifoQueue: true
ContentBasedDeduplication: true

# Output FIFO Queue
OutputFifoQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: output-queue.fifo
FifoQueue: true
ContentBasedDeduplication: true

# EventBridge Rule
CustomEventRule:
Type: AWS::Events::Rule
Properties:
Description: "Rule to match events from my-custom-app"
EventPattern:
source:
- "my-custom-app"
State: "ENABLED"
Targets:
- Arn: !GetAtt InputFifoQueue.Arn
Id: "SendToInputQueue"
SqsParameters:
MessageGroupId: "DEFAULT"
RoleArn: !GetAtt EventBridgeRuleRole.Arn

# EventBridge Pipe
CustomEventPipe:
Type: AWS::Pipes::Pipe
Properties:
Name: "custom-event-pipe"
RoleArn: !GetAtt PipeRole.Arn
Source: !GetAtt InputFifoQueue.Arn
Target: !GetAtt OutputFifoQueue.Arn
TargetParameters:
SqsQueueParameters:
MessageGroupId: "$.body.detail.accountId"

# EventBridge Pipe IAM Role
PipeRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: pipes.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: PipeSourcePolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- sqs:ReceiveMessage
- sqs:DeleteMessage
- sqs:GetQueueAttributes
Resource: !GetAtt InputFifoQueue.Arn
- PolicyName: PipeTargetPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- sqs:SendMessage
Resource: !GetAtt OutputFifoQueue.Arn

Outputs:

OutputQueueURL:
Description: "URL of the output FIFO queue"
Value: !Ref OutputFifoQueue