Skip to content

Commit f1861a0

Browse files
grange74danielnelson
authored andcommitted
Add http input plugin which supports any input data format (#3546)
1 parent e14cf49 commit f1861a0

File tree

5 files changed

+361
-0
lines changed

5 files changed

+361
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ configuration options.
152152
* [graylog](./plugins/inputs/graylog)
153153
* [haproxy](./plugins/inputs/haproxy)
154154
* [hddtemp](./plugins/inputs/hddtemp)
155+
* [http](./plugins/inputs/http) (generic HTTP plugin, supports using input data formats)
155156
* [http_response](./plugins/inputs/http_response)
156157
* [httpjson](./plugins/inputs/httpjson) (generic JSON-emitting http service plugin)
157158
* [internal](./plugins/inputs/internal)

plugins/inputs/all/all.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
_ "github.com/influxdata/telegraf/plugins/inputs/graylog"
3030
_ "github.com/influxdata/telegraf/plugins/inputs/haproxy"
3131
_ "github.com/influxdata/telegraf/plugins/inputs/hddtemp"
32+
_ "github.com/influxdata/telegraf/plugins/inputs/http"
3233
_ "github.com/influxdata/telegraf/plugins/inputs/http_listener"
3334
_ "github.com/influxdata/telegraf/plugins/inputs/http_response"
3435
_ "github.com/influxdata/telegraf/plugins/inputs/httpjson"

plugins/inputs/http/README.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# HTTP Input Plugin
2+
3+
The HTTP input plugin collects metrics from one or more HTTP(S) endpoints. The metrics need to be formatted in one of the supported data formats. Each data format has its own unique set of configuration options, read more about them here:
4+
https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
5+
6+
7+
### Configuration:
8+
9+
This section contains the default TOML to configure the plugin. You can
10+
generate it using `telegraf --usage http`.
11+
12+
```toml
13+
# Read formatted metrics from one or more HTTP endpoints
14+
[[inputs.http]]
15+
## One or more URLs from which to read formatted metrics
16+
urls = [
17+
"http://localhost/metrics"
18+
]
19+
20+
## Optional HTTP Basic Auth Credentials
21+
# username = "username"
22+
# password = "pa$$word"
23+
24+
## Optional SSL Config
25+
# ssl_ca = "/etc/telegraf/ca.pem"
26+
# ssl_cert = "/etc/telegraf/cert.pem"
27+
# ssl_key = "/etc/telegraf/key.pem"
28+
## Use SSL but skip chain & host verification
29+
# insecure_skip_verify = false
30+
31+
# timeout = "5s"
32+
33+
## Data format to consume.
34+
## Each data format has its own unique set of configuration options, read
35+
## more about them here:
36+
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
37+
# data_format = "influx"
38+
```
39+
40+
### Metrics:
41+
42+
The metrics collected by this input plugin will depend on the configurated `data_format` and the payload returned by the HTTP endpoint(s).

plugins/inputs/http/http.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
package http
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"io/ioutil"
7+
"net/http"
8+
"strings"
9+
"sync"
10+
"time"
11+
12+
"github.com/influxdata/telegraf"
13+
"github.com/influxdata/telegraf/internal"
14+
"github.com/influxdata/telegraf/plugins/inputs"
15+
"github.com/influxdata/telegraf/plugins/parsers"
16+
)
17+
18+
type HTTP struct {
19+
URLs []string `toml:"urls"`
20+
21+
Headers map[string]string
22+
23+
// HTTP Basic Auth Credentials
24+
Username string
25+
Password string
26+
27+
// Option to add "url" tag to each metric
28+
TagURL bool `toml:"tag_url"`
29+
30+
// Path to CA file
31+
SSLCA string `toml:"ssl_ca"`
32+
// Path to host cert file
33+
SSLCert string `toml:"ssl_cert"`
34+
// Path to cert key file
35+
SSLKey string `toml:"ssl_key"`
36+
// Use SSL but skip chain & host verification
37+
InsecureSkipVerify bool
38+
39+
Timeout internal.Duration
40+
41+
client *http.Client
42+
43+
// The parser will automatically be set by Telegraf core code because
44+
// this plugin implements the ParserInput interface (i.e. the SetParser method)
45+
parser parsers.Parser
46+
}
47+
48+
var sampleConfig = `
49+
## One or more URLs from which to read formatted metrics
50+
urls = [
51+
"http://localhost/metrics"
52+
]
53+
54+
## Optional HTTP headers
55+
# headers = {"X-Special-Header" = "Special-Value"}
56+
57+
## Optional HTTP Basic Auth Credentials
58+
# username = "username"
59+
# password = "pa$$word"
60+
61+
## Tag all metrics with the url
62+
# tag_url = true
63+
64+
## Optional SSL Config
65+
# ssl_ca = "/etc/telegraf/ca.pem"
66+
# ssl_cert = "/etc/telegraf/cert.pem"
67+
# ssl_key = "/etc/telegraf/key.pem"
68+
## Use SSL but skip chain & host verification
69+
# insecure_skip_verify = false
70+
71+
# timeout = "5s"
72+
73+
## Data format to consume.
74+
## Each data format has its own unique set of configuration options, read
75+
## more about them here:
76+
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
77+
# data_format = "influx"
78+
`
79+
80+
// SampleConfig returns the default configuration of the Input
81+
func (*HTTP) SampleConfig() string {
82+
return sampleConfig
83+
}
84+
85+
// Description returns a one-sentence description on the Input
86+
func (*HTTP) Description() string {
87+
return "Read formatted metrics from one or more HTTP endpoints"
88+
}
89+
90+
// Gather takes in an accumulator and adds the metrics that the Input
91+
// gathers. This is called every "interval"
92+
func (h *HTTP) Gather(acc telegraf.Accumulator) error {
93+
if h.client == nil {
94+
tlsCfg, err := internal.GetTLSConfig(
95+
h.SSLCert, h.SSLKey, h.SSLCA, h.InsecureSkipVerify)
96+
if err != nil {
97+
return err
98+
}
99+
h.client = &http.Client{
100+
Transport: &http.Transport{
101+
TLSClientConfig: tlsCfg,
102+
},
103+
Timeout: h.Timeout.Duration,
104+
}
105+
}
106+
107+
var wg sync.WaitGroup
108+
for _, u := range h.URLs {
109+
wg.Add(1)
110+
go func(url string) {
111+
defer wg.Done()
112+
if err := h.gatherURL(acc, url); err != nil {
113+
acc.AddError(fmt.Errorf("[url=%s]: %s", url, err))
114+
}
115+
}(u)
116+
}
117+
118+
wg.Wait()
119+
120+
return nil
121+
}
122+
123+
// SetParser takes the data_format from the config and finds the right parser for that format
124+
func (h *HTTP) SetParser(parser parsers.Parser) {
125+
h.parser = parser
126+
}
127+
128+
// Gathers data from a particular URL
129+
// Parameters:
130+
// acc : The telegraf Accumulator to use
131+
// url : endpoint to send request to
132+
//
133+
// Returns:
134+
// error: Any error that may have occurred
135+
func (h *HTTP) gatherURL(
136+
acc telegraf.Accumulator,
137+
url string,
138+
) error {
139+
request, err := http.NewRequest("GET", url, nil)
140+
if err != nil {
141+
return err
142+
}
143+
144+
for k, v := range h.Headers {
145+
if strings.ToLower(k) == "host" {
146+
request.Host = v
147+
} else {
148+
request.Header.Add(k, v)
149+
}
150+
}
151+
152+
if h.Username != "" {
153+
request.SetBasicAuth(h.Username, h.Password)
154+
}
155+
156+
resp, err := h.client.Do(request)
157+
if err != nil {
158+
return err
159+
}
160+
defer resp.Body.Close()
161+
162+
if resp.StatusCode != http.StatusOK {
163+
return fmt.Errorf("Received status code %d (%s), expected %d (%s)",
164+
resp.StatusCode,
165+
http.StatusText(resp.StatusCode),
166+
http.StatusOK,
167+
http.StatusText(http.StatusOK))
168+
}
169+
170+
b, err := ioutil.ReadAll(resp.Body)
171+
if err != nil {
172+
return err
173+
}
174+
175+
if h.parser == nil {
176+
return errors.New("Parser is not set")
177+
}
178+
179+
metrics, err := h.parser.Parse(b)
180+
if err != nil {
181+
return err
182+
}
183+
184+
for _, metric := range metrics {
185+
if h.TagURL {
186+
metric.AddTag("url", url)
187+
}
188+
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
189+
}
190+
191+
return nil
192+
}
193+
194+
func init() {
195+
inputs.Add("http", func() telegraf.Input {
196+
return &HTTP{
197+
Timeout: internal.Duration{Duration: time.Second * 5},
198+
}
199+
})
200+
}

