11package kinesis
22
33import (
4- "fmt"
54 "log"
65 "os"
76 "sync/atomic"
@@ -13,6 +12,7 @@ import (
1312 "github.com/influxdata/telegraf"
1413 internalaws "github.com/influxdata/telegraf/internal/config/aws"
1514 "github.com/influxdata/telegraf/plugins/outputs"
15+ "github.com/influxdata/telegraf/plugins/serializers"
1616)
1717
1818type KinesisOutput struct {
@@ -26,9 +26,10 @@ type KinesisOutput struct {
2626
2727 StreamName string `toml:"streamname"`
2828 PartitionKey string `toml:"partitionkey"`
29- Format string `toml:"format"`
3029 Debug bool `toml:"debug"`
3130 svc * kinesis.Kinesis
31+
32+ serializer serializers.Serializer
3233}
3334
3435var sampleConfig = `
@@ -54,9 +55,13 @@ var sampleConfig = `
5455 streamname = "StreamName"
5556 ## PartitionKey as used for sharding data.
5657 partitionkey = "PartitionKey"
57- ## format of the Data payload in the kinesis PutRecord, supported
58- ## String and Custom.
59- format = "string"
58+
59+ ## Data format to output.
60+ ## Each data format has it's own unique set of configuration options, read
61+ ## more about them here:
62+ ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
63+ data_format = "influx"
64+
6065 ## debug will show upstream aws messages.
6166 debug = false
6267`
@@ -125,16 +130,8 @@ func (k *KinesisOutput) Close() error {
125130 return nil
126131}
127132
128- func FormatMetric (k * KinesisOutput , point telegraf.Metric ) (string , error ) {
129- if k .Format == "string" {
130- return point .String (), nil
131- } else {
132- m := fmt .Sprintf ("%+v,%+v,%+v" ,
133- point .Name (),
134- point .Tags (),
135- point .String ())
136- return m , nil
137- }
133+ func (k * KinesisOutput ) SetSerializer (serializer serializers.Serializer ) {
134+ k .serializer = serializer
138135}
139136
140137func writekinesis (k * KinesisOutput , r []* kinesis.PutRecordsRequestEntry ) time.Duration {
@@ -169,23 +166,30 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
169166
170167 r := []* kinesis.PutRecordsRequestEntry {}
171168
172- for _ , p := range metrics {
169+ for _ , metric := range metrics {
173170 atomic .AddUint32 (& sz , 1 )
174171
175- metric , _ := FormatMetric (k , p )
176- d := kinesis.PutRecordsRequestEntry {
177- Data : []byte (metric ),
178- PartitionKey : aws .String (k .PartitionKey ),
172+ values , err := k .serializer .Serialize (metric )
173+ if err != nil {
174+ return err
179175 }
180- r = append (r , & d )
181-
182- if sz == 500 {
183- // Max Messages Per PutRecordRequest is 500
184- elapsed := writekinesis (k , r )
185- log .Printf ("E! Wrote a %+v point batch to Kinesis in %+v.\n " , sz , elapsed )
186- atomic .StoreUint32 (& sz , 0 )
187- r = nil
176+
177+ for _ , metric := range values {
178+ d := kinesis.PutRecordsRequestEntry {
179+ Data : []byte (metric ),
180+ PartitionKey : aws .String (k .PartitionKey ),
181+ }
182+ r = append (r , & d )
183+
184+ if sz == 500 {
185+ // Max Messages Per PutRecordRequest is 500
186+ elapsed := writekinesis (k , r )
187+ log .Printf ("E! Wrote a %+v point batch to Kinesis in %+v.\n " , sz , elapsed )
188+ atomic .StoreUint32 (& sz , 0 )
189+ r = nil
190+ }
188191 }
192+
189193 }
190194
191195 writekinesis (k , r )
0 commit comments