Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 112 additions & 9 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,23 @@ but can be overridden using the `name_override` config option.

#### JSON Configuration:

The JSON data format supports specifying "tag keys". If specified, keys
will be searched for in the root-level of the JSON blob. If the key(s) exist,
they will be applied as tags to the Telegraf metrics.
The JSON data format supports specifying "tag_keys", "string_keys", and "json_query".
If specified, keys in "tag_keys" and "string_keys" will be searched for in the root-level
and any nested lists of the JSON blob. All int and float values are added to fields by default.
If the key(s) exist, they will be applied as tags or fields to the Telegraf metrics.
If "string_keys" is specified, the string will be added as a field.

The "json_query" configuration is a gjson path to an JSON object or list of JSON objects.
If this path leads to an array of values or single data point an error will be thrown. If this
configuration is specified, only the result of the query will be parsed and returned as metrics.

Object paths are specified using gjson path format, which is denoted by object keys
concatenated with "." to go deeper in nested JSON objects.
Additional information on gjson paths can be found here: https://github.com/tidwall/gjson#path-syntax

The JSON data format also supports extracting time values through the config "json_time_key" and "json_time_format".
If "json_time_key" is set, "json_time_format" must be specified. The "json_time_key" describes the name of the field containing time information. The "json_time_format" must be a recognized Go time format.
More info on time formats can be found here: https://golang.org/src/time/format.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use https://golang.org/pkg/time/#Parse as the link. This other link is still not great but the audience of this documentation is not Go developers.

Try to wrap around 78 chars


For example, if you had this configuration:

Expand All @@ -124,11 +138,25 @@ For example, if you had this configuration:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"

## List of tag names to extract from top-level of JSON server response
## List of tag names to extract from JSON server response
tag_keys = [
"my_tag_1",
"my_tag_2"
]

## List of field names to extract from JSON and add as string fields
# string_fields = []

## gjson query path to specify a specific chunk of JSON to be parsed with the above configuration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap comments before 78 chars for readability.

## if not specified, the whole file will be parsed.
## gjson query paths are described here:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add link to where gjson query paths are described

# json_query = ""

## holds the name of the tag of timestamp
# json_time_key = ""

## holds the format of timestamp to be parsed
# json_time_format = ""
```

with this JSON output from a command:
Expand Down Expand Up @@ -173,6 +201,19 @@ For example, if the following configuration:
"my_tag_1",
"my_tag_2"
]

## List of field names to extract from JSON and add as string fields
# string_fields = []

## gjson query path to specify a specific chunk of JSON to be parsed with the above configuration
## if not specified, the whole file will be parsed
# json_query = ""

## holds the name of the tag of timestamp
# json_time_key = "b_time"

## holds the format of timestamp to be parsed
# json_time_format = "02 Jan 06 15:04 MST"
```

with this JSON output from a command:
Expand All @@ -182,27 +223,89 @@ with this JSON output from a command:
{
"a": 5,
"b": {
"c": 6
"c": 6,
"time":"04 Jan 06 15:04 MST"
},
"my_tag_1": "foo",
"my_tag_2": "baz"
},
{
"a": 7,
"b": {
"c": 8
"c": 8,
"time":"11 Jan 07 15:04 MST"
},
"my_tag_1": "bar",
"my_tag_2": "baz"
}
]
```

Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2"
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" and fielded with "b_c"
The metric's time will be a time.Time object, as specified by "b_time"

```
exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6 1136387040000000000
exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8 1168527840000000000
```

If you want to only use a specific portion of your JSON, use the "json_query"
configuration to specify a path to a JSON object.

For example, with the following config:
```toml
[[inputs.exec]]
## Commands array
commands = ["/usr/bin/mycollector --foo=bar"]

## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"

## List of tag names to extract from top-level of JSON server response
tag_keys = ["first"]

## List of field names to extract from JSON and add as string fields
string_fields = ["last"]

## gjson query path to specify a specific chunk of JSON to be parsed with the above configuration
## if not specified, the whole file will be parsed
json_query = "obj.friends"

## holds the name of the tag of timestamp
# json_time_key = ""

