1
+ using System ;
2
+ using System . Threading ;
3
+ using System . Threading . Tasks ;
4
+ using ES . Kubernetes . Reflector . CertManager . Constants ;
5
+ using ES . Kubernetes . Reflector . CertManager . Events ;
6
+ using ES . Kubernetes . Reflector . CertManager . Resources ;
7
+ using ES . Kubernetes . Reflector . Core . Events ;
8
+ using ES . Kubernetes . Reflector . Core . Extensions ;
9
+ using ES . Kubernetes . Reflector . Core . Monitoring ;
10
+ using ES . Kubernetes . Reflector . Core . Queuing ;
11
+ using ES . Kubernetes . Reflector . Core . Resources ;
12
+ using k8s ;
13
+ using k8s . Models ;
14
+ using MediatR ;
15
+ using Microsoft . Extensions . Diagnostics . HealthChecks ;
16
+ using Microsoft . Extensions . Hosting ;
17
+ using Microsoft . Extensions . Logging ;
18
+
19
+ namespace ES . Kubernetes . Reflector . CertManager
20
+ {
21
+ public class Monitor : IHostedService , IHealthCheck
22
+ {
23
+ private readonly ManagedWatcher < Certificate > _certificatesWatcher ;
24
+ private readonly ManagedWatcher < V1beta1CustomResourceDefinition > _crdWatcher ;
25
+ private readonly FeederQueue < WatcherEvent > _eventQueue ;
26
+ private readonly ILogger < Monitor > _logger ;
27
+ private readonly IMediator _mediator ;
28
+ private readonly ManagedWatcher < V1Secret > _secretsWatcher ;
29
+
30
+ private string _crdVersion ;
31
+
32
+ public Monitor ( ILogger < Monitor > logger ,
33
+ ManagedWatcher < V1beta1CustomResourceDefinition > crdWatcher ,
34
+ ManagedWatcher < Certificate > certificatesWatcher ,
35
+ ManagedWatcher < V1Secret > secretsWatcher ,
36
+ IMediator mediator )
37
+ {
38
+ _logger = logger ;
39
+ _crdWatcher = crdWatcher ;
40
+ _certificatesWatcher = certificatesWatcher ;
41
+ _secretsWatcher = secretsWatcher ;
42
+ _mediator = mediator ;
43
+
44
+ _eventQueue = new FeederQueue < WatcherEvent > ( OnEvent , OnEventHandlingError ) ;
45
+
46
+
47
+ _secretsWatcher . OnStateChanged = OnWatcherStateChanged ;
48
+ _secretsWatcher . EventHandlerFactory = e =>
49
+ _eventQueue . FeedAsync ( new InternalSecretWatcherEvent
50
+ { Item = e . Item , Type = e . Type , CertificateResourceDefinitionVersion = _crdVersion } ) ;
51
+ _secretsWatcher . RequestFactory = async c =>
52
+ await c . ListSecretForAllNamespacesWithHttpMessagesAsync ( watch : true ) ;
53
+
54
+
55
+ _certificatesWatcher . OnStateChanged = OnWatcherStateChanged ;
56
+ _certificatesWatcher . EventHandlerFactory = e =>
57
+ _eventQueue . FeedAsync ( new InternalCertificateWatcherEvent
58
+ { Item = e . Item , Type = e . Type , CertificateResourceDefinitionVersion = _crdVersion } ) ;
59
+
60
+
61
+ _crdWatcher . EventHandlerFactory = OnCrdEvent ;
62
+ _crdWatcher . RequestFactory = async c =>
63
+ await c . ListCustomResourceDefinitionWithHttpMessagesAsync ( watch : true ) ;
64
+ _crdWatcher . OnStateChanged = async ( sender , update ) =>
65
+ {
66
+ switch ( update . State )
67
+ {
68
+ case ManagedWatcherState . Closed :
69
+ _logger . LogDebug ( "{type} watcher {state}" , typeof ( V1beta1CustomResourceDefinition ) . Name ,
70
+ update . State ) ;
71
+ await sender . Start ( ) ;
72
+ break ;
73
+ case ManagedWatcherState . Faulted :
74
+ _logger . LogError ( update . Exception , "{type} watcher {state}" ,
75
+ typeof ( V1beta1CustomResourceDefinition ) . Name , update . State ) ;
76
+ break ;
77
+ default :
78
+ _logger . LogDebug ( "{type} watcher {state}" , typeof ( V1beta1CustomResourceDefinition ) . Name ,
79
+ update . State ) ;
80
+ break ;
81
+ }
82
+ } ;
83
+ }
84
+
85
+ public async Task StartAsync ( CancellationToken cancellationToken )
86
+ {
87
+ await _crdWatcher . Start ( ) ;
88
+ }
89
+
90
+ public async Task StopAsync ( CancellationToken cancellationToken )
91
+ {
92
+ await _crdWatcher . Stop ( ) ;
93
+ await _certificatesWatcher . Stop ( ) ;
94
+ await _secretsWatcher . Stop ( ) ;
95
+ }
96
+
97
+ private async Task OnWatcherStateChanged < TS > ( ManagedWatcher < TS , WatcherEvent < TS > > sender ,
98
+ ManagedWatcherStateUpdate update ) where TS : class , IKubernetesObject
99
+ {
100
+ switch ( update . State )
101
+ {
102
+ case ManagedWatcherState . Closed :
103
+ _logger . LogDebug ( "{type} watcher {state}" , typeof ( TS ) . Name , update . State ) ;
104
+ await _secretsWatcher . Stop ( ) ;
105
+ await _certificatesWatcher . Stop ( ) ;
106
+
107
+ await _eventQueue . WaitAndClear ( ) ;
108
+
109
+ await _secretsWatcher . Start ( ) ;
110
+ await _certificatesWatcher . Start ( ) ;
111
+ break ;
112
+ case ManagedWatcherState . Faulted :
113
+ _logger . LogError ( update . Exception , "{type} watcher {state}" , typeof ( TS ) . Name , update . State ) ;
114
+ break ;
115
+ default :
116
+ _logger . LogDebug ( "{type} watcher {state}" , typeof ( TS ) . Name , update . State ) ;
117
+ break ;
118
+ }
119
+ }
120
+
121
+ private async Task OnEvent ( WatcherEvent e )
122
+ {
123
+ var id = KubernetesObjectId . For ( e . Item . Metadata ( ) ) ;
124
+ _logger . LogTrace ( "[{eventType}] {kind} {@id}" , e . Type , e . Item . Kind , id ) ;
125
+ await _mediator . Publish ( e ) ;
126
+ }
127
+
128
+ private async Task OnEventHandlingError ( WatcherEvent e , Exception ex )
129
+ {
130
+ var id = KubernetesObjectId . For ( e . Item . Metadata ( ) ) ;
131
+ _logger . LogError ( ex , "Failed to process {eventType} {kind} {@id} due to exception" ,
132
+ e . Type , e . Item . Kind , id ) ;
133
+ await _secretsWatcher . Stop ( ) ;
134
+ await _certificatesWatcher . Stop ( ) ;
135
+ _eventQueue . Clear ( ) ;
136
+
137
+ _logger . LogTrace ( "Watchers restarting" ) ;
138
+ await _secretsWatcher . Start ( ) ;
139
+ await _certificatesWatcher . Start ( ) ;
140
+ _logger . LogTrace ( "Watchers restarted" ) ;
141
+ }
142
+
143
+
144
+ private async Task OnCrdEvent ( WatcherEvent < V1beta1CustomResourceDefinition > request )
145
+ {
146
+ if ( request . Type != WatchEventType . Added && request . Type != WatchEventType . Modified ) return ;
147
+ if ( request . Item . Spec ? . Names == null ) return ;
148
+ if ( request . Item . Spec . Group != CertManagerConstants . CrdGroup ||
149
+ request . Item . Spec . Names . Kind != CertManagerConstants . CertificateKind ) return ;
150
+ if ( request . Item . Spec . Version == _crdVersion ) return ;
151
+
152
+ _crdVersion = request . Item . Spec . Version ;
153
+ _logger . LogInformation ( "{crdType} {kind} version updated to {crdGroup}/{version}" ,
154
+ typeof ( V1beta1CustomResourceDefinition ) . Name ,
155
+ CertManagerConstants . CertificateKind ,
156
+ CertManagerConstants . CrdGroup ,
157
+ request . Item . Spec . Version ) ;
158
+
159
+ await _certificatesWatcher . Stop ( ) ;
160
+ await _secretsWatcher . Stop ( ) ;
161
+
162
+ _certificatesWatcher . RequestFactory = async client =>
163
+ await client . ListClusterCustomObjectWithHttpMessagesAsync ( request . Item . Spec . Group ,
164
+ request . Item . Spec . Version , request . Item . Spec . Names . Plural , watch : true , timeoutSeconds : ( int ) TimeSpan . FromHours ( 1 ) . TotalSeconds ) ;
165
+
166
+ await _certificatesWatcher . Start ( ) ;
167
+ await _secretsWatcher . Start ( ) ;
168
+ }
169
+
170
+ public Task < HealthCheckResult > CheckHealthAsync ( HealthCheckContext context , CancellationToken cancellationToken = new CancellationToken ( ) )
171
+ {
172
+ return Task . FromResult ( _crdWatcher . IsFaulted || _secretsWatcher . IsFaulted || _certificatesWatcher . IsFaulted
173
+ ? HealthCheckResult . Unhealthy ( )
174
+ : HealthCheckResult . Healthy ( ) ) ;
175
+ }
176
+ }
177
+ }
0 commit comments