@@ -4,7 +4,10 @@ import (
44 "errors"
55 "fmt"
66 "io/ioutil"
7+ "log"
8+ "net"
79 "net/http"
10+ "net/url"
811 "sync"
912 "time"
1013
@@ -16,8 +19,12 @@ import (
1619const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3`
1720
1821type Prometheus struct {
22+ // An array of urls to scrape metrics from.
1923 Urls []string
2024
25+ // An array of Kubernetes services to scrape metrics from.
26+ KubernetesServices []string
27+
2128 // Bearer Token authorization file path
2229 BearerToken string `toml:"bearer_token"`
2330
@@ -39,6 +46,9 @@ var sampleConfig = `
3946 ## An array of urls to scrape metrics from.
4047 urls = ["http://localhost:9100/metrics"]
4148
49+ ## An array of Kubernetes services to scrape metrics from.
50+ kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]
51+
4252 ## Use bearer token for authorization
4353 # bearer_token = /path/to/bearer/token
4454
@@ -63,6 +73,53 @@ func (p *Prometheus) Description() string {
6373
6474var ErrProtocolError = errors .New ("prometheus protocol error" )
6575
76+ func (p * Prometheus ) AddressToURL (u * url.URL , address string ) string {
77+ host := address
78+ if u .Port () != "" {
79+ host = address + ":" + u .Port ()
80+ }
81+ reconstructedUrl := url.URL {
82+ Scheme : u .Scheme ,
83+ Opaque : u .Opaque ,
84+ User : u .User ,
85+ Path : u .Path ,
86+ RawPath : u .RawPath ,
87+ ForceQuery : u .ForceQuery ,
88+ RawQuery : u .RawQuery ,
89+ Fragment : u .Fragment ,
90+ Host : host ,
91+ }
92+ return reconstructedUrl .String ()
93+ }
94+
95+ type UrlAndAddress struct {
96+ Url string
97+ Address string
98+ }
99+
100+ func (p * Prometheus ) GetAllURLs () ([]UrlAndAddress , error ) {
101+ allUrls := make ([]UrlAndAddress , 0 )
102+ for _ , url := range p .Urls {
103+ allUrls = append (allUrls , UrlAndAddress {Url : url })
104+ }
105+ for _ , service := range p .KubernetesServices {
106+ u , err := url .Parse (service )
107+ if err != nil {
108+ return nil , err
109+ }
110+ resolvedAddresses , err := net .LookupHost (u .Hostname ())
111+ if err != nil {
112+ log .Printf ("prometheus: Could not resolve %s, skipping it. Error: %s" , u .Host , err )
113+ continue
114+ }
115+ for _ , resolved := range resolvedAddresses {
116+ serviceUrl := p .AddressToURL (u , resolved )
117+ allUrls = append (allUrls , UrlAndAddress {Url : serviceUrl , Address : resolved })
118+ }
119+ }
120+ return allUrls , nil
121+ }
122+
66123// Reads stats from all configured servers accumulates stats.
67124// Returns one of the errors encountered while gather stats (if any).
68125func (p * Prometheus ) Gather (acc telegraf.Accumulator ) error {
@@ -76,12 +133,16 @@ func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
76133
77134 var wg sync.WaitGroup
78135
79- for _ , serv := range p .Urls {
136+ allUrls , err := p .GetAllURLs ()
137+ if err != nil {
138+ return err
139+ }
140+ for _ , url := range allUrls {
80141 wg .Add (1 )
81- go func (serv string ) {
142+ go func (serviceUrl UrlAndAddress ) {
82143 defer wg .Done ()
83- acc .AddError (p .gatherURL (serv , acc ))
84- }(serv )
144+ acc .AddError (p .gatherURL (serviceUrl , acc ))
145+ }(url )
85146 }
86147
87148 wg .Wait ()
@@ -116,8 +177,8 @@ func (p *Prometheus) createHttpClient() (*http.Client, error) {
116177 return client , nil
117178}
118179
119- func (p * Prometheus ) gatherURL (url string , acc telegraf.Accumulator ) error {
120- var req , err = http .NewRequest ("GET" , url , nil )
180+ func (p * Prometheus ) gatherURL (url UrlAndAddress , acc telegraf.Accumulator ) error {
181+ var req , err = http .NewRequest ("GET" , url . Url , nil )
121182 req .Header .Add ("Accept" , acceptHeader )
122183 var token []byte
123184 var resp * http.Response
@@ -132,11 +193,11 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
132193
133194 resp , err = p .client .Do (req )
134195 if err != nil {
135- return fmt .Errorf ("error making HTTP request to %s: %s" , url , err )
196+ return fmt .Errorf ("error making HTTP request to %s: %s" , url . Url , err )
136197 }
137198 defer resp .Body .Close ()
138199 if resp .StatusCode != http .StatusOK {
139- return fmt .Errorf ("%s returned HTTP status %s" , url , resp .Status )
200+ return fmt .Errorf ("%s returned HTTP status %s" , url . Url , resp .Status )
140201 }
141202
142203 body , err := ioutil .ReadAll (resp .Body )
@@ -147,12 +208,15 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
147208 metrics , err := Parse (body , resp .Header )
148209 if err != nil {
149210 return fmt .Errorf ("error reading metrics for %s: %s" ,
150- url , err )
211+ url . Url , err )
151212 }
152213 // Add (or not) collected metrics
153214 for _ , metric := range metrics {
154215 tags := metric .Tags ()
155- tags ["url" ] = url
216+ tags ["url" ] = url .Url
217+ if url .Address != "" {
218+ tags ["address" ] = url .Address
219+ }
156220 acc .AddFields (metric .Name (), metric .Fields (), tags , metric .Time ())
157221 }
158222
0 commit comments