Skip to content

Commit 1e56c02

Browse files
dupondjebitcharmer
authored andcommitted
Fix master check and move cluster health indices to separate measurement (influxdata#6004)
1 parent b0b47c1 commit 1e56c02

File tree

3 files changed

+556
-74
lines changed

3 files changed

+556
-74
lines changed

plugins/inputs/elasticsearch/elasticsearch.go

Lines changed: 99 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -40,31 +40,32 @@ type nodeStat struct {
4040
}
4141

4242
type clusterHealth struct {
43-
ClusterName string `json:"cluster_name"`
44-
Status string `json:"status"`
45-
TimedOut bool `json:"timed_out"`
46-
NumberOfNodes int `json:"number_of_nodes"`
47-
NumberOfDataNodes int `json:"number_of_data_nodes"`
4843
ActivePrimaryShards int `json:"active_primary_shards"`
4944
ActiveShards int `json:"active_shards"`
50-
RelocatingShards int `json:"relocating_shards"`
51-
InitializingShards int `json:"initializing_shards"`
52-
UnassignedShards int `json:"unassigned_shards"`
45+
ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"`
46+
ClusterName string `json:"cluster_name"`
5347
DelayedUnassignedShards int `json:"delayed_unassigned_shards"`
48+
InitializingShards int `json:"initializing_shards"`
49+
NumberOfDataNodes int `json:"number_of_data_nodes"`
50+
NumberOfInFlightFetch int `json:"number_of_in_flight_fetch"`
51+
NumberOfNodes int `json:"number_of_nodes"`
5452
NumberOfPendingTasks int `json:"number_of_pending_tasks"`
53+
RelocatingShards int `json:"relocating_shards"`
54+
Status string `json:"status"`
5555
TaskMaxWaitingInQueueMillis int `json:"task_max_waiting_in_queue_millis"`
56-
ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"`
56+
TimedOut bool `json:"timed_out"`
57+
UnassignedShards int `json:"unassigned_shards"`
5758
Indices map[string]indexHealth `json:"indices"`
5859
}
5960

6061
type indexHealth struct {
61-
Status string `json:"status"`
62-
NumberOfShards int `json:"number_of_shards"`
63-
NumberOfReplicas int `json:"number_of_replicas"`
6462
ActivePrimaryShards int `json:"active_primary_shards"`
6563
ActiveShards int `json:"active_shards"`
66-
RelocatingShards int `json:"relocating_shards"`
6764
InitializingShards int `json:"initializing_shards"`
65+
NumberOfReplicas int `json:"number_of_replicas"`
66+
NumberOfShards int `json:"number_of_shards"`
67+
RelocatingShards int `json:"relocating_shards"`
68+
Status string `json:"status"`
6869
UnassignedShards int `json:"unassigned_shards"`
6970
}
7071

@@ -137,9 +138,17 @@ type Elasticsearch struct {
137138
NodeStats []string
138139
tls.ClientConfig
139140

140-
client *http.Client
141-
catMasterResponseTokens []string
142-
isMaster bool
141+
client *http.Client
142+
serverInfo map[string]serverInfo
143+
serverInfoMutex sync.Mutex
144+
}
145+
type serverInfo struct {
146+
nodeID string
147+
masterID string
148+
}
149+
150+
func (i serverInfo) isMaster() bool {
151+
return i.nodeID == i.masterID
143152
}
144153

145154
// NewElasticsearch return a new instance of Elasticsearch
@@ -186,25 +195,49 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
186195
e.client = client
187196
}
188197

