Skip to content

Commit 5a3a77b

Browse files
committed
Add support for pass/drop/tagpass/tagdrop for outputs
Reuses same logic as the plugins for filtering points, should be only a marginal performance decrease to check all the points before writing to the output. Added examples to the README as well (for generic pass/drop as well as output pass/drop/tagpass/tagdrop). X-Github-Closes #398
1 parent b705608 commit 5a3a77b

File tree

5 files changed

+174
-70
lines changed

5 files changed

+174
-70
lines changed

README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,19 @@ Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5)
156156
path = [ "/opt", "/home" ]
157157
```
158158

159+
Below is how to configure `pass` and `drop` parameters (added in 0.1.5)
160+
161+
```
162+
# Drop all metrics for guest CPU usage
163+
[[plugins.cpu]]
164+
drop = [ "cpu_usage_guest" ]
165+
166+
# Only store inode related metrics for disks
167+
[[plugins.disk]]
168+
pass = [ "disk_inodes" ]
169+
```
170+
171+
159172
Additional plugins (or outputs) of the same type can be specified,
160173
just define another instance in the config file:
161174

@@ -224,6 +237,27 @@ Telegraf also supports specifying multiple output sinks to send data to,
224237
configuring each output sink is different, but examples can be
225238
found by running `telegraf -sample-config`.
226239

240+
Outputs also support the same configurable options as plugins (pass, drop, tagpass, tagdrop)
241+
242+
```
243+
[[outputs.influxdb]]
244+
urls = [ "http://localhost:8086" ]
245+
database = "telegraf"
246+
# Drop all measurements that start with "aerospike"
247+
drop = ["aerospike"]
248+
249+
# Send to a different database
250+
[[outputs.influxdb]]
251+
urls = [ "http://localhost:8086" ]
252+
database = "mydb"
253+
precision = "s"
254+
255+
# Only store measurements where the tag "mytag" matches the value "B"
256+
[outputs.influxdb.tagpass]
257+
mytag = ["B"]
258+
```
259+
260+
227261
## Supported Outputs
228262

229263
* influxdb

accumulator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func (ac *accumulator) AddFields(
107107
}
108108

109109
if ac.pluginConfig != nil {
110-
if !ac.pluginConfig.ShouldPass(measurement) || !ac.pluginConfig.ShouldTagsPass(tags) {
110+
if !ac.pluginConfig.Filter.ShouldPass(measurement) || !ac.pluginConfig.Filter.ShouldTagsPass(tags) {
111111
return
112112
}
113113
}

agent.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,12 +226,13 @@ func (a *Agent) writeOutput(
226226
start := time.Now()
227227

228228
for {
229-
err := ro.Output.Write(points)
229+
filtered := ro.FilterPoints(points)
230+
err := ro.Output.Write(filtered)
230231
if err == nil {
231232
// Write successful
232233
elapsed := time.Since(start)
233234
log.Printf("Flushed %d metrics to output %s in %s\n",
234-
len(points), ro.Name, elapsed)
235+
len(filtered), ro.Name, elapsed)
235236
return
236237
}
237238

internal/config/config.go

Lines changed: 110 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616

1717
"github.com/naoina/toml"
1818
"github.com/naoina/toml/ast"
19+
20+
"github.com/influxdb/influxdb/client/v2"
1921
)
2022

2123
// Config specifies the URL/user/password for the database that telegraf
@@ -88,6 +90,7 @@ type TagFilter struct {
8890
type RunningOutput struct {
8991
Name string
9092
Output outputs.Output
93+
Config *OutputConfig
9194
}
9295

9396
type RunningPlugin struct {
@@ -96,34 +99,61 @@ type RunningPlugin struct {
9699
Config *PluginConfig
97100
}
98101

99-
// PluginConfig containing a name, interval, and drop/pass prefix lists
100-
// Also lists the tags to filter
101-
type PluginConfig struct {
102-
Name string
103-
102+
// Filter containing drop/pass and tagdrop/tagpass rules
103+
type Filter struct {
104104
Drop []string
105105
Pass []string
106106

107107
TagDrop []TagFilter
108108
TagPass []TagFilter
109109

110+
IsActive bool
111+
}
112+
113+
// PluginConfig containing a name, interval, and filter
114+
type PluginConfig struct {
115+
Name string
116+
Filter *Filter
110117
Interval time.Duration
111118
}
112119

120+
// OutputConfig containing name and filter
121+
type OutputConfig struct {
122+
Name string
123+
Filter *Filter
124+
}
125+
126+
// Filter returns filtered slice of client.Points based on whether filters
127+
// are active for this RunningOutput.
128+
func (ro *RunningOutput) FilterPoints(points []*client.Point) []*client.Point {
129+
if !ro.Config.Filter.IsActive {
130+
return points
131+
}
132+
133+
var filteredPoints []*client.Point
134+
for i := range points {
135+
if !ro.Config.Filter.ShouldPass(points[i].Name()) || !ro.Config.Filter.ShouldTagsPass(points[i].Tags()) {
136+
continue
137+
}
138+
filteredPoints = append(filteredPoints, points[i])
139+
}
140+
return filteredPoints
141+
}
142+
113143
// ShouldPass returns true if the metric should pass, false if should drop
114-
// based on the drop/pass plugin parameters
115-
func (cp *PluginConfig) ShouldPass(measurement string) bool {
116-
if cp.Pass != nil {
117-
for _, pat := range cp.Pass {
144+
// based on the drop/pass filter parameters
145+
func (f *Filter) ShouldPass(measurement string) bool {
146+
if f.Pass != nil {
147+
for _, pat := range f.Pass {
118148
if strings.HasPrefix(measurement, pat) {
119149
return true
120150
}
121151
}
122152
return false
123153
}
124154

125-
if cp.Drop != nil {
126-
for _, pat := range cp.Drop {
155+
if f.Drop != nil {
156+
for _, pat := range f.Drop {
127157
if strings.HasPrefix(measurement, pat) {
128158
return false
129159
}
@@ -135,10 +165,10 @@ func (cp *PluginConfig) ShouldPass(measurement string) bool {
135165
}
136166

137167
// ShouldTagsPass returns true if the metric should pass, false if should drop
138-
// based on the tagdrop/tagpass plugin parameters
139-
func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool {
140-
if cp.TagPass != nil {
141-
for _, pat := range cp.TagPass {
168+
// based on the tagdrop/tagpass filter parameters
169+
func (f *Filter) ShouldTagsPass(tags map[string]string) bool {
170+
if f.TagPass != nil {
171+
for _, pat := range f.TagPass {
142172
if tagval, ok := tags[pat.Name]; ok {
143173
for _, filter := range pat.Filter {
144174
if filter == tagval {
@@ -150,8 +180,8 @@ func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool {
150180
return false
151181
}
152182

153-
if cp.TagDrop != nil {
154-
for _, pat := range cp.TagDrop {
183+
if f.TagDrop != nil {
184+
for _, pat := range f.TagDrop {
155185
if tagval, ok := tags[pat.Name]; ok {
156186
for _, filter := range pat.Filter {
157187
if filter == tagval {
@@ -469,15 +499,21 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
469499
if !ok {
470500
return fmt.Errorf("Undefined but requested output: %s", name)
471501
}
472-
o := creator()
502+
output := creator()
473503

474-
if err := toml.UnmarshalTable(table, o); err != nil {
504+
outputConfig, err := buildOutput(name, table)
505+
if err != nil {
506+
return err
507+
}
508+
509+
if err := toml.UnmarshalTable(table, output); err != nil {
475510
return err
476511
}
477512

478513
ro := &RunningOutput{
479514
Name: name,
480-
Output: o,
515+
Output: output,
516+
Config: outputConfig,
481517
}
482518
c.Outputs = append(c.Outputs, ro)
483519
return nil
@@ -493,10 +529,15 @@ func (c *Config) addPlugin(name string, table *ast.Table) error {
493529
}
494530
plugin := creator()
495531

496-
pluginConfig, err := applyPlugin(name, table, plugin)
532+
pluginConfig, err := buildPlugin(name, table)
497533
if err != nil {
498534
return err
499535
}
536+
537+
if err := toml.UnmarshalTable(table, plugin); err != nil {
538+
return err
539+
}
540+
500541
rp := &RunningPlugin{
501542
Name: name,
502543
Plugin: plugin,
@@ -506,18 +547,19 @@ func (c *Config) addPlugin(name string, table *ast.Table) error {
506547
return nil
507548
}
508549

509-
// applyPlugin takes defined plugin names and applies them to the given
510-
// interface, returning a PluginConfig object in the end that can
511-
// be inserted into a runningPlugin by the agent.
512-
func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, error) {
513-
cp := &PluginConfig{Name: name}
550+
// buildFilter builds a Filter (tagpass/tagdrop/pass/drop) to
551+
// be inserted into the OutputConfig/PluginConfig to be used for prefix
552+
// filtering on tags and measurements
553+
func buildFilter(tbl *ast.Table) *Filter {
554+
f := &Filter{}
514555

515556
if node, ok := tbl.Fields["pass"]; ok {
516557
if kv, ok := node.(*ast.KeyValue); ok {
517558
if ary, ok := kv.Value.(*ast.Array); ok {
518559
for _, elem := range ary.Value {
519560
if str, ok := elem.(*ast.String); ok {
520-
cp.Pass = append(cp.Pass, str.Value)
561+
f.Pass = append(f.Pass, str.Value)
562+
f.IsActive = true
521563
}
522564
}
523565
}
@@ -529,26 +571,14 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
529571
if ary, ok := kv.Value.(*ast.Array); ok {
530572
for _, elem := range ary.Value {
531573
if str, ok := elem.(*ast.String); ok {
532-
cp.Drop = append(cp.Drop, str.Value)
574+
f.Drop = append(f.Drop, str.Value)
575+
f.IsActive = true
533576
}
534577
}
535578
}
536579
}
537580
}
538581

539-
if node, ok := tbl.Fields["interval"]; ok {
540-
if kv, ok := node.(*ast.KeyValue); ok {
541-
if str, ok := kv.Value.(*ast.String); ok {
542-
dur, err := time.ParseDuration(str.Value)
543-
if err != nil {
544-
return nil, err
545-
}
546-
547-
cp.Interval = dur
548-
}
549-
}
550-
}
551-
552582
if node, ok := tbl.Fields["tagpass"]; ok {
553583
if subtbl, ok := node.(*ast.Table); ok {
554584
for name, val := range subtbl.Fields {
@@ -561,7 +591,8 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
561591
}
562592
}
563593
}
564-
cp.TagPass = append(cp.TagPass, *tagfilter)
594+
f.TagPass = append(f.TagPass, *tagfilter)
595+
f.IsActive = true
565596
}
566597
}
567598
}
@@ -579,16 +610,50 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
579610
}
580611
}
581612
}
582-
cp.TagDrop = append(cp.TagDrop, *tagfilter)
613+
f.TagDrop = append(f.TagDrop, *tagfilter)
614+
f.IsActive = true
583615
}
584616
}
585617
}
586618
}
587619

588620
delete(tbl.Fields, "drop")
589621
delete(tbl.Fields, "pass")
590-
delete(tbl.Fields, "interval")
591622
delete(tbl.Fields, "tagdrop")
592623
delete(tbl.Fields, "tagpass")
593-
return cp, toml.UnmarshalTable(tbl, p)
624+
return f
625+
}
626+
627+
// buildPlugin parses plugin specific items from the ast.Table, builds the filter and returns a
628+
// PluginConfig to be inserted into RunningPlugin
629+
func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) {
630+
cp := &PluginConfig{Name: name}
631+
if node, ok := tbl.Fields["interval"]; ok {
632+
if kv, ok := node.(*ast.KeyValue); ok {
633+
if str, ok := kv.Value.(*ast.String); ok {
634+
dur, err := time.ParseDuration(str.Value)
635+
if err != nil {
636+
return nil, err
637+
}
638+
639+
cp.Interval = dur
640+
}
641+
}
642+
}
643+
delete(tbl.Fields, "interval")
644+
cp.Filter = buildFilter(tbl)
645+
return cp, nil
646+
647+
}
648+
649+
// buildOutput parses output specific items from the ast.Table, builds the filter and returns an
650+
// OutputConfig to be inserted into RunningPlugin
651+
// Note: error exists in the return for future calls that might require error
652+
func buildOutput(name string, tbl *ast.Table) (*OutputConfig, error) {
653+
oc := &OutputConfig{
654+
Name: name,
655+
Filter: buildFilter(tbl),
656+
}
657+
return oc, nil
658+
594659
}

0 commit comments

Comments
 (0)