Skip to content

Commit e72cd58

Browse files
committed
Add README content
1 parent 8588356 commit e72cd58

File tree

3 files changed

+37
-72
lines changed

3 files changed

+37
-72
lines changed

packages/@aws-cdk/aws-pipes-sources-alpha/README.md

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,46 @@
1717
<!--END STABILITY BANNER-->
1818

1919

20-
EventBridge Pipes let you create source to target connections between several
21-
aws services. While transporting messages from a source to a target the messages
22-
can be filtered, transformed and enriched.
20+
EventBridge Pipes Sources let you create a source for a EventBridge Pipe.
2321

24-
![diagram of pipes](https://d1.awsstatic.com/product-marketing/EventBridge/Product-Page-Diagram_Amazon-EventBridge-Pipes.cd7961854be4432d63f6158ffd18271d6c9fa3ec.png)
2522

2623
For more details see the service documentation:
2724

28-
[Documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)
25+
[Documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-source.html)
2926

3027
## Pipe sources
3128

32-
// TODO
29+
Pipe sources are the starting point of a EventBridge Pipe. They are the source of the events that are sent to the pipe.
3330

31+
### Amazon SQS
32+
33+
A SQS message queue can be used as a source for a pipe. The queue will be polled for new messages and the messages will be sent to the pipe.
34+
35+
```ts
36+
declare const sourceQueue: sqs.Queue;
37+
declare const targetQueue: sqs.Queue;
38+
39+
const pipeSource = new sources.SqsSource(sourceQueue);
40+
41+
const pipe = new pipes.Pipe(this, 'Pipe', {
42+
source: pipeSource,
43+
target: new SomeTarget(targetQueue)
44+
});
45+
```
46+
47+
The polling configuration can be customized:
48+
49+
```ts
50+
declare const sourceQueue: sqs.Queue;
51+
declare const targetQueue: sqs.Queue;
52+
53+
const pipeSource = new sources.SqsSource(sourceQueue, {
54+
batchSize: 10,
55+
maximumBatchingWindow: cdk.Duration.seconds(10)
56+
});
57+
58+
const pipe = new pipes.Pipe(this, 'Pipe', {
59+
source: pipeSource,
60+
target: new SomeTarget(targetQueue)
61+
});
62+
```

packages/@aws-cdk/aws-pipes-sources-alpha/rosetta/default.ts-fixture

Lines changed: 2 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,9 @@ import * as lambda from 'aws-cdk-lib/aws-lambda';
55
import * as logs from 'aws-cdk-lib/aws-logs';
66
import { Construct } from 'constructs';
77
import * as pipes from '@aws-cdk/aws-pipes-alpha';
8+
import * as sources from '@aws-cdk/aws-pipes-sources-alpha';
89

9-
class SqsSource implements pipes.ISource {
10-
sourceArn: string;
11-
sourceParameters = undefined;
12-
constructor(private readonly queue: sqs.Queue) {
13-
this.queue = queue;
14-
this.sourceArn = queue.queueArn;
15-
}
16-
bind(_pipe: pipes.IPipe): pipes.SourceConfig {
17-
return {
18-
sourceParameters: this.sourceParameters,
19-
};
20-
}
21-
grantRead(pipeRole: cdk.aws_iam.IRole): void {
22-
this.queue.grantConsumeMessages(pipeRole);
23-
}
24-
}
25-
26-
class SqsTarget implements pipes.ITarget {
10+
class SomeTarget implements pipes.ITarget {
2711
targetArn: string;
2812
inputTransformation: pipes.InputTransformation | undefined;
2913

@@ -46,47 +30,6 @@ class SqsTarget implements pipes.ITarget {
4630
}
4731
}
4832

49-
class LambdaEnrichment implements pipes.IEnrichment {
50-
enrichmentArn: string;
51-
52-
private inputTransformation: pipes.InputTransformation | undefined;
53-
constructor(private readonly lambda: lambda.Function, props: {inputTransformation?: pipes.InputTransformation} = {}) {
54-
this.enrichmentArn = lambda.functionArn;
55-
this.inputTransformation = props?.inputTransformation
56-
}
57-
bind(pipe: pipes.IPipe): pipes.EnrichmentParametersConfig {
58-
return {
59-
enrichmentParameters: {
60-
inputTemplate: this.inputTransformation?.bind(pipe).inputTemplate,
61-
},
62-
};
63-
}
64-
grantInvoke(pipeRole: cdk.aws_iam.IRole): void {
65-
this.lambda.grantInvoke(pipeRole);
66-
}
67-
}
68-
69-
class CloudwatchDestination implements pipes.ILogDestination {
70-
parameters: pipes.LogDestinationParameters;
71-
constructor(private readonly logGroup: cdk.aws_logs.LogGroup) {
72-
this.logGroup = logGroup;
73-
this.parameters = {
74-
cloudwatchLogsLogDestination: {
75-
logGroupArn: logGroup.logGroupArn,
76-
},
77-
};
78-
}
79-
bind(_pipe: pipes.IPipe): pipes.LogDestinationConfig {
80-
return {
81-
parameters: this.parameters,
82-
};
83-
}
84-
85-
grantPush(pipeRole: cdk.aws_iam.IRole): void {
86-
this.logGroup.grantWrite(pipeRole);
87-
}
88-
}
89-
9033
class Fixture extends cdk.Stack {
9134
constructor(scope: Construct, id: string) {
9235
super(scope, id);

packages/@aws-cdk/aws-pipes-sources-alpha/rosetta/pipes-imports.ts-fixture

Lines changed: 0 additions & 7 deletions
This file was deleted.

0 commit comments

Comments
 (0)