@@ -100,7 +100,7 @@ public MonitorImpl(
100
100
this .properties = properties ;
101
101
this .monitorDisposalTimeMillis = monitorDisposalTimeMillis ;
102
102
this .monitorService = monitorService ;
103
-
103
+
104
104
this .contextLastUsedTimestampNano = this .getCurrentTimeNano ();
105
105
this .contextsSizeGauge = telemetryFactory .createGauge ("efm.activeContexts.queue.size" ,
106
106
() -> (long ) activeContexts .size ());
@@ -113,6 +113,9 @@ public MonitorImpl(
113
113
114
114
@ Override
115
115
public void startMonitoring (final MonitorConnectionContext context ) {
116
+ if (this .stopped ) {
117
+ LOGGER .warning (() -> Messages .get ("MonitorImpl.monitorIsStopped" , new Object [] {this .hostSpec .getHost ()}));
118
+ }
116
119
final long currentTimeNano = this .getCurrentTimeNano ();
117
120
context .setStartMonitorTimeNano (currentTimeNano );
118
121
this .contextLastUsedTimestampNano = currentTimeNano ;
@@ -143,107 +146,128 @@ public void run() {
143
146
try {
144
147
this .stopped = false ;
145
148
while (true ) {
149
+ try {
146
150
147
- // process new contexts
148
- MonitorConnectionContext newMonitorContext ;
149
- MonitorConnectionContext firstAddedNewMonitorContext = null ;
150
- final long currentTimeNano = this .getCurrentTimeNano ();
151
- while ((newMonitorContext = this .newContexts .poll ()) != null ) {
152
- if (firstAddedNewMonitorContext == newMonitorContext ) {
153
- // This context has already been processed.
154
- // Add it back to the queue and process it in the next round.
155
- this .newContexts .add (newMonitorContext );
156
- break ;
157
- }
158
- if (newMonitorContext .isActiveContext ()) {
159
- if (newMonitorContext .getExpectedActiveMonitoringStartTimeNano () > currentTimeNano ) {
160
- // The context active monitoring time hasn't come.
161
- // Add the context to the queue and check it later.
151
+ // process new contexts
152
+ MonitorConnectionContext newMonitorContext ;
153
+ MonitorConnectionContext firstAddedNewMonitorContext = null ;
154
+ final long currentTimeNano = this .getCurrentTimeNano ();
155
+ while ((newMonitorContext = this .newContexts .poll ()) != null ) {
156
+ if (firstAddedNewMonitorContext == newMonitorContext ) {
157
+ // This context has already been processed.
158
+ // Add it back to the queue and process it in the next round.
162
159
this .newContexts .add (newMonitorContext );
163
- if (firstAddedNewMonitorContext == null ) {
164
- firstAddedNewMonitorContext = newMonitorContext ;
160
+ break ;
161
+ }
162
+ if (newMonitorContext .isActiveContext ()) {
163
+ if (newMonitorContext .getExpectedActiveMonitoringStartTimeNano () > currentTimeNano ) {
164
+ // The context active monitoring time hasn't come.
165
+ // Add the context to the queue and check it later.
166
+ this .newContexts .add (newMonitorContext );
167
+ if (firstAddedNewMonitorContext == null ) {
168
+ firstAddedNewMonitorContext = newMonitorContext ;
169
+ }
170
+ } else {
171
+ // It's time to start actively monitor this context.
172
+ this .activeContexts .add (newMonitorContext );
165
173
}
166
- } else {
167
- // It's time to start actively monitor this context.
168
- this .activeContexts .add (newMonitorContext );
169
174
}
170
175
}
171
- }
172
176
173
- if (!this .activeContexts .isEmpty ()) {
177
+ if (!this .activeContexts .isEmpty ()) {
174
178
175
- final long statusCheckStartTimeNano = this .getCurrentTimeNano ();
176
- this .contextLastUsedTimestampNano = statusCheckStartTimeNano ;
179
+ final long statusCheckStartTimeNano = this .getCurrentTimeNano ();
180
+ this .contextLastUsedTimestampNano = statusCheckStartTimeNano ;
177
181
178
- final ConnectionStatus status =
179
- checkConnectionStatus (this .nodeCheckTimeoutMillis );
182
+ final ConnectionStatus status =
183
+ checkConnectionStatus (this .nodeCheckTimeoutMillis );
180
184
181
- long delayMillis = -1 ;
182
- MonitorConnectionContext monitorContext ;
183
- MonitorConnectionContext firstAddedMonitorContext = null ;
185
+ long delayMillis = -1 ;
186
+ MonitorConnectionContext monitorContext ;
187
+ MonitorConnectionContext firstAddedMonitorContext = null ;
184
188
185
- while ((monitorContext = this .activeContexts .poll ()) != null ) {
186
-
187
- synchronized (monitorContext ) {
188
- // If context is already invalid, just skip it
189
- if (!monitorContext .isActiveContext ()) {
190
- continue ;
191
- }
189
+ while ((monitorContext = this .activeContexts .poll ()) != null ) {
192
190
193
- if (firstAddedMonitorContext == monitorContext ) {
194
- // this context has already been processed by this loop
195
- // add it to the queue and exit this loop
196
- this .activeContexts .add (monitorContext );
197
- break ;
198
- }
191
+ synchronized (monitorContext ) {
192
+ // If context is already invalid, just skip it
193
+ if (!monitorContext .isActiveContext ()) {
194
+ continue ;
195
+ }
199
196
200
- // otherwise, process this context
201
- monitorContext .updateConnectionStatus (
202
- this .hostSpec .getUrl (),
203
- statusCheckStartTimeNano ,
204
- statusCheckStartTimeNano + status .elapsedTimeNano ,
205
- status .isValid );
206
-
207
- // If context is still valid and node is still healthy, it needs to continue updating this context
208
- if (monitorContext .isActiveContext () && !monitorContext .isNodeUnhealthy ()) {
209
- this .activeContexts .add (monitorContext );
210
- if (firstAddedMonitorContext == null ) {
211
- firstAddedMonitorContext = monitorContext ;
197
+ if (firstAddedMonitorContext == monitorContext ) {
198
+ // this context has already been processed by this loop
199
+ // add it to the queue and exit this loop
200
+ this .activeContexts .add (monitorContext );
201
+ break ;
212
202
}
213
203
214
- if (delayMillis == -1 || delayMillis > monitorContext .getFailureDetectionIntervalMillis ()) {
215
- delayMillis = monitorContext .getFailureDetectionIntervalMillis ();
204
+ // otherwise, process this context
205
+ monitorContext .updateConnectionStatus (
206
+ this .hostSpec .getUrl (),
207
+ statusCheckStartTimeNano ,
208
+ statusCheckStartTimeNano + status .elapsedTimeNano ,
209
+ status .isValid );
210
+
211
+ // If context is still valid and node is still healthy, it needs to continue updating this context
212
+ if (monitorContext .isActiveContext () && !monitorContext .isNodeUnhealthy ()) {
213
+ this .activeContexts .add (monitorContext );
214
+ if (firstAddedMonitorContext == null ) {
215
+ firstAddedMonitorContext = monitorContext ;
216
+ }
217
+
218
+ if (delayMillis == -1 || delayMillis > monitorContext .getFailureDetectionIntervalMillis ()) {
219
+ delayMillis = monitorContext .getFailureDetectionIntervalMillis ();
220
+ }
216
221
}
217
222
}
218
223
}
219
- }
220
224
221
- if (delayMillis == -1 ) {
222
- // No active contexts
223
- delayMillis = THREAD_SLEEP_WHEN_INACTIVE_MILLIS ;
224
- } else {
225
- delayMillis -= status .elapsedTimeNano ;
226
- // Check for min delay between node health check
227
- if (delayMillis <= 0 ) {
228
- delayMillis = MIN_CONNECTION_CHECK_TIMEOUT_MILLIS ;
225
+ if (delayMillis == -1 ) {
226
+ // No active contexts
227
+ delayMillis = THREAD_SLEEP_WHEN_INACTIVE_MILLIS ;
228
+ } else {
229
+ delayMillis -= status .elapsedTimeNano ;
230
+ // Check for min delay between node health check
231
+ if (delayMillis <= 0 ) {
232
+ delayMillis = MIN_CONNECTION_CHECK_TIMEOUT_MILLIS ;
233
+ }
234
+ // Use this delay as node checkout timeout since it corresponds to min interval for all active contexts
235
+ this .nodeCheckTimeoutMillis = delayMillis ;
229
236
}
230
- // Use this delay as node checkout timeout since it corresponds to min interval for all active contexts
231
- this .nodeCheckTimeoutMillis = delayMillis ;
232
- }
233
237
234
- TimeUnit .MILLISECONDS .sleep (delayMillis );
238
+ TimeUnit .MILLISECONDS .sleep (delayMillis );
235
239
236
- } else {
237
- if ((this .getCurrentTimeNano () - this .contextLastUsedTimestampNano )
238
- >= TimeUnit .MILLISECONDS .toNanos (this .monitorDisposalTimeMillis )) {
239
- monitorService .notifyUnused (this );
240
- break ;
240
+ } else {
241
+ if ((this .getCurrentTimeNano () - this .contextLastUsedTimestampNano )
242
+ >= TimeUnit .MILLISECONDS .toNanos (this .monitorDisposalTimeMillis )) {
243
+ monitorService .notifyUnused (this );
244
+ break ;
245
+ }
246
+ TimeUnit .MILLISECONDS .sleep (THREAD_SLEEP_WHEN_INACTIVE_MILLIS );
241
247
}
242
- TimeUnit .MILLISECONDS .sleep (THREAD_SLEEP_WHEN_INACTIVE_MILLIS );
248
+
249
+ } catch (final InterruptedException intEx ) {
250
+ throw intEx ;
251
+ } catch (final Exception ex ) {
252
+ // log and ignore
253
+ LOGGER .warning (
254
+ () -> Messages .get (
255
+ "MonitorImpl.exceptionDuringMonitoringContinue" ,
256
+ new Object [] {this .hostSpec .getHost (), ex .getMessage ()}));
243
257
}
244
258
}
245
259
} catch (final InterruptedException intEx ) {
246
- // do nothing; exit thread
260
+ // exit thread
261
+ LOGGER .warning (
262
+ () -> Messages .get (
263
+ "MonitorImpl.interruptedExceptionDuringMonitoring" ,
264
+ new Object [] {this .hostSpec .getHost (), intEx .getMessage ()}));
265
+ } catch (final Exception ex ) {
266
+ // this should not be reached; log and exit thread
267
+ LOGGER .warning (
268
+ () -> Messages .get (
269
+ "MonitorImpl.exceptionDuringMonitoringStop" ,
270
+ new Object [] {this .hostSpec .getHost (), ex .getMessage ()}));
247
271
} finally {
248
272
if (this .monitoringConn != null ) {
249
273
try {
0 commit comments