From e354a5cec8b57364299b0029eaaeb23f6f458423 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Fri, 4 Mar 2022 13:54:29 +0100 Subject: [PATCH 1/3] Fix verifyConnectivy/getServerInfo procedure The previous implementation was checking if all the servers are available instead of check if at least one server is available. This should be done by iterate over the servers until it find one server available. --- .../connection-provider-routing.js | 16 +- .../connection-provider-routing.test.js | 163 ++++++++++++++++-- 2 files changed, 158 insertions(+), 21 deletions(-) diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js index 75b7ee77a..d91b7b05e 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js @@ -251,9 +251,21 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider }) const servers = accessMode === WRITE ? routingTable.writers : routingTable.readers + + let error = newError( + `No servers available for database '${context.database}' with access mode '${accessMode}'`, + SERVICE_UNAVAILABLE + ) - return Promise.all(servers.map(address => this._verifyConnectivityAndGetServerVersion({ address }))) - .then(([serverInfo]) => serverInfo) + for (const address of servers) { + try { + const serverInfo = await this._verifyConnectivityAndGetServerVersion({ address }) + return serverInfo + } catch (e) { + error = e + } + } + throw error } forget (address, database) { diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js index 6089ffbad..e8e2aeec5 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js @@ -2493,24 +2493,24 @@ describe('#unit RoutingConnectionProvider', () => { expect(serverInfo).toEqual(new ServerInfo(server, protocolVersion)) }) - it('should acquire, resetAndFlush and release connections for sever with the selected access mode', async () => { + it('should acquire, resetAndFlush and release connections for sever first', async () => { const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup() const acquireSpy = jest.spyOn(pool, 'acquire') await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers - for (const address of targetServers) { - expect(acquireSpy).toHaveBeenCalledWith(address) + const address = targetServers[0] + expect(acquireSpy).toHaveBeenCalledWith(address) - const connections = seenConnectionsPerAddress.get(address) + const connections = seenConnectionsPerAddress.get(address) - expect(connections.length).toBe(1) - expect(connections[0].resetAndFlush).toHaveBeenCalled() - expect(connections[0]._release).toHaveBeenCalled() - expect(connections[0]._release.mock.invocationCallOrder[0]) - .toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0]) - } + expect(connections.length).toBe(1) + expect(connections[0].resetAndFlush).toHaveBeenCalled() + expect(connections[0]._release).toHaveBeenCalled() + expect(connections[0]._release.mock.invocationCallOrder[0]) + .toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0]) + }) it('should not acquire, resetAndFlush and release connections for sever with the other access mode', async () => { @@ -2527,20 +2527,97 @@ describe('#unit RoutingConnectionProvider', () => { }) describe('when the reset and flush fails for at least one the address', () => { - it('should fails with the reset and flush error', async () => { + it('should succeed with the server info ', async () => { const error = newError('Error') let i = 0 const resetAndFlush = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve()) - const { connectionProvider } = setup({ resetAndFlush }) + const { connectionProvider, server, protocolVersion } = setup({ resetAndFlush }) + + const serverInfo = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + + expect(serverInfo).toEqual(new ServerInfo(server, protocolVersion)) + }) + + it('should release the connection', async () => { + const error = newError('Error') + let i = 0 + const resetAndFlush = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve()) + const { connectionProvider, seenConnectionsPerAddress, routingTable } = setup({ resetAndFlush }) try { await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) - expect().toBe('Not reached') } catch (e) { + } finally { + const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers + for (const address of targetServers) { + const connections = seenConnectionsPerAddress.get(address) + + expect(connections.length).toBe(1) + expect(connections[0].resetAndFlush).toHaveBeenCalled() + expect(connections[0]._release).toHaveBeenCalled() + } + } + }) + + describe('and the release fails', () => { + it('should fails with the release error', async () => { + const error = newError('Error') + const releaseError = newError('Release error') + let i = 0 + const resetAndFlush = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve()) + const releaseMock = jest.fn(() => Promise.reject(releaseError)) + const { connectionProvider } = setup({ resetAndFlush, releaseMock }) + + try { + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + expect().toBe('Not reached') + } catch (e) { + expect(e).toBe(releaseError) + } + }) + }) + + }) + + describe('when the reset and flush fails for all addresses', () => { + it('should succeed with the server info ', async () => { + const error = newError('Error') + const resetAndFlush = jest.fn(() => Promise.reject(error)) + const { connectionProvider } = setup({ resetAndFlush }) + + try { + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + expect().toBe('Not reached') + } catch(e) { expect(e).toBe(error) } }) + it('should acquire, resetAndFlush and release connections for all servers', async () => { + const resetAndFlush = jest.fn(() => Promise.reject(error)) + const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup({ resetAndFlush }) + const acquireSpy = jest.spyOn(pool, 'acquire') + + try { + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + } catch (e) { + // nothing to do + } finally { + const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers + for (const address of targetServers) { + expect(acquireSpy).toHaveBeenCalledWith(address) + + const connections = seenConnectionsPerAddress.get(address) + + expect(connections.length).toBe(1) + expect(connections[0].resetAndFlush).toHaveBeenCalled() + expect(connections[0]._release).toHaveBeenCalled() + expect(connections[0]._release.mock.invocationCallOrder[0]) + .toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0]) + } + } + }) + it('should release the connection', async () => { const error = newError('Error') let i = 0 @@ -2583,16 +2660,28 @@ describe('#unit RoutingConnectionProvider', () => { }) describe('when the release for at least one the address', () => { - it('should fails with the reset and flush error', async () => { + it('should succeed with the server info', async () => { const error = newError('Error') let i = 0 const releaseMock = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve()) + const { connectionProvider, server, protocolVersion } = setup({ releaseMock }) + + const serverInfo = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + + expect(serverInfo).toEqual(new ServerInfo(server, protocolVersion)) + }) + }) + + describe('when the release for all address', () => { + it('should fail with release error', async () => { + const error = newError('Error') + const releaseMock = jest.fn(() => Promise.reject(error)) const { connectionProvider } = setup({ releaseMock }) try { await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) expect().toBe('Not reached') - } catch (e) { + } catch(e) { expect(e).toBe(error) } }) @@ -2668,7 +2757,7 @@ describe('#unit RoutingConnectionProvider', () => { }) describe('when at least the one of the connections could not be created', () => { - it('should reject with acquistion timeout error', async () => { + it('should succeed with the server info', async () => { let i = 0 const error = new Error('Connection creation error') const routingTable = newRoutingTable( @@ -2694,13 +2783,49 @@ describe('#unit RoutingConnectionProvider', () => { pool ) + const serverInfo = connectionProvider = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + expect(serverInfo).toEqual(new ServerInfo({}, 4.4)) + }) + }) + + describe('when there is not server available in the routing table', () => { + it('should thown an discovery error', async () => { + const expectedError = newError( + `No servers available for database '${database || null}' with access mode '${accessMode || READ}'`, + SERVICE_UNAVAILABLE + ) + const routingTable = newRoutingTable( + database || null, + [server1, server2], + accessMode === READ ? [] : [server3, server4], + accessMode === WRITE ? [] : [server5, server6], + Integer.MAX_SAFE_VALUE + ) + + const routingTableToRouters = {} + const routingMap = {} + routingMap[server0.asKey()] = routingTable + routingMap[server1.asKey()] = routingTable + routingMap[server2.asKey()] = routingTable + routingTableToRouters[database || null] = routingMap + + const connectionProvider = newRoutingConnectionProviderWithSeedRouter( + server0, + [server0], // seed router address resolves just to itself + [ + routingTable + ], + routingTableToRouters + ) + try { - connectionProvider = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) expect().toBe('not reached') - } catch (e) { - expect(e).toBe(error) + } catch (error) { + expect(error).toEqual(expectedError) } }) + }) }) }) From c2e600dc46a17e08f9936d3f60055705293efbc5 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Fri, 4 Mar 2022 16:33:36 +0100 Subject: [PATCH 2/3] Removing empty line --- .../test/connection-provider/connection-provider-routing.test.js | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js index e8e2aeec5..7f4d1a437 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js @@ -2825,7 +2825,6 @@ describe('#unit RoutingConnectionProvider', () => { expect(error).toEqual(expectedError) } }) - }) }) }) From 33a5a01408f6fc78bb6df33304137e91cee18bf4 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Fri, 4 Mar 2022 17:06:40 +0100 Subject: [PATCH 3/3] Rouven --- .../test/connection-provider/connection-provider-routing.test.js | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js index 7f4d1a437..f6151a4d4 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js @@ -2576,7 +2576,6 @@ describe('#unit RoutingConnectionProvider', () => { } }) }) - }) describe('when the reset and flush fails for all addresses', () => {