11package kinesis
22
33import (
4- "fmt"
54 "log"
65 "os"
7- "sync/atomic"
86 "time"
97
108 "github.com/aws/aws-sdk-go/aws"
@@ -13,6 +11,7 @@ import (
1311 "github.com/influxdata/telegraf"
1412 internalaws "github.com/influxdata/telegraf/internal/config/aws"
1513 "github.com/influxdata/telegraf/plugins/outputs"
14+ "github.com/influxdata/telegraf/plugins/serializers"
1615)
1716
1817type KinesisOutput struct {
@@ -26,9 +25,10 @@ type KinesisOutput struct {
2625
2726 StreamName string `toml:"streamname"`
2827 PartitionKey string `toml:"partitionkey"`
29- Format string `toml:"format"`
3028 Debug bool `toml:"debug"`
3129 svc * kinesis.Kinesis
30+
31+ serializer serializers.Serializer
3232}
3333
3434var sampleConfig = `
@@ -54,9 +54,13 @@ var sampleConfig = `
5454 streamname = "StreamName"
5555 ## PartitionKey as used for sharding data.
5656 partitionkey = "PartitionKey"
57- ## format of the Data payload in the kinesis PutRecord, supported
58- ## String and Custom.
59- format = "string"
57+
58+ ## Data format to output.
59+ ## Each data format has it's own unique set of configuration options, read
60+ ## more about them here:
61+ ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
62+ data_format = "influx"
63+
6064 ## debug will show upstream aws messages.
6165 debug = false
6266`
@@ -125,16 +129,8 @@ func (k *KinesisOutput) Close() error {
125129 return nil
126130}
127131
128- func FormatMetric (k * KinesisOutput , point telegraf.Metric ) (string , error ) {
129- if k .Format == "string" {
130- return point .String (), nil
131- } else {
132- m := fmt .Sprintf ("%+v,%+v,%+v" ,
133- point .Name (),
134- point .Tags (),
135- point .String ())
136- return m , nil
137- }
132+ func (k * KinesisOutput ) SetSerializer (serializer serializers.Serializer ) {
133+ k .serializer = serializer
138134}
139135
140136func writekinesis (k * KinesisOutput , r []* kinesis.PutRecordsRequestEntry ) time.Duration {
@@ -161,31 +157,37 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du
161157}
162158
163159func (k * KinesisOutput ) Write (metrics []telegraf.Metric ) error {
164- var sz uint32 = 0
160+ var sz uint32
165161
166162 if len (metrics ) == 0 {
167163 return nil
168164 }
169165
170166 r := []* kinesis.PutRecordsRequestEntry {}
171167
172- for _ , p := range metrics {
173- atomic .AddUint32 (& sz , 1 )
168+ for _ , metric := range metrics {
169+ sz ++
170+
171+ values , err := k .serializer .Serialize (metric )
172+ if err != nil {
173+ return err
174+ }
174175
175- metric , _ := FormatMetric (k , p )
176176 d := kinesis.PutRecordsRequestEntry {
177- Data : [] byte ( metric ) ,
177+ Data : values ,
178178 PartitionKey : aws .String (k .PartitionKey ),
179179 }
180+
180181 r = append (r , & d )
181182
182183 if sz == 500 {
183184 // Max Messages Per PutRecordRequest is 500
184185 elapsed := writekinesis (k , r )
185186 log .Printf ("E! Wrote a %+v point batch to Kinesis in %+v.\n " , sz , elapsed )
186- atomic . StoreUint32 ( & sz , 0 )
187+ sz = 0
187188 r = nil
188189 }
190+
189191 }
190192
191193 writekinesis (k , r )
0 commit comments