@@ -41,6 +41,7 @@ type AMQPConsumer struct {
4141 // Queue Name
4242 Queue string `toml:"queue"`
4343 QueueDurability string `toml:"queue_durability"`
44+ QueuePassive bool `toml:"queue_passive"`
4445
4546 // Binding Key
4647 BindingKey string `toml:"binding_key"`
@@ -101,7 +102,7 @@ func (a *AMQPConsumer) SampleConfig() string {
101102 # username = ""
102103 # password = ""
103104
104- ## Exchange to declare and consume from .
105+ ## Name of the exchange to declare. If unset, no exchange will be declared .
105106 exchange = "telegraf"
106107
107108 ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
@@ -123,7 +124,11 @@ func (a *AMQPConsumer) SampleConfig() string {
123124 ## AMQP queue durability can be "transient" or "durable".
124125 queue_durability = "durable"
125126
126- ## Binding Key.
127+ ## If true, queue will be passively declared.
128+ # queue_passive = false
129+
130+ ## A binding between the exchange and queue using this binding key is
131+ ## created. If unset, no binding is created.
127132 binding_key = "#"
128133
129134 ## Maximum number of messages server should give to the worker.
@@ -286,59 +291,52 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
286291 return nil , fmt .Errorf ("Failed to open a channel: %s" , err )
287292 }
288293
289- var exchangeDurable = true
290- switch a .ExchangeDurability {
291- case "transient" :
292- exchangeDurable = false
293- default :
294- exchangeDurable = true
295- }
294+ if a .Exchange != "" {
295+ var exchangeDurable = true
296+ switch a .ExchangeDurability {
297+ case "transient" :
298+ exchangeDurable = false
299+ default :
300+ exchangeDurable = true
301+ }
296302
297- exchangeArgs := make (amqp.Table , len (a .ExchangeArguments ))
298- for k , v := range a .ExchangeArguments {
299- exchangeArgs [k ] = v
303+ exchangeArgs := make (amqp.Table , len (a .ExchangeArguments ))
304+ for k , v := range a .ExchangeArguments {
305+ exchangeArgs [k ] = v
306+ }
307+
308+ err = declareExchange (
309+ ch ,
310+ a .Exchange ,
311+ a .ExchangeType ,
312+ a .ExchangePassive ,
313+ exchangeDurable ,
314+ exchangeArgs )
315+ if err != nil {
316+ return nil , err
317+ }
300318 }
301319
302- err = declareExchange (
320+ q , err := declareQueue (
303321 ch ,
304- a .Exchange ,
305- a .ExchangeType ,
306- a .ExchangePassive ,
307- exchangeDurable ,
308- exchangeArgs )
322+ a .Queue ,
323+ a .QueueDurability ,
324+ a .QueuePassive )
309325 if err != nil {
310326 return nil , err
311327 }
312328
313- var queueDurable = true
314- switch a .QueueDurability {
315- case "transient" :
316- queueDurable = false
317- default :
318- queueDurable = true
319- }
320-
321- q , err := ch .QueueDeclare (
322- a .Queue , // queue
323- queueDurable , // durable
324- false , // delete when unused
325- false , // exclusive
326- false , // no-wait
327- nil , // arguments
328- )
329- if err != nil {
330- return nil , fmt .Errorf ("Failed to declare a queue: %s" , err )
331- }
332-
333- err = ch .QueueBind (
334- q .Name , // queue
335- a .BindingKey , // binding-key
336- a .Exchange , // exchange
337- false ,
338- nil ,
339- )
340- if err != nil {
341- return nil , fmt .Errorf ("Failed to bind a queue: %s" , err )
329+ if a .BindingKey != "" {
330+ err = ch .QueueBind (
331+ q .Name , // queue
332+ a .BindingKey , // binding-key
333+ a .Exchange , // exchange
334+ false ,
335+ nil ,
336+ )
337+ if err != nil {
338+ return nil , fmt .Errorf ("Failed to bind a queue: %s" , err )
339+ }
342340 }
343341
344342 err = ch .Qos (
@@ -402,6 +400,48 @@ func declareExchange(
402400 return nil
403401}
404402
403+ func declareQueue (
404+ channel * amqp.Channel ,
405+ queueName string ,
406+ queueDurability string ,
407+ queuePassive bool ,
408+ ) (* amqp.Queue , error ) {
409+ var queue amqp.Queue
410+ var err error
411+
412+ var queueDurable = true
413+ switch queueDurability {
414+ case "transient" :
415+ queueDurable = false
416+ default :
417+ queueDurable = true
418+ }
419+
420+ if queuePassive {
421+ queue , err = channel .QueueDeclarePassive (
422+ queueName , // queue
423+ queueDurable , // durable
424+ false , // delete when unused
425+ false , // exclusive
426+ false , // no-wait
427+ nil , // arguments
428+ )
429+ } else {
430+ queue , err = channel .QueueDeclare (
431+ queueName , // queue
432+ queueDurable , // durable
433+ false , // delete when unused
434+ false , // exclusive
435+ false , // no-wait
436+ nil , // arguments
437+ )
438+ }
439+ if err != nil {
440+ return nil , fmt .Errorf ("error declaring queue: %v" , err )
441+ }
442+ return & queue , nil
443+ }
444+
405445// Read messages from queue and add them to the Accumulator
406446func (a * AMQPConsumer ) process (ctx context.Context , msgs <- chan amqp.Delivery , ac telegraf.Accumulator ) {
407447 a .deliveries = make (map [telegraf.TrackingID ]amqp.Delivery )
0 commit comments