Skip to content

Commit 21e08c8

Browse files
mmelnykMathieu Lecarme
authored andcommitted
Add TLS & credentials configuration for nats_consumer input plugin (influxdata#6195)
1 parent 448aaa3 commit 21e08c8

File tree

2 files changed

+48
-5
lines changed

2 files changed

+48
-5
lines changed

plugins/inputs/nats_consumer/README.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,22 @@ instances of telegraf can read from a NATS cluster in parallel.
1212
[[inputs.nats_consumer]]
1313
## urls of NATS servers
1414
servers = ["nats://localhost:4222"]
15-
## Use Transport Layer Security
16-
secure = false
1715
## subject(s) to consume
1816
subjects = ["telegraf"]
1917
## name a queue group
2018
queue_group = "telegraf_consumers"
2119

20+
## Optional credentials
21+
# username = ""
22+
# password = ""
23+
24+
## Optional TLS Config
25+
# tls_ca = "/etc/telegraf/ca.pem"
26+
# tls_cert = "/etc/telegraf/cert.pem"
27+
# tls_key = "/etc/telegraf/key.pem"
28+
## Use TLS but skip chain & host verification
29+
# insecure_skip_verify = false
30+
2231
## Sets the limits for pending msgs and bytes for each subscription
2332
## These shouldn't need to be adjusted except in very high throughput scenarios
2433
# pending_message_limit = 65536

plugins/inputs/nats_consumer/nats_consumer.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"sync"
88

99
"github.com/influxdata/telegraf"
10+
"github.com/influxdata/telegraf/internal/tls"
1011
"github.com/influxdata/telegraf/plugins/inputs"
1112
"github.com/influxdata/telegraf/plugins/parsers"
1213
nats "github.com/nats-io/go-nats"
@@ -34,7 +35,11 @@ type natsConsumer struct {
3435
QueueGroup string `toml:"queue_group"`
3536
Subjects []string `toml:"subjects"`
3637
Servers []string `toml:"servers"`
37-
Secure bool `toml:"secure"`
38+
Username string `toml:"username"`
39+
Password string `toml:"password"`
40+
tls.ClientConfig
41+
// Legacy; Should be deprecated
42+
Secure bool `toml:"secure"`
3843

3944
// Client pending limits:
4045
PendingMessageLimit int `toml:"pending_message_limit"`
@@ -61,13 +66,24 @@ type natsConsumer struct {
6166
var sampleConfig = `
6267
## urls of NATS servers
6368
servers = ["nats://localhost:4222"]
64-
## Use Transport Layer Security
69+
## Deprecated: Use Transport Layer Security
6570
secure = false
6671
## subject(s) to consume
6772
subjects = ["telegraf"]
6873
## name a queue group
6974
queue_group = "telegraf_consumers"
7075
76+
## Optional credentials
77+
# username = ""
78+
# password = ""
79+
80+
## Optional TLS Config
81+
# tls_ca = "/etc/telegraf/ca.pem"
82+
# tls_cert = "/etc/telegraf/cert.pem"
83+
# tls_key = "/etc/telegraf/key.pem"
84+
## Use TLS but skip chain & host verification
85+
# insecure_skip_verify = false
86+
7187
## Sets the limits for pending msgs and bytes for each subscription
7288
## These shouldn't need to be adjusted except in very high throughput scenarios
7389
# pending_message_limit = 65536
@@ -125,7 +141,25 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
125141
// override servers if any were specified
126142
opts.Servers = n.Servers
127143

128-
opts.Secure = n.Secure
144+
// override authentication, if any was specified
145+
if n.Username != "" {
146+
opts.User = n.Username
147+
opts.Password = n.Password
148+
}
149+
150+
// override TLS, if it was specified
151+
tlsConfig, err := n.ClientConfig.TLSConfig()
152+
if err != nil {
153+
return err
154+
}
155+
if tlsConfig != nil {
156+
// set NATS connection TLS options
157+
opts.Secure = true
158+
opts.TLSConfig = tlsConfig
159+
} else {
160+
// should be deprecated; use TLS
161+
opts.Secure = n.Secure
162+
}
129163

130164
if n.conn == nil || n.conn.IsClosed() {
131165
n.conn, connectErr = opts.Connect()

0 commit comments

Comments
 (0)