189-
var wg sync.WaitGroup
190-
wg.Add(len(e.Servers))
198+
if e.ClusterStats {
199+
var wgC sync.WaitGroup
200+
wgC.Add(len(e.Servers))
191201

192-
for _, serv := range e.Servers {
193-
go func(s string, acc telegraf.Accumulator) {
194-
defer wg.Done()
195-
url := e.nodeStatsUrl(s)
196-
e.isMaster = false
202+
e.serverInfo = make(map[string]serverInfo)
203+
for _, serv := range e.Servers {
204+
go func(s string, acc telegraf.Accumulator) {
205+
defer wgC.Done()
206+
info := serverInfo{}
207+
208+
var err error
209+
210+
// Gather node ID
211+
if info.nodeID, err = e.gatherNodeID(s + "/_nodes/_local/name"); err != nil {
212+
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
213+
return
214+
}
197215

198-
if e.ClusterStats {
199216
// get cat/master information here so NodeStats can determine
200217
// whether this node is the Master
201-
if err := e.setCatMaster(s + "/_cat/master"); err != nil {
218+
if info.masterID, err = e.getCatMaster(s + "/_cat/master"); err != nil {
202219
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
203220
return
204221
}
205-
}
206222

207-
// Always gather node states
223+
e.serverInfoMutex.Lock()
224+
e.serverInfo[s] = info
225+
e.serverInfoMutex.Unlock()
226+
227+
}(serv, acc)
228+
}
229+
wgC.Wait()
230+
}
231+
232+
var wg sync.WaitGroup
233+
wg.Add(len(e.Servers))
234+
235+
for _, serv := range e.Servers {
236+
go func(s string, acc telegraf.Accumulator) {
237+
defer wg.Done()
238+
url := e.nodeStatsUrl(s)
239+
240+
// Always gather node stats
208241
if err := e.gatherNodeStats(url, acc); err != nil {
209242
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
210243
return
@@ -221,7 +254,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
221254
}
222255
}
223256

224-
if e.ClusterStats && (e.isMaster || !e.ClusterStatsOnlyFromMaster || !e.Local) {
257+
if e.ClusterStats && (e.serverInfo[s].isMaster() || !e.ClusterStatsOnlyFromMaster || !e.Local) {
225258
if err := e.gatherClusterStats(s+"/_cluster/stats", acc); err != nil {
226259
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
227260
return
@@ -267,6 +300,22 @@ func (e *Elasticsearch) nodeStatsUrl(baseUrl string) string {
267300
return fmt.Sprintf("%s/%s", url, strings.Join(e.NodeStats, ","))
268301
}
269302

303+
func (e *Elasticsearch) gatherNodeID(url string) (string, error) {
304+
nodeStats := &struct {
305+
ClusterName string `json:"cluster_name"`
306+
Nodes map[string]*nodeStat `json:"nodes"`
307+
}{}
308+
if err := e.gatherJsonData(url, nodeStats); err != nil {
309+
return "", err
310+
}
311+
312+
// Only 1 should be returned
313+
for id := range nodeStats.Nodes {
314+
return id, nil
315+
}
316+
return "", nil
317+
}
318+
270319
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
271320
nodeStats := &struct {
272321
ClusterName string `json:"cluster_name"`
@@ -284,11 +333,6 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
284333
"cluster_name": nodeStats.ClusterName,
285334
}
286335

287-
if e.ClusterStats {
288-
// check for master
289-
e.isMaster = (id == e.catMasterResponseTokens[0])
290-
}
291-
292336
for k, v := range n.Attributes {
293337
tags["node_attribute_"+k] = v
294338
}
@@ -331,20 +375,21 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator
331375
}
332376
measurementTime := time.Now()
333377
clusterFields := map[string]interface{}{
334-
"status": healthStats.Status,
335-
"status_code": mapHealthStatusToCode(healthStats.Status),
336-
"timed_out": healthStats.TimedOut,
337-
"number_of_nodes": healthStats.NumberOfNodes,
338-
"number_of_data_nodes": healthStats.NumberOfDataNodes,
339378
"active_primary_shards": healthStats.ActivePrimaryShards,
340379
"active_shards": healthStats.ActiveShards,
341-
"relocating_shards": healthStats.RelocatingShards,
342-
"initializing_shards": healthStats.InitializingShards,
343-
"unassigned_shards": healthStats.UnassignedShards,
380+
"active_shards_percent_as_number": healthStats.ActiveShardsPercentAsNumber,
344381
"delayed_unassigned_shards": healthStats.DelayedUnassignedShards,
382+
"initializing_shards": healthStats.InitializingShards,
383+
"number_of_data_nodes": healthStats.NumberOfDataNodes,
384+
"number_of_in_flight_fetch": healthStats.NumberOfInFlightFetch,
385+
"number_of_nodes": healthStats.NumberOfNodes,
345386
"number_of_pending_tasks": healthStats.NumberOfPendingTasks,
387+
"relocating_shards": healthStats.RelocatingShards,
388+
"status": healthStats.Status,
389+
"status_code": mapHealthStatusToCode(healthStats.Status),
346390
"task_max_waiting_in_queue_millis": healthStats.TaskMaxWaitingInQueueMillis,
347-
"active_shards_percent_as_number": healthStats.ActiveShardsPercentAsNumber,
391+
"timed_out": healthStats.TimedOut,
392+
"unassigned_shards": healthStats.UnassignedShards,
348393
}
349394
acc.AddFields(
350395
"elasticsearch_cluster_health",
@@ -355,18 +400,18 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator
355400

356401
for name, health := range healthStats.Indices {
357402
indexFields := map[string]interface{}{
358-
"status": health.Status,
359-
"status_code": mapHealthStatusToCode(health.Status),
360-
"number_of_shards": health.NumberOfShards,
361-
"number_of_replicas": health.NumberOfReplicas,
362403
"active_primary_shards": health.ActivePrimaryShards,
363404
"active_shards": health.ActiveShards,
364-
"relocating_shards": health.RelocatingShards,
365405
"initializing_shards": health.InitializingShards,
406+
"number_of_replicas": health.NumberOfReplicas,
407+
"number_of_shards": health.NumberOfShards,
408+
"relocating_shards": health.RelocatingShards,
409+
"status": health.Status,
410+
"status_code": mapHealthStatusToCode(health.Status),
366411
"unassigned_shards": health.UnassignedShards,
367412
}
368413
acc.AddFields(
369-
"elasticsearch_indices",
414+
"elasticsearch_cluster_health_indices",
370415
indexFields,
371416
map[string]string{"index": name, "name": healthStats.ClusterName},
372417
measurementTime,
@@ -405,27 +450,27 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator)
405450
return nil
406451
}
407452

408-
func (e *Elasticsearch) setCatMaster(url string) error {
453+
func (e *Elasticsearch) getCatMaster(url string) (string, error) {
409454
r, err := e.client.Get(url)
410455
if err != nil {
411-
return err
456+
return "", err
412457
}
413458
defer r.Body.Close()
414459
if r.StatusCode != http.StatusOK {
415460
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
416461
// to let the underlying transport close the connection and re-establish a new one for
417462
// future calls.
418-
return fmt.Errorf("elasticsearch: Unable to retrieve master node information. API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
463+
return "", fmt.Errorf("elasticsearch: Unable to retrieve master node information. API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
419464
}
420465
response, err := ioutil.ReadAll(r.Body)
421466

422467
if err != nil {
423-
return err
468+
return "", err
424469
}
425470

426-
e.catMasterResponseTokens = strings.Split(string(response), " ")
471+
masterID := strings.Split(string(response), " ")[0]
427472

428-
return nil
473+
return masterID, nil
429474
}
430475

431476
func (e *Elasticsearch) gatherJsonData(url string, v interface{}) error {

0 commit comments

Comments
 (0)