@@ -100,7 +100,7 @@ public MonitorImpl(
100
100
this .properties = properties ;
101
101
this .monitorDisposalTimeMillis = monitorDisposalTimeMillis ;
102
102
this .monitorService = monitorService ;
103
-
103
+
104
104
this .contextLastUsedTimestampNano = this .getCurrentTimeNano ();
105
105
this .contextsSizeGauge = telemetryFactory .createGauge ("efm.activeContexts.queue.size" ,
106
106
() -> (long ) activeContexts .size ());
@@ -113,6 +113,9 @@ public MonitorImpl(
113
113
114
114
@ Override
115
115
public void startMonitoring (final MonitorConnectionContext context ) {
116
+ if (this .stopped ) {
117
+ LOGGER .warning (() -> Messages .get ("MonitorImpl.monitorIsStopped" , new Object [] {this .hostSpec .getHost ()}));
118
+ }
116
119
final long currentTimeNano = this .getCurrentTimeNano ();
117
120
context .setStartMonitorTimeNano (currentTimeNano );
118
121
this .contextLastUsedTimestampNano = currentTimeNano ;
@@ -137,125 +140,142 @@ public void clearContexts() {
137
140
138
141
@ Override
139
142
public void run () {
140
- this .telemetryContext = telemetryFactory .openTelemetryContext (
141
- "monitoring thread" , TelemetryTraceLevel .TOP_LEVEL );
142
- telemetryContext .setAttribute ("url" , hostSpec .getUrl ());
143
- try {
144
- this .stopped = false ;
145
- while (true ) {
146
-
147
- // process new contexts
148
- MonitorConnectionContext newMonitorContext ;
149
- MonitorConnectionContext firstAddedNewMonitorContext = null ;
150
- final long currentTimeNano = this .getCurrentTimeNano ();
151
- while ((newMonitorContext = this .newContexts .poll ()) != null ) {
152
- if (firstAddedNewMonitorContext == newMonitorContext ) {
153
- // This context has already been processed.
154
- // Add it back to the queue and process it in the next round.
155
- this .newContexts .add (newMonitorContext );
156
- break ;
157
- }
158
- if (newMonitorContext .isActiveContext ()) {
159
- if (newMonitorContext .getExpectedActiveMonitoringStartTimeNano () > currentTimeNano ) {
160
- // The context active monitoring time hasn't come.
161
- // Add the context to the queue and check it later.
143
+ boolean continueMonitoring = true ;
144
+ while (continueMonitoring ) {
145
+ this .telemetryContext = telemetryFactory .openTelemetryContext (
146
+ "monitoring thread" , TelemetryTraceLevel .TOP_LEVEL );
147
+ telemetryContext .setAttribute ("url" , hostSpec .getUrl ());
148
+ try {
149
+ this .stopped = false ;
150
+ while (true ) {
151
+
152
+ // process new contexts
153
+ MonitorConnectionContext newMonitorContext ;
154
+ MonitorConnectionContext firstAddedNewMonitorContext = null ;
155
+ final long currentTimeNano = this .getCurrentTimeNano ();
156
+ while ((newMonitorContext = this .newContexts .poll ()) != null ) {
157
+ if (firstAddedNewMonitorContext == newMonitorContext ) {
158
+ // This context has already been processed.
159
+ // Add it back to the queue and process it in the next round.
162
160
this .newContexts .add (newMonitorContext );
163
- if (firstAddedNewMonitorContext == null ) {
164
- firstAddedNewMonitorContext = newMonitorContext ;
161
+ break ;
162
+ }
163
+ if (newMonitorContext .isActiveContext ()) {
164
+ if (newMonitorContext .getExpectedActiveMonitoringStartTimeNano () > currentTimeNano ) {
165
+ // The context active monitoring time hasn't come.
166
+ // Add the context to the queue and check it later.
167
+ this .newContexts .add (newMonitorContext );
168
+ if (firstAddedNewMonitorContext == null ) {
169
+ firstAddedNewMonitorContext = newMonitorContext ;
170
+ }
171
+ } else {
172
+ // It's time to start actively monitor this context.
173
+ this .activeContexts .add (newMonitorContext );
165
174
}
166
- } else {
167
- // It's time to start actively monitor this context.
168
- this .activeContexts .add (newMonitorContext );
169
175
}
170
176
}
171
- }
172
-
173
- if (!this .activeContexts .isEmpty ()) {
174
177
175
- final long statusCheckStartTimeNano = this .getCurrentTimeNano ();
176
- this .contextLastUsedTimestampNano = statusCheckStartTimeNano ;
178
+ if (!this .activeContexts .isEmpty ()) {
177
179
178
- final ConnectionStatus status =
179
- checkConnectionStatus ( this .nodeCheckTimeoutMillis ) ;
180
+ final long statusCheckStartTimeNano = this . getCurrentTimeNano ();
181
+ this .contextLastUsedTimestampNano = statusCheckStartTimeNano ;
180
182
181
- long delayMillis = -1 ;
182
- MonitorConnectionContext monitorContext ;
183
- MonitorConnectionContext firstAddedMonitorContext = null ;
183
+ final ConnectionStatus status =
184
+ checkConnectionStatus (this .nodeCheckTimeoutMillis );
184
185
185
- while ((monitorContext = this .activeContexts .poll ()) != null ) {
186
+ long delayMillis = -1 ;
187
+ MonitorConnectionContext monitorContext ;
188
+ MonitorConnectionContext firstAddedMonitorContext = null ;
186
189
187
- synchronized (monitorContext ) {
188
- // If context is already invalid, just skip it
189
- if (!monitorContext .isActiveContext ()) {
190
- continue ;
191
- }
190
+ while ((monitorContext = this .activeContexts .poll ()) != null ) {
192
191
193
- if (firstAddedMonitorContext == monitorContext ) {
194
- // this context has already been processed by this loop
195
- // add it to the queue and exit this loop
196
- this .activeContexts .add (monitorContext );
197
- break ;
198
- }
192
+ synchronized (monitorContext ) {
193
+ // If context is already invalid, just skip it
194
+ if (!monitorContext .isActiveContext ()) {
195
+ continue ;
196
+ }
199
197
200
- // otherwise, process this context
201
- monitorContext .updateConnectionStatus (
202
- this .hostSpec .getUrl (),
203
- statusCheckStartTimeNano ,
204
- statusCheckStartTimeNano + status .elapsedTimeNano ,
205
- status .isValid );
206
-
207
- // If context is still valid and node is still healthy, it needs to continue updating this context
208
- if (monitorContext .isActiveContext () && !monitorContext .isNodeUnhealthy ()) {
209
- this .activeContexts .add (monitorContext );
210
- if (firstAddedMonitorContext == null ) {
211
- firstAddedMonitorContext = monitorContext ;
198
+ if (firstAddedMonitorContext == monitorContext ) {
199
+ // this context has already been processed by this loop
200
+ // add it to the queue and exit this loop
201
+ this .activeContexts .add (monitorContext );
202
+ break ;
212
203
}
213
204
214
- if (delayMillis == -1 || delayMillis > monitorContext .getFailureDetectionIntervalMillis ()) {
215
- delayMillis = monitorContext .getFailureDetectionIntervalMillis ();
205
+ // otherwise, process this context
206
+ monitorContext .updateConnectionStatus (
207
+ this .hostSpec .getUrl (),
208
+ statusCheckStartTimeNano ,
209
+ statusCheckStartTimeNano + status .elapsedTimeNano ,
210
+ status .isValid );
211
+
212
+ // If context is still valid and node is still healthy, it needs to continue updating this context
213
+ if (monitorContext .isActiveContext () && !monitorContext .isNodeUnhealthy ()) {
214
+ this .activeContexts .add (monitorContext );
215
+ if (firstAddedMonitorContext == null ) {
216
+ firstAddedMonitorContext = monitorContext ;
217
+ }
218
+
219
+ if (delayMillis == -1 || delayMillis > monitorContext .getFailureDetectionIntervalMillis ()) {
220
+ delayMillis = monitorContext .getFailureDetectionIntervalMillis ();
221
+ }
216
222
}
217
223
}
218
224
}
219
- }
220
225
221
- if (delayMillis == -1 ) {
222
- // No active contexts
223
- delayMillis = THREAD_SLEEP_WHEN_INACTIVE_MILLIS ;
224
- } else {
225
- delayMillis -= status .elapsedTimeNano ;
226
- // Check for min delay between node health check
227
- if (delayMillis <= 0 ) {
228
- delayMillis = MIN_CONNECTION_CHECK_TIMEOUT_MILLIS ;
226
+ if (delayMillis == -1 ) {
227
+ // No active contexts
228
+ delayMillis = THREAD_SLEEP_WHEN_INACTIVE_MILLIS ;
229
+ } else {
230
+ delayMillis -= status .elapsedTimeNano ;
231
+ // Check for min delay between node health check
232
+ if (delayMillis <= 0 ) {
233
+ delayMillis = MIN_CONNECTION_CHECK_TIMEOUT_MILLIS ;
234
+ }
235
+ // Use this delay as node checkout timeout since it corresponds to min interval for all active contexts
236
+ this .nodeCheckTimeoutMillis = delayMillis ;
229
237
}
230
- // Use this delay as node checkout timeout since it corresponds to min interval for all active contexts
231
- this .nodeCheckTimeoutMillis = delayMillis ;
232
- }
233
238
234
- TimeUnit .MILLISECONDS .sleep (delayMillis );
239
+ TimeUnit .MILLISECONDS .sleep (delayMillis );
235
240
236
- } else {
237
- if ((this .getCurrentTimeNano () - this .contextLastUsedTimestampNano )
238
- >= TimeUnit .MILLISECONDS .toNanos (this .monitorDisposalTimeMillis )) {
239
- monitorService .notifyUnused (this );
240
- break ;
241
+ } else {
242
+ if ((this .getCurrentTimeNano () - this .contextLastUsedTimestampNano )
243
+ >= TimeUnit .MILLISECONDS .toNanos (this .monitorDisposalTimeMillis )) {
244
+ monitorService .notifyUnused (this );
245
+ continueMonitoring = false ;
246
+ break ;
247
+ }
248
+ TimeUnit .MILLISECONDS .sleep (THREAD_SLEEP_WHEN_INACTIVE_MILLIS );
241
249
}
242
- TimeUnit .MILLISECONDS .sleep (THREAD_SLEEP_WHEN_INACTIVE_MILLIS );
243
250
}
244
- }
245
- } catch (final InterruptedException intEx ) {
246
- // do nothing; exit thread
247
- } finally {
248
- if (this .monitoringConn != null ) {
249
- try {
250
- this .monitoringConn .close ();
251
- } catch (final SQLException ex ) {
252
- // ignore
251
+ } catch (final InterruptedException intEx ) {
252
+ // set continueMonitoring to false so monitoring will be stopped; exit thread
253
+ continueMonitoring = false ;
254
+ LOGGER .warning (
255
+ () -> Messages .get (
256
+ "MonitorImpl.interruptedExceptionDuringMonitoring" ,
257
+ new Object [] {this .hostSpec .getHost (), intEx .getMessage ()}));
258
+ } catch (final Exception ex ) {
259
+ // do nothing; exit thread
260
+ LOGGER .warning (
261
+ () -> Messages .get (
262
+ "MonitorImpl.exceptionDuringMonitoring" ,
263
+ new Object [] {this .hostSpec .getHost (), ex .getMessage ()}));
264
+ } finally {
265
+ if (!continueMonitoring ) {
266
+ if (this .monitoringConn != null ) {
267
+ try {
268
+ this .monitoringConn .close ();
269
+ } catch (final SQLException ex ) {
270
+ // ignore
271
+ }
272
+ }
273
+ if (telemetryContext != null ) {
274
+ this .telemetryContext .closeContext ();
275
+ }
276
+ this .stopped = true ;
253
277
}
254
278
}
255
- if (telemetryContext != null ) {
256
- this .telemetryContext .closeContext ();
257
- }
258
- this .stopped = true ;
259
279
}
260
280
}
261
281
0 commit comments