Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,22 @@
public class LimitlessConnectionContext {
private HostSpec hostSpec;
private Properties props;
private Properties origProps;
private Connection connection;
private JdbcCallable<Connection, SQLException> connectFunc;
private List<HostSpec> limitlessRouters;

public LimitlessConnectionContext(
final HostSpec hostSpec,
final Properties props,
final Properties origProps,
final Connection connection,
final JdbcCallable<Connection, SQLException> connectFunc,
final List<HostSpec> limitlessRouters
) {
this.hostSpec = hostSpec;
this.props = props;
this.origProps = origProps;
this.connection = connection;
this.connectFunc = connectFunc;
this.limitlessRouters = limitlessRouters;
Expand All @@ -53,6 +56,10 @@ public Properties getProps() {
return this.props;
}

public Properties getOrigProps() {
return this.origProps;
}

public Connection getConnection() {
return this.connection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ public Connection connect(

final Properties copyProps = PropertyUtils.copyProperties(props);
copyProps.setProperty(INTERNAL_CONNECT_PROPERTY_NAME, "true");
return connectInternal(driverProtocol, hostSpec, copyProps, isInitialConnection, connectFunc);
return connectInternal(driverProtocol, hostSpec, props, copyProps, isInitialConnection, connectFunc);
}

public Connection connectInternal(
final String driverProtocol,
final HostSpec hostSpec,
final Properties props,
final Properties origProps,
final Properties copyProps,
final boolean isInitialConnection,
final JdbcCallable<Connection, SQLException> connectFunc)
throws SQLException {
Expand All @@ -143,7 +144,8 @@ public Connection connectInternal(

final LimitlessConnectionContext context = new LimitlessConnectionContext(
hostSpec,
props,
copyProps,
origProps,
conn,
connectFunc,
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ public void run() {
newLimitlessRouters,
TimeUnit.MILLISECONDS.toNanos(LimitlessRouterServiceImpl.MONITOR_DISPOSAL_TIME_MS.getLong(props)));

RoundRobinHostSelector.setRoundRobinHostWeightPairsProperty(this.props, newLimitlessRouters);
LOGGER.finest(Utils.logTopology(newLimitlessRouters, "[limitlessRouterMonitor] Topology:"));
TimeUnit.MILLISECONDS.sleep(this.intervalMs); // do not include this in the telemetry
} catch (final InterruptedException exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ public void establishConnection(final LimitlessConnectionContext context) throws
return;
}

RoundRobinHostSelector.setRoundRobinHostWeightPairsProperty(context.getProps(), context.getLimitlessRouters());
RoundRobinHostSelector.setRoundRobinHostWeightPairsProperty(
context.getOrigProps(),
context.getLimitlessRouters());
HostSpec selectedHostSpec;
try {
selectedHostSpec = this.pluginService.getHostSpecByStrategy(
Expand Down Expand Up @@ -165,7 +167,7 @@ public void establishConnection(final LimitlessConnectionContext context) throws
}
}

protected List<HostSpec> getLimitlessRouters(final String clusterId, final Properties props) throws SQLException {
protected List<HostSpec> getLimitlessRouters(final String clusterId, final Properties props) {
final long cacheExpirationNano = TimeUnit.MILLISECONDS.toNanos(
MONITOR_DISPOSAL_TIME_MS.getLong(props));
return limitlessRouterCache.get(clusterId, cacheExpirationNano);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import software.amazon.jdbc.RoundRobinHostSelector;
import software.amazon.jdbc.hostavailability.HostAvailability;
import software.amazon.jdbc.hostavailability.SimpleHostAvailabilityStrategy;
import software.amazon.jdbc.util.PropertyUtils;
import software.amazon.jdbc.wrapper.HighestWeightHostSelector;

public class LimitlessRouterServiceImplTest {
Expand All @@ -70,6 +71,7 @@ public void init() throws SQLException {
props = new Properties();
when(mockConnectFuncLambda.call()).thenReturn(mockConnection);
when(mockPluginService.getHostListProvider()).thenReturn(mockHostListProvider);
when(mockPluginService.getProperties()).thenReturn(props);
when(mockHostListProvider.getClusterId()).thenReturn(CLUSTER_ID);
}

Expand All @@ -85,6 +87,7 @@ void testEstablishConnection_GivenGetEmptyRouterListAndWaitForRouterInfo_ThenThr

final LimitlessConnectionContext inputContext = new LimitlessConnectionContext(
hostSpec,
PropertyUtils.copyProperties(props),
props,
null,
mockConnectFuncLambda,
Expand All @@ -103,6 +106,7 @@ void testEstablishConnection_GivenGetEmptyRouterListAndNoWaitForRouterInfo_ThenC
props.setProperty(LimitlessConnectionPlugin.WAIT_FOR_ROUTER_INFO.name, "false");
final LimitlessConnectionContext inputContext = new LimitlessConnectionContext(
hostSpec,
PropertyUtils.copyProperties(props),
props,
null,
mockConnectFuncLambda,
Expand Down Expand Up @@ -132,6 +136,7 @@ void testEstablishConnection_GivenHostSpecInRouterCache_ThenCallConnectFunc() th

final LimitlessConnectionContext inputContext = new LimitlessConnectionContext(
routerList.get(1),
PropertyUtils.copyProperties(props),
props,
null,
mockConnectFuncLambda,
Expand Down Expand Up @@ -163,6 +168,7 @@ void testEstablishConnection_GivenFetchRouterListAndHostSpecInRouterList_ThenCal

final LimitlessConnectionContext inputContext = new LimitlessConnectionContext(
routerList.get(1),
PropertyUtils.copyProperties(props),
props,
null,
mockConnectFuncLambda,
Expand Down Expand Up @@ -199,6 +205,7 @@ void testEstablishConnection_GivenRouterCache_ThenSelectsHost() throws SQLExcept

final LimitlessConnectionContext inputContext = new LimitlessConnectionContext(
hostSpec,
PropertyUtils.copyProperties(props),
props,
null,
mockConnectFuncLambda,
Expand Down Expand Up @@ -234,6 +241,7 @@ void testEstablishConnection_GivenFetchRouterList_ThenSelectsHost() throws SQLEx

final LimitlessConnectionContext inputContext = new LimitlessConnectionContext(
hostSpec,
PropertyUtils.copyProperties(props),
props,
null,
mockConnectFuncLambda,
Expand Down Expand Up @@ -267,6 +275,7 @@ void testEstablishConnection_GivenHostSpecInRouterCacheAndCallConnectFuncThrows_
final HostSpec selectedRouter = routerList.get(2);
final LimitlessConnectionContext inputContext = new LimitlessConnectionContext(
routerList.get(1),
PropertyUtils.copyProperties(props),
props,
null,
mockConnectFuncLambda,
Expand Down Expand Up @@ -311,6 +320,7 @@ void testEstablishConnection_GivenSelectsHostThrows_ThenRetry() throws SQLExcept

final LimitlessConnectionContext inputContext = new LimitlessConnectionContext(
hostSpec,
PropertyUtils.copyProperties(props),
props,
null,
mockConnectFuncLambda,
Expand Down Expand Up @@ -350,6 +360,7 @@ void testEstablishConnection_GivenSelectsHostNull_ThenRetry() throws SQLExceptio

final LimitlessConnectionContext inputContext = new LimitlessConnectionContext(
hostSpec,
PropertyUtils.copyProperties(props),
props,
null,
mockConnectFuncLambda,
Expand Down Expand Up @@ -392,6 +403,7 @@ void testEstablishConnection_GivenPluginServiceConnectThrows_ThenRetry() throws

final LimitlessConnectionContext inputContext = new LimitlessConnectionContext(
hostSpec,
PropertyUtils.copyProperties(props),
props,
null,
mockConnectFuncLambda,
Expand Down Expand Up @@ -429,6 +441,7 @@ void testEstablishConnection_GivenRetryAndMaxRetriesExceeded_thenThrowSqlExcepti

final LimitlessConnectionContext inputContext = new LimitlessConnectionContext(
routerList.get(0),
PropertyUtils.copyProperties(props),
props,
null,
mockConnectFuncLambda,
Expand Down
Loading