Skip to content

Commit 538efe4

Browse files
morfien101Mathieu Lecarme
authored andcommitted
Add database_tag option to influxdb_listener to add database from query string (influxdata#6257)
1 parent 22bf0b3 commit 538efe4

File tree

3 files changed

+54
-27
lines changed

3 files changed

+54
-27
lines changed

plugins/inputs/influxdb_listener/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ submits data to InfluxDB determines the destination database.
4646
tls_cert = "/etc/telegraf/cert.pem"
4747
tls_key = "/etc/telegraf/key.pem"
4848

49+
## Optional tag name used to store the database name.
50+
## If the write has a database in the query string then it will be kept in this tag name.
51+
## This tag can be used in downstream outputs.
52+
## The default value of nothing means it will be off and the database will not be recorded.
53+
## If you have a tag that is the same as the one specified below, and supply a database,
54+
## the tag will be overwritten with the database supplied.
55+
# database_tag = ""
56+
4957
## Optional username and password to accept for HTTP basic authentication.
5058
## You probably want to make sure you have TLS configured above for this.
5159
# basic_username = "foobar"

plugins/inputs/influxdb_listener/http_listener.go

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,18 @@ const (
3737
type TimeFunc func() time.Time
3838

3939
type HTTPListener struct {
40-
ServiceAddress string
41-
ReadTimeout internal.Duration
42-
WriteTimeout internal.Duration
43-
MaxBodySize internal.Size
44-
MaxLineSize internal.Size
45-
Port int
46-
40+
ServiceAddress string `toml:"service_address"`
41+
// Port gets pulled out of ServiceAddress
42+
Port int
4743
tlsint.ServerConfig
4844

49-
BasicUsername string
50-
BasicPassword string
45+
ReadTimeout internal.Duration `toml:"read_timeout"`
46+
WriteTimeout internal.Duration `toml:"write_timeout"`
47+
MaxBodySize internal.Size `toml:"max_body_size"`
48+
MaxLineSize internal.Size `toml:"max_line_size"`
49+
BasicUsername string `toml:"basic_username"`
50+
BasicPassword string `toml:"basic_password"`
51+
DatabaseTag string `toml:"database_tag"`
5152

5253
TimeFunc
5354

@@ -93,6 +94,13 @@ const sampleConfig = `
9394
## Maximum line size allowed to be sent in bytes.
9495
## 0 means to use the default of 65536 bytes (64 kibibytes)
9596
max_line_size = "64KiB"
97+
98+
99+
## Optional tag name used to store the database.
100+
## If the write has a database in the query string then it will be kept in this tag name.
101+
## This tag can be used in downstream outputs.
102+
## The default value of nothing means it will be off and the database will not be recorded.
103+
# database_tag = ""
96104
97105
## Set one or more allowed client CA certificate file names to
98106
## enable mutually authenticated TLS connections
@@ -258,6 +266,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
258266
now := h.TimeFunc()
259267

260268
precision := req.URL.Query().Get("precision")
269+
db := req.URL.Query().Get("db")
261270

262271
// Handle gzip request bodies
263272
body := req.Body
@@ -315,7 +324,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
315324

316325
if err == io.ErrUnexpectedEOF {
317326
// finished reading the request body
318-
err = h.parse(buf[:n+bufStart], now, precision)
327+
err = h.parse(buf[:n+bufStart], now, precision, db)
319328
if err != nil {
320329
log.Println("D! "+err.Error(), bufStart+n)
321330
return400 = true
@@ -346,7 +355,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
346355
bufStart = 0
347356
continue
348357
}
349-
if err := h.parse(buf[:i+1], now, precision); err != nil {
358+
if err := h.parse(buf[:i+1], now, precision, db); err != nil {
350359
log.Println("D! " + err.Error())
351360
return400 = true
352361
}
@@ -359,7 +368,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
359368
}
360369
}
361370

362-
func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
371+
func (h *HTTPListener) parse(b []byte, t time.Time, precision, db string) error {
363372
h.mu.Lock()
364373
defer h.mu.Unlock()
365374

@@ -371,6 +380,13 @@ func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
371380
}
372381

373382
for _, m := range metrics {
383+
// Do we need to keep the database name in the query string.
384+
// If a tag has been supplied to put the db in and we actually got a db query,
385+
// then we write it in. This overwrites the database tag if one was sent.
386+
// This makes it behave like the influx endpoint.
387+
if h.DatabaseTag != "" && db != "" {
388+
m.AddTag(h.DatabaseTag, db)
389+
}
374390
h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
375391
}
376392

plugins/inputs/influxdb_listener/http_listener_test.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,11 @@ func TestWriteHTTPBasicAuth(t *testing.T) {
146146
require.EqualValues(t, http.StatusNoContent, resp.StatusCode)
147147
}
148148

149-
func TestWriteHTTP(t *testing.T) {
149+
func TestWriteHTTPKeepDatabase(t *testing.T) {
150+
testMsgWithDB := "cpu_load_short,host=server01,database=wrongdb value=12.0 1422568543702900257\n"
151+
150152
listener := newTestHTTPListener()
153+
listener.DatabaseTag = "database"
151154

152155
acc := &testutil.Accumulator{}
153156
require.NoError(t, listener.Start(acc))
@@ -162,7 +165,19 @@ func TestWriteHTTP(t *testing.T) {
162165
acc.Wait(1)
163166
acc.AssertContainsTaggedFields(t, "cpu_load_short",
164167
map[string]interface{}{"value": float64(12)},
165-
map[string]string{"host": "server01"},
168+
map[string]string{"host": "server01", "database": "mydb"},
169+
)
170+
171+
// post single message to listener with a database tag in it already. It should be clobbered.
172+
resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgWithDB)))
173+
require.NoError(t, err)
174+
resp.Body.Close()
175+
require.EqualValues(t, 204, resp.StatusCode)
176+
177+
acc.Wait(1)
178+
acc.AssertContainsTaggedFields(t, "cpu_load_short",
179+
map[string]interface{}{"value": float64(12)},
180+
map[string]string{"host": "server01", "database": "mydb"},
166181
)
167182

168183
// post multiple message to listener
@@ -177,21 +192,9 @@ func TestWriteHTTP(t *testing.T) {
177192
for _, hostTag := range hostTags {
178193
acc.AssertContainsTaggedFields(t, "cpu_load_short",
179194
map[string]interface{}{"value": float64(12)},
180-
map[string]string{"host": hostTag},
195+
map[string]string{"host": hostTag, "database": "mydb"},
181196
)
182197
}
183-
184-
// Post a gigantic metric to the listener and verify that an error is returned:
185-
resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric)))
186-
require.NoError(t, err)
187-
resp.Body.Close()
188-
require.EqualValues(t, 400, resp.StatusCode)
189-
190-
acc.Wait(3)
191-
acc.AssertContainsTaggedFields(t, "cpu_load_short",
192-
map[string]interface{}{"value": float64(12)},
193-
map[string]string{"host": "server01"},
194-
)
195198
}
196199

197200
// http listener should add a newline at the end of the buffer if it's not there

0 commit comments

Comments
 (0)