Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ docker run --name logspout \

The routes can be updated on a running container by using the **logspout** [Route API](https://github.com/gliderlabs/logspout/tree/master/routesapi) and specifying the route `id` "cat" or "dog".

## TLS Authentication Support
Some brokers require TLS certificate-based authentication. This adapter supports this if you provide both a certificate and a private key file in the container.

To use this, mount certificate and private key files (in PEM format) into the container, then set the *TLS_CERT_FILE* and *TLS_PRIVKEY_FILE* environment variables to point to their location on the disk **inside the container**, like so:

```
docker run --name logspout \
-p "8000:8000" \
--volume /logspout/routes:/mnt/routes \
--volume /var/run/docker.sock:/tmp/docker.sock \
--volume /logspout/tls:/mnt/tls \
-e "TLS_CERTFILE=/mnt/tls/cert.pem" \
-e "TLS_PRIVKEY_FILE=/mnt/tls/priv.key"
```
## build

**logspout-kafka** is a custom **logspout** module. To use it, create an empty `Dockerfile` based on `gliderlabs/logspout` and include this **logspout-kafka** module in a new `modules.go` file.
Expand Down
56 changes: 55 additions & 1 deletion kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"text/template"
"time"
"crypto/tls"
"io/ioutil"

"github.com/gliderlabs/logspout/router"
"gopkg.in/Shopify/sarama.v1"
Expand Down Expand Up @@ -38,6 +40,10 @@ func NewKafkaAdapter(route *router.Route) (router.LogAdapter, error) {
}

var err error

cert_file := os.Getenv("TLS_CERT_FILE")
key_file := os.Getenv("TLS_PRIVKEY_FILE")

var tmpl *template.Template
if text := os.Getenv("KAFKA_TEMPLATE"); text != "" {
tmpl, err = template.New("kafka").Parse(text)
Expand All @@ -55,9 +61,57 @@ func NewKafkaAdapter(route *router.Route) (router.LogAdapter, error) {
if err != nil {
retries = 3
}

var producer sarama.AsyncProducer

if os.Getenv("DEBUG") != "" {
log.Println("Generating Kafka configuration.")
}
config := newConfig()

if (cert_file != "") && (key_file != "") {
if os.Getenv("DEBUG") != "" {
log.Println("Enabling Kafka TLS support.")
}

certfile, err := os.Open(cert_file)
if err != nil {
return nil, errorf("Couldn't open TLS certificate file: %s", err)
}

keyfile, err := os.Open(key_file)
if err != nil {
return nil, errorf("Couldn't open TLS private key file: %s", err)
}

tls_cert, err := ioutil.ReadAll(certfile)
if err != nil {
return nil, errorf("Couldn't read TLS certificate: %s", err)
}

tls_privkey, err := ioutil.ReadAll(keyfile)
if err != nil {
return nil, errorf("Couldn't read TLS private key: %s", err)
}

keypair, err := tls.X509KeyPair([]byte(tls_cert), []byte(tls_privkey))
if err != nil {
return nil, errorf("Couldn't establish TLS authentication keypair. Check TLS_CERT and TLS_PRIVKEY environment vars.")
}

tls_configuration := &tls.Config{
Certificates: []tls.Certificate{keypair},
InsecureSkipVerify: false,
}

config.Net.TLS.Config = tls_configuration
config.Net.TLS.Enable = true
}

for i := 0; i < retries; i++ {
producer, err = sarama.NewAsyncProducer(brokers, newConfig())

producer, err = sarama.NewAsyncProducer(brokers, config)

if err != nil {
if os.Getenv("DEBUG") != "" {
log.Println("Couldn't create Kafka producer. Retrying...", err)
Expand Down