Skip to content

Commit c011f7f

Browse files
danielnelsonbitcharmer
authored andcommitted
Add support for gzip compression to amqp plugins (influxdata#5830)
1 parent 72334e6 commit c011f7f

File tree

7 files changed

+250
-12
lines changed

7 files changed

+250
-12
lines changed

internal/content_coding.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package internal
2+
3+
import (
4+
"bytes"
5+
"compress/gzip"
6+
"errors"
7+
"io"
8+
)
9+
10+
// NewContentEncoder returns a ContentEncoder for the encoding type.
11+
func NewContentEncoder(encoding string) (ContentEncoder, error) {
12+
switch encoding {
13+
case "gzip":
14+
return NewGzipEncoder()
15+
16+
case "identity", "":
17+
return NewIdentityEncoder(), nil
18+
default:
19+
return nil, errors.New("invalid value for content_encoding")
20+
}
21+
}
22+
23+
// NewContentDecoder returns a ContentDecoder for the encoding type.
24+
func NewContentDecoder(encoding string) (ContentDecoder, error) {
25+
switch encoding {
26+
case "gzip":
27+
return NewGzipDecoder()
28+
case "identity", "":
29+
return NewIdentityDecoder(), nil
30+
default:
31+
return nil, errors.New("invalid value for content_encoding")
32+
}
33+
}
34+
35+
// ContentEncoder applies a wrapper encoding to byte buffers.
36+
type ContentEncoder interface {
37+
Encode([]byte) ([]byte, error)
38+
}
39+
40+
// GzipEncoder compresses the buffer using gzip at the default level.
41+
type GzipEncoder struct {
42+
writer *gzip.Writer
43+
buf *bytes.Buffer
44+
}
45+
46+
func NewGzipEncoder() (*GzipEncoder, error) {
47+
var buf bytes.Buffer
48+
return &GzipEncoder{
49+
writer: gzip.NewWriter(&buf),
50+
buf: &buf,
51+
}, nil
52+
}
53+
54+
func (e *GzipEncoder) Encode(data []byte) ([]byte, error) {
55+
e.buf.Reset()
56+
e.writer.Reset(e.buf)
57+
58+
_, err := e.writer.Write(data)
59+
if err != nil {
60+
return nil, err
61+
}
62+
err = e.writer.Close()
63+
if err != nil {
64+
return nil, err
65+
}
66+
return e.buf.Bytes(), nil
67+
}
68+
69+
// IdentityEncoder is a null encoder that applies no transformation.
70+
type IdentityEncoder struct{}
71+
72+
func NewIdentityEncoder() *IdentityEncoder {
73+
return &IdentityEncoder{}
74+
}
75+
76+
func (*IdentityEncoder) Encode(data []byte) ([]byte, error) {
77+
return data, nil
78+
}
79+
80+
// ContentDecoder removes a wrapper encoding from byte buffers.
81+
type ContentDecoder interface {
82+
Decode([]byte) ([]byte, error)
83+
}
84+
85+
// GzipDecoder decompresses buffers with gzip compression.
86+
type GzipDecoder struct {
87+
reader *gzip.Reader
88+
buf *bytes.Buffer
89+
}
90+
91+
func NewGzipDecoder() (*GzipDecoder, error) {
92+
return &GzipDecoder{
93+
reader: new(gzip.Reader),
94+
buf: new(bytes.Buffer),
95+
}, nil
96+
}
97+
98+
func (d *GzipDecoder) Decode(data []byte) ([]byte, error) {
99+
d.reader.Reset(bytes.NewBuffer(data))
100+
d.buf.Reset()
101+
102+
_, err := d.buf.ReadFrom(d.reader)
103+
if err != nil && err != io.EOF {
104+
return nil, err
105+
}
106+
err = d.reader.Close()
107+
if err != nil {
108+
return nil, err
109+
}
110+
return d.buf.Bytes(), nil
111+
}
112+
113+
// IdentityDecoder is a null decoder that returns the input.
114+
type IdentityDecoder struct{}
115+
116+
func NewIdentityDecoder() *IdentityDecoder {
117+
return &IdentityDecoder{}
118+
}
119+
120+
func (*IdentityDecoder) Decode(data []byte) ([]byte, error) {
121+
return data, nil
122+
}

internal/content_coding_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package internal
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func TestGzipEncodeDecode(t *testing.T) {
10+
enc, err := NewGzipEncoder()
11+
require.NoError(t, err)
12+
dec, err := NewGzipDecoder()
13+
require.NoError(t, err)
14+
15+
payload, err := enc.Encode([]byte("howdy"))
16+
require.NoError(t, err)
17+
18+
actual, err := dec.Decode(payload)
19+
require.NoError(t, err)
20+
21+
require.Equal(t, "howdy", string(actual))
22+
}
23+
24+
func TestGzipReuse(t *testing.T) {
25+
enc, err := NewGzipEncoder()
26+
require.NoError(t, err)
27+
dec, err := NewGzipDecoder()
28+
require.NoError(t, err)
29+
30+
payload, err := enc.Encode([]byte("howdy"))
31+
require.NoError(t, err)
32+
33+
actual, err := dec.Decode(payload)
34+
require.NoError(t, err)
35+
36+
require.Equal(t, "howdy", string(actual))
37+
38+
payload, err = enc.Encode([]byte("doody"))
39+
require.NoError(t, err)
40+
41+
actual, err = dec.Decode(payload)
42+
require.NoError(t, err)
43+
44+
require.Equal(t, "doody", string(actual))
45+
}
46+
47+
func TestIdentityEncodeDecode(t *testing.T) {
48+
enc := NewIdentityEncoder()
49+
dec := NewIdentityDecoder()
50+
51+
payload, err := enc.Encode([]byte("howdy"))
52+
require.NoError(t, err)
53+
54+
actual, err := dec.Decode(payload)
55+
require.NoError(t, err)
56+
57+
require.Equal(t, "howdy", string(actual))
58+
}

plugins/inputs/amqp_consumer/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ The following defaults are known to work with RabbitMQ:
7777
## Use TLS but skip chain & host verification
7878
# insecure_skip_verify = false
7979

80+
## Content encoding for message payloads, can be set to "gzip" to or
81+
## "identity" to apply no encoding.
82+
# content_encoding = "identity"
83+
8084
## Data format to consume.
8185
## Each data format has its own unique set of configuration options, read
8286
## more about them here:

plugins/inputs/amqp_consumer/amqp_consumer.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/influxdata/telegraf"
14+
"github.com/influxdata/telegraf/internal"
1415
"github.com/influxdata/telegraf/internal/tls"
1516
"github.com/influxdata/telegraf/plugins/inputs"
1617
"github.com/influxdata/telegraf/plugins/parsers"
@@ -52,12 +53,15 @@ type AMQPConsumer struct {
5253
AuthMethod string
5354
tls.ClientConfig
5455

56+
ContentEncoding string `toml:"content_encoding"`
57+
5558
deliveries map[telegraf.TrackingID]amqp.Delivery
5659

57-
parser parsers.Parser
58-
conn *amqp.Connection
59-
wg *sync.WaitGroup
60-
cancel context.CancelFunc
60+
parser parsers.Parser
61+
conn *amqp.Connection
62+
wg *sync.WaitGroup
63+
cancel context.CancelFunc
64+
decoder internal.ContentDecoder
6165
}
6266

6367
type externalAuth struct{}
@@ -147,6 +151,10 @@ func (a *AMQPConsumer) SampleConfig() string {
147151
## Use TLS but skip chain & host verification
148152
# insecure_skip_verify = false
149153
154+
## Content encoding for message payloads, can be set to "gzip" to or
155+
## "identity" to apply no encoding.
156+
# content_encoding = "identity"
157+
150158
## Data format to consume.
151159
## Each data format has its own unique set of configuration options, read
152160
## more about them here:
@@ -201,6 +209,11 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
201209
return err
202210
}
203211

212+
a.decoder, err = internal.NewContentDecoder(a.ContentEncoding)
213+
if err != nil {
214+
return err
215+
}
216+
204217
msgs, err := a.connect(amqpConf)
205218
if err != nil {
206219
return err
@@ -428,8 +441,7 @@ func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, a
428441
}
429442

430443
func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delivery) error {
431-
metrics, err := a.parser.Parse(d.Body)
432-
if err != nil {
444+
onError := func() {
433445
// Discard the message from the queue; will never be able to process
434446
// this message.
435447
rejErr := d.Ack(false)
@@ -438,6 +450,17 @@ func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delive
438450
d.DeliveryTag, rejErr)
439451
a.conn.Close()
440452
}
453+
}
454+
455+
body, err := a.decoder.Decode(d.Body)
456+
if err != nil {
457+
onError()
458+
return err
459+
}
460+
461+
metrics, err := a.parser.Parse(body)
462+
if err != nil {
463+
onError()
441464
return err
442465
}
443466

