@@ -6,65 +6,89 @@ import (
66 "log"
77 "os"
88 "os/signal"
9+ "runtime"
910 "sync"
1011
1112 "github.com/aws/aws-sdk-go-v2/aws"
1213 "github.com/aws/aws-sdk-go-v2/config"
1314 "github.com/aws/aws-sdk-go-v2/service/sqs"
15+ flags "github.com/jessevdk/go-flags"
1416)
1517
1618const (
17- queueURL = "https://sqs.us-east-1.amazonaws.com/xxxxxxxx/yyyyyyyyy"
18- concurrency = 10
19+ maxNumberOfMessages = 10
20+ waitTimeSeconds = 20
1921)
2022
21- var (
22- outputDelimiter = []byte ("\n " )
23- )
23+ type opts struct {
24+ Concurrency int `short:"c" long:"concurrency" description:"Number of concurrent SQS pollers; Defaults to 10 x Num. of CPUs"`
25+ Positional struct {
26+ QueueName string `positional-arg-name:"queue-name"`
27+ } `positional-args:"true" required:"true"`
28+ }
2429
2530func main () {
31+ opts := opts {}
32+ if _ , err := flags .Parse (& opts ); err != nil {
33+ if e , ok := err .(* flags.Error ); ok {
34+ if e .Type == flags .ErrHelp {
35+ os .Exit (0 )
36+ } else {
37+ os .Exit (1 )
38+ }
39+ }
40+ }
41+
2642 awsCfg , err := config .LoadDefaultConfig (context .Background ())
2743 if err != nil {
2844 log .Fatalf ("config.LoadDefaultConfig() failed: %v" , err )
2945 }
46+
3047 sqsClient := sqs .NewFromConfig (awsCfg )
3148
49+ resp , err := sqsClient .GetQueueUrl (context .Background (), & sqs.GetQueueUrlInput {
50+ QueueName : aws .String (opts .Positional .QueueName ),
51+ })
52+ if err != nil {
53+ log .Fatalf ("sqsClient.GetQueueUrl(%s) failed: %v" , opts .Positional .QueueName , err )
54+ }
55+
3256 ctx , stop := signal .NotifyContext (context .Background (), os .Interrupt )
3357 defer stop ()
3458
59+ if opts .Concurrency <= 0 {
60+ opts .Concurrency = 10 * runtime .NumCPU ()
61+ }
62+
3563 wg := & sync.WaitGroup {}
36- for i := 0 ; i < concurrency ; i ++ {
64+ for i := 0 ; i < opts . Concurrency ; i ++ {
3765 wg .Add (1 )
38- go poll (ctx , sqsClient , os .Stdout , wg )
66+ go poll (ctx , sqsClient , resp . QueueUrl , os .Stdout , wg )
3967 }
4068
4169 wg .Wait ()
4270}
4371
44- func poll (ctx context.Context , sqsClient * sqs.Client , f * os.File , wg * sync.WaitGroup ) {
72+ func poll (ctx context.Context , sqsClient * sqs.Client , queueURL * string , f * os.File , wg * sync.WaitGroup ) {
4573 defer wg .Done ()
4674
4775 for {
4876 select {
4977 case <- ctx .Done ():
50- log .Printf ("poller exited" )
5178 return
5279 default :
5380 resp , err := sqsClient .ReceiveMessage (ctx , & sqs.ReceiveMessageInput {
54- QueueUrl : aws . String ( queueURL ) ,
55- MaxNumberOfMessages : int32 ( 10 ) ,
56- WaitTimeSeconds : int32 ( 20 ) ,
81+ QueueUrl : queueURL ,
82+ MaxNumberOfMessages : maxNumberOfMessages ,
83+ WaitTimeSeconds : waitTimeSeconds ,
5784 })
5885 if err != nil {
5986 if errors .Is (err , context .Canceled ) {
60- log .Printf ("poller exited" )
6187 return
6288 }
6389 log .Fatalf ("sqsClient.ReceiveMessage() failed: %v" , err )
6490 }
6591
66- log .Printf ("received messages; count = %d\n " , len (resp .Messages ))
67-
6892 for _ , msg := range resp .Messages {
6993 // sequential is ok, poller is concurrent
7094 handleMessage (f , * msg .Body )
@@ -73,6 +97,8 @@ func poll(ctx context.Context, sqsClient *sqs.Client, f *os.File, wg *sync.WaitG
7397 }
7498}
7599
100+ var outputDelimiter = []byte ("\n " )
101+
76102func handleMessage (f * os.File , body string ) {
77103 if _ , err := f .WriteString (body ); err != nil {
78104 log .Fatalf ("WriteString() failed: %v" , err )
0 commit comments