Skip to content

Commit 93ac576

Browse files
committed
Support StatisticValues in cloudwatch output plugin (#4318)
1 parent be2ea90 commit 93ac576

File tree

3 files changed

+147
-3
lines changed

3 files changed

+147
-3
lines changed

plugins/outputs/cloudwatch/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,12 @@ Examples include but are not limited to:
3636
### namespace
3737

3838
The namespace used for AWS CloudWatch metrics.
39+
40+
### enable_statistic_values
41+
42+
If you have a large amount of metrics, you should consider to send
43+
statistic values instead of raw metrics. This would not only improve
44+
performance but also save AWS API cost. Use `basicstats` aggregator to
45+
calculate those required statistic fields (count, min, max, and sum).
46+
[See Cloudwatch StatisticSet](https://docs.aws.amazon.com/sdk-for-go/api/service/cloudwatch/#StatisticSet).
47+
This plugin would try to parse those statistic fields and send to Cloudwatch.

plugins/outputs/cloudwatch/cloudwatch.go

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,16 @@ type CloudWatch struct {
2727

2828
Namespace string `toml:"namespace"` // CloudWatch Metrics Namespace
2929
svc *cloudwatch.CloudWatch
30+
31+
EnableStatisticValues bool `toml:"enable_statistic_values"`
32+
}
33+
34+
type statisticSet struct {
35+
field string
36+
max float64
37+
min float64
38+
sum float64
39+
count float64
3040
}
3141

3242
var sampleConfig = `
@@ -50,6 +60,14 @@ var sampleConfig = `
5060
5161
## Namespace for the CloudWatch MetricDatums
5262
namespace = "InfluxData/Telegraf"
63+
64+
## If you have a large amount of metrics, you should consider to send
65+
## statistic values instead of raw metrics. This would not only improve
66+
## performance but also save AWS API cost. Use basicstats aggregator to
67+
## calculate required statistic fields (count, min, max, and sum) and
68+
## enable this flag. This plugin would try to parse those fields and
69+
## send statistic values to Cloudwatch.
70+
# enable_statistic_values = false
5371
`
5472

5573
func (c *CloudWatch) SampleConfig() string {
@@ -151,9 +169,28 @@ func PartitionDatums(size int, datums []*cloudwatch.MetricDatum) [][]*cloudwatch
151169
return partitions
152170
}
153171

172+
// Make a MetricDatum from telegraf.Metric. It would check if all required fields of
173+
// cloudwatch.StatisticSet are available. If so, it would build MetricDatum from statistic values.
174+
// Otherwise, it would make MetricDatum from each field in a Point.
175+
func BuildMetricDatum(buildStatistic bool, point telegraf.Metric) []*cloudwatch.MetricDatum {
176+
177+
// If not enable, just take all metrics as value datums.
178+
if !buildStatistic {
179+
return BuildValueMetricDatum(point)
180+
}
181+
182+
// Try to parse statisticSet first, then build statistic/value datum accordingly.
183+
set, ok := getStatisticSet(point)
184+
if ok {
185+
return BuildStatisticMetricDatum(point, set)
186+
} else {
187+
return BuildValueMetricDatum(point)
188+
}
189+
}
190+
154191
// Make a MetricDatum for each field in a Point. Only fields with values that can be
155192
// converted to float64 are supported. Non-supported fields are skipped.
156-
func BuildMetricDatum(point telegraf.Metric) []*cloudwatch.MetricDatum {
193+
func BuildValueMetricDatum(point telegraf.Metric) []*cloudwatch.MetricDatum {
157194
datums := make([]*cloudwatch.MetricDatum, len(point.Fields()))
158195
i := 0
159196

@@ -217,6 +254,24 @@ func BuildMetricDatum(point telegraf.Metric) []*cloudwatch.MetricDatum {
217254
return datums
218255
}
219256

257+
// Make a MetricDatum with statistic values.
258+
func BuildStatisticMetricDatum(point telegraf.Metric, set *statisticSet) []*cloudwatch.MetricDatum {
259+
260+
data := &cloudwatch.MetricDatum{
261+
MetricName: aws.String(strings.Join([]string{point.Name(), set.field}, "_")),
262+
StatisticValues: &cloudwatch.StatisticSet{
263+
Minimum: aws.Float64(set.min),
264+
Maximum: aws.Float64(set.max),
265+
Sum: aws.Float64(set.sum),
266+
SampleCount: aws.Float64(set.count),
267+
},
268+
Dimensions: BuildDimensions(point.Tags()),
269+
Timestamp: aws.Time(point.Time()),
270+
}
271+
272+
return []*cloudwatch.MetricDatum{data}
273+
}
274+
220275
// Make a list of Dimensions by using a Point's tags. CloudWatch supports up to
221276
// 10 dimensions per metric so we only keep up to the first 10 alphabetically.
222277
// This always includes the "host" tag if it exists.
@@ -260,6 +315,66 @@ func BuildDimensions(mTags map[string]string) []*cloudwatch.Dimension {
260315
return dimensions
261316
}
262317

318+
func getStatisticSet(point telegraf.Metric) (*statisticSet, bool) {
319+
320+
// cloudwatch.StatisticSet requires Max, Min, Count and Sum values.
321+
// If this point has less than 4 fields, it's not possible to build
322+
// StatisticSet from it.
323+
if len(point.Fields()) < 4 {
324+
return nil, false
325+
}
326+
327+
// Try to find the max field. If we could find it, we will use its
328+
// field name to find other required fields.
329+
var set *statisticSet
330+
for k, v := range point.Fields() {
331+
if strings.HasSuffix(k, "_max") {
332+
if fv, ok := convert(v); ok {
333+
set = &statisticSet{
334+
field: k[:len(k)-4],
335+
max: fv,
336+
}
337+
break
338+
}
339+
}
340+
}
341+
if set == nil {
342+
return nil, false
343+
}
344+
345+
// Check if we could find all required fields with the same field name
346+
var ok bool
347+
if set.min, ok = findField(point, set.field+"_min"); !ok {
348+
return nil, false
349+
}
350+
if set.count, ok = findField(point, set.field+"_count"); !ok {
351+
return nil, false
352+
}
353+
if set.sum, ok = findField(point, set.field+"_sum"); !ok {
354+
return nil, false
355+
}
356+
357+
return set, true
358+
}
359+
360+
func convert(in interface{}) (float64, bool) {
361+
switch v := in.(type) {
362+
case float64:
363+
return v, true
364+
default:
365+
return 0, false
366+
}
367+
}
368+
369+
func findField(point telegraf.Metric, field string) (float64, bool) {
370+
if v, ok := point.GetField(field); ok {
371+
if fv, ok := convert(v); ok {
372+
return fv, true
373+
}
374+
}
375+
return 0, false
376+
}
377+
263378
func init() {
264379
outputs.Add("cloudwatch", func() telegraf.Output {
265380
return &CloudWatch{}

plugins/outputs/cloudwatch/cloudwatch_test.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ import (
55
"math"
66
"sort"
77
"testing"
8+
"time"
89

910
"github.com/aws/aws-sdk-go/aws"
1011
"github.com/aws/aws-sdk-go/service/cloudwatch"
1112

1213
"github.com/influxdata/telegraf"
14+
"github.com/influxdata/telegraf/metric"
1315
"github.com/influxdata/telegraf/testutil"
1416

1517
"github.com/stretchr/testify/assert"
@@ -72,13 +74,31 @@ func TestBuildMetricDatums(t *testing.T) {
7274
testutil.TestMetric(float64(1.174272e+108)), // largest should be 1.174271e+108
7375
}
7476
for _, point := range validMetrics {
75-
datums := BuildMetricDatum(point)
77+
datums := BuildMetricDatum(false, point)
7678
assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", point))
7779
}
7880
for _, point := range invalidMetrics {
79-
datums := BuildMetricDatum(point)
81+
datums := BuildMetricDatum(false, point)
8082
assert.Equal(0, len(datums), fmt.Sprintf("Valid point should not create a Datum {value: %v}", point))
8183
}
84+
85+
statisticMetric, _ := metric.New(
86+
"test1",
87+
map[string]string{"tag1": "value1"},
88+
map[string]interface{}{"value_max": float64(10), "value_min": float64(0), "value_sum": float64(100), "value_count": float64(20)},
89+
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
90+
)
91+
datums := BuildMetricDatum(true, statisticMetric)
92+
assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", statisticMetric))
93+
94+
multipleFieldsMetric, _ := metric.New(
95+
"test1",
96+
map[string]string{"tag1": "value1"},
97+
map[string]interface{}{"valueA": float64(10), "valueB": float64(0), "valueC": float64(100), "valueD": float64(20)},
98+
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
99+
)
100+
datums = BuildMetricDatum(true, multipleFieldsMetric)
101+
assert.Equal(4, len(datums), fmt.Sprintf("Each field should create a Datum {value: %v}", multipleFieldsMetric))
82102
}
83103

84104
func TestPartitionDatums(t *testing.T) {

0 commit comments

Comments
 (0)