Skip to content

Update and extend tests for connection_acquisition_timeout setting. #474

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion nutkit/frontend/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ class Driver:
def __init__(self, backend, uri, auth_token, user_agent=None,
resolver_fn=None, domain_name_resolver_fn=None,
connection_timeout_ms=None, fetch_size=None,
max_tx_retry_time_ms=None, encrypted=None,
max_tx_retry_time_ms=None, session_connection_timeout_ms=None,
update_routing_table_timeout_ms=None, encrypted=None,
trusted_certificates=None, liveness_check_timeout_ms=None,
max_connection_pool_size=None,
connection_acquisition_timeout_ms=None):
Expand All @@ -18,6 +19,8 @@ def __init__(self, backend, uri, auth_token, user_agent=None,
resolverRegistered=resolver_fn is not None,
domainNameResolverRegistered=domain_name_resolver_fn is not None,
connectionTimeoutMs=connection_timeout_ms,
sessionConnectionTimeoutMs=session_connection_timeout_ms,
updateRoutingTableTimeoutMs=update_routing_table_timeout_ms,
fetchSize=fetch_size, maxTxRetryTimeMs=max_tx_retry_time_ms,
encrypted=encrypted, trustedCertificates=trusted_certificates,
liveness_check_timeout_ms=liveness_check_timeout_ms,
Expand Down
12 changes: 11 additions & 1 deletion nutkit/protocol/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

class Feature(Enum):
# === FUNCTIONAL FEATURES ===
# The driver offers a configuration option to limit time it spend at most,
# The driver offers a configuration option to limit time it spends at most,
# trying to acquire a connection from the pool.
# The connection acquisition timeout must account for the whole acquisition
# execution time, whether a new connection is created, an idle connection
Expand Down Expand Up @@ -40,6 +40,13 @@ class Feature(Enum):
# If there are more than records, the driver emits a warning.
# This method is supposed to always exhaust the result stream.
API_RESULT_SINGLE_OPTIONAL = "Feature:API:Result.SingleOptional"
# The driver offers a configuration option to limit time it spends at most,
# trying to acquire a usable read/write connection for any session.
# The connection acquisition timeout must account for the whole acquisition
# execution time, whether a new connection is created, an idle connection
# is picked up instead, we need to wait until the full pool depletes, or
# a routing table must be fetched.
API_SESSION_CONNECTION_TIMEOUT = "Feature:API:SessionConnectionTimeout"
# The driver implements explicit configuration options for SSL.
# - enable / disable SSL
# - verify signature against system store / custom cert / not at all
Expand All @@ -53,6 +60,9 @@ class Feature(Enum):
API_TYPE_SPATIAL = "Feature:API:Type.Spatial"
# The driver supports sending and receiving temporal data types.
API_TYPE_TEMPORAL = "Feature:API:Type.Temporal"
# The driver offers a configuration option to limit time it spends at most,
# trying to update the routing table whenever needed.
API_UPDATE_ROUTING_TABLE_TIMEOUT = "Feature:API:UpdateRoutingTableTimeout"
# The driver supports single-sign-on (SSO) by providing a bearer auth token
# API.
AUTH_BEARER = "Feature:Auth:Bearer"
Expand Down
3 changes: 3 additions & 0 deletions nutkit/protocol/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class NewDriver:
def __init__(
self, uri, authToken, userAgent=None, resolverRegistered=False,
domainNameResolverRegistered=False, connectionTimeoutMs=None,
sessionConnectionTimeoutMs=None, updateRoutingTableTimeoutMs=None,
fetchSize=None, maxTxRetryTimeMs=None, encrypted=None,
trustedCertificates=None, liveness_check_timeout_ms=None,
max_connection_pool_size=None,
Expand All @@ -82,6 +83,8 @@ def __init__(
self.resolverRegistered = resolverRegistered
self.domainNameResolverRegistered = domainNameResolverRegistered
self.connectionTimeoutMs = connectionTimeoutMs
self.sessionConnectionTimeoutMs = sessionConnectionTimeoutMs
self.updateRoutingTableTimeoutMs = updateRoutingTableTimeoutMs
self.fetchSize = fetchSize
self.maxTxRetryTimeMs = maxTxRetryTimeMs
self.livenessCheckTimeoutMs = liveness_check_timeout_ms
Expand Down
12 changes: 12 additions & 0 deletions tests/stub/driver_parameters/scripts/router.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
!: BOLT 5.0
!: AUTO RESET
!: ALLOW RESTART

A: HELLO {"{}": "*"}
*: RESET
{+
C: ROUTE "*" "*" "*"
S: SUCCESS { "rt": { "ttl": 1000, "servers": [{"addresses": ["#HOST#:9000"], "role":"ROUTE"}, {"addresses": ["#HOST#:9010"], "role":"READ"}, {"addresses": ["#HOST#:9010"], "role":"WRITE"}]}}
*: RESET
+}
?: GOODBYE
16 changes: 16 additions & 0 deletions tests/stub/driver_parameters/scripts/router_hello_delay.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
!: BOLT 5.0
!: AUTO RESET
!: ALLOW RESTART

C: HELLO {"{}": "*"}
S: <SLEEP> 2
<NOOP>
<SLEEP> 2
SUCCESS {"server": "Neo4j/5.0.0", "connection_id": "bolt-123456789"}
*: RESET
{+
C: ROUTE "*" "*" "*"
S: SUCCESS { "rt": { "ttl": 1000, "servers": [{"addresses": ["#HOST#:9000"], "role":"ROUTE"}, {"addresses": ["#HOST#:9010"], "role":"READ"}, {"addresses": ["#HOST#:9010"], "role":"WRITE"}]}}
*: RESET
+}
?: GOODBYE
15 changes: 15 additions & 0 deletions tests/stub/driver_parameters/scripts/router_route_delay.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
!: BOLT 5.0
!: AUTO RESET
!: ALLOW RESTART

A: HELLO {"{}": "*"}
*: RESET
{+
C: ROUTE "*" "*" "*"
S: <SLEEP> 2
<NOOP>
<SLEEP> 2
SUCCESS { "rt": { "ttl": 1000, "servers": [{"addresses": ["#HOST#:9000"], "role":"ROUTE"}, {"addresses": ["#HOST#:9010"], "role":"READ"}, {"addresses": ["#HOST#:9010"], "role":"WRITE"}]}}
*: RESET
+}
?: GOODBYE
12 changes: 12 additions & 0 deletions tests/stub/driver_parameters/scripts/session_run.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
!: BOLT 5.0
!: ALLOW CONCURRENT

A: HELLO {"{}": "*"}
*: RESET
C: RUN "*" "*" "*"
S: SUCCESS {"fields": ["n"]}
C: PULL {"n": "*"}
S: RECORD [1]
SUCCESS {"type": "r"}
*: RESET
?: GOODBYE
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
!: BOLT 4.4
!: BOLT 5.0
!: ALLOW CONCURRENT

C: HELLO {"{}": "*"}
S: <SLEEP> 2
<NOOP>
<SLEEP> 2
<NOOP>
<SLEEP> 2
SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"}
SUCCESS {"server": "Neo4j/5.0.0", "connection_id": "bolt-123456789"}
*: RESET
C: RUN "*" "*" "*"
S: SUCCESS {"fields": ["n"]}
C: PULL {"n": "*"}
S: RECORD [1]
SUCCESS {"type": "r"}

*: RESET
?: GOODBYE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
!: BOLT 4.4
!: BOLT 5.0
!: ALLOW CONCURRENT

A: HELLO {"{}": "*"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class TestConnectionAcquisitionTimeoutMs(TestkitTestCase):
"""
Connection Acquition Timeout Tests.
Connection Acquisition Timeout Tests.

The connection acquisition timeout must account for the
whole acquisition execution time, whether a new connection is created,
Expand All @@ -32,20 +32,22 @@ class TestConnectionAcquisitionTimeoutMs(TestkitTestCase):
"""

required_features = (
types.Feature.BOLT_4_4,
types.Feature.API_CONNECTION_ACQUISITION_TIMEOUT
types.Feature.BOLT_5_0,
types.Feature.API_CONNECTION_ACQUISITION_TIMEOUT,
)

def setUp(self):
super().setUp()
self._server = StubServer(9001)
self._server = StubServer(9010)
self._router = StubServer(9000)
self._driver = None
self._session = None
self._sessions = []
self._txs = []

def tearDown(self) -> None:
def tearDown(self):
self._server.reset()
self._router.reset()
for tx in self._txs:
with self.assertRaises(types.DriverError):
# The server does not accept ending the transaction.
Expand All @@ -63,6 +65,10 @@ def tearDown(self) -> None:

return super().tearDown()

def _start_server(self, server, script):
server.start(self.script_path(script),
vars_={"#HOST#": self._router.host})

def test_should_work_when_every_step_is_done_in_time(self):
"""
Everything in time scenario.
Expand All @@ -75,9 +81,7 @@ def test_should_work_when_every_step_is_done_in_time(self):

Then the query is executed successfully
"""
self._server.start(
self.script_path("session_run_auth_delay.script")
)
self._start_server(self._server, "session_run_auth_delay.script")

auth = types.AuthorizationToken("basic", principal="neo4j",
credentials="pass")
Expand All @@ -88,7 +92,7 @@ def test_should_work_when_every_step_is_done_in_time(self):

self._session = self._driver.session("r")

list(self._session.run("RETURN 1 as n"))
list(self._session.run("RETURN 1 AS n"))

def test_should_encompass_the_handshake_time(self):
"""
Expand All @@ -104,9 +108,7 @@ def test_should_encompass_the_handshake_time(self):
Then the query is not executed since the connection acquisition
timed out.
"""
self._server.start(
self.script_path("session_run_auth_delay.script")
)
self._start_server(self._server, "session_run_auth_delay.script")

auth = types.AuthorizationToken("basic", principal="neo4j",
credentials="pass")
Expand All @@ -118,11 +120,11 @@ def test_should_encompass_the_handshake_time(self):
self._session = self._driver.session("r")

with self.assertRaises(types.DriverError):
list(self._session.run("RETURN 1 as n"))
list(self._session.run("RETURN 1 AS n"))

def test_should_fail_when_acquisition_timeout_is_reached_first(self):
"""
Connection creation bigger then acquisition timeout scenario.
Connection creation bigger than acquisition timeout scenario.

This test scenario tests the case where:

Expand All @@ -146,11 +148,11 @@ def test_should_fail_when_acquisition_timeout_is_reached_first(self):
self._session = self._driver.session("r")

with self.assertRaises(types.DriverError):
list(self._session.run("RETURN 1 as n"))
list(self._session.run("RETURN 1 AS n"))

def test_should_fail_when_connection_timeout_is_reached_first(self):
"""
Acquisition timeout bigger then connection creation timeout scenario.
Acquisition timeout bigger than connection creation timeout scenario.

This test scenario tests the case where:

Expand All @@ -174,27 +176,64 @@ def test_should_fail_when_connection_timeout_is_reached_first(self):
self._session = self._driver.session("r")

with self.assertRaises(types.DriverError):
list(self._session.run("RETURN 1 as n"))
list(self._session.run("RETURN 1 AS n"))

def test_does_not_encompass_router_handshake(self):
self._start_server(self._router, "router_hello_delay.script")
self._start_server(self._server, "session_run.script")

uri = "neo4j://%s" % self._router.address
auth = types.AuthorizationToken("basic", principal="neo4j",
credentials="pass")
self._driver = Driver(self._backend, uri, auth,
connection_acquisition_timeout_ms=2000,
connection_timeout_ms=720000)
self._session = self._driver.session("r")
list(self._session.run("RETURN 1 AS n"))

self._session.close()
self._session = None
self._driver.close()
self._driver = None
self._router.done()
self._server.done()

def test_does_not_encompass_router_route_response(self):
self._start_server(self._router, "router_route_delay.script")
self._start_server(self._server, "session_run.script")

uri = "neo4j://%s" % self._router.address
auth = types.AuthorizationToken("basic", principal="neo4j",
credentials="pass")
self._driver = Driver(self._backend, uri, auth,
connection_acquisition_timeout_ms=2000,
connection_timeout_ms=720000)
self._session = self._driver.session("r")
list(self._session.run("RETURN 1 AS n"))

self._session.close()
self._session = None
self._driver.close()
self._driver = None
self._router.done()
self._server.done()

@driver_feature(types.Feature.OPT_EAGER_TX_BEGIN)
def test_should_regulate_the_time_for_acquiring_connections(self):
"""
No connection available scenario.

This test scenario tests the case where:

1. the connection acquisition timeout is higher than
the connection creation timeout
2. the connection is successfully created and in due time
3. the connection pool doesn't have connections available in
suitable time
1. The connection pool is configured for max 1 connection
2. A connection is acquired and locked by another transaction
3. When the new session try to acquire a connection, the connection
pool doesn't have connections available in suitable time

Then the begin transaction is not executed
since the connection acquisition times out.
"""
self._server.start(
self.script_path("tx_without_commit_or_rollback.script")
)
self._start_server(self._server,
"tx_without_commit_or_rollback.script")

auth = types.AuthorizationToken("basic", principal="neo4j",
credentials="pass")
Expand Down
20 changes: 16 additions & 4 deletions tests/stub/driver_parameters/test_max_connection_pool_size.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@

class TestMaxConnectionPoolSize(TestkitTestCase):

required_features = types.Feature.BOLT_4_4,
required_features = types.Feature.BOLT_5_0,

def setUp(self):
super().setUp()
# This needs to be a port that's not used by other tests.
# Else, when testing the javascript driver in a browser (specifically
# Firefox), the browser might block this port for the driver after this
# test for security reasons.
self._server = StubServer(9999)
self._server.start(
self.script_path("tx_without_commit_or_rollback.script")
Expand All @@ -24,7 +28,7 @@ def setUp(self):

def tearDown(self):
# If test raised an exception this will make sure that the stub server
# is killed and it's output is dumped for analysis.
# is killed, and it's output is dumped for analysis.
self._server.reset()
for tx in self._transactions:
with self.assertRaises(types.DriverError):
Expand All @@ -44,6 +48,10 @@ def _open_driver(self, max_pool_size=None):
types.Feature.API_CONNECTION_ACQUISITION_TIMEOUT
):
kwargs["connection_acquisition_timeout_ms"] = 500
if self.driver_supports_features(
types.Feature.API_SESSION_CONNECTION_TIMEOUT
):
kwargs["session_connection_timeout_ms"] = 1000
if max_pool_size is not None:
kwargs["max_connection_pool_size"] = max_pool_size
auth = types.AuthorizationToken("basic", principal="neo4j",
Expand All @@ -53,8 +61,12 @@ def _open_driver(self, max_pool_size=None):

@contextmanager
def _backend_timeout_adjustment(self):
if self.driver_supports_features(
types.Feature.API_CONNECTION_ACQUISITION_TIMEOUT
if any(
self.driver_supports_features(feature)
for feature in (
types.Feature.API_CONNECTION_ACQUISITION_TIMEOUT,
types.Feature.API_SESSION_CONNECTION_TIMEOUT,
)
):
yield
else:
Expand Down
Loading