-
Notifications
You must be signed in to change notification settings - Fork 69
feat: implement autoscaling and the leastConnections strategy #451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
implement computeIfAbsent() for CacheMap; provide dispose callback function to properly close DataSource;
} | ||
|
||
public CacheMap( | ||
final IsItemValidFunc<V> isItemValidFunc, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
canDisposeFunc ?
|
||
public class CacheMap<K, V> { | ||
|
||
private final Map<K, CacheItem<V>> cache = new ConcurrentHashMap<>(); | ||
private final long cleanupIntervalNanos = TimeUnit.MINUTES.toNanos(10); | ||
private final Map<K, CacheItem> cache = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason to remove V?
|
||
public CacheItem(final V item, final long expirationTime) { | ||
this.item = item; | ||
this.expirationTime = expirationTime; | ||
} | ||
|
||
boolean isExpired() { | ||
if (isItemValidFunc != null) { | ||
return System.nanoTime() > expirationTime && !isItemValidFunc.isValid(this.item); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I'm not sure what's the meaning of isItemValidFunc. Need to discuss.
@@ -23,32 +23,36 @@ | |||
public class TestInstanceInfo { | |||
|
|||
private String instanceId; // "instance-1" | |||
private String endpoint; // "instance-1.ABC.cluster-XYZ.us-west-2.rds.amazonaws.com" | |||
private int endpointPort; | |||
private String host; // "instance-1.ABC.cluster-XYZ.us-west-2.rds.amazonaws.com" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is probably unrelated to leastConnections strategy. I'd make it in a separate PR if needed. We have quite stable integration suite and renaming of properties is definitely can do but it should be a strong reason for doing that.
.anyMatch((url) -> url.equals(newInstance.getUrl()))); | ||
newInstanceConn.setReadOnly(false); | ||
} finally { | ||
auroraUtil.deleteInstance(newInstance); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the consequences for other integrations tests? We probably should run autoscaling tests in a separate run (like we do for hibernate tests) so changes in cluster configuration makes no side effects on other tests.
final HikariConfig config = new HikariConfig(); | ||
config.setMaximumPoolSize(maxPoolSize); | ||
config.setInitializationFailTimeout(75000); | ||
config.setKeepaliveTime(30000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TCPKeepAlive feature doesn't work well with ToxyProxy. Is it essential setting for the tests?
} | ||
|
||
public V get(final K key) { | ||
final CacheItem<V> cacheItem = cache.computeIfPresent(key, (kk, vv) -> vv.isExpired() ? null : vv); | ||
removeIfExpired(key); | ||
final CacheItem cacheItem = cache.get(key); | ||
return cacheItem == null ? null : cacheItem.item; | ||
} | ||
|
||
public V get(final K key, final V defaultItemValue, final long itemExpirationNano) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need both this and computeIfAbsent? They are quite similar. Differences: computeIfAbsent calls cleanup and extends the expiration
return cacheItem.item; | ||
} | ||
|
||
public V getWithExtendExpiration(final K key, final long itemExpirationNano) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only used in tests, should we remove?
cleanUp(); | ||
removeAndDispose(key); | ||
cache.put(key, new CacheItem(item, System.nanoTime() + itemExpirationNano)); | ||
} | ||
|
||
public void putIfAbsent(final K key, final V item, final long itemExpirationNano) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only used in tests, should we remove?
final IsItemValidFunc<V> isItemValidFunc, | ||
final ItemDisposalFunc<V> itemDisposalFunc) { | ||
this.isItemValidFunc = isItemValidFunc; | ||
this.itemDisposalFunc = itemDisposalFunc; | ||
} | ||
|
||
public V get(final K key) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call cleanup() in the get functions or any other function? Why do we call it only in certain functions?
private final Map<K, CacheItem<V>> cache = new ConcurrentHashMap<>(); | ||
private final long cleanupIntervalNanos = TimeUnit.MINUTES.toNanos(10); | ||
private final Map<K, CacheItem> cache = new ConcurrentHashMap<>(); | ||
private long cleanupIntervalNanos = TimeUnit.MINUTES.toNanos(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be configurable via the HikariPooledConnectionProvider constructor? Right now there isn't a way for the user to set this
* @param key the key associated with the value to be removed/disposed | ||
*/ | ||
public void remove(final K key) { | ||
cleanUp(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd change order of this two lines. In such way, cleanUp() will have less item to check.
* @return the current (existing or computed) value associated with the specified key, or null if | ||
* the computed value is null | ||
*/ | ||
public V computeIfAbsent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove similar method (and sliding expiration related functionality) from CacheMap<> ?
@@ -39,7 +39,9 @@ The wrapper driver currently uses [Hikari](https://github.com/brettwooldridge/Hi | |||
- username | |||
- password | |||
|
|||
You can optionally pass in a `HikariPoolMapping` function as a second parameter to the `HikariPooledConnectionProvider`. This allows you to decide when new connection pools should be created by defining what is included in the pool map key. A new pool will be created each time a new connection is requested with a unique key. By default, a new pool will be created for each unique instance-user combination. If you would like to define a different key system, you should pass in a `HikariPoolMapping` function defining this logic. Note that the user will always be automatically included in the key for security reasons. Please see [ReadWriteSplittingPostgresExample.java](../../../examples/AWSDriverExample/src/main/java/software/amazon/ReadWriteSplittingPostgresExample.java) for an example of how to configure the pool map key. | |||
You can optionally pass in a `HikariPoolMapping` function as a second parameter to the `HikariPooledConnectionProvider`. This allows you to decide when new connection pools should be created by defining what is included in the pool map key. A new pool will be created each time a new connection is requested with a unique key. By default, a new pool will be created for each unique instance-user combination. If you would like to define a different key system, you should pass in a `HikariPoolMapping` function defining this logic. Please see [ReadWriteSplittingPostgresExample.java](../../../examples/AWSDriverExample/src/main/java/software/amazon/ReadWriteSplittingPostgresExample.java) for an example of how to configure the pool map key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we provide a simple example here for the hikaripoolmapping function, and say, for the full example please see ReadWriteSplittingPostgresExample.java
props.setProperty("somePropertyValue", "1"); // used in getPoolKey
final HikariPooledConnectionProvider connProvider =
new HikariPooledConnectionProvider(
ReadWriteSplittingPostgresExample::getHikariConfig,
ReadWriteSplittingPostgresExample::getPoolKey
);
ConnectionProviderManager.setConnectionProvider(connProvider);
private static String getPoolKey(HostSpec hostSpec, Properties props) {
// Include the URL, user, and somePropertyValue in the connection pool key so that a new
// connection pool will be opened for each different instance-user-somePropertyValue
// combination.
final String user = props.getProperty(PropertyDefinition.USER.name);
final String somePropertyValue = props.getProperty("somePropertyValue");
return hostSpec.getUrl() + user + somePropertyValue;
}
# Read Write Splitting Plugin | ||
ReadWriteSplittingPlugin.setReadOnlyOnClosedConnection=setReadOnly cannot be called on a closed connection. | ||
ReadWriteSplittingPlugin.errorSwitchingToCachedReader=An error occurred while trying to switch to a cached reader connection to ''{0}''. The driver will attempt to establish a new reader connection. | ||
ReadWriteSplittingPlugin.errorSwitchingToCachedReaderWithCause=An error occurred while trying to switch to a cached reader connection to ''{0}''. Error message: ''{1}''. The driver will attempt to establish a new reader connection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReadWriteSplittingPlugin.errorSwitchingToCachedReaderWithCause=An error occurred while trying to switch to a cached reader connection to ''{0}''. Error message: ''{1}''. The driver will attempt to establish a new reader connection. | |
ReadWriteSplittingPlugin.errorSwitchingToCachedReaderWithCause=An error occurred while trying to switch a cached reader connection to ''{0}''. Error message: ''{1}''. The driver will attempt to establish a new reader connection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the error message is misleading if we remove 'to'. Removing it makes it sound like the error occurred when we tried to change a cached reader connection to another reader. The actual scenario is that we tried to switch from a writer to a cached reader. I'll rephrase the error message a bit to hopefully be clearer
# Read Write Splitting Plugin | ||
ReadWriteSplittingPlugin.setReadOnlyOnClosedConnection=setReadOnly cannot be called on a closed connection. | ||
ReadWriteSplittingPlugin.errorSwitchingToCachedReader=An error occurred while trying to switch to a cached reader connection to ''{0}''. The driver will attempt to establish a new reader connection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReadWriteSplittingPlugin.errorSwitchingToCachedReader=An error occurred while trying to switch to a cached reader connection to ''{0}''. The driver will attempt to establish a new reader connection. | |
ReadWriteSplittingPlugin.errorSwitchingToCachedReader=An error occurred while trying to switch a cached reader connection to ''{0}''. The driver will attempt to establish a new reader connection. |
wrapper/src/test/java/integration/refactored/container/ConnectionStringHelper.java
Show resolved
Hide resolved
wrapper/src/test/java/integration/refactored/container/tests/AutoscalingTests.java
Show resolved
Hide resolved
final Connection conn1 = DriverManager.getConnection(connString, props); | ||
connections.add(conn1); | ||
final Connection conn2 = DriverManager.getConnection(connString, props); | ||
connections.add(conn2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final Connection conn1 = DriverManager.getConnection(connString, props); | |
connections.add(conn1); | |
final Connection conn2 = DriverManager.getConnection(connString, props); | |
connections.add(conn2); | |
// Create 2 connections per instance. | |
connections.add(DriverManager.getConnection(connString, props)); | |
connections.add(DriverManager.getConnection(connString, props)); |
Summary
Description
Additional Reviewers
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.