Skip to content

Commit 22afc99

Browse files
oldmantaitersparrc
authored andcommitted
Add support for pass/drop/tagpass/tagdrop for outputs
Reuses same logic as the plugins for filtering points, should be only a marginal performance decrease to check all the points before writing to the output. Added examples to the README as well (for generic pass/drop as well as output pass/drop/tagpass/tagdrop). X-Github-Closes #398 closes #398 closes #401
1 parent c83f220 commit 22afc99

File tree

6 files changed

+186
-73
lines changed

6 files changed

+186
-73
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
- [#418](https://github.com/influxdb/telegraf/pull/418): memcached plugin additional unit tests.
99
- [#408](https://github.com/influxdb/telegraf/pull/408): MailChimp plugin.
1010
- [#382](https://github.com/influxdb/telegraf/pull/382): Add system wide network protocol stats to `net` plugin.
11+
- [#401](https://github.com/influxdb/telegraf/pull/401): Support pass/drop/tagpass/tagdrop for outputs. Thanks @oldmantaiter!
1112

1213
### Bugfixes
1314
- [#405](https://github.com/influxdb/telegraf/issues/405): Prometheus output cardinality issue

README.md

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output
113113
measurements at a 10s interval and will collect per-cpu data, dropping any
114114
measurements which begin with `cpu_time`.
115115

116-
```
116+
```toml
117117
[tags]
118118
dc = "denver-1"
119119

@@ -137,7 +137,7 @@ measurements which begin with `cpu_time`.
137137

138138
Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5)
139139

140-
```
140+
```toml
141141
[plugins]
142142
[[plugins.cpu]]
143143
percpu = true
@@ -156,10 +156,23 @@ Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5)
156156
path = [ "/opt", "/home" ]
157157
```
158158

159+
Below is how to configure `pass` and `drop` parameters (added in 0.1.5)
160+
161+
```toml
162+
# Drop all metrics for guest CPU usage
163+
[[plugins.cpu]]
164+
drop = [ "cpu_usage_guest" ]
165+
166+
# Only store inode related metrics for disks
167+
[[plugins.disk]]
168+
pass = [ "disk_inodes" ]
169+
```
170+
171+
159172
Additional plugins (or outputs) of the same type can be specified,
160173
just define another instance in the config file:
161174

162-
```
175+
```toml
163176
[[plugins.cpu]]
164177
percpu = false
165178
totalcpu = true
@@ -225,6 +238,33 @@ Telegraf also supports specifying multiple output sinks to send data to,
225238
configuring each output sink is different, but examples can be
226239
found by running `telegraf -sample-config`.
227240

241+
Outputs also support the same configurable options as plugins (pass, drop, tagpass, tagdrop)
242+
243+
```toml
244+
[[outputs.influxdb]]
245+
urls = [ "http://localhost:8086" ]
246+
database = "telegraf"
247+
precision = "s"
248+
# Drop all measurements that start with "aerospike"
249+
drop = ["aerospike"]
250+
251+
[[outputs.influxdb]]
252+
urls = [ "http://localhost:8086" ]
253+
database = "telegraf-aerospike-data"
254+
precision = "s"
255+
# Only accept aerospike data:
256+
pass = ["aerospike"]
257+
258+
[[outputs.influxdb]]
259+
urls = [ "http://localhost:8086" ]
260+
database = "telegraf-cpu0-data"
261+
precision = "s"
262+
# Only store measurements where the tag "cpu" matches the value "cpu0"
263+
[outputs.influxdb.tagpass]
264+
cpu = ["cpu0"]
265+
```
266+
267+
228268
## Supported Outputs
229269

230270
* influxdb

accumulator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func (ac *accumulator) AddFields(
107107
}
108108

109109
if ac.pluginConfig != nil {
110-
if !ac.pluginConfig.ShouldPass(measurement) || !ac.pluginConfig.ShouldTagsPass(tags) {
110+
if !ac.pluginConfig.Filter.ShouldPass(measurement) || !ac.pluginConfig.Filter.ShouldTagsPass(tags) {
111111
return
112112
}
113113
}

agent.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,12 +230,13 @@ func (a *Agent) writeOutput(
230230
start := time.Now()
231231

232232
for {
233-
err := ro.Output.Write(points)
233+
filtered := ro.FilterPoints(points)
234+
err := ro.Output.Write(filtered)
234235
if err == nil {
235236
// Write successful
236237
elapsed := time.Since(start)
237238
log.Printf("Flushed %d metrics to output %s in %s\n",
238-
len(points), ro.Name, elapsed)
239+
len(filtered), ro.Name, elapsed)
239240
return
240241
}
241242

internal/config/config.go

Lines changed: 110 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616

1717
"github.com/naoina/toml"
1818
"github.com/naoina/toml/ast"
19+
20+
"github.com/influxdb/influxdb/client/v2"
1921
)
2022

2123
// Config specifies the URL/user/password for the database that telegraf
@@ -88,6 +90,7 @@ type TagFilter struct {
8890
type RunningOutput struct {
8991
Name string
9092
Output outputs.Output
93+
Config *OutputConfig
9194
}
9295

9396
type RunningPlugin struct {
@@ -96,34 +99,61 @@ type RunningPlugin struct {
9699
Config *PluginConfig
97100
}
98101

99-
// PluginConfig containing a name, interval, and drop/pass prefix lists
100-
// Also lists the tags to filter
101-
type PluginConfig struct {
102-
Name string
103-
102+
// Filter containing drop/pass and tagdrop/tagpass rules
103+
type Filter struct {
104104
Drop []string
105105
Pass []string
106106

107107
TagDrop []TagFilter
108108
TagPass []TagFilter
109109

110+
IsActive bool
111+
}
112+
113+
// PluginConfig containing a name, interval, and filter
114+
type PluginConfig struct {
115+
Name string
116+
Filter Filter
110117
Interval time.Duration
111118
}
112119

120+
// OutputConfig containing name and filter
121+
type OutputConfig struct {
122+
Name string
123+
Filter Filter
124+
}
125+
126+
// Filter returns filtered slice of client.Points based on whether filters
127+
// are active for this RunningOutput.
128+
func (ro *RunningOutput) FilterPoints(points []*client.Point) []*client.Point {
129+
if !ro.Config.Filter.IsActive {
130+
return points
131+
}
132+
133+
var filteredPoints []*client.Point
134+
for i := range points {
135+
if !ro.Config.Filter.ShouldPass(points[i].Name()) || !ro.Config.Filter.ShouldTagsPass(points[i].Tags()) {
136+
continue
137+
}
138+
filteredPoints = append(filteredPoints, points[i])
139+
}
140+
return filteredPoints
141+
}
142+
113143
// ShouldPass returns true if the metric should pass, false if should drop
114-
// based on the drop/pass plugin parameters
115-
func (cp *PluginConfig) ShouldPass(measurement string) bool {
116-
if cp.Pass != nil {
117-
for _, pat := range cp.Pass {
144+
// based on the drop/pass filter parameters
145+
func (f Filter) ShouldPass(measurement string) bool {
146+
if f.Pass != nil {
147+
for _, pat := range f.Pass {
118148
if strings.HasPrefix(measurement, pat) {
119149
return true
120150
}
121151
}
122152
return false
123153
}
124154

125-
if cp.Drop != nil {
126-
for _, pat := range cp.Drop {
155+
if f.Drop != nil {
156+
for _, pat := range f.Drop {
127157
if strings.HasPrefix(measurement, pat) {
128158
return false
129159
}
@@ -135,10 +165,10 @@ func (cp *PluginConfig) ShouldPass(measurement string) bool {
135165
}
136166

137167
// ShouldTagsPass returns true if the metric should pass, false if should drop
138-
// based on the tagdrop/tagpass plugin parameters
139-
func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool {
140-
if cp.TagPass != nil {
141-
for _, pat := range cp.TagPass {
168+
// based on the tagdrop/tagpass filter parameters
169+
func (f Filter) ShouldTagsPass(tags map[string]string) bool {
170+
if f.TagPass != nil {
171+
for _, pat := range f.TagPass {
142172
if tagval, ok := tags[pat.Name]; ok {
143173
for _, filter := range pat.Filter {
144174
if filter == tagval {
@@ -150,8 +180,8 @@ func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool {
150180
return false
151181
}
152182

153-
if cp.TagDrop != nil {
154-
for _, pat := range cp.TagDrop {
183+
if f.TagDrop != nil {
184+
for _, pat := range f.TagDrop {
155185
if tagval, ok := tags[pat.Name]; ok {
156186
for _, filter := range pat.Filter {
157187
if filter == tagval {
@@ -469,15 +499,21 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
469499
if !ok {
470500
return fmt.Errorf("Undefined but requested output: %s", name)
471501
}
472-
o := creator()
502+
output := creator()
473503

474-
if err := toml.UnmarshalTable(table, o); err != nil {
504+
outputConfig, err := buildOutput(name, table)
505+
if err != nil {
506+
return err
507+
}
508+
509+
if err := toml.UnmarshalTable(table, output); err != nil {
475510
return err
476511
}
477512

478513
ro := &RunningOutput{
479514
Name: name,
480-
Output: o,
515+
Output: output,
516+
Config: outputConfig,
481517
}
482518
c.Outputs = append(c.Outputs, ro)
483519
return nil
@@ -493,10 +529,15 @@ func (c *Config) addPlugin(name string, table *ast.Table) error {
493529
}
494530
plugin := creator()
495531

496-
pluginConfig, err := applyPlugin(name, table, plugin)
532+
pluginConfig, err := buildPlugin(name, table)
497533
if err != nil {
498534
return err
499535
}
536+
537+
if err := toml.UnmarshalTable(table, plugin); err != nil {
538+
return err
539+
}
540+
500541
rp := &RunningPlugin{
501542
Name: name,
502543
Plugin: plugin,
@@ -506,18 +547,19 @@ func (c *Config) addPlugin(name string, table *ast.Table) error {
506547
return nil
507548
}
508549

509-
// applyPlugin takes defined plugin names and applies them to the given
510-
// interface, returning a PluginConfig object in the end that can
511-
// be inserted into a runningPlugin by the agent.
512-
func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, error) {
513-
cp := &PluginConfig{Name: name}
550+
// buildFilter builds a Filter (tagpass/tagdrop/pass/drop) to
551+
// be inserted into the OutputConfig/PluginConfig to be used for prefix
552+
// filtering on tags and measurements
553+
func buildFilter(tbl *ast.Table) Filter {
554+
f := Filter{}
514555

515556
if node, ok := tbl.Fields["pass"]; ok {
516557
if kv, ok := node.(*ast.KeyValue); ok {
517558
if ary, ok := kv.Value.(*ast.Array); ok {
518559
for _, elem := range ary.Value {
519560
if str, ok := elem.(*ast.String); ok {
520-
cp.Pass = append(cp.Pass, str.Value)
561+
f.Pass = append(f.Pass, str.Value)
562+
f.IsActive = true
521563
}
522564
}
523565
}
@@ -529,26 +571,14 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
529571
if ary, ok := kv.Value.(*ast.Array); ok {
530572
for _, elem := range ary.Value {
531573
if str, ok := elem.(*ast.String); ok {
532-
cp.Drop = append(cp.Drop, str.Value)
574+
f.Drop = append(f.Drop, str.Value)
575+
f.IsActive = true
533576
}
534577
}
535578
}
536579
}
537580
}
538581

539-
if node, ok := tbl.Fields["interval"]; ok {
540-
if kv, ok := node.(*ast.KeyValue); ok {
541-
if str, ok := kv.Value.(*ast.String); ok {
542-
dur, err := time.ParseDuration(str.Value)
543-
if err != nil {
544-
return nil, err
545-
}
546-
547-
cp.Interval = dur
548-
}
549-
}
550-
}
551-
552582
if node, ok := tbl.Fields["tagpass"]; ok {
553583
if subtbl, ok := node.(*ast.Table); ok {
554584
for name, val := range subtbl.Fields {
@@ -561,7 +591,8 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
561591
}
562592
}
563593
}
564-
cp.TagPass = append(cp.TagPass, *tagfilter)
594+
f.TagPass = append(f.TagPass, *tagfilter)
595+
f.IsActive = true
565596
}
566597
}
567598
}
@@ -579,16 +610,50 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
579610
}
580611
}
581612
}
582-
cp.TagDrop = append(cp.TagDrop, *tagfilter)
613+
f.TagDrop = append(f.TagDrop, *tagfilter)
614+
f.IsActive = true
583615
}
584616
}
585617
}
586618
}
587619

588620
delete(tbl.Fields, "drop")
589621
delete(tbl.Fields, "pass")
590-
delete(tbl.Fields, "interval")
591622
delete(tbl.Fields, "tagdrop")
592623
delete(tbl.Fields, "tagpass")
593-
return cp, toml.UnmarshalTable(tbl, p)
624+
return f
625+
}
626+
627+
// buildPlugin parses plugin specific items from the ast.Table, builds the filter and returns a
628+
// PluginConfig to be inserted into RunningPlugin
629+
func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) {
630+
cp := &PluginConfig{Name: name}
631+
if node, ok := tbl.Fields["interval"]; ok {
632+
if kv, ok := node.(*ast.KeyValue); ok {
633+
if str, ok := kv.Value.(*ast.String); ok {
634+
dur, err := time.ParseDuration(str.Value)
635+
if err != nil {
636+
return nil, err
637+
}
638+
639+
cp.Interval = dur
640+
}
641+
}
642+
}
643+
delete(tbl.Fields, "interval")
644+
cp.Filter = buildFilter(tbl)
645+
return cp, nil
646+
647+
}
648+
649+
// buildOutput parses output specific items from the ast.Table, builds the filter and returns an
650+
// OutputConfig to be inserted into RunningPlugin
651+
// Note: error exists in the return for future calls that might require error
652+
func buildOutput(name string, tbl *ast.Table) (*OutputConfig, error) {
653+
oc := &OutputConfig{
654+
Name: name,
655+
Filter: buildFilter(tbl),
656+
}
657+
return oc, nil
658+
594659
}

0 commit comments

Comments
 (0)