Skip to content

Commit 22d78f6

Browse files
puckpuckotherpirate
authored andcommitted
Add Wavefront parser (influxdata#4402)
1 parent 182ea09 commit 22d78f6

File tree

8 files changed

+809
-14
lines changed

8 files changed

+809
-14
lines changed

docs/DATA_FORMATS_INPUT.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Telegraf is able to parse the following input data formats into metrics:
1010
1. [Collectd](#collectd)
1111
1. [Dropwizard](#dropwizard)
1212
1. [Grok](#grok)
13+
1. [Wavefront](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#wavefront)
1314

1415
Telegraf metrics, like InfluxDB
1516
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
@@ -881,3 +882,29 @@ the file output will only print once per `flush_interval`.
881882
- Continue one token at a time until the entire line is successfully parsed.
882883

883884

885+
```
886+
887+
# Wavefront:
888+
889+
Wavefront Data Format is metrics are parsed directly into Telegraf metrics.
890+
For more information about the Wavefront Data Format see
891+
[here](https://docs.wavefront.com/wavefront_data_format.html).
892+
893+
There are no additional configuration options for Wavefront Data Format line-protocol.
894+
895+
#### Wavefront Configuration:
896+
897+
```toml
898+
[[inputs.exec]]
899+
## Commands array
900+
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]
901+
902+
## measurement name suffix (for separating different commands)
903+
name_suffix = "_mycollector"
904+
905+
## Data format to consume.
906+
## Each data format has its own unique set of configuration options, read
907+
## more about them here:
908+
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
909+
data_format = "wavefront"
910+
```

plugins/outputs/wavefront/wavefront.go

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -189,26 +189,32 @@ func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string
189189
}
190190

191191
var source string
192-
sourceTagFound := false
193-
194-
for _, s := range w.SourceOverride {
195-
for k, v := range mTags {
196-
if k == s {
197-
source = v
198-
mTags["telegraf_host"] = mTags["host"]
199-
sourceTagFound = true
200-
delete(mTags, k)
192+
193+
if s, ok := mTags["source"]; ok {
194+
source = s
195+
delete(mTags, "source")
196+
} else {
197+
sourceTagFound := false
198+
for _, s := range w.SourceOverride {
199+
for k, v := range mTags {
200+
if k == s {
201+
source = v
202+
mTags["telegraf_host"] = mTags["host"]
203+
sourceTagFound = true
204+
delete(mTags, k)
205+
break
206+
}
207+
}
208+
if sourceTagFound {
201209
break
202210
}
203211
}
204-
if sourceTagFound {
205-
break
212+
213+
if !sourceTagFound {
214+
source = mTags["host"]
206215
}
207216
}
208217

209-
if !sourceTagFound {
210-
source = mTags["host"]
211-
}
212218
delete(mTags, "host")
213219

214220
return tagValueReplacer.Replace(source), mTags

plugins/parsers/registry.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/influxdata/telegraf/plugins/parsers/json"
1414
"github.com/influxdata/telegraf/plugins/parsers/nagios"
1515
"github.com/influxdata/telegraf/plugins/parsers/value"
16+
"github.com/influxdata/telegraf/plugins/parsers/wavefront"
1617
)
1718

1819
// ParserInput is an interface for input plugins that are able to parse
@@ -131,6 +132,8 @@ func NewParser(config *Config) (Parser, error) {
131132
config.DefaultTags,
132133
config.Separator,
133134
config.Templates)
135+
case "wavefront":
136+
parser, err = NewWavefrontParser(config.DefaultTags)
134137
case "grok":
135138
parser, err = newGrokParser(
136139
config.MetricName,
@@ -238,3 +241,7 @@ func NewDropwizardParser(
238241
}
239242
return parser, err
240243
}
244+
245+
func NewWavefrontParser(defaultTags map[string]string) (Parser, error) {
246+
return wavefront.NewWavefrontParser(defaultTags), nil
247+
}
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package wavefront
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"strconv"
7+
"time"
8+
)
9+
10+
var (
11+
ErrEOF = errors.New("EOF")
12+
ErrInvalidTimestamp = errors.New("Invalid timestamp")
13+
)
14+
15+
// Interface for parsing line elements.
16+
type ElementParser interface {
17+
parse(p *PointParser, pt *Point) error
18+
}
19+
20+
type NameParser struct{}
21+
type ValueParser struct{}
22+
type TimestampParser struct {
23+
optional bool
24+
}
25+
type WhiteSpaceParser struct {
26+
nextOptional bool
27+
}
28+
type TagParser struct{}
29+
type LoopedParser struct {
30+
wrappedParser ElementParser
31+
wsPaser *WhiteSpaceParser
32+
}
33+
type LiteralParser struct {
34+
literal string
35+
}
36+
37+
func (ep *NameParser) parse(p *PointParser, pt *Point) error {
38+
//Valid characters are: a-z, A-Z, 0-9, hyphen ("-"), underscore ("_"), dot (".").
39+
// Forward slash ("/") and comma (",") are allowed if metricName is enclosed in double quotes.
40+
name, err := parseLiteral(p)
41+
if err != nil {
42+
return err
43+
}
44+
pt.Name = name
45+
return nil
46+
}
47+
48+
func (ep *ValueParser) parse(p *PointParser, pt *Point) error {
49+
tok, lit := p.scan()
50+
if tok == EOF {
51+
return fmt.Errorf("found %q, expected number", lit)
52+
}
53+
54+
p.writeBuf.Reset()
55+
if tok == MINUS_SIGN {
56+
p.writeBuf.WriteString(lit)
57+
tok, lit = p.scan()
58+
}
59+
60+
for tok != EOF && (tok == LETTER || tok == NUMBER || tok == DOT) {
61+
p.writeBuf.WriteString(lit)
62+
tok, lit = p.scan()
63+
}
64+
p.unscan()
65+
66+
pt.Value = p.writeBuf.String()
67+
_, err := strconv.ParseFloat(pt.Value, 64)
68+
if err != nil {
69+
return fmt.Errorf("invalid metric value %s", pt.Value)
70+
}
71+
return nil
72+
}
73+
74+
func (ep *TimestampParser) parse(p *PointParser, pt *Point) error {
75+
tok, lit := p.scan()
76+
if tok == EOF {
77+
if ep.optional {
78+
p.unscanTokens(2)
79+
return setTimestamp(pt, 0, 1)
80+
}
81+
return fmt.Errorf("found %q, expected number", lit)
82+
}
83+
84+
if tok != NUMBER {
85+
if ep.optional {
86+
p.unscanTokens(2)
87+
return setTimestamp(pt, 0, 1)
88+
}
89+
return ErrInvalidTimestamp
90+
}
91+
92+
p.writeBuf.Reset()
93+
for tok != EOF && tok == NUMBER {
94+
p.writeBuf.WriteString(lit)
95+
tok, lit = p.scan()
96+
}
97+
p.unscan()
98+
99+
tsStr := p.writeBuf.String()
100+
ts, err := strconv.ParseInt(tsStr, 10, 64)
101+
if err != nil {
102+
return err
103+
}
104+
return setTimestamp(pt, ts, len(tsStr))
105+
}
106+
107+
func setTimestamp(pt *Point, ts int64, numDigits int) error {
108+
109+
if numDigits == 19 {
110+
// nanoseconds
111+
ts = ts / 1e9
112+
} else if numDigits == 16 {
113+
// microseconds
114+
ts = ts / 1e6
115+
} else if numDigits == 13 {
116+
// milliseconds
117+
ts = ts / 1e3
118+
} else if numDigits != 10 {
119+
// must be in seconds, return error if not 0
120+
if ts == 0 {
121+
ts = getCurrentTime()
122+
} else {
123+
return ErrInvalidTimestamp
124+
}
125+
}
126+
pt.Timestamp = ts
127+
return nil
128+
}
129+
130+
func (ep *LoopedParser) parse(p *PointParser, pt *Point) error {
131+
for {
132+
err := ep.wrappedParser.parse(p, pt)
133+
if err != nil {
134+
return err
135+
}
136+
err = ep.wsPaser.parse(p, pt)
137+
if err == ErrEOF {
138+
break
139+
}
140+
}
141+
return nil
142+
}
143+
144+
func (ep *TagParser) parse(p *PointParser, pt *Point) error {
145+
k, err := parseLiteral(p)
146+
if err != nil {
147+
if k == "" {
148+
return nil
149+
}
150+
return err
151+
}
152+
153+
next, lit := p.scan()
154+
if next != EQUALS {
155+
return fmt.Errorf("found %q, expected equals", lit)
156+
}
157+
158+
v, err := parseLiteral(p)
159+
if err != nil {
160+
return err
161+
}
162+
if len(pt.Tags) == 0 {
163+
pt.Tags = make(map[string]string)
164+
}
165+
pt.Tags[k] = v
166+
return nil
167+
}
168+
169+
func (ep *WhiteSpaceParser) parse(p *PointParser, pt *Point) error {
170+
tok := WS
171+
for tok != EOF && tok == WS {
172+
tok, _ = p.scan()
173+
}
174+
175+
if tok == EOF {
176+
if !ep.nextOptional {
177+
return ErrEOF
178+
}
179+
return nil
180+
}
181+
p.unscan()
182+
return nil
183+
}
184+
185+
func (ep *LiteralParser) parse(p *PointParser, pt *Point) error {
186+
l, err := parseLiteral(p)
187+
if err != nil {
188+
return err
189+
}
190+
191+
if l != ep.literal {
192+
return fmt.Errorf("found %s, expected %s", l, ep.literal)
193+
}
194+
return nil
195+
}
196+
197+
func parseQuotedLiteral(p *PointParser) (string, error) {
198+
p.writeBuf.Reset()
199+
200+
escaped := false
201+
tok, lit := p.scan()
202+
for tok != EOF && (tok != QUOTES || (tok == QUOTES && escaped)) {
203+
// let everything through
204+
escaped = tok == BACKSLASH
205+
p.writeBuf.WriteString(lit)
206+
tok, lit = p.scan()
207+
}
208+
if tok == EOF {
209+
return "", fmt.Errorf("found %q, expected quotes", lit)
210+
}
211+
return p.writeBuf.String(), nil
212+
}
213+
214+
func parseLiteral(p *PointParser) (string, error) {
215+
tok, lit := p.scan()
216+
if tok == EOF {
217+
return "", fmt.Errorf("found %q, expected literal", lit)
218+
}
219+
220+
if tok == QUOTES {
221+
return parseQuotedLiteral(p)
222+
}
223+
224+
p.writeBuf.Reset()
225+
for tok != EOF && tok > literal_beg && tok < literal_end {
226+
p.writeBuf.WriteString(lit)
227+
tok, lit = p.scan()
228+
}
229+
if tok == QUOTES {
230+
return "", errors.New("found quote inside unquoted literal")
231+
}
232+
p.unscan()
233+
return p.writeBuf.String(), nil
234+
}
235+
236+
func getCurrentTime() int64 {
237+
return time.Now().UnixNano() / 1e9
238+
}

0 commit comments

Comments
 (0)