Skip to content

Commit c694ebd

Browse files
authored
fix(destination): fix workload subscribers metric (#14683)
The destination controller has a gauge metric called workload_subscribers which tracks the number of subscribers to an endpoint profile. However, since we have dropped all labels from this metric due to high cardinality, we are now re-using the same gauge for all ip:port publishers. This means that each time we set the gauge value, it overrides the previous value and only tracks the number of subscribers to the most recently subscribed ip:port combination. We move this logic up so that the workload_subscribers gauge instead tracks the total number of subscribers across all ip:port publishers.
1 parent b476274 commit c694ebd

File tree

1 file changed

+25
-10
lines changed

1 file changed

+25
-10
lines changed

controller/api/destination/watcher/workload_watcher.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"net"
77
"strings"
88
"sync"
9+
"sync/atomic"
910
"time"
1011

1112
ext "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
@@ -32,6 +33,8 @@ type (
3233
k8sAPI *k8s.API
3334
metadataAPI *k8s.MetadataAPI
3435
publishers map[IPPort]*workloadPublisher
36+
metrics metrics
37+
subscriberCount atomic.Int32
3538
log *logging.Entry
3639
enableEndpointSlices bool
3740

@@ -49,6 +52,7 @@ type (
4952
addr Address
5053
listeners []WorkloadUpdateListener
5154
metrics metrics
55+
subscriberCount *atomic.Int32
5256
log *logging.Entry
5357

5458
mu sync.RWMutex
@@ -63,18 +67,26 @@ type (
6367
var workloadVecs = newMetricsVecs("workload", []string{})
6468

6569
func NewWorkloadWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool, defaultOpaquePorts map[uint32]struct{}) (*WorkloadWatcher, error) {
70+
// Omit high-cardinality IP:port labels.
71+
metrics, err := workloadVecs.newMetrics(prometheus.Labels{})
72+
if err != nil {
73+
return nil, err
74+
}
75+
6676
ww := &WorkloadWatcher{
6777
defaultOpaquePorts: defaultOpaquePorts,
6878
k8sAPI: k8sAPI,
6979
metadataAPI: metadataAPI,
7080
publishers: make(map[IPPort]*workloadPublisher),
81+
metrics: metrics,
82+
subscriberCount: atomic.Int32{},
7183
log: log.WithFields(logging.Fields{
7284
"component": "workload-watcher",
7385
}),
7486
enableEndpointSlices: enableEndpointSlices,
7587
}
7688

77-
_, err := k8sAPI.Pod().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
89+
_, err = k8sAPI.Pod().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
7890
AddFunc: ww.addPod,
7991
DeleteFunc: ww.deletePod,
8092
UpdateFunc: ww.updatePod,
@@ -125,6 +137,8 @@ func (ww *WorkloadWatcher) Subscribe(service *ServiceID, hostname, ip string, po
125137
return "", err
126138
}
127139

140+
ww.updateSubscriberCount()
141+
128142
return wp.addr.IP, nil
129143
}
130144

@@ -145,6 +159,12 @@ func (ww *WorkloadWatcher) Unsubscribe(ip string, port Port, listener WorkloadUp
145159
if len(wp.listeners) == 0 {
146160
delete(ww.publishers, IPPort{wp.addr.IP, wp.addr.Port})
147161
}
162+
163+
ww.updateSubscriberCount()
164+
}
165+
166+
func (ww *WorkloadWatcher) updateSubscriberCount() {
167+
ww.metrics.setSubscribers(int(ww.subscriberCount.Load()))
148168
}
149169

150170
// addPod is an event handler so it cannot block
@@ -454,11 +474,6 @@ func (ww *WorkloadWatcher) getOrNewWorkloadPublisher(service *ServiceID, hostnam
454474
ipPort := IPPort{ip, port}
455475
wp, ok := ww.publishers[ipPort]
456476
if !ok {
457-
// Omit high-cardinality IP:port labels.
458-
metrics, err := workloadVecs.newMetrics(prometheus.Labels{})
459-
if err != nil {
460-
return nil, err
461-
}
462477
wp = &workloadPublisher{
463478
defaultOpaquePorts: ww.defaultOpaquePorts,
464479
k8sAPI: ww.k8sAPI,
@@ -467,7 +482,8 @@ func (ww *WorkloadWatcher) getOrNewWorkloadPublisher(service *ServiceID, hostnam
467482
IP: ip,
468483
Port: port,
469484
},
470-
metrics: metrics,
485+
metrics: ww.metrics,
486+
subscriberCount: &ww.subscriberCount,
471487
log: ww.log.WithFields(logging.Fields{
472488
"component": "workload-publisher",
473489
"ip": ip,
@@ -630,12 +646,12 @@ func (wp *workloadPublisher) subscribe(listener WorkloadUpdateListener) error {
630646
defer wp.mu.Unlock()
631647

632648
wp.listeners = append(wp.listeners, listener)
633-
wp.metrics.setSubscribers(len(wp.listeners))
634649

635650
if err := listener.Update(&wp.addr); err != nil {
636651
return fmt.Errorf("failed to send initial update: %w", err)
637652
}
638653
wp.metrics.incUpdates()
654+
wp.subscriberCount.Add(1)
639655
return nil
640656
}
641657

@@ -649,11 +665,10 @@ func (wp *workloadPublisher) unsubscribe(listener WorkloadUpdateListener) {
649665
wp.listeners[i] = wp.listeners[n-1]
650666
wp.listeners[n-1] = nil
651667
wp.listeners = wp.listeners[:n-1]
668+
wp.subscriberCount.Add(-1)
652669
break
653670
}
654671
}
655-
656-
wp.metrics.setSubscribers(len(wp.listeners))
657672
}
658673

659674
// updatePod creates an Address instance for the given pod, that is passed to

0 commit comments

Comments
 (0)