-
Notifications
You must be signed in to change notification settings - Fork 5.8k
Plugin/reader each interval #4332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
e12eced
08a11d7
9c4b522
4e24a1b
ec7f131
504d978
542c030
554b960
36a23ea
f40371e
9c84595
cc40629
79d9ea4
bbd68b3
bf7220d
a931eb1
e450b26
001658a
7fa27f4
1be2a8e
aa750ec
892c95a
04f09d6
8063b38
bfc13a7
67db143
8a9da28
cafa95e
c6087ab
e4b6f23
d224673
f52ceeb
285cf0b
0c3ac29
74900ed
d0f5389
dd778a9
5449eb7
1f58dd7
2a18ca2
50f49fe
08d1397
63da4e6
88a85a7
2638186
1fe3adb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1338,6 +1338,59 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { | |
| } | ||
| } | ||
|
|
||
| //for grok data_format | ||
| if node, ok := tbl.Fields["named_patterns"]; ok { | ||
| if kv, ok := node.(*ast.KeyValue); ok { | ||
| if ary, ok := kv.Value.(*ast.Array); ok { | ||
| for _, elem := range ary.Value { | ||
| if str, ok := elem.(*ast.String); ok { | ||
| c.NamedPatterns = append(c.NamedPatterns, str.Value) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if node, ok := tbl.Fields["patterns"]; ok { | ||
| if kv, ok := node.(*ast.KeyValue); ok { | ||
| if ary, ok := kv.Value.(*ast.Array); ok { | ||
| for _, elem := range ary.Value { | ||
| if str, ok := elem.(*ast.String); ok { | ||
| c.Patterns = append(c.Patterns, str.Value) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if node, ok := tbl.Fields["custom_patterns"]; ok { | ||
| if kv, ok := node.(*ast.KeyValue); ok { | ||
| if str, ok := kv.Value.(*ast.String); ok { | ||
| c.CustomPatterns = str.Value | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if node, ok := tbl.Fields["custom_pattern_files"]; ok { | ||
| if kv, ok := node.(*ast.KeyValue); ok { | ||
| if ary, ok := kv.Value.(*ast.Array); ok { | ||
| for _, elem := range ary.Value { | ||
| if str, ok := elem.(*ast.String); ok { | ||
| c.CustomPatternFiles = append(c.CustomPatternFiles, str.Value) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if node, ok := tbl.Fields["timezone"]; ok { | ||
| if kv, ok := node.(*ast.KeyValue); ok { | ||
| if str, ok := kv.Value.(*ast.String); ok { | ||
| c.TimeZone = str.Value | ||
| } | ||
| } | ||
| } | ||
|
|
||
| c.MetricName = name | ||
|
|
||
| delete(tbl.Fields, "data_format") | ||
|
|
@@ -1353,6 +1406,11 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { | |
| delete(tbl.Fields, "dropwizard_time_format") | ||
| delete(tbl.Fields, "dropwizard_tags_path") | ||
| delete(tbl.Fields, "dropwizard_tag_paths") | ||
| delete(tbl.Fields, "named_patterns") | ||
| delete(tbl.Fields, "patterns") | ||
| delete(tbl.Fields, "custom_patterns") | ||
| delete(tbl.Fields, "custom_pattern_files") | ||
| delete(tbl.Fields, "timezone") | ||
|
||
|
|
||
| return parsers.NewParser(c) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| version: '3' | ||
|
|
||
| services: | ||
| telegraf: | ||
| image: glinton/scratch | ||
| volumes: | ||
| - ./telegraf.conf:/telegraf.conf | ||
| - ../../../../telegraf:/telegraf | ||
| - ./json_a.log:/var/log/test.log | ||
| entrypoint: | ||
| - /telegraf | ||
| - --config | ||
| - /telegraf.conf | ||
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| { | ||
| "parent": { | ||
| "child": 3.0, | ||
| "ignored_child": "hi" | ||
| }, | ||
| "ignored_null": null, | ||
| "integer": 4, | ||
| "list": [3, 4], | ||
| "ignored_parent": { | ||
| "another_ignored_null": null, | ||
| "ignored_string": "hello, world!" | ||
| }, | ||
| "another_list": [4] | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
|
|
||
|
||
| # Global tags can be specified here in key="value" format. | ||
| [global_tags] | ||
| # dc = "us-east-1" # will tag all metrics with dc=us-east-1 | ||
| # rack = "1a" | ||
| ## Environment variables can be used as tags, and throughout the config file | ||
| # user = "$USER" | ||
|
|
||
|
|
||
| # Configuration for telegraf agent | ||
| [agent] | ||
| ## Default data collection interval for all inputs | ||
| interval = "15s" | ||
| ## Rounds collection interval to 'interval' | ||
| ## ie, if interval="10s" then always collect on :00, :10, :20, etc. | ||
| round_interval = true | ||
|
|
||
| ## Telegraf will send metrics to outputs in batches of at most | ||
| ## metric_batch_size metrics. | ||
| ## This controls the size of writes that Telegraf sends to output plugins. | ||
| metric_batch_size = 1000 | ||
|
|
||
| ## For failed writes, telegraf will cache metric_buffer_limit metrics for each | ||
| ## output, and will flush this buffer on a successful write. Oldest metrics | ||
| ## are dropped first when this buffer fills. | ||
| ## This buffer only fills when writes fail to output plugin(s). | ||
| metric_buffer_limit = 10000 | ||
|
|
||
| ## Collection jitter is used to jitter the collection by a random amount. | ||
| ## Each plugin will sleep for a random time within jitter before collecting. | ||
| ## This can be used to avoid many plugins querying things like sysfs at the | ||
| ## same time, which can have a measurable effect on the system. | ||
| collection_jitter = "0s" | ||
|
|
||
| ## Default flushing interval for all outputs. You shouldn't set this below | ||
| ## interval. Maximum flush_interval will be flush_interval + flush_jitter | ||
| flush_interval = "10s" | ||
| ## Jitter the flush interval by a random amount. This is primarily to avoid | ||
| ## large write spikes for users running a large number of telegraf instances. | ||
| ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s | ||
| flush_jitter = "0s" | ||
|
|
||
| ## By default or when set to "0s", precision will be set to the same | ||
| ## timestamp order as the collection interval, with the maximum being 1s. | ||
| ## ie, when interval = "10s", precision will be "1s" | ||
| ## when interval = "250ms", precision will be "1ms" | ||
| ## Precision will NOT be used for service inputs. It is up to each individual | ||
| ## service input to set the timestamp at the appropriate precision. | ||
| ## Valid time units are "ns", "us" (or "µs"), "ms", "s". | ||
| precision = "" | ||
|
|
||
| ## Logging configuration: | ||
| ## Run telegraf with debug log messages. | ||
| debug = false | ||
| ## Run telegraf in quiet mode (error log messages only). | ||
| quiet = false | ||
| ## Specify the log file name. The empty string means to log to stderr. | ||
| logfile = "" | ||
|
|
||
| ## Override default hostname, if empty use os.Hostname() | ||
| hostname = "" | ||
| ## If set to true, do no set the "host" tag in the telegraf agent. | ||
| omit_hostname = false | ||
|
|
||
| # # reload and gather from file[s] on telegraf's interval | ||
| [[inputs.reader]] | ||
| # ## These accept standard unix glob matching rules, but with the addition of | ||
| # ## ** as a "super asterisk". ie: | ||
| # ## /var/log/**.log -> recursively find all .log files in /var/log | ||
| # ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log | ||
| # ## /var/log/apache.log -> only tail the apache log file | ||
| files = ["/var/log/test.log"] | ||
| # | ||
| # ## The dataformat to be read from files | ||
| # ## Each data format has its own unique set of configuration options, read | ||
| # ## more about them here: | ||
| # ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md | ||
| data_format = "json" | ||
| # | ||
|
|
||
| #patterns = ["%{TEST_LOG_B}","%{TEST_LOG_A}"] | ||
| # | ||
| # ## Name of the outputted measurement name. | ||
| #name_override = "grok_reader" | ||
| # | ||
| # ## Full path(s) to custom pattern files. | ||
| #custom_pattern_files = ["/Users/maxu/go/src/github.com/influxdata/telegraf/plugins/inputs/logparser/grok/testdata/test-patterns"] | ||
| # | ||
| # ## Custom patterns can also be defined here. Put one pattern per line. | ||
| # custom_patterns = ''' | ||
| # ''' | ||
| # | ||
| # ## Timezone allows you to provide an override for timestamps that | ||
| # ## don't already include an offset | ||
| # ## e.g. 04/06/2016 12:41:45 data one two 5.43µs | ||
| # ## | ||
| # ## Default: "" which renders UTC | ||
| # ## Options are as follows: | ||
| # ## 1. Local -- interpret based on machine localtime | ||
| # ## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones | ||
| # ## 3. UTC -- or blank/unspecified, will return timestamp in UTC | ||
| # timezone = "Canada/Eastern" | ||
|
|
||
|
|
||
| [[outputs.file]] | ||
| files = ["stdout"] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| package reader | ||
|
|
||
| import ( | ||
| "io/ioutil" | ||
| "log" | ||
|
|
||
| "github.com/influxdata/telegraf" | ||
| "github.com/influxdata/telegraf/internal/globpath" | ||
| "github.com/influxdata/telegraf/plugins/inputs" | ||
| "github.com/influxdata/telegraf/plugins/parsers" | ||
| ) | ||
|
|
||
| type Reader struct { | ||
| Filepaths []string `toml:"files"` | ||
|
||
| FromBeginning bool | ||
| parser parsers.Parser | ||
|
|
||
| Filenames []string | ||
|
||
| } | ||
|
|
||
| const sampleConfig = `## Files to parse each interval. | ||
| ## These accept standard unix glob matching rules, but with the addition of | ||
| ## ** as a "super asterisk". ie: | ||
| ## /var/log/**.log -> recursively find all .log files in /var/log | ||
| ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log | ||
| ## /var/log/apache.log -> only tail the apache log file | ||
| files = ["/var/log/apache/access.log"] | ||
|
|
||
| ## The dataformat to be read from files | ||
| ## Each data format has its own unique set of configuration options, read | ||
| ## more about them here: | ||
| ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md | ||
| data_format = "" | ||
|
||
| ` | ||
|
|
||
| // SampleConfig returns the default configuration of the Input | ||
| func (r *Reader) SampleConfig() string { | ||
| return sampleConfig | ||
| } | ||
|
|
||
| func (r *Reader) Description() string { | ||
| return "reload and gather from file[s] on telegraf's interval" | ||
| } | ||
|
|
||
| func (r *Reader) Gather(acc telegraf.Accumulator) error { | ||
| r.refreshFilePaths() | ||
| for _, k := range r.Filenames { | ||
| metrics, err := r.readMetric(k) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| for i, m := range metrics { | ||
|
|
||
| //error if m is nil | ||
| if m == nil { | ||
|
||
| log.Printf("E! Metric could not be parsed from: %v, on line %v", k, i) | ||
|
||
| continue | ||
| } | ||
| acc.AddFields(m.Name(), m.Fields(), m.Tags()) | ||
|
||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (r *Reader) SetParser(p parsers.Parser) { | ||
| r.parser = p | ||
| } | ||
|
|
||
| func (r *Reader) refreshFilePaths() { | ||
| var allFiles []string | ||
| for _, filepath := range r.Filepaths { | ||
| g, err := globpath.Compile(filepath) | ||
|
||
| if err != nil { | ||
| log.Printf("E! Error Glob %s failed to compile, %s", filepath, err) | ||
|
||
| continue | ||
| } | ||
| files := g.Match() | ||
|
|
||
| for k := range files { | ||
| allFiles = append(allFiles, k) | ||
| } | ||
| } | ||
|
|
||
| r.Filenames = allFiles | ||
| } | ||
|
|
||
| //requires that Parser has been compiled | ||
| func (r *Reader) readMetric(filename string) ([]telegraf.Metric, error) { | ||
| fileContents, err := ioutil.ReadFile(filename) | ||
| if err != nil { | ||
| log.Printf("E! File could not be opened: %v", filename) | ||
|
||
| } | ||
| return r.parser.Parse(fileContents) | ||
|
|
||
| } | ||
|
|
||
| func init() { | ||
| inputs.Add("reader", func() telegraf.Input { | ||
| return &Reader{} | ||
| }) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may cause a merge conflict when we merge #4324 unless you merged that branch into yours.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I do when I need to have parts from another commit/branch is either cherry-pick that commit or make the change to the shared file, and just not commit the changes to that file from my new branch.