## holds the format of timestamp to be parsed
# json_time_format = ""
```

with this JSON as input:
```json
{
"obj": {
"name": {"first": "Tom", "last": "Anderson"},
"age":37,
"children": ["Sara","Alex","Jack"],
"fav.movie": "Deer Hunter",
"friends": [
{"first": "Dale", "last": "Murphy", "age": 44},
{"first": "Roger", "last": "Craig", "age": 68},
{"first": "Jane", "last": "Murphy", "age": 47}
]
}
}
```
You would recieve 3 metrics tagged with "first", and fielded with "last" and "age"

```
exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
exec_mycollector, "first":"Dale" "last":"Murphy","age":44
exec_mycollector, "first":"Roger" "last":"Craig","age":68
exec_mycollector, "first":"Jane" "last":"Murphy","age":47
```

# Value:
Expand Down
40 changes: 40 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,42 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
}
}

if node, ok := tbl.Fields["string_fields"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.StringFields = append(c.StringFields, str.Value)
}
}
}
}
}

if node, ok := tbl.Fields["json_query"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.JSONQuery = str.Value
}
}
}

if node, ok := tbl.Fields["json_time_key"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.JSONTimeKey = str.Value
}
}
}

if node, ok := tbl.Fields["json_time_format"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.JSONTimeFormat = str.Value
}
}
}

if node, ok := tbl.Fields["data_type"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
Expand Down Expand Up @@ -1344,6 +1380,10 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "separator")
delete(tbl.Fields, "templates")
delete(tbl.Fields, "tag_keys")
delete(tbl.Fields, "string_fields")
delete(tbl.Fields, "json_query")
delete(tbl.Fields, "json_time_key")
delete(tbl.Fields, "json_time_format")
delete(tbl.Fields, "data_type")
delete(tbl.Fields, "collectd_auth_file")
delete(tbl.Fields, "collectd_security_level")
Expand Down
5 changes: 4 additions & 1 deletion internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ func TestConfig_LoadDirectory(t *testing.T) {
"Testdata did not produce correct memcached metadata.")

ex := inputs.Inputs["exec"]().(*exec.Exec)
p, err := parsers.NewJSONParser("exec", nil, nil)
p, err := parsers.NewParser(&parsers.Config{
MetricName: "exec",
DataFormat: "json",
})
assert.NoError(t, err)
ex.SetParser(p)
ex.Command = "/usr/bin/myothercollector --foo=bar"
Expand Down
15 changes: 12 additions & 3 deletions plugins/inputs/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]by
}

func TestExec(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(validJson), nil),
Commands: []string{"testcommand arg1"},
Expand All @@ -119,7 +122,10 @@ func TestExec(t *testing.T) {
}

func TestExecMalformed(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(malformedJson), nil),
Commands: []string{"badcommand arg1"},
Expand All @@ -132,7 +138,10 @@ func TestExecMalformed(t *testing.T) {
}

func TestCommandError(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
Commands: []string{"badcommand"},
Expand Down
24 changes: 18 additions & 6 deletions plugins/inputs/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ func TestHTTPwithJSONFormat(t *testing.T) {
URLs: []string{url},
}
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)

p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand Down Expand Up @@ -63,8 +67,11 @@ func TestHTTPHeaders(t *testing.T) {
URLs: []string{url},
Headers: map[string]string{header: headerValue},
}
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)

p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand All @@ -83,7 +90,10 @@ func TestInvalidStatusCode(t *testing.T) {
}

metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: metricName,
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand All @@ -105,8 +115,10 @@ func TestMethod(t *testing.T) {
Method: "POST",
}

metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand Down
7 changes: 6 additions & 1 deletion plugins/inputs/httpjson/httpjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,12 @@ func (h *HttpJson) gatherServer(
"server": serverURL,
}

parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags)
parser, err := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: msrmnt_name,
TagKeys: h.TagKeys,
DefaultTags: tags,
})
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc
defer close(k.done)

k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
go k.receiver()
in <- saramaMsg(testMsgJSON)
acc.Wait(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc
defer close(k.done)

k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
go k.receiver()
in <- saramaMsg(testMsgJSON)
acc.Wait(1)
Expand Down
5 changes: 4 additions & 1 deletion plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.acc = &acc
defer close(n.done)

n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
n.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "nats_json_test",
})
go n.receiver()
in <- mqttMsg(testMsgJSON)

Expand Down
5 changes: 4 additions & 1 deletion plugins/inputs/nats_consumer/nats_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.acc = &acc
defer close(n.done)

n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
n.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "nats_json_test",
})
n.wg.Add(1)
go n.receiver()
in <- natsMsg(testMsgJSON)
Expand Down
5 changes: 4 additions & 1 deletion plugins/inputs/tcp_listener/tcp_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,10 @@ func TestRunParserJSONMsg(t *testing.T) {
listener.acc = &acc
defer close(listener.done)

listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
listener.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "udp_json_test",
})
listener.wg.Add(1)
go listener.tcpParser()

Expand Down
Loading