@@ -72,23 +72,27 @@ type Listeners struct {
7272
7373// Details ...
7474type Details struct {
75- Rate float64
75+ Rate float64 `json:"rate"`
7676}
7777
7878// MessageStats ...
7979type MessageStats struct {
80- Ack int64
81- AckDetails Details `json:"ack_details"`
82- Deliver int64
83- DeliverDetails Details `json:"deliver_details"`
84- DeliverGet int64 `json:"deliver_get"`
85- DeliverGetDetails Details `json:"deliver_get_details"`
86- Publish int64
87- PublishDetails Details `json:"publish_details"`
88- Redeliver int64
89- RedeliverDetails Details `json:"redeliver_details"`
90- PublishIn int64 `json:"publish_in"`
91- PublishOut int64 `json:"publish_out"`
80+ Ack int64
81+ AckDetails Details `json:"ack_details"`
82+ Deliver int64
83+ DeliverDetails Details `json:"deliver_details"`
84+ DeliverGet int64 `json:"deliver_get"`
85+ DeliverGetDetails Details `json:"deliver_get_details"`
86+ Publish int64
87+ PublishDetails Details `json:"publish_details"`
88+ Redeliver int64
89+ RedeliverDetails Details `json:"redeliver_details"`
90+ PublishIn int64 `json:"publish_in"`
91+ PublishInDetails Details `json:"publish_in_details"`
92+ PublishOut int64 `json:"publish_out"`
93+ PublishOutDetails Details `json:"publish_out_details"`
94+ ReturnUnroutable int64 `json:"return_unroutable"`
95+ ReturnUnroutableDetails Details `json:"return_unroutable_details"`
9296}
9397
9498// ObjectTotals ...
@@ -131,18 +135,37 @@ type Queue struct {
131135type Node struct {
132136 Name string
133137
134- DiskFree int64 `json:"disk_free"`
135- DiskFreeLimit int64 `json:"disk_free_limit"`
136- FdTotal int64 `json:"fd_total"`
137- FdUsed int64 `json:"fd_used"`
138- MemLimit int64 `json:"mem_limit"`
139- MemUsed int64 `json:"mem_used"`
140- ProcTotal int64 `json:"proc_total"`
141- ProcUsed int64 `json:"proc_used"`
142- RunQueue int64 `json:"run_queue"`
143- SocketsTotal int64 `json:"sockets_total"`
144- SocketsUsed int64 `json:"sockets_used"`
145- Running bool `json:"running"`
138+ DiskFree int64 `json:"disk_free"`
139+ DiskFreeLimit int64 `json:"disk_free_limit"`
140+ DiskFreeAlarm bool `json:"disk_free_alarm"`
141+ FdTotal int64 `json:"fd_total"`
142+ FdUsed int64 `json:"fd_used"`
143+ MemLimit int64 `json:"mem_limit"`
144+ MemUsed int64 `json:"mem_used"`
145+ MemAlarm bool `json:"mem_alarm"`
146+ ProcTotal int64 `json:"proc_total"`
147+ ProcUsed int64 `json:"proc_used"`
148+ RunQueue int64 `json:"run_queue"`
149+ SocketsTotal int64 `json:"sockets_total"`
150+ SocketsUsed int64 `json:"sockets_used"`
151+ Running bool `json:"running"`
152+ Uptime int64 `json:"uptime"`
153+ MnesiaDiskTxCount int64 `json:"mnesia_disk_tx_count"`
154+ MnesiaDiskTxCountDetails Details `json:"mnesia_disk_tx_count_details"`
155+ MnesiaRamTxCount int64 `json:"mnesia_ram_tx_count"`
156+ MnesiaRamTxCountDetails Details `json:"mnesia_ram_tx_count_details"`
157+ GcNum int64 `json:"gc_num"`
158+ GcNumDetails Details `json:"gc_num_details"`
159+ GcBytesReclaimed int64 `json:"gc_bytes_reclaimed"`
160+ GcBytesReclaimedDetails Details `json:"gc_bytes_reclaimed_details"`
161+ IoReadAvgTime int64 `json:"io_read_avg_time"`
162+ IoReadAvgTimeDetails Details `json:"io_read_avg_time_details"`
163+ IoReadBytes int64 `json:"io_read_bytes"`
164+ IoReadBytesDetails Details `json:"io_read_bytes_details"`
165+ IoWriteAvgTime int64 `json:"io_write_avg_time"`
166+ IoWriteAvgTimeDetails Details `json:"io_write_avg_time_details"`
167+ IoWriteBytes int64 `json:"io_write_bytes"`
168+ IoWriteBytesDetails Details `json:"io_write_bytes_details"`
146169}
147170
148171type Exchange struct {
@@ -155,6 +178,10 @@ type Exchange struct {
155178 AutoDelete bool `json:"auto_delete"`
156179}
157180
181+ type HealthCheck struct {
182+ Status string `json:"status"`
183+ }
184+
158185// gatherFunc ...
159186type gatherFunc func (r * RabbitMQ , acc telegraf.Accumulator )
160187
@@ -204,6 +231,13 @@ var sampleConfig = `
204231 queue_name_exclude = []
205232`
206233
234+ func boolToInt (b bool ) int64 {
235+ if b {
236+ return 1
237+ }
238+ return 0
239+ }
240+
207241// SampleConfig ...
208242func (r * RabbitMQ ) SampleConfig () string {
209243 return sampleConfig
@@ -302,12 +336,12 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
302336 return
303337 }
304338
305- var clustering_listeners , amqp_listeners int64 = 0 , 0
339+ var clusteringListeners , amqpListeners int64 = 0 , 0
306340 for _ , listener := range overview .Listeners {
307341 if listener .Protocol == "clustering" {
308- clustering_listeners ++
342+ clusteringListeners ++
309343 } else if listener .Protocol == "amqp" {
310- amqp_listeners ++
344+ amqpListeners ++
311345 }
312346 }
313347
@@ -328,48 +362,109 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
328362 "messages_delivered" : overview .MessageStats .Deliver ,
329363 "messages_delivered_get" : overview .MessageStats .DeliverGet ,
330364 "messages_published" : overview .MessageStats .Publish ,
331- "clustering_listeners" : clustering_listeners ,
332- "amqp_listeners" : amqp_listeners ,
365+ "clustering_listeners" : clusteringListeners ,
366+ "amqp_listeners" : amqpListeners ,
367+ "return_unroutable" : overview .MessageStats .ReturnUnroutable ,
368+ "return_unroutable_rate" : overview .MessageStats .ReturnUnroutableDetails .Rate ,
333369 }
334370 acc .AddFields ("rabbitmq_overview" , fields , tags )
335371}
336372
337373func gatherNodes (r * RabbitMQ , acc telegraf.Accumulator ) {
338- nodes := make ([]Node , 0 )
374+ allNodes := make ([]Node , 0 )
339375 // Gather information about nodes
340- err := r .requestJSON ("/api/nodes" , & nodes )
376+ err := r .requestJSON ("/api/nodes" , & allNodes )
341377 if err != nil {
342378 acc .AddError (err )
343379 return
344380 }
345- now := time .Now ()
381+
382+ nodes := make (map [string ]Node )
383+ for _ , node := range allNodes {
384+ if r .shouldGatherNode (node ) {
385+ nodes [node .Name ] = node
386+ }
387+ }
388+
389+ numberNodes := len (nodes )
390+ if numberNodes == 0 {
391+ return
392+ }
393+
394+ type NodeHealthCheck struct {
395+ NodeName string
396+ HealthCheck HealthCheck
397+ Error error
398+ }
399+
400+ healthChecksChannel := make (chan NodeHealthCheck , numberNodes )
346401
347402 for _ , node := range nodes {
348- if ! r .shouldGatherNode (node ) {
349- continue
403+ go func (nodeName string , healthChecksChannel chan NodeHealthCheck ) {
404+ var healthCheck HealthCheck
405+
406+ err := r .requestJSON ("/api/healthchecks/node/" + nodeName , & healthCheck )
407+ nodeHealthCheck := NodeHealthCheck {
408+ NodeName : nodeName ,
409+ Error : err ,
410+ HealthCheck : healthCheck ,
411+ }
412+
413+ healthChecksChannel <- nodeHealthCheck
414+ }(node .Name , healthChecksChannel )
415+ }
416+
417+ now := time .Now ()
418+
419+ for i := 0 ; i < len (nodes ); i ++ {
420+ nodeHealthCheck := <- healthChecksChannel
421+
422+ var healthCheckStatus int64 = 0
423+
424+ if nodeHealthCheck .Error != nil {
425+ acc .AddError (nodeHealthCheck .Error )
426+ } else if nodeHealthCheck .HealthCheck .Status == "ok" {
427+ healthCheckStatus = 1
350428 }
351429
430+ node := nodes [nodeHealthCheck .NodeName ]
431+
352432 tags := map [string ]string {"url" : r .URL }
353433 tags ["node" ] = node .Name
354434
355- var running int64 = 0
356- if node .Running {
357- running = 1
358- }
359-
360435 fields := map [string ]interface {}{
361- "disk_free" : node .DiskFree ,
362- "disk_free_limit" : node .DiskFreeLimit ,
363- "fd_total" : node .FdTotal ,
364- "fd_used" : node .FdUsed ,
365- "mem_limit" : node .MemLimit ,
366- "mem_used" : node .MemUsed ,
367- "proc_total" : node .ProcTotal ,
368- "proc_used" : node .ProcUsed ,
369- "run_queue" : node .RunQueue ,
370- "sockets_total" : node .SocketsTotal ,
371- "sockets_used" : node .SocketsUsed ,
372- "running" : running ,
436+ "disk_free" : node .DiskFree ,
437+ "disk_free_limit" : node .DiskFreeLimit ,
438+ "disk_free_alarm" : boolToInt (node .DiskFreeAlarm ),
439+ "fd_total" : node .FdTotal ,
440+ "fd_used" : node .FdUsed ,
441+ "mem_limit" : node .MemLimit ,
442+ "mem_used" : node .MemUsed ,
443+ "mem_alarm" : boolToInt (node .MemAlarm ),
444+ "proc_total" : node .ProcTotal ,
445+ "proc_used" : node .ProcUsed ,
446+ "run_queue" : node .RunQueue ,
447+ "sockets_total" : node .SocketsTotal ,
448+ "sockets_used" : node .SocketsUsed ,
449+ "uptime" : node .Uptime ,
450+ "mnesia_disk_tx_count" : node .MnesiaDiskTxCount ,
451+ "mnesia_disk_tx_count_rate" : node .MnesiaDiskTxCountDetails .Rate ,
452+ "mnesia_ram_tx_count" : node .MnesiaRamTxCount ,
453+ "mnesia_ram_tx_count_rate" : node .MnesiaRamTxCountDetails .Rate ,
454+ "gc_num" : node .GcNum ,
455+ "gc_num_rate" : node .GcNumDetails .Rate ,
456+ "gc_bytes_reclaimed" : node .GcBytesReclaimed ,
457+ "gc_bytes_reclaimed_rate" : node .GcBytesReclaimedDetails .Rate ,
458+ "io_read_avg_time" : node .IoReadAvgTime ,
459+ "io_read_avg_time_rate" : node .IoReadAvgTimeDetails .Rate ,
460+ "io_read_bytes" : node .IoReadBytes ,
461+ "io_read_bytes_rate" : node .IoReadBytesDetails .Rate ,
462+ "io_write_avg_time" : node .IoWriteAvgTime ,
463+ "io_write_avg_time_rate" : node .IoWriteAvgTimeDetails .Rate ,
464+ "io_write_bytes" : node .IoWriteBytes ,
465+ "io_write_bytes_rate" : node .IoWriteBytesDetails .Rate ,
466+ "running" : boolToInt (node .Running ),
467+ "health_check_status" : healthCheckStatus ,
373468 }
374469 acc .AddFields ("rabbitmq_node" , fields , tags , now )
375470 }
@@ -459,8 +554,10 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
459554 acc .AddFields (
460555 "rabbitmq_exchange" ,
461556 map [string ]interface {}{
462- "messages_publish_in" : exchange .MessageStats .PublishIn ,
463- "messages_publish_out" : exchange .MessageStats .PublishOut ,
557+ "messages_publish_in" : exchange .MessageStats .PublishIn ,
558+ "messages_publish_in_rate" : exchange .MessageStats .PublishInDetails .Rate ,
559+ "messages_publish_out" : exchange .MessageStats .PublishOut ,
560+ "messages_publish_out_rate" : exchange .MessageStats .PublishOutDetails .Rate ,
464561 },
465562 tags ,
466563 )
@@ -487,11 +584,11 @@ func (r *RabbitMQ) createQueueFilter() error {
487584 r .QueueInclude = append (r .QueueInclude , r .Queues ... )
488585 }
489586
490- filter , err := filter .NewIncludeExcludeFilter (r .QueueInclude , r .QueueExclude )
587+ queueFilter , err := filter .NewIncludeExcludeFilter (r .QueueInclude , r .QueueExclude )
491588 if err != nil {
492589 return err
493590 }
494- r .queueFilter = filter
591+ r .queueFilter = queueFilter
495592
496593 for _ , q := range r .QueueExclude {
497594 if q == "*" {
0 commit comments