Skip to content

Commit f4032fc

Browse files
danielnelsonglinton
authored andcommitted
Add support for lz4 compression to kafka output (#4492)
1 parent 943dcc0 commit f4032fc

File tree

2 files changed

+24
-2
lines changed

2 files changed

+24
-2
lines changed

plugins/outputs/kafka/README.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,23 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
1010
## Kafka topic for producer messages
1111
topic = "telegraf"
1212

13-
## Optional client id
13+
## Optional Client id
1414
# client_id = "Telegraf"
1515

16+
## Set the minimal supported Kafka version. Setting this enables the use of new
17+
## Kafka features and APIs. Of particular interested, lz4 compression
18+
## requires at least version 0.10.0.0.
19+
## ex: version = "1.1.0"
20+
# version = ""
21+
1622
## Optional topic suffix configuration.
1723
## If the section is omitted, no suffix is used.
1824
## Following topic suffix methods are supported:
1925
## measurement - suffix equals to separator + measurement's name
2026
## tags - suffix equals to separator + specified tags' values
2127
## interleaved with separator
2228

23-
## Suffix equals to "_" + measurement's name
29+
## Suffix equals to "_" + measurement name
2430
# [outputs.kafka.topic_suffix]
2531
# method = "measurement"
2632
# separator = "_"

plugins/outputs/kafka/kafka.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ type (
3838
// MaxRetry Tag
3939
MaxRetry int
4040

41+
Version string `toml:"version"`
42+
4143
// Legacy TLS config options
4244
// TLS client certificate
4345
Certificate string
@@ -74,6 +76,12 @@ var sampleConfig = `
7476
## Optional Client id
7577
# client_id = "Telegraf"
7678
79+
## Set the minimal supported Kafka version. Setting this enables the use of new
80+
## Kafka features and APIs. Of particular interested, lz4 compression
81+
## requires at least version 0.10.0.0.
82+
## ex: version = "1.1.0"
83+
# version = ""
84+
7785
## Optional topic suffix configuration.
7886
## If the section is omitted, no suffix is used.
7987
## Following topic suffix methods are supported:
@@ -191,6 +199,14 @@ func (k *Kafka) Connect() error {
191199
}
192200
config := sarama.NewConfig()
193201

202+
if k.Version != "" {
203+
version, err := sarama.ParseKafkaVersion(k.Version)
204+
if err != nil {
205+
return err
206+
}
207+
config.Version = version
208+
}
209+
194210
if k.ClientID != "" {
195211
config.ClientID = k.ClientID
196212
} else {

0 commit comments

Comments
 (0)