plugins/inputs/http/http_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package http_test
2+
3+
import (
4+
"net/http"
5+
"net/http/httptest"
6+
"testing"
7+
8+
plugin "github.com/influxdata/telegraf/plugins/inputs/http"
9+
"github.com/influxdata/telegraf/plugins/parsers"
10+
"github.com/influxdata/telegraf/testutil"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func TestHTTPwithJSONFormat(t *testing.T) {
15+
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
16+
if r.URL.Path == "/endpoint" {
17+
_, _ = w.Write([]byte(simpleJSON))
18+
} else {
19+
w.WriteHeader(http.StatusNotFound)
20+
}
21+
}))
22+
defer fakeServer.Close()
23+
24+
url := fakeServer.URL + "/endpoint"
25+
plugin := &plugin.HTTP{
26+
URLs: []string{url},
27+
TagURL: true,
28+
}
29+
metricName := "metricName"
30+
p, _ := parsers.NewJSONParser(metricName, nil, nil)
31+
plugin.SetParser(p)
32+
33+
var acc testutil.Accumulator
34+
require.NoError(t, acc.GatherError(plugin.Gather))
35+
36+
require.Len(t, acc.Metrics, 1)
37+
38+
// basic check to see if we got the right field, value and tag
39+
var metric = acc.Metrics[0]
40+
require.Equal(t, metric.Measurement, metricName)
41+
require.Len(t, acc.Metrics[0].Fields, 1)
42+
require.Equal(t, acc.Metrics[0].Fields["a"], 1.2)
43+
require.Equal(t, acc.Metrics[0].Tags["url"], url)
44+
}
45+
46+
func TestHTTPHeaders(t *testing.T) {
47+
header := "X-Special-Header"
48+
headerValue := "Special-Value"
49+
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
50+
if r.URL.Path == "/endpoint" {
51+
if r.Header.Get(header) == headerValue {
52+
_, _ = w.Write([]byte(simpleJSON))
53+
} else {
54+
w.WriteHeader(http.StatusForbidden)
55+
}
56+
} else {
57+
w.WriteHeader(http.StatusNotFound)
58+
}
59+
}))
60+
defer fakeServer.Close()
61+
62+
url := fakeServer.URL + "/endpoint"
63+
plugin := &plugin.HTTP{
64+
URLs: []string{url},
65+
Headers: map[string]string{header: headerValue},
66+
}
67+
metricName := "metricName"
68+
p, _ := parsers.NewJSONParser(metricName, nil, nil)
69+
plugin.SetParser(p)
70+
71+
var acc testutil.Accumulator
72+
require.NoError(t, acc.GatherError(plugin.Gather))
73+
}
74+
75+
func TestInvalidStatusCode(t *testing.T) {
76+
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
77+
w.WriteHeader(http.StatusNotFound)
78+
}))
79+
defer fakeServer.Close()
80+
81+
url := fakeServer.URL + "/endpoint"
82+
plugin := &plugin.HTTP{
83+
URLs: []string{url},
84+
}
85+
86+
metricName := "metricName"
87+
p, _ := parsers.NewJSONParser(metricName, nil, nil)
88+
plugin.SetParser(p)
89+
90+
var acc testutil.Accumulator
91+
require.Error(t, acc.GatherError(plugin.Gather))
92+
}
93+
94+
func TestParserNotSet(t *testing.T) {
95+
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
96+
if r.URL.Path == "/endpoint" {
97+
_, _ = w.Write([]byte(simpleJSON))
98+
} else {
99+
w.WriteHeader(http.StatusNotFound)
100+
}
101+
}))
102+
defer fakeServer.Close()
103+
104+
url := fakeServer.URL + "/endpoint"
105+
plugin := &plugin.HTTP{
106+
URLs: []string{url},
107+
}
108+
109+
var acc testutil.Accumulator
110+
require.Error(t, acc.GatherError(plugin.Gather))
111+
}
112+
113+
const simpleJSON = `
114+
{
115+
"a": 1.2
116+
}
117+
`

0 commit comments

Comments
 (0)