Skip to content

Commit 12bfd67

Browse files
danielnelsonmarcosnils
authored andcommitted
Set defaults on kinesis output partition key when tag does not exist.
Signed-off-by: Marcos Lilljedahl <marcosnils@gmail.com>
1 parent 8d0ec99 commit 12bfd67

File tree

3 files changed

+22
-8
lines changed

3 files changed

+22
-8
lines changed

plugins/outputs/kinesis/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ All metrics will be mapped to the same shard which may limit throughput.
7171
#### tag
7272

7373
This will take the value of the specified tag from each metric as the paritionKey.
74-
If the tag is not found an empty string will be used.
74+
If the tag is not found the `default` value will be used or `telegraf` if unspecified
7575

7676
#### measurement
7777

plugins/outputs/kinesis/kinesis.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ type (
3636
}
3737

3838
Partition struct {
39-
Method string `toml:"method"`
40-
Key string `toml:"key"`
39+
Method string `toml:"method"`
40+
Key string `toml:"key"`
41+
Default string `toml:"default"`
4142
}
4243
)
4344

@@ -90,10 +91,11 @@ var sampleConfig = `
9091
# method = "measurement"
9192
#
9293
## Use the value of a tag for all writes, if the tag is not set the empty
93-
## string will be used:
94+
## default option will be used. When no default, defaults to "telegraf"
9495
# [outputs.kinesis.partition]
9596
# method = "tag"
9697
# key = "host"
98+
# default = "mykey"
9799
98100
99101
## Data format to output.
@@ -187,10 +189,13 @@ func (k *KinesisOutput) getPartitionKey(metric telegraf.Metric) string {
187189
case "measurement":
188190
return metric.Name()
189191
case "tag":
190-
if metric.HasTag(k.Partition.Key) {
191-
return metric.Tags()[k.Partition.Key]
192+
if t, ok := metric.GetTag(k.Partition.Key); ok {
193+
return t
194+
} else if len(k.Partition.Default) > 0 {
195+
return k.Partition.Default
192196
}
193-
log.Printf("E! kinesis : You have configured a Partition using tag %+v which does not exist.", k.Partition.Key)
197+
// Default partition name if default is not set
198+
return "telegraf"
194199
default:
195200
log.Printf("E! kinesis : You have configured a Partition method of %+v which is not supported", k.Partition.Method)
196201
}

plugins/outputs/kinesis/kinesis_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,22 @@ func TestPartitionKey(t *testing.T) {
2929
}
3030
assert.Equal(testPoint.Tags()["tag1"], k.getPartitionKey(testPoint), "PartitionKey should be value of 'tag1'")
3131

32+
k = KinesisOutput{
33+
Partition: &Partition{
34+
Method: "tag",
35+
Key: "doesnotexist",
36+
Default: "somedefault",
37+
},
38+
}
39+
assert.Equal("somedefault", k.getPartitionKey(testPoint), "PartitionKey should use default")
40+
3241
k = KinesisOutput{
3342
Partition: &Partition{
3443
Method: "tag",
3544
Key: "doesnotexist",
3645
},
3746
}
38-
assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''")
47+
assert.Equal("telegraf", k.getPartitionKey(testPoint), "PartitionKey should be telegraf")
3948

4049
k = KinesisOutput{
4150
Partition: &Partition{

0 commit comments

Comments
 (0)