11package sqs
22
33import (
4+ "context"
45 "encoding/json"
56 "errors"
67 "fmt"
7- sqsiface "github.com/RichardKnop/machinery/v2/brokers/iface/sqs"
8- "github.com/aws/aws-sdk-go-v2/service/sqs"
98 "os"
109 "sync"
1110
11+ sqsiface "github.com/RichardKnop/machinery/v2/brokers/iface/sqs"
12+
1213 "github.com/RichardKnop/machinery/v2/brokers/iface"
1314 "github.com/RichardKnop/machinery/v2/common"
1415 "github.com/RichardKnop/machinery/v2/config"
15- "github.com/aws/aws-sdk-go/aws"
16- "github.com/aws/aws-sdk-go/aws/session"
17- awssqs "github.com/aws/aws-sdk-go/service/sqs"
18- "github.com/aws/aws-sdk-go/service/sqs/sqsiface"
16+ "github.com/aws/aws-sdk-go-v2/aws"
17+ awssqs "github.com/aws/aws-sdk-go-v2/service/sqs"
18+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
1919)
2020
2121var (
@@ -26,7 +26,7 @@ type FakeSQS struct {
2626 sqsiface.API
2727}
2828
29- func (f * FakeSQS ) SendMessage (* awssqs.SendMessageInput ) (* awssqs.SendMessageOutput , error ) {
29+ func (f * FakeSQS ) SendMessage (context. Context , * awssqs.SendMessageInput , ... func ( * awssqs. Options ) ) (* awssqs.SendMessageOutput , error ) {
3030 output := awssqs.SendMessageOutput {
3131 MD5OfMessageAttributes : aws .String ("d25a6aea97eb8f585bfa92d314504a92" ),
3232 MD5OfMessageBody : aws .String ("bbdc5fdb8be7251f5c910905db994bab" ),
@@ -35,29 +35,29 @@ func (f *FakeSQS) SendMessage(*awssqs.SendMessageInput) (*awssqs.SendMessageOutp
3535 return & output , nil
3636}
3737
38- func (f * FakeSQS ) ReceiveMessage (* awssqs.ReceiveMessageInput ) (* awssqs.ReceiveMessageOutput , error ) {
38+ func (f * FakeSQS ) ReceiveMessage (context. Context , * awssqs.ReceiveMessageInput , ... func ( * awssqs. Options ) ) (* awssqs.ReceiveMessageOutput , error ) {
3939 return ReceiveMessageOutput , nil
4040}
4141
42- func (f * FakeSQS ) DeleteMessage (* awssqs.DeleteMessageInput ) (* awssqs.DeleteMessageOutput , error ) {
42+ func (f * FakeSQS ) DeleteMessage (context. Context , * awssqs.DeleteMessageInput , ... func ( * awssqs. Options ) ) (* awssqs.DeleteMessageOutput , error ) {
4343 return & awssqs.DeleteMessageOutput {}, nil
4444}
4545
4646type ErrorSQS struct {
4747 sqsiface.API
4848}
4949
50- func (e * ErrorSQS ) SendMessage (* sqs .SendMessageInput ) (* awssqs.SendMessageOutput , error ) {
50+ func (e * ErrorSQS ) SendMessage (context. Context , * awssqs .SendMessageInput , ... func ( * awssqs. Options ) ) (* awssqs.SendMessageOutput , error ) {
5151 err := errors .New ("this is an error" )
5252 return nil , err
5353}
5454
55- func (e * ErrorSQS ) ReceiveMessage (* sqs .ReceiveMessageInput ) (* awssqs.ReceiveMessageOutput , error ) {
55+ func (e * ErrorSQS ) ReceiveMessage (context. Context , * awssqs .ReceiveMessageInput , ... func ( * awssqs. Options ) ) (* awssqs.ReceiveMessageOutput , error ) {
5656 err := errors .New ("this is an error" )
5757 return nil , err
5858}
5959
60- func (e * ErrorSQS ) DeleteMessage (* sqs .DeleteMessageInput ) (* awssqs.DeleteMessageOutput , error ) {
60+ func (e * ErrorSQS ) DeleteMessage (context. Context , * awssqs .DeleteMessageInput , ... func ( * awssqs. Options ) ) (* awssqs.DeleteMessageOutput , error ) {
6161 err := errors .New ("this is an error" )
6262 return nil , err
6363}
@@ -66,15 +66,15 @@ func init() {
6666 // TODO: chang message body to signature example
6767 messageBody , _ := json .Marshal (map [string ]int {"apple" : 5 , "lettuce" : 7 })
6868 ReceiveMessageOutput = & awssqs.ReceiveMessageOutput {
69- Messages : []* awssqs .Message {
69+ Messages : []types .Message {
7070 {
71- Attributes : map [string ]* string {
72- "SentTimestamp" : aws . String ( "1512962021537" ) ,
71+ Attributes : map [string ]string {
72+ "SentTimestamp" : "1512962021537" ,
7373 },
7474 Body : aws .String (string (messageBody )),
7575 MD5OfBody : aws .String ("bbdc5fdb8be7251f5c910905db994bab" ),
7676 MD5OfMessageAttributes : aws .String ("d25a6aea97eb8f585bfa92d314504a92" ),
77- MessageAttributes : map [string ]* awssqs .MessageAttributeValue {
77+ MessageAttributes : map [string ]types .MessageAttributeValue {
7878 "Title" : {
7979 DataType : aws .String ("String" ),
8080 StringValue : aws .String ("The Whistler" ),
@@ -115,18 +115,13 @@ func NewTestConfig() *config.Config {
115115
116116func NewTestBroker (cnf * config.Config ) * Broker {
117117
118- sess := session .Must (session .NewSessionWithOptions (session.Options {
119- SharedConfigState : session .SharedConfigEnable ,
120- }))
121-
122118 var svc sqsiface.API = new (FakeSQS )
123119
124120 if cnf .SQS .Client != nil {
125121 svc = cnf .SQS .Client
126122 }
127123 return & Broker {
128124 Broker : common .NewBroker (cnf ),
129- sess : sess ,
130125 service : svc ,
131126 processingWG : sync.WaitGroup {},
132127 receivingWG : sync.WaitGroup {},
@@ -137,50 +132,46 @@ func NewTestBroker(cnf *config.Config) *Broker {
137132func NewTestErrorBroker () * Broker {
138133
139134 cnf := NewTestConfig ()
140- sess := session .Must (session .NewSessionWithOptions (session.Options {
141- SharedConfigState : session .SharedConfigEnable ,
142- }))
143135
144136 errSvc := new (ErrorSQS )
145137 return & Broker {
146138 Broker : common .NewBroker (cnf ),
147- sess : sess ,
148139 service : errSvc ,
149140 processingWG : sync.WaitGroup {},
150141 receivingWG : sync.WaitGroup {},
151142 stopReceivingChan : make (chan int ),
152143 }
153144}
154145
155- func (b * Broker ) ConsumeForTest (deliveries <- chan * sqs .ReceiveMessageOutput , concurrency int , taskProcessor iface.TaskProcessor , pool chan struct {}) error {
146+ func (b * Broker ) ConsumeForTest (deliveries <- chan * awssqs .ReceiveMessageOutput , concurrency int , taskProcessor iface.TaskProcessor , pool chan struct {}) error {
156147 return b .consume (deliveries , concurrency , taskProcessor , pool )
157148}
158149
159- func (b * Broker ) ConsumeOneForTest (delivery * sqs .ReceiveMessageOutput , taskProcessor iface.TaskProcessor ) error {
150+ func (b * Broker ) ConsumeOneForTest (delivery * awssqs .ReceiveMessageOutput , taskProcessor iface.TaskProcessor ) error {
160151 return b .consumeOne (delivery , taskProcessor )
161152}
162153
163- func (b * Broker ) DeleteOneForTest (delivery * sqs .ReceiveMessageOutput ) error {
154+ func (b * Broker ) DeleteOneForTest (delivery * awssqs .ReceiveMessageOutput ) error {
164155 return b .deleteOne (delivery )
165156}
166157
167158func (b * Broker ) DefaultQueueURLForTest () * string {
168159 return b .defaultQueueURL ()
169160}
170161
171- func (b * Broker ) ReceiveMessageForTest (qURL * string ) (* sqs .ReceiveMessageOutput , error ) {
162+ func (b * Broker ) ReceiveMessageForTest (qURL * string ) (* awssqs .ReceiveMessageOutput , error ) {
172163 return b .receiveMessage (qURL )
173164}
174165
175166func (b * Broker ) InitializePoolForTest (pool chan struct {}, concurrency int ) {
176167 b .initializePool (pool , concurrency )
177168}
178169
179- func (b * Broker ) ConsumeDeliveriesForTest (deliveries <- chan * sqs .ReceiveMessageOutput , concurrency int , taskProcessor iface.TaskProcessor , pool chan struct {}, errorsChan chan error ) (bool , error ) {
170+ func (b * Broker ) ConsumeDeliveriesForTest (deliveries <- chan * awssqs .ReceiveMessageOutput , concurrency int , taskProcessor iface.TaskProcessor , pool chan struct {}, errorsChan chan error ) (bool , error ) {
180171 return b .consumeDeliveries (deliveries , concurrency , taskProcessor , pool , errorsChan )
181172}
182173
183- func (b * Broker ) ContinueReceivingMessagesForTest (qURL * string , deliveries chan * sqs .ReceiveMessageOutput ) (bool , error ) {
174+ func (b * Broker ) ContinueReceivingMessagesForTest (qURL * string , deliveries chan * awssqs .ReceiveMessageOutput ) (bool , error ) {
184175 return b .continueReceivingMessages (qURL , deliveries )
185176}
186177
0 commit comments