Skip to content

Commit fdfa863

Browse files
danielnelsonJean-Louis Dupond
authored andcommitted
Skip unserializable metric in influxDB UDP output (influxdata#4534)
1 parent eabc9d6 commit fdfa863

File tree

5 files changed

+120
-83
lines changed

5 files changed

+120
-83
lines changed

plugins/outputs/influxdb/udp.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ package influxdb
33
import (
44
"context"
55
"fmt"
6+
"log"
67
"net"
78
"net/url"
89

910
"github.com/influxdata/telegraf"
10-
"github.com/influxdata/telegraf/plugins/serializers"
1111
"github.com/influxdata/telegraf/plugins/serializers/influx"
1212
)
1313

@@ -28,7 +28,7 @@ type Conn interface {
2828
type UDPConfig struct {
2929
MaxPayloadSize int
3030
URL *url.URL
31-
Serializer serializers.Serializer
31+
Serializer *influx.Serializer
3232
Dialer Dialer
3333
}
3434

@@ -65,7 +65,7 @@ func NewUDPClient(config *UDPConfig) (*udpClient, error) {
6565
type udpClient struct {
6666
conn Conn
6767
dialer Dialer
68-
serializer serializers.Serializer
68+
serializer *influx.Serializer
6969
url *url.URL
7070
}
7171

@@ -89,7 +89,11 @@ func (c *udpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
8989
for _, metric := range metrics {
9090
octets, err := c.serializer.Serialize(metric)
9191
if err != nil {
92-
return fmt.Errorf("could not serialize metric: %v", err)
92+
// Since we are serializing multiple metrics, don't fail the
93+
// entire batch just because of one unserializable metric.
94+
log.Printf("E! [outputs.influxdb] when writing to [%s] could not serialize metric: %v",
95+
c.URL(), err)
96+
continue
9397
}
9498

9599
_, err = c.conn.Write(octets)

plugins/outputs/influxdb/udp_test.go

Lines changed: 59 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"log"
78
"net"
89
"net/url"
910
"sync"
@@ -13,7 +14,6 @@ import (
1314
"github.com/influxdata/telegraf"
1415
"github.com/influxdata/telegraf/metric"
1516
"github.com/influxdata/telegraf/plugins/outputs/influxdb"
16-
"github.com/influxdata/telegraf/plugins/serializers/influx"
1717
"github.com/stretchr/testify/require"
1818
)
1919

@@ -65,19 +65,6 @@ func (d *MockDialer) DialContext(ctx context.Context, network string, address st
6565
return d.DialContextF(network, address)
6666
}
6767

68-
type MockSerializer struct {
69-
SerializeF func(metric telegraf.Metric) ([]byte, error)
70-
SerializeBatchF func(metrics []telegraf.Metric) ([]byte, error)
71-
}
72-
73-
func (s *MockSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
74-
return s.SerializeF(metric)
75-
}
76-
77-
func (s *MockSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
78-
return s.SerializeBatchF(metrics)
79-
}
80-
8168
func TestUDP_NewUDPClientNoURL(t *testing.T) {
8269
config := &influxdb.UDPConfig{}
8370
_, err := influxdb.NewUDPClient(config)
@@ -177,28 +164,69 @@ func TestUDP_WriteError(t *testing.T) {
177164
require.True(t, closed)
178165
}
179166

180-
func TestUDP_SerializeError(t *testing.T) {
181-
config := &influxdb.UDPConfig{
182-
URL: getURL(),
183-
Dialer: &MockDialer{
184-
DialContextF: func(network, address string) (influxdb.Conn, error) {
185-
conn := &MockConn{}
186-
return conn, nil
167+
func TestUDP_ErrorLogging(t *testing.T) {
168+
tests := []struct {
169+
name string
170+
config *influxdb.UDPConfig
171+
metrics []telegraf.Metric
172+
logContains string
173+
}{
174+
{
175+
name: "logs need more space",
176+
config: &influxdb.UDPConfig{
177+
MaxPayloadSize: 1,
178+
URL: getURL(),
179+
Dialer: &MockDialer{
180+
DialContextF: func(network, address string) (influxdb.Conn, error) {
181+
conn := &MockConn{}
182+
return conn, nil
183+
},
184+
},
187185
},
186+
metrics: []telegraf.Metric{getMetric()},
187+
logContains: `could not serialize metric: "cpu": need more space`,
188188
},
189-
Serializer: &MockSerializer{
190-
SerializeF: func(metric telegraf.Metric) ([]byte, error) {
191-
return nil, influx.ErrNeedMoreSpace
189+
{
190+
name: "logs series name",
191+
config: &influxdb.UDPConfig{
192+
URL: getURL(),
193+
Dialer: &MockDialer{
194+
DialContextF: func(network, address string) (influxdb.Conn, error) {
195+
conn := &MockConn{}
196+
return conn, nil
197+
},
198+
},
199+
},
200+
metrics: []telegraf.Metric{
201+
func() telegraf.Metric {
202+
metric, _ := metric.New(
203+
"cpu",
204+
map[string]string{
205+
"host": "example.org",
206+
},
207+
map[string]interface{}{},
208+
time.Unix(0, 0),
209+
)
210+
return metric
211+
}(),
192212
},
213+
logContains: `could not serialize metric: "cpu,host=example.org": no serializable fields`,
193214
},
194215
}
195-
client, err := influxdb.NewUDPClient(config)
196-
require.NoError(t, err)
197-
198-
ctx := context.Background()
199-
err = client.Write(ctx, []telegraf.Metric{getMetric()})
200-
require.Error(t, err)
201-
require.Contains(t, err.Error(), influx.ErrNeedMoreSpace.Error())
216+
for _, tt := range tests {
217+
t.Run(tt.name, func(t *testing.T) {
218+
var b bytes.Buffer
219+
log.SetOutput(&b)
220+
221+
client, err := influxdb.NewUDPClient(tt.config)
222+
require.NoError(t, err)
223+
224+
ctx := context.Background()
225+
err = client.Write(ctx, tt.metrics)
226+
require.NoError(t, err)
227+
require.Contains(t, b.String(), tt.logContains)
228+
})
229+
}
202230
}
203231

204232
func TestUDP_WriteWithRealConn(t *testing.T) {

plugins/serializers/influx/influx.go

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,30 +27,34 @@ const (
2727
UintSupport FieldTypeSupport = 1 << iota
2828
)
2929

30-
// MetricError is an error causing a metric to be unserializable.
30+
var (
31+
NeedMoreSpace = "need more space"
32+
InvalidName = "invalid name"
33+
NoFields = "no serializable fields"
34+
)
35+
36+
// MetricError is an error causing an entire metric to be unserializable.
3137
type MetricError struct {
32-
s string
38+
series string
39+
reason string
3340
}
3441

3542
func (e MetricError) Error() string {
36-
return e.s
43+
if e.series != "" {
44+
return fmt.Sprintf("%q: %s", e.series, e.reason)
45+
}
46+
return e.reason
3747
}
3848

3949
// FieldError is an error causing a field to be unserializable.
4050
type FieldError struct {
41-
s string
51+
reason string
4252
}
4353

4454
func (e FieldError) Error() string {
45-
return e.s
55+
return e.reason
4656
}
4757

48-
var (
49-
ErrNeedMoreSpace = &MetricError{"need more space"}
50-
ErrInvalidName = &MetricError{"invalid name"}
51-
ErrNoFields = &MetricError{"no serializable fields"}
52-
)
53-
5458
// Serializer is a serializer for line protocol.
5559
type Serializer struct {
5660
maxLineBytes int
@@ -102,17 +106,20 @@ func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) {
102106
return out, nil
103107
}
104108

109+
// SerializeBatch writes the slice of metrics and returns a byte slice of the
110+
// results. The returned byte slice may contain multiple lines of data.
105111
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
106-
var batch bytes.Buffer
112+
s.buf.Reset()
107113
for _, m := range metrics {
108-
_, err := s.Write(&batch, m)
114+
_, err := s.Write(&s.buf, m)
109115
if err != nil {
110116
return nil, err
111117
}
112118
}
113-
return batch.Bytes(), nil
119+
out := make([]byte, s.buf.Len())
120+
copy(out, s.buf.Bytes())
121+
return out, nil
114122
}
115-
116123
func (s *Serializer) Write(w io.Writer, m telegraf.Metric) (int, error) {
117124
err := s.writeMetric(w, m)
118125
return s.bytesWritten, err
@@ -135,7 +142,7 @@ func (s *Serializer) buildHeader(m telegraf.Metric) error {
135142

136143
name := nameEscape(m.Name())
137144
if name == "" {
138-
return ErrInvalidName
145+
return s.newMetricError(InvalidName)
139146
}
140147

141148
s.header = append(s.header, name...)
@@ -222,31 +229,23 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
222229
}
223230

224231
if s.maxLineBytes > 0 && bytesNeeded > s.maxLineBytes {
225-
// Need at least one field per line
232+
// Need at least one field per line, this metric cannot be fit
233+
// into the max line bytes.
226234
if firstField {
227-
return ErrNeedMoreSpace
235+
return s.newMetricError(NeedMoreSpace)
228236
}
229237

230238
err = s.write(w, s.footer)
231239
if err != nil {
232240
return err
233241
}
234242

243+
firstField = true
235244
bytesNeeded = len(s.header) + len(s.pair) + len(s.footer)
236245

237-
if s.maxLineBytes > 0 && bytesNeeded > s.maxLineBytes {
238-
return ErrNeedMoreSpace
246+
if bytesNeeded > s.maxLineBytes {
247+
return s.newMetricError(NeedMoreSpace)
239248
}
240-
241-
err = s.write(w, s.header)
242-
if err != nil {
243-
return err
244-
}
245-
246-
s.write(w, s.pair)
247-
pairsLen += len(s.pair)
248-
firstField = false
249-
continue
250249
}
251250

252251
if firstField {
@@ -261,18 +260,28 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
261260
}
262261
}
263262

264-
s.write(w, s.pair)
263+
err = s.write(w, s.pair)
264+
if err != nil {
265+
return err
266+
}
265267

266268
pairsLen += len(s.pair)
267269
firstField = false
268270
}
269271

270272
if firstField {
271-
return ErrNoFields
273+
return s.newMetricError(NoFields)
272274
}
273275

274276
return s.write(w, s.footer)
277+
}
275278

279+
func (s *Serializer) newMetricError(reason string) *MetricError {
280+
if len(s.header) != 0 {
281+
series := bytes.TrimRight(s.header, " ")
282+
return &MetricError{series: string(series), reason: reason}
283+
}
284+
return &MetricError{reason: reason}
276285
}
277286

278287
func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, error) {

plugins/serializers/influx/influx_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ var tests = []struct {
2323
typeSupport FieldTypeSupport
2424
input telegraf.Metric
2525
output []byte
26-
err error
26+
errReason string
2727
}{
2828
{
2929
name: "minimal",
@@ -98,7 +98,7 @@ var tests = []struct {
9898
time.Unix(0, 0),
9999
),
100100
),
101-
err: ErrNoFields,
101+
errReason: NoFields,
102102
},
103103
{
104104
name: "float Inf",
@@ -333,8 +333,8 @@ var tests = []struct {
333333
time.Unix(1519194109, 42),
334334
),
335335
),
336-
output: nil,
337-
err: ErrNeedMoreSpace,
336+
output: nil,
337+
errReason: NeedMoreSpace,
338338
},
339339
{
340340
name: "no fields",
@@ -346,7 +346,7 @@ var tests = []struct {
346346
time.Unix(0, 0),
347347
),
348348
),
349-
err: ErrNoFields,
349+
errReason: NoFields,
350350
},
351351
{
352352
name: "procstat",
@@ -427,7 +427,10 @@ func TestSerializer(t *testing.T) {
427427
serializer.SetFieldSortOrder(SortFields)
428428
serializer.SetFieldTypeSupport(tt.typeSupport)
429429
output, err := serializer.Serialize(tt.input)
430-
require.Equal(t, tt.err, err)
430+
if tt.errReason != "" {
431+
require.Error(t, err)
432+
require.Contains(t, err.Error(), tt.errReason)
433+
}
431434
require.Equal(t, string(tt.output), string(output))
432435
})
433436
}

plugins/serializers/influx/reader.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package influx
22

33
import (
44
"bytes"
5-
"fmt"
65
"io"
76
"log"
87

@@ -54,17 +53,11 @@ func (r *reader) Read(p []byte) (int, error) {
5453
r.offset += 1
5554
if err != nil {
5655
r.buf.Reset()
57-
switch err.(type) {
58-
case *MetricError:
59-
// Since we are serializing an array of metrics, don't fail
56+
if err != nil {
57+
// Since we are serializing multiple metrics, don't fail the
6058
// the entire batch just because of one unserializable metric.
61-
log.Printf(
62-
"D! [serializers.influx] could not serialize metric %q: %v; discarding metric",
63-
metric.Name(), err)
59+
log.Printf("E! [serializers.influx] could not serialize metric: %v; discarding metric", err)
6460
continue
65-
default:
66-
fmt.Println(err)
67-
return 0, err
6861
}
6962
}
7063
break

0 commit comments

Comments
 (0)