Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 101 additions & 77 deletions wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public MonitorImpl(
this.properties = properties;
this.monitorDisposalTimeMillis = monitorDisposalTimeMillis;
this.monitorService = monitorService;

this.contextLastUsedTimestampNano = this.getCurrentTimeNano();
this.contextsSizeGauge = telemetryFactory.createGauge("efm.activeContexts.queue.size",
() -> (long) activeContexts.size());
Expand All @@ -113,6 +113,9 @@ public MonitorImpl(

@Override
public void startMonitoring(final MonitorConnectionContext context) {
if (this.stopped) {
LOGGER.warning(() -> Messages.get("MonitorImpl.monitorIsStopped", new Object[] {this.hostSpec.getHost()}));
}
final long currentTimeNano = this.getCurrentTimeNano();
context.setStartMonitorTimeNano(currentTimeNano);
this.contextLastUsedTimestampNano = currentTimeNano;
Expand Down Expand Up @@ -143,107 +146,128 @@ public void run() {
try {
this.stopped = false;
while (true) {
try {

// process new contexts
MonitorConnectionContext newMonitorContext;
MonitorConnectionContext firstAddedNewMonitorContext = null;
final long currentTimeNano = this.getCurrentTimeNano();
while ((newMonitorContext = this.newContexts.poll()) != null) {
if (firstAddedNewMonitorContext == newMonitorContext) {
// This context has already been processed.
// Add it back to the queue and process it in the next round.
this.newContexts.add(newMonitorContext);
break;
}
if (newMonitorContext.isActiveContext()) {
if (newMonitorContext.getExpectedActiveMonitoringStartTimeNano() > currentTimeNano) {
// The context active monitoring time hasn't come.
// Add the context to the queue and check it later.
// process new contexts
MonitorConnectionContext newMonitorContext;
MonitorConnectionContext firstAddedNewMonitorContext = null;
final long currentTimeNano = this.getCurrentTimeNano();
while ((newMonitorContext = this.newContexts.poll()) != null) {
if (firstAddedNewMonitorContext == newMonitorContext) {
// This context has already been processed.
// Add it back to the queue and process it in the next round.
this.newContexts.add(newMonitorContext);
if (firstAddedNewMonitorContext == null) {
firstAddedNewMonitorContext = newMonitorContext;
break;
}
if (newMonitorContext.isActiveContext()) {
if (newMonitorContext.getExpectedActiveMonitoringStartTimeNano() > currentTimeNano) {
// The context active monitoring time hasn't come.
// Add the context to the queue and check it later.
this.newContexts.add(newMonitorContext);
if (firstAddedNewMonitorContext == null) {
firstAddedNewMonitorContext = newMonitorContext;
}
} else {
// It's time to start actively monitor this context.
this.activeContexts.add(newMonitorContext);
}
} else {
// It's time to start actively monitor this context.
this.activeContexts.add(newMonitorContext);
}
}
}

if (!this.activeContexts.isEmpty()) {
if (!this.activeContexts.isEmpty()) {

final long statusCheckStartTimeNano = this.getCurrentTimeNano();
this.contextLastUsedTimestampNano = statusCheckStartTimeNano;
final long statusCheckStartTimeNano = this.getCurrentTimeNano();
this.contextLastUsedTimestampNano = statusCheckStartTimeNano;

final ConnectionStatus status =
checkConnectionStatus(this.nodeCheckTimeoutMillis);
final ConnectionStatus status =
checkConnectionStatus(this.nodeCheckTimeoutMillis);

long delayMillis = -1;
MonitorConnectionContext monitorContext;
MonitorConnectionContext firstAddedMonitorContext = null;
long delayMillis = -1;
MonitorConnectionContext monitorContext;
MonitorConnectionContext firstAddedMonitorContext = null;

while ((monitorContext = this.activeContexts.poll()) != null) {

synchronized (monitorContext) {
// If context is already invalid, just skip it
if (!monitorContext.isActiveContext()) {
continue;
}
while ((monitorContext = this.activeContexts.poll()) != null) {

if (firstAddedMonitorContext == monitorContext) {
// this context has already been processed by this loop
// add it to the queue and exit this loop
this.activeContexts.add(monitorContext);
break;
}
synchronized (monitorContext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really have to be synchronized? It is a local variable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a local variable but we add it to queues in MonitorImpl that runs in a separate thread. Monitoring thread may change an internal state of the context. The idea was to synchronize on it to avoid multi-threading collisions. It seems this intent isn't properly implemented here. Need to address it in a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not fix it in this PR ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some investigation, it seems like the current implementation is working and a fix isn't required.

// If context is already invalid, just skip it
if (!monitorContext.isActiveContext()) {
continue;
}

// otherwise, process this context
monitorContext.updateConnectionStatus(
this.hostSpec.getUrl(),
statusCheckStartTimeNano,
statusCheckStartTimeNano + status.elapsedTimeNano,
status.isValid);

// If context is still valid and node is still healthy, it needs to continue updating this context
if (monitorContext.isActiveContext() && !monitorContext.isNodeUnhealthy()) {
this.activeContexts.add(monitorContext);
if (firstAddedMonitorContext == null) {
firstAddedMonitorContext = monitorContext;
if (firstAddedMonitorContext == monitorContext) {
// this context has already been processed by this loop
// add it to the queue and exit this loop
this.activeContexts.add(monitorContext);
break;
}

if (delayMillis == -1 || delayMillis > monitorContext.getFailureDetectionIntervalMillis()) {
delayMillis = monitorContext.getFailureDetectionIntervalMillis();
// otherwise, process this context
monitorContext.updateConnectionStatus(
this.hostSpec.getUrl(),
statusCheckStartTimeNano,
statusCheckStartTimeNano + status.elapsedTimeNano,
status.isValid);

// If context is still valid and node is still healthy, it needs to continue updating this context
if (monitorContext.isActiveContext() && !monitorContext.isNodeUnhealthy()) {
this.activeContexts.add(monitorContext);
if (firstAddedMonitorContext == null) {
firstAddedMonitorContext = monitorContext;
}

if (delayMillis == -1 || delayMillis > monitorContext.getFailureDetectionIntervalMillis()) {
delayMillis = monitorContext.getFailureDetectionIntervalMillis();
}
}
}
}
}

if (delayMillis == -1) {
// No active contexts
delayMillis = THREAD_SLEEP_WHEN_INACTIVE_MILLIS;
} else {
delayMillis -= status.elapsedTimeNano;
// Check for min delay between node health check
if (delayMillis <= 0) {
delayMillis = MIN_CONNECTION_CHECK_TIMEOUT_MILLIS;
if (delayMillis == -1) {
// No active contexts
delayMillis = THREAD_SLEEP_WHEN_INACTIVE_MILLIS;
} else {
delayMillis -= status.elapsedTimeNano;
// Check for min delay between node health check
if (delayMillis <= 0) {
delayMillis = MIN_CONNECTION_CHECK_TIMEOUT_MILLIS;
}
// Use this delay as node checkout timeout since it corresponds to min interval for all active contexts
this.nodeCheckTimeoutMillis = delayMillis;
}
// Use this delay as node checkout timeout since it corresponds to min interval for all active contexts
this.nodeCheckTimeoutMillis = delayMillis;
}

TimeUnit.MILLISECONDS.sleep(delayMillis);
TimeUnit.MILLISECONDS.sleep(delayMillis);

} else {
if ((this.getCurrentTimeNano() - this.contextLastUsedTimestampNano)
>= TimeUnit.MILLISECONDS.toNanos(this.monitorDisposalTimeMillis)) {
monitorService.notifyUnused(this);
break;
} else {
if ((this.getCurrentTimeNano() - this.contextLastUsedTimestampNano)
>= TimeUnit.MILLISECONDS.toNanos(this.monitorDisposalTimeMillis)) {
monitorService.notifyUnused(this);
break;
}
TimeUnit.MILLISECONDS.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS);
}
TimeUnit.MILLISECONDS.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS);

} catch (final InterruptedException intEx) {
throw intEx;
} catch (final Exception ex) {
// log and ignore
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.exceptionDuringMonitoringContinue",
new Object[] {this.hostSpec.getHost(), ex.getMessage()}));
}
}
} catch (final InterruptedException intEx) {
// do nothing; exit thread
// exit thread
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.interruptedExceptionDuringMonitoring",
new Object[] {this.hostSpec.getHost(), intEx.getMessage()}));
} catch (final Exception ex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we expecting any other kind of exception? (we did not catch it beforehand)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're adding extra logging in case there was an exception we hadn't accounted for stopping this method and going undetected. This is because of the OOM error that was noticed after the newContexts queue had too many context objects added to it.

// this should not be reached; log and exit thread
LOGGER.warning(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to change code so such unhandled exceptions to be logged but the monitoring thread/loop keeps running.

() -> Messages.get(
"MonitorImpl.exceptionDuringMonitoringStop",
new Object[] {this.hostSpec.getHost(), ex.getMessage()}));
} finally {
if (this.monitoringConn != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ MonitorThreadContainer.emptyNodeKeys=Provided node keys are empty.

# Monitor Impl
MonitorImpl.contextNullWarning=Parameter 'context' should not be null.
MonitorImpl.interruptedExceptionDuringMonitoring=Monitoring thread for node {0} was interrupted: {1}
MonitorImpl.exceptionDuringMonitoringContinue=Continuing monitoring after unhandled exception was thrown in monitoring thread for node {0}: {1}
MonitorImpl.exceptionDuringMonitoringStop=Stopping monitoring after unhandled exception was thrown in monitoring thread for node {0}: {1}
MonitorImpl.monitorIsStopped=Monitoring was already stopped for node {0}.

# Monitor Service Impl
MonitorServiceImpl.nullMonitorParam=Parameter 'monitor' should not be null.
Expand Down