Skip to content

Commit 6eea89f

Browse files
committed
Make NSQ plugin compatible with version 0.10.0
1 parent dbbb2d9 commit 6eea89f

File tree

4 files changed

+172
-88
lines changed

4 files changed

+172
-88
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
- [#475](https://github.com/influxdata/telegraf/pull/475): Add response time to httpjson plugin. Thanks @titilambert!
77
- [#519](https://github.com/influxdata/telegraf/pull/519): Added a sensors input based on lm-sensors. Thanks @md14454!
88
- [#467](https://github.com/influxdata/telegraf/issues/467): Add option to disable statsd measurement name conversion.
9+
- [#534](https://github.com/influxdata/telegraf/pull/534): NSQ input plugin. Thanks @allingeek!
910

1011
### Bugfixes
1112
- [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert!

plugins/inputs/all/all.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
_ "github.com/influxdb/telegraf/plugins/inputs/mongodb"
2020
_ "github.com/influxdb/telegraf/plugins/inputs/mysql"
2121
_ "github.com/influxdb/telegraf/plugins/inputs/nginx"
22+
_ "github.com/influxdb/telegraf/plugins/inputs/nsq"
2223
_ "github.com/influxdb/telegraf/plugins/inputs/phpfpm"
2324
_ "github.com/influxdb/telegraf/plugins/inputs/ping"
2425
_ "github.com/influxdb/telegraf/plugins/inputs/postgresql"

plugins/inputs/nsq/nsq.go

Lines changed: 60 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
"sync"
3232
"time"
3333

34-
"github.com/influxdb/telegraf/plugins"
34+
"github.com/influxdb/telegraf/plugins/inputs"
3535
)
3636

3737
// Might add Lookupd endpoints for cluster discovery
@@ -41,15 +41,15 @@ type NSQ struct {
4141

4242
var sampleConfig = `
4343
# An array of NSQD HTTP API endpoints
44-
endpoints = ["http://localhost:4151","http://otherhost:4151"]
44+
endpoints = ["http://localhost:4151"]
4545
`
4646

4747
const (
4848
requestPattern = `%s/stats?format=json`
4949
)
5050

5151
func init() {
52-
plugins.Add("nsq", func() plugins.Plugin {
52+
inputs.Add("nsq", func() inputs.Input {
5353
return &NSQ{}
5454
})
5555
}
@@ -62,7 +62,7 @@ func (n *NSQ) Description() string {
6262
return "Read NSQ topic and channel statistics."
6363
}
6464

65-
func (n *NSQ) Gather(acc plugins.Accumulator) error {
65+
func (n *NSQ) Gather(acc inputs.Accumulator) error {
6666
var wg sync.WaitGroup
6767
var outerr error
6868

@@ -85,7 +85,7 @@ var tr = &http.Transport{
8585

8686
var client = &http.Client{Transport: tr}
8787

88-
func (n *NSQ) gatherEndpoint(e string, acc plugins.Accumulator) error {
88+
func (n *NSQ) gatherEndpoint(e string, acc inputs.Accumulator) error {
8989
u, err := buildURL(e)
9090
if err != nil {
9191
return err
@@ -111,13 +111,15 @@ func (n *NSQ) gatherEndpoint(e string, acc plugins.Accumulator) error {
111111
`server_version`: s.Data.Version,
112112
}
113113

114+
fields := make(map[string]interface{})
114115
if s.Data.Health == `OK` {
115-
acc.Add(`nsq_server_count`, int64(1), tags)
116+
fields["server_count"] = int64(1)
116117
} else {
117-
acc.Add(`nsq_server_count`, int64(0), tags)
118+
fields["server_count"] = int64(0)
118119
}
120+
fields["topic_count"] = int64(len(s.Data.Topics))
119121

120-
acc.Add(`nsq_server_topic_count`, int64(len(s.Data.Topics)), tags)
122+
acc.AddFields("nsq_server", fields, tags)
121123
for _, t := range s.Data.Topics {
122124
topicStats(t, acc, u.Host, s.Data.Version)
123125
}
@@ -134,68 +136,77 @@ func buildURL(e string) (*url.URL, error) {
134136
return addr, nil
135137
}
136138

137-
func topicStats(t TopicStats, acc plugins.Accumulator, host, version string) {
138-
139+
func topicStats(t TopicStats, acc inputs.Accumulator, host, version string) {
139140
// per topic overall (tag: name, paused, channel count)
140141
tags := map[string]string{
141-
`server_host`: host,
142-
`server_version`: version,
143-
`topic`: t.Name,
142+
"server_host": host,
143+
"server_version": version,
144+
"topic": t.Name,
144145
}
145146

146-
acc.Add(`nsq_topic_depth`, t.Depth, tags)
147-
acc.Add(`nsq_topic_backend_depth`, t.BackendDepth, tags)
148-
acc.Add(`nsq_topic_message_count`, t.MessageCount, tags)
147+
fields := map[string]interface{}{
148+
"depth": t.Depth,
149+
"backend_depth": t.BackendDepth,
150+
"message_count": t.MessageCount,
151+
"channel_count": int64(len(t.Channels)),
152+
}
153+
acc.AddFields("nsq_topic", fields, tags)
149154

150-
acc.Add(`nsq_topic_channel_count`, int64(len(t.Channels)), tags)
151155
for _, c := range t.Channels {
152156
channelStats(c, acc, host, version, t.Name)
153157
}
154158
}
155159

156-
func channelStats(c ChannelStats, acc plugins.Accumulator, host, version, topic string) {
160+
func channelStats(c ChannelStats, acc inputs.Accumulator, host, version, topic string) {
157161
tags := map[string]string{
158-
`server_host`: host,
159-
`server_version`: version,
160-
`topic`: topic,
161-
`channel`: c.Name,
162+
"server_host": host,
163+
"server_version": version,
164+
"topic": topic,
165+
"channel": c.Name,
162166
}
163167

164-
acc.Add("nsq_channel_depth", c.Depth, tags)
165-
acc.Add("nsq_channel_backend_depth", c.BackendDepth, tags)
166-
acc.Add("nsq_channel_inflight_count", c.InFlightCount, tags)
167-
acc.Add("nsq_channel_deferred_count", c.DeferredCount, tags)
168-
acc.Add("nsq_channel_message_count", c.MessageCount, tags)
169-
acc.Add("nsq_channel_requeue_count", c.RequeueCount, tags)
170-
acc.Add("nsq_channel_timeout_count", c.TimeoutCount, tags)
168+
fields := map[string]interface{}{
169+
"depth": c.Depth,
170+
"backend_depth": c.BackendDepth,
171+
"inflight_count": c.InFlightCount,
172+
"deferred_count": c.DeferredCount,
173+
"message_count": c.MessageCount,
174+
"requeue_count": c.RequeueCount,
175+
"timeout_count": c.TimeoutCount,
176+
"client_count": int64(len(c.Clients)),
177+
}
171178

172-
acc.Add("nsq_channel_client_count", int64(len(c.Clients)), tags)
179+
acc.AddFields("nsq_channel", fields, tags)
173180
for _, cl := range c.Clients {
174181
clientStats(cl, acc, host, version, topic, c.Name)
175182
}
176183
}
177184

178-
func clientStats(c ClientStats, acc plugins.Accumulator, host, version, topic, channel string) {
185+
func clientStats(c ClientStats, acc inputs.Accumulator, host, version, topic, channel string) {
179186
tags := map[string]string{
180-
`server_host`: host,
181-
`server_version`: version,
182-
`topic`: topic,
183-
`channel`: channel,
184-
`client_name`: c.Name,
185-
`client_id`: c.ID,
186-
`client_hostname`: c.Hostname,
187-
`client_version`: c.Version,
188-
`client_address`: c.RemoteAddress,
189-
`client_user_agent`: c.UserAgent,
190-
`client_tls`: strconv.FormatBool(c.TLS),
191-
`client_snappy`: strconv.FormatBool(c.Snappy),
192-
`client_deflate`: strconv.FormatBool(c.Deflate),
187+
"server_host": host,
188+
"server_version": version,
189+
"topic": topic,
190+
"channel": channel,
191+
"client_name": c.Name,
192+
"client_id": c.ID,
193+
"client_hostname": c.Hostname,
194+
"client_version": c.Version,
195+
"client_address": c.RemoteAddress,
196+
"client_user_agent": c.UserAgent,
197+
"client_tls": strconv.FormatBool(c.TLS),
198+
"client_snappy": strconv.FormatBool(c.Snappy),
199+
"client_deflate": strconv.FormatBool(c.Deflate),
200+
}
201+
202+
fields := map[string]interface{}{
203+
"ready_count": c.ReadyCount,
204+
"inflight_count": c.InFlightCount,
205+
"message_count": c.MessageCount,
206+
"finish_count": c.FinishCount,
207+
"requeue_count": c.RequeueCount,
193208
}
194-
acc.Add("nsq_client_ready_count", c.ReadyCount, tags)
195-
acc.Add("nsq_client_inflight_count", c.InFlightCount, tags)
196-
acc.Add("nsq_client_message_count", c.MessageCount, tags)
197-
acc.Add("nsq_client_finish_count", c.FinishCount, tags)
198-
acc.Add("nsq_client_requeue_count", c.RequeueCount, tags)
209+
acc.AddFields("nsq_client", fields, tags)
199210
}
200211

201212
type NSQStats struct {

plugins/inputs/nsq/nsq_test.go

Lines changed: 110 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99

1010
"github.com/influxdb/telegraf/testutil"
1111

12-
"github.com/stretchr/testify/assert"
1312
"github.com/stretchr/testify/require"
1413
)
1514

@@ -35,49 +34,121 @@ func TestNSQStats(t *testing.T) {
3534
// actually validate the tests
3635
tests := []struct {
3736
m string
38-
v int64
37+
f map[string]interface{}
3938
g map[string]string
4039
}{
41-
{`nsq_server_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`}},
42-
{`nsq_server_topic_count`, int64(2), map[string]string{`server_host`: host, `server_version`: `0.3.6`}},
43-
{`nsq_topic_depth`, int64(12), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}},
44-
{`nsq_topic_backend_depth`, int64(13), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}},
45-
{`nsq_topic_message_count`, int64(14), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}},
46-
{`nsq_topic_channel_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}},
47-
{`nsq_channel_depth`, int64(0), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
48-
{`nsq_channel_backend_depth`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
49-
{`nsq_channel_inflight_count`, int64(2), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
50-
{`nsq_channel_deferred_count`, int64(3), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
51-
{`nsq_channel_message_count`, int64(4), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
52-
{`nsq_channel_requeue_count`, int64(5), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
53-
{`nsq_channel_timeout_count`, int64(6), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
54-
{`nsq_channel_client_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
55-
{`nsq_client_ready_count`, int64(200), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}},
56-
{`nsq_client_inflight_count`, int64(7), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}},
57-
{`nsq_client_message_count`, int64(8), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}},
58-
{`nsq_client_finish_count`, int64(9), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}},
59-
{`nsq_client_requeue_count`, int64(10), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}},
60-
{`nsq_topic_depth`, int64(28), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}},
61-
{`nsq_topic_backend_depth`, int64(29), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}},
62-
{`nsq_topic_message_count`, int64(30), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}},
63-
{`nsq_topic_channel_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}},
64-
{`nsq_channel_depth`, int64(15), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
65-
{`nsq_channel_backend_depth`, int64(16), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
66-
{`nsq_channel_inflight_count`, int64(17), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
67-
{`nsq_channel_deferred_count`, int64(18), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
68-
{`nsq_channel_message_count`, int64(19), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
69-
{`nsq_channel_requeue_count`, int64(20), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
70-
{`nsq_channel_timeout_count`, int64(21), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
71-
{`nsq_channel_client_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
72-
{`nsq_client_ready_count`, int64(22), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}},
73-
{`nsq_client_inflight_count`, int64(23), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}},
74-
{`nsq_client_message_count`, int64(24), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}},
75-
{`nsq_client_finish_count`, int64(25), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}},
76-
{`nsq_client_requeue_count`, int64(26), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}},
40+
{
41+
"nsq_server",
42+
map[string]interface{}{
43+
"server_count": int64(1),
44+
"topic_count": int64(2),
45+
},
46+
map[string]string{
47+
"server_host": host,
48+
"server_version": "0.3.6",
49+
},
50+
},
51+
{
52+
"nsq_topic",
53+
map[string]interface{}{
54+
"depth": int64(12),
55+
"backend_depth": int64(13),
56+
"message_count": int64(14),
57+
"channel_count": int64(1),
58+
},
59+
map[string]string{
60+
"server_host": host,
61+
"server_version": "0.3.6",
62+
"topic": "t1"},
63+
},
64+
{
65+
"nsq_channel",
66+
map[string]interface{}{
67+
"depth": int64(0),
68+
"backend_depth": int64(1),
69+
"inflight_count": int64(2),
70+
"deferred_count": int64(3),
71+
"message_count": int64(4),
72+
"requeue_count": int64(5),
73+
"timeout_count": int64(6),
74+
"client_count": int64(1),
75+
},
76+
map[string]string{
77+
"server_host": host,
78+
"server_version": "0.3.6",
79+
"topic": "t1",
80+
"channel": "c1",
81+
},
82+
},
83+
{
84+
"nsq_client",
85+
map[string]interface{}{
86+
"ready_count": int64(200),
87+
"inflight_count": int64(7),
88+
"message_count": int64(8),
89+
"finish_count": int64(9),
90+
"requeue_count": int64(10),
91+
},
92+
map[string]string{"server_host": host, "server_version": "0.3.6",
93+
"topic": "t1", "channel": "c1", "client_name": "373a715cd990",
94+
"client_id": "373a715cd990", "client_hostname": "373a715cd990",
95+
"client_version": "V2", "client_address": "172.17.0.11:35560",
96+
"client_tls": "false", "client_snappy": "false",
97+
"client_deflate": "false",
98+
"client_user_agent": "nsq_to_nsq/0.3.6 go-nsq/1.0.5"},
99+
},
100+
{
101+
"nsq_topic",
102+
map[string]interface{}{
103+
"depth": int64(28),
104+
"backend_depth": int64(29),
105+
"message_count": int64(30),
106+
"channel_count": int64(1),
107+
},
108+
map[string]string{
109+
"server_host": host,
110+
"server_version": "0.3.6",
111+
"topic": "t2"},
112+
},
113+
{
114+
"nsq_channel",
115+
map[string]interface{}{
116+
"depth": int64(15),
117+
"backend_depth": int64(16),
118+
"inflight_count": int64(17),
119+
"deferred_count": int64(18),
120+
"message_count": int64(19),
121+
"requeue_count": int64(20),
122+
"timeout_count": int64(21),
123+
"client_count": int64(1),
124+
},
125+
map[string]string{
126+
"server_host": host,
127+
"server_version": "0.3.6",
128+
"topic": "t2",
129+
"channel": "c2",
130+
},
131+
},
132+
{
133+
"nsq_client",
134+
map[string]interface{}{
135+
"ready_count": int64(22),
136+
"inflight_count": int64(23),
137+
"message_count": int64(24),
138+
"finish_count": int64(25),
139+
"requeue_count": int64(26),
140+
},
141+
map[string]string{"server_host": host, "server_version": "0.3.6",
142+
"topic": "t2", "channel": "c2", "client_name": "377569bd462b",
143+
"client_id": "377569bd462b", "client_hostname": "377569bd462b",
144+
"client_version": "V2", "client_address": "172.17.0.8:48145",
145+
"client_user_agent": "go-nsq/1.0.5", "client_tls": "true",
146+
"client_snappy": "true", "client_deflate": "true"},
147+
},
77148
}
78149

79150
for _, test := range tests {
80-
assert.True(t, acc.CheckTaggedValue(test.m, test.v, test.g), "Failed expectation: (\"%v\", \"%v\", \"%v\")", test.m, test.v, fmt.Sprint(test.g))
151+
acc.AssertContainsTaggedFields(t, test.m, test.f, test.g)
81152
}
82153
}
83154

0 commit comments

Comments
 (0)