Skip to content

Commit 0af40a8

Browse files
authored
Fix dropwizard parsing error for metrics that need escaped (#4142)
If the dropwizard parser cannot convert the metric name into a valid line protocol series then we will accept the name as is.
1 parent 558caf5 commit 0af40a8

File tree

10 files changed

+6010
-5258
lines changed

10 files changed

+6010
-5258
lines changed

metric.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ type Metric interface {
5454
AddField(key string, value interface{})
5555
RemoveField(key string)
5656

57+
SetTime(t time.Time)
58+
5759
// HashID returns an unique identifier for the series.
5860
HashID() uint64
5961

metric/metric.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,10 @@ func (m *metric) RemoveField(key string) {
202202
}
203203
}
204204

205+
func (m *metric) SetTime(t time.Time) {
206+
m.tm = t
207+
}
208+
205209
func (m *metric) Copy() telegraf.Metric {
206210
m2 := &metric{
207211
name: m.name,

plugins/parsers/dropwizard/parser.go

Lines changed: 75 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package dropwizard
22

33
import (
4-
"bytes"
54
"encoding/json"
65
"fmt"
76
"log"
@@ -10,6 +9,7 @@ import (
109

1110
"github.com/influxdata/telegraf"
1211
"github.com/influxdata/telegraf/internal/templating"
12+
"github.com/influxdata/telegraf/metric"
1313
"github.com/influxdata/telegraf/plugins/parsers/influx"
1414
"github.com/tidwall/gjson"
1515
)
@@ -19,8 +19,8 @@ var keyEscaper = strings.NewReplacer(" ", "\\ ", ",", "\\,", "=", "\\=")
1919

2020
// Parser parses json inputs containing dropwizard metrics,
2121
// either top-level or embedded inside a json field.
22-
// This parser is using gjon for retrieving paths within the json file.
23-
type Parser struct {
22+
// This parser is using gjson for retrieving paths within the json file.
23+
type parser struct {
2424

2525
// an optional json path containing the metric registry object
2626
// if left empty, the whole json object is parsed as a metric registry
@@ -45,15 +45,28 @@ type Parser struct {
4545
// an optional map of default tags to use for metrics
4646
DefaultTags map[string]string
4747

48-
// templating configuration
49-
Separator string
50-
Templates []string
51-
48+
separator string
5249
templateEngine *templating.Engine
50+
51+
timeFunc metric.TimeFunc
52+
53+
// seriesParser parses line protocol measurement + tags
54+
seriesParser *influx.Parser
55+
}
56+
57+
func NewParser() *parser {
58+
handler := influx.NewMetricHandler()
59+
seriesParser := influx.NewSeriesParser(handler)
60+
61+
parser := &parser{
62+
timeFunc: time.Now,
63+
seriesParser: seriesParser,
64+
}
65+
return parser
5366
}
5467

5568
// Parse parses the input bytes to an array of metrics
56-
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
69+
func (p *parser) Parse(buf []byte) ([]telegraf.Metric, error) {
5770

5871
metrics := make([]telegraf.Metric, 0)
5972

@@ -100,28 +113,38 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
100113
return metrics, nil
101114
}
102115

103-
// InitTemplating initializes the templating support
104-
func (p *Parser) InitTemplating() error {
105-
if len(p.Templates) > 0 {
106-
defaultTemplate, _ := templating.NewDefaultTemplateWithPattern("measurement*")
107-
templateEngine, err := templating.NewEngine(p.Separator, defaultTemplate, p.Templates)
108-
p.templateEngine = templateEngine
116+
func (p *parser) SetTemplates(separator string, templates []string) error {
117+
if len(templates) == 0 {
118+
p.templateEngine = nil
119+
return nil
120+
}
121+
122+
defaultTemplate, err := templating.NewDefaultTemplateWithPattern("measurement*")
123+
if err != nil {
124+
return err
125+
}
126+
127+
templateEngine, err := templating.NewEngine(separator, defaultTemplate, templates)
128+
if err != nil {
109129
return err
110130
}
131+
132+
p.separator = separator
133+
p.templateEngine = templateEngine
111134
return nil
112135
}
113136

114137
// ParseLine is not supported by the dropwizard format
115-
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
138+
func (p *parser) ParseLine(line string) (telegraf.Metric, error) {
116139
return nil, fmt.Errorf("ParseLine not supported: %s, for data format: dropwizard", line)
117140
}
118141

119142
// SetDefaultTags sets the default tags
120-
func (p *Parser) SetDefaultTags(tags map[string]string) {
143+
func (p *parser) SetDefaultTags(tags map[string]string) {
121144
p.DefaultTags = tags
122145
}
123146

124-
func (p *Parser) readTags(buf []byte) map[string]string {
147+
func (p *parser) readTags(buf []byte) map[string]string {
125148

126149
if p.TagsPath != "" {
127150
var tagsBytes []byte
@@ -147,7 +170,7 @@ func (p *Parser) readTags(buf []byte) map[string]string {
147170
return tags
148171
}
149172

150-
func (p *Parser) parseTime(buf []byte) (time.Time, error) {
173+
func (p *parser) parseTime(buf []byte) (time.Time, error) {
151174

152175
if p.TimePath != "" {
153176
timeFormat := p.TimeFormat
@@ -157,19 +180,19 @@ func (p *Parser) parseTime(buf []byte) (time.Time, error) {
157180
timeString := gjson.GetBytes(buf, p.TimePath).String()
158181
if timeString == "" {
159182
err := fmt.Errorf("time not found in JSON path %s", p.TimePath)
160-
return time.Now().UTC(), err
183+
return p.timeFunc(), err
161184
}
162185
t, err := time.Parse(timeFormat, timeString)
163186
if err != nil {
164187
err = fmt.Errorf("time %s cannot be parsed with format %s, %s", timeString, timeFormat, err)
165-
return time.Now().UTC(), err
188+
return p.timeFunc(), err
166189
}
167190
return t.UTC(), nil
168191
}
169-
return time.Now().UTC(), nil
192+
return p.timeFunc(), nil
170193
}
171194

172-
func (p *Parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) {
195+
func (p *parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) {
173196

174197
var registryBytes []byte
175198
if p.MetricRegistryPath != "" {
@@ -195,71 +218,55 @@ func (p *Parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) {
195218
return jsonOut, nil
196219
}
197220

198-
func (p *Parser) readDWMetrics(metricType string, dwms interface{}, metrics []telegraf.Metric, tm time.Time) []telegraf.Metric {
199-
200-
switch dwmsTyped := dwms.(type) {
201-
case map[string]interface{}:
202-
var metricsBuffer bytes.Buffer
221+
func (p *parser) readDWMetrics(metricType string, dwms interface{}, metrics []telegraf.Metric, tm time.Time) []telegraf.Metric {
222+
if dwmsTyped, ok := dwms.(map[string]interface{}); ok {
203223
for dwmName, dwmFields := range dwmsTyped {
204224
measurementName := dwmName
205225
tags := make(map[string]string)
206226
fieldPrefix := ""
207227
if p.templateEngine != nil {
208228
measurementName, tags, fieldPrefix, _ = p.templateEngine.Apply(dwmName)
209229
if len(fieldPrefix) > 0 {
210-
fieldPrefix = fmt.Sprintf("%s%s", fieldPrefix, p.Separator)
230+
fieldPrefix = fmt.Sprintf("%s%s", fieldPrefix, p.separator)
231+
}
232+
}
233+
234+
parsed, err := p.seriesParser.Parse([]byte(measurementName))
235+
var m telegraf.Metric
236+
if err != nil || len(parsed) != 1 {
237+
m, err = metric.New(measurementName, map[string]string{}, map[string]interface{}{}, tm)
238+
if err != nil {
239+
log.Printf("W! failed to create metric of type '%s': %s\n", metricType, err)
240+
continue
211241
}
242+
} else {
243+
m = parsed[0]
244+
m.SetTime(tm)
212245
}
213-
tags["metric_type"] = metricType
214246

215-
measurementWithTags := measurementName
216-
for tagName, tagValue := range tags {
217-
tagKeyValue := fmt.Sprintf("%s=%s", keyEscaper.Replace(tagName), keyEscaper.Replace(tagValue))
218-
measurementWithTags = fmt.Sprintf("%s,%s", measurementWithTags, tagKeyValue)
247+
m.AddTag("metric_type", metricType)
248+
for k, v := range tags {
249+
m.AddTag(k, v)
219250
}
220251

221-
fields := make([]string, 0)
222-
switch t := dwmFields.(type) {
223-
case map[string]interface{}: // json object
224-
for fieldName, fieldValue := range t {
225-
key := keyEscaper.Replace(fieldPrefix + fieldName)
226-
switch v := fieldValue.(type) {
227-
case float64:
228-
fields = append(fields, fmt.Sprintf("%s=%f", key, v))
229-
case string:
230-
fields = append(fields, fmt.Sprintf("%s=\"%s\"", key, fieldEscaper.Replace(v)))
231-
case bool:
232-
fields = append(fields, fmt.Sprintf("%s=%t", key, v))
233-
default: // ignore
252+
if fields, ok := dwmFields.(map[string]interface{}); ok {
253+
for k, v := range fields {
254+
switch v := v.(type) {
255+
case float64, string, bool:
256+
m.AddField(fieldPrefix+k, v)
257+
default:
258+
// ignore
234259
}
235260
}
236-
default: // ignore
237261
}
238262

239-
metricsBuffer.WriteString(fmt.Sprintf("%s,metric_type=%s ", measurementWithTags, metricType))
240-
metricsBuffer.WriteString(strings.Join(fields, ","))
241-
metricsBuffer.WriteString("\n")
242-
}
243-
244-
handler := influx.NewMetricHandler()
245-
handler.SetTimeFunc(func() time.Time { return tm })
246-
parser := influx.NewParser(handler)
247-
newMetrics, err := parser.Parse(metricsBuffer.Bytes())
248-
if err != nil {
249-
log.Printf("W! failed to create metric of type '%s': %s\n", metricType, err)
263+
metrics = append(metrics, m)
250264
}
251-
252-
return append(metrics, newMetrics...)
253-
default:
254-
return metrics
255265
}
256266

267+
return metrics
257268
}
258269

259-
func arraymap(vs []string, f func(string) string) []string {
260-
vsm := make([]string, len(vs))
261-
for i, v := range vs {
262-
vsm[i] = f(v)
263-
}
264-
return vsm
270+
func (p *parser) SetTimeFunc(f metric.TimeFunc) {
271+
p.timeFunc = f
265272
}

0 commit comments

Comments
 (0)