plugins/outputs/amqp/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,14 @@ For an introduction to AMQP see:
9292
## Recommended to set to true.
9393
# use_batch_format = false
9494

95+
## Content encoding for message payloads, can be set to "gzip" to or
96+
## "identity" to apply no encoding.
97+
##
98+
## Please note that when use_batch_format = false each amqp message contains only
99+
## a single metric, it is recommended to use compression with batch format
100+
## for best results.
101+
# content_encoding = "identity"
102+
95103
## Data format to output.
96104
## Each data format has its own unique set of configuration options, read
97105
## more about them here:

plugins/outputs/amqp/amqp.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,15 @@ type AMQP struct {
5454
Headers map[string]string `toml:"headers"`
5555
Timeout internal.Duration `toml:"timeout"`
5656
UseBatchFormat bool `toml:"use_batch_format"`
57+
ContentEncoding string `toml:"content_encoding"`
5758
tls.ClientConfig
5859

5960
serializer serializers.Serializer
6061
connect func(*ClientConfig) (Client, error)
6162
client Client
6263
config *ClientConfig
6364
sentMessages int
65+
encoder internal.ContentEncoder
6466
}
6567

6668
type Client interface {
@@ -149,6 +151,14 @@ var sampleConfig = `
149151
## Recommended to set to true.
150152
# use_batch_format = false
151153
154+
## Content encoding for message payloads, can be set to "gzip" to or
155+
## "identity" to apply no encoding.
156+
##
157+
## Please note that when use_batch_format = false each amqp message contains only
158+
## a single metric, it is recommended to use compression with batch format
159+
## for best results.
160+
# content_encoding = "identity"
161+
152162
## Data format to output.
153163
## Each data format has its own unique set of configuration options, read
154164
## more about them here:
@@ -177,11 +187,16 @@ func (q *AMQP) Connect() error {
177187
q.config = config
178188
}
179189

180-
client, err := q.connect(q.config)
190+
var err error
191+
q.encoder, err = internal.NewContentEncoder(q.ContentEncoding)
192+
if err != nil {
193+
return err
194+
}
195+
196+
q.client, err = q.connect(q.config)
181197
if err != nil {
182198
return err
183199
}
184-
q.client = client
185200

186201
return nil
187202
}
@@ -227,6 +242,11 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
227242
return err
228243
}
229244

245+
body, err = q.encoder.Encode(body)
246+
if err != nil {
247+
return err
248+
}
249+
230250
err = q.publish(key, body)
231251
if err != nil {
232252
// If this is the first attempt to publish and the connection is
@@ -298,6 +318,7 @@ func (q *AMQP) makeClientConfig() (*ClientConfig, error) {
298318
exchange: q.Exchange,
299319
exchangeType: q.ExchangeType,
300320
exchangePassive: q.ExchangePassive,
321+
encoding: q.ContentEncoding,
301322
timeout: q.Timeout.Duration,
302323
}
303324

plugins/outputs/amqp/client.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type ClientConfig struct {
1919
exchangePassive bool
2020
exchangeDurable bool
2121
exchangeArguments amqp.Table
22+
encoding string
2223
headers amqp.Table
2324
deliveryMode uint8
2425
tlsConfig *tls.Config
@@ -114,10 +115,11 @@ func (c *client) Publish(key string, body []byte) error {
114115
false, // mandatory
115116
false, // immediate
116117
amqp.Publishing{
117-
Headers: c.config.headers,
118-
ContentType: "text/plain",
119-
Body: body,
120-
DeliveryMode: c.config.deliveryMode,
118+
Headers: c.config.headers,
119+
ContentType: "text/plain",
120+
ContentEncoding: c.config.encoding,
121+
Body: body,
122+
DeliveryMode: c.config.deliveryMode,
121123
})
122124
}
123125

0 commit comments

Comments
 (0)