@@ -6,21 +6,27 @@ import (
66 "time"
77
88 "github.com/argoproj/gitops-engine/pkg/cache"
9-
109 "github.com/prometheus/client_golang/prometheus"
10+ log "github.com/sirupsen/logrus"
11+
12+ argoappv1 "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
13+ metricsutil "github.com/argoproj/argo-cd/v3/util/metrics"
1114)
1215
1316const (
1417 metricsCollectionInterval = 30 * time .Second
18+ metricsCollectionTimeout = 10 * time .Second
1519)
1620
1721var (
1822 descClusterDefaultLabels = []string {"server" }
1923
24+ descClusterLabels * prometheus.Desc
25+
2026 descClusterInfo = prometheus .NewDesc (
2127 "argocd_cluster_info" ,
2228 "Information about cluster." ,
23- append (descClusterDefaultLabels , "k8s_version" ),
29+ append (descClusterDefaultLabels , "k8s_version" , "name" ),
2430 nil ,
2531 )
2632 descClusterCacheResources = prometheus .NewDesc (
@@ -53,26 +59,99 @@ type HasClustersInfo interface {
5359 GetClustersInfo () []cache.ClusterInfo
5460}
5561
62+ type ClusterLister func (ctx context.Context ) (* argoappv1.ClusterList , error )
63+
5664type clusterCollector struct {
57- infoSource HasClustersInfo
58- info []cache.ClusterInfo
59- lock sync.Mutex
65+ infoSource HasClustersInfo
66+ lock sync.RWMutex
67+ clusterLabels []string
68+ clusterLister ClusterLister
69+
70+ latestInfo []* clusterData
71+ }
72+
73+ type clusterData struct {
74+ info * cache.ClusterInfo
75+ cluster * argoappv1.Cluster
6076}
6177
62- func (c * clusterCollector ) Run (ctx context.Context ) {
78+ func NewClusterCollector (ctx context.Context , source HasClustersInfo , clusterLister ClusterLister , clusterLabels []string ) prometheus.Collector {
79+ if len (clusterLabels ) > 0 {
80+ normalizedClusterLabels := metricsutil .NormalizeLabels ("label" , clusterLabels )
81+ descClusterLabels = prometheus .NewDesc (
82+ "argocd_cluster_labels" ,
83+ "Argo Cluster labels converted to Prometheus labels" ,
84+ append (append (descClusterDefaultLabels , "name" ), normalizedClusterLabels ... ),
85+ nil ,
86+ )
87+ }
88+
89+ collector := & clusterCollector {
90+ infoSource : source ,
91+ clusterLabels : clusterLabels ,
92+ clusterLister : clusterLister ,
93+ lock : sync.RWMutex {},
94+ }
95+
96+ collector .setClusterData ()
97+ go collector .run (ctx )
98+
99+ return collector
100+ }
101+
102+ func (c * clusterCollector ) run (ctx context.Context ) {
63103 //nolint:staticcheck // FIXME: complains about SA1015
64104 tick := time .Tick (metricsCollectionInterval )
65105 for {
66106 select {
67107 case <- ctx .Done ():
68108 case <- tick :
69- info := c .infoSource .GetClustersInfo ()
109+ c .setClusterData ()
110+ }
111+ }
112+ }
113+
114+ func (c * clusterCollector ) setClusterData () {
115+ if clusterData , err := c .getClusterData (); err == nil {
116+ c .lock .Lock ()
117+ c .latestInfo = clusterData
118+ c .lock .Unlock ()
119+ } else {
120+ log .Warnf ("error collecting cluster metrics: %v" , err )
121+ }
122+ }
123+
124+ func (c * clusterCollector ) getClusterData () ([]* clusterData , error ) {
125+ clusterDatas := []* clusterData {}
126+ clusterInfos := c .infoSource .GetClustersInfo ()
127+
128+ ctx , cancel := context .WithTimeout (context .Background (), metricsCollectionTimeout )
129+ defer cancel ()
130+ clusters , err := c .clusterLister (ctx )
131+ if err != nil {
132+ return nil , err
133+ }
70134
71- c .lock .Lock ()
72- c .info = info
73- c .lock .Unlock ()
135+ clusterMap := map [string ]* argoappv1.Cluster {}
136+ for i , cluster := range clusters .Items {
137+ clusterMap [cluster .Server ] = & clusters .Items [i ]
138+ }
139+
140+ // Base the cluster data on the ClusterInfo because it only contains the
141+ // clusters managed by this controller instance
142+ for i , info := range clusterInfos {
143+ cluster , ok := clusterMap [info .Server ]
144+ if ! ok {
145+ // This should not happen, but we cannot emit incomplete metrics, so we skip this cluster
146+ log .WithField ("server" , info .Server ).Warnf ("could find cluster for metrics collection" )
147+ continue
74148 }
149+ clusterDatas = append (clusterDatas , & clusterData {
150+ info : & clusterInfos [i ],
151+ cluster : cluster ,
152+ })
75153 }
154+ return clusterDatas , nil
76155}
77156
78157// Describe implements the prometheus.Collector interface
@@ -82,20 +161,41 @@ func (c *clusterCollector) Describe(ch chan<- *prometheus.Desc) {
82161 ch <- descClusterAPIs
83162 ch <- descClusterCacheAgeSeconds
84163 ch <- descClusterConnectionStatus
164+ if len (c .clusterLabels ) > 0 {
165+ ch <- descClusterLabels
166+ }
85167}
86168
87169func (c * clusterCollector ) Collect (ch chan <- prometheus.Metric ) {
170+ c .lock .RLock ()
171+ latestInfo := c .latestInfo
172+ c .lock .RUnlock ()
173+
88174 now := time .Now ()
89- for _ , c := range c .info {
90- defaultValues := []string {c .Server }
91- ch <- prometheus .MustNewConstMetric (descClusterInfo , prometheus .GaugeValue , 1 , append (defaultValues , c .K8SVersion )... )
92- ch <- prometheus .MustNewConstMetric (descClusterCacheResources , prometheus .GaugeValue , float64 (c .ResourcesCount ), defaultValues ... )
93- ch <- prometheus .MustNewConstMetric (descClusterAPIs , prometheus .GaugeValue , float64 (c .APIsCount ), defaultValues ... )
175+ for _ , clusterData := range latestInfo {
176+ info := clusterData .info
177+ name := clusterData .cluster .Name
178+ labels := clusterData .cluster .Labels
179+
180+ defaultValues := []string {info .Server }
181+ ch <- prometheus .MustNewConstMetric (descClusterInfo , prometheus .GaugeValue , 1 , append (defaultValues , info .K8SVersion , name )... )
182+ ch <- prometheus .MustNewConstMetric (descClusterCacheResources , prometheus .GaugeValue , float64 (info .ResourcesCount ), defaultValues ... )
183+ ch <- prometheus .MustNewConstMetric (descClusterAPIs , prometheus .GaugeValue , float64 (info .APIsCount ), defaultValues ... )
94184 cacheAgeSeconds := - 1
95- if c .LastCacheSyncTime != nil {
96- cacheAgeSeconds = int (now .Sub (* c .LastCacheSyncTime ).Seconds ())
185+ if info .LastCacheSyncTime != nil {
186+ cacheAgeSeconds = int (now .Sub (* info .LastCacheSyncTime ).Seconds ())
97187 }
98188 ch <- prometheus .MustNewConstMetric (descClusterCacheAgeSeconds , prometheus .GaugeValue , float64 (cacheAgeSeconds ), defaultValues ... )
99- ch <- prometheus .MustNewConstMetric (descClusterConnectionStatus , prometheus .GaugeValue , boolFloat64 (c .SyncError == nil ), append (defaultValues , c .K8SVersion )... )
189+ ch <- prometheus .MustNewConstMetric (descClusterConnectionStatus , prometheus .GaugeValue , boolFloat64 (info .SyncError == nil ), append (defaultValues , info .K8SVersion )... )
190+
191+ if len (c .clusterLabels ) > 0 && labels != nil {
192+ labelValues := []string {}
193+ labelValues = append (labelValues , info .Server , name )
194+ for _ , desiredLabel := range c .clusterLabels {
195+ value := labels [desiredLabel ]
196+ labelValues = append (labelValues , value )
197+ }
198+ ch <- prometheus .MustNewConstMetric (descClusterLabels , prometheus .GaugeValue , 1 , labelValues ... )
199+ }
100200 }
101201}
0 commit comments