Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
ConnectionErrorHandler
} from '../connection'
import { internal, error } from 'neo4j-driver-core'
import { controller } from '../lang'

const {
constants: { BOLT_PROTOCOL_V3, BOLT_PROTOCOL_V4_0, BOLT_PROTOCOL_V4_4 }
Expand All @@ -49,12 +50,17 @@ export default class DirectConnectionProvider extends PooledConnectionProvider {
this._handleAuthorizationExpired(error, address, database)
})

return this._connectionPool
.acquire(this._address)
.then(
connection =>
new DelegateConnection(connection, databaseSpecificErrorHandler)
)
const acquireConnectionJob = {
run: () => this._connectionPool
.acquire(this._address)
.then(
connection =>
new DelegateConnection(connection, databaseSpecificErrorHandler)
),
onTimeout: connection => connection._release()
}

return controller.runWithTimeout(this._sessionConnectionTimeoutConfig, acquireConnectionJob)
}

_handleAuthorizationExpired (error, address, database) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { createChannelConnection, ConnectionErrorHandler } from '../connection'
import Pool, { PoolConfig } from '../pool'
import { error, ConnectionProvider, ServerInfo } from 'neo4j-driver-core'
import { error, newError, ConnectionProvider, ServerInfo } from 'neo4j-driver-core'

const { SERVICE_UNAVAILABLE } = error
export default class PooledConnectionProvider extends ConnectionProvider {
Expand Down Expand Up @@ -58,6 +58,12 @@ export default class PooledConnectionProvider extends ConnectionProvider {
log: this._log
})
this._openConnections = {}
this._sessionConnectionTimeoutConfig = {
timeout: this._config.sessionConnectionTimeout,
reason: () => newError(
`Session acquisition timed out in ${this._config.sessionConnectionTimeout} ms.`
)
}
}

_createConnectionErrorHandler () {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { HostNameResolver } from '../channel'
import SingleConnectionProvider from './connection-provider-single'
import PooledConnectionProvider from './connection-provider-pooled'
import { LeastConnectedLoadBalancingStrategy } from '../load-balancing'
import { controller } from '../lang'
import {
createChannelConnection,
ConnectionErrorHandler,
Expand Down Expand Up @@ -75,6 +76,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
)
})

this._updateRoutingTableTimeoutConfig = {
timeout: this._config.updateRoutingTableTimeout,
reason: () => newError(
`Routing table update timed out in ${this._config.updateRoutingTableTimeout} ms.`
)
}

this._routingContext = { ...routingContext, address: address.toString() }
this._seedRouter = address
this._rediscovery = new Rediscovery(this._routingContext)
Expand Down Expand Up @@ -143,53 +151,65 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
this._handleAuthorizationExpired(error, address, context.database)
)

const routingTable = await this._freshRoutingTable({
accessMode,
database: context.database,
bookmarks: bookmarks,
impersonatedUser,
onDatabaseNameResolved: (databaseName) => {
context.database = context.database || databaseName
if (onDatabaseNameResolved) {
onDatabaseNameResolved(databaseName)
const refreshRoutingTableJob = {
run: async () => {
const routingTable = await this._freshRoutingTable({
accessMode,
database: context.database,
bookmarks: bookmarks,
impersonatedUser,
onDatabaseNameResolved: (databaseName) => {
context.database = context.database || databaseName
if (onDatabaseNameResolved) {
onDatabaseNameResolved(databaseName)
}
}
})

// select a target server based on specified access mode
if (accessMode === READ) {
address = this._loadBalancingStrategy.selectReader(routingTable.readers)
name = 'read'
} else if (accessMode === WRITE) {
address = this._loadBalancingStrategy.selectWriter(routingTable.writers)
name = 'write'
} else {
throw newError('Illegal mode ' + accessMode)
}
}
})

// select a target server based on specified access mode
if (accessMode === READ) {
address = this._loadBalancingStrategy.selectReader(routingTable.readers)
name = 'read'
} else if (accessMode === WRITE) {
address = this._loadBalancingStrategy.selectWriter(routingTable.writers)
name = 'write'
} else {
throw newError('Illegal mode ' + accessMode)
}

// we couldn't select a target server
if (!address) {
throw newError(
`Failed to obtain connection towards ${name} server. Known routing table is: ${routingTable}`,
SESSION_EXPIRED
)
// we couldn't select a target server
if (!address) {
throw newError(
`Failed to obtain connection towards ${name} server. Known routing table is: ${routingTable}`,
SESSION_EXPIRED
)
}
return { routingTable, address }
}
}

try {
const connection = await this._acquireConnectionToServer(
address,
name,
routingTable
)
const acquireConnectionJob = {
run: async ({ routingTable, address }) => {
try {
const connection = await this._acquireConnectionToServer(
address,
name,
routingTable
)

return new DelegateConnection(connection, databaseSpecificErrorHandler)
} catch (error) {
const transformed = databaseSpecificErrorHandler.handleAndTransformError(
error,
address
)
throw transformed
return new DelegateConnection(connection, databaseSpecificErrorHandler)
} catch (error) {
const transformed = databaseSpecificErrorHandler.handleAndTransformError(
error,
address
)
throw transformed
}
},
onTimeout: connection => connection._release()
}

return controller.runWithTimeout(this._sessionConnectionTimeoutConfig, refreshRoutingTableJob, acquireConnectionJob)
}

async _hasProtocolVersion (versionPredicate) {
Expand Down Expand Up @@ -302,18 +322,23 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
}

_freshRoutingTable ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved } = {}) {
const currentRoutingTable = this._routingTableRegistry.get(
database,
() => new RoutingTable({ database })
)
const refreshRoutingTableJob = {
run: () => {
const currentRoutingTable = this._routingTableRegistry.get(
database,
() => new RoutingTable({ database })
)

if (!currentRoutingTable.isStaleFor(accessMode)) {
return currentRoutingTable
if (!currentRoutingTable.isStaleFor(accessMode)) {
return currentRoutingTable
}
this._log.info(
`Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}`
)
return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved)
}
}
this._log.info(
`Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}`
)
return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved)
return controller.runWithTimeout(this._updateRoutingTableTimeoutConfig, refreshRoutingTableJob)
}

_refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved) {
Expand Down
55 changes: 55 additions & 0 deletions packages/bolt-connection/src/lang/controller.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

export function runWithTimeout ({ timeout, reason }, ...jobs) {
const status = { timedout: false }
async function _run (currentValue, { resolve, reject }, myJobs) {
const [{ run, onTimeout = () => Promise.resolve() }, ...otherJobs] = myJobs
try {
const value = await run(currentValue)
if (status.timedout) {
await onTimeout(value).catch(() => {})
} else if (otherJobs.length === 0) {
resolve(value)
} else {
await _run(value, { resolve, reject }, otherJobs)
}
} catch (e) {
if (!status.timedout) {
reject(e)
}
}
}

return new Promise((resolve, reject) => {
if (timeout != null) {
status.timeoutHandle = setTimeout(() => {
status.timedout = true
reject(reason())
}, timeout)
}

_run(undefined, { resolve, reject }, jobs)
.finally(() => {
if (status.timeoutHandle != null) {
clearTimeout(status.timeoutHandle)
}
})
})
}
1 change: 1 addition & 0 deletions packages/bolt-connection/src/lang/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
*/

export * as functional from './functional'
export * as controller from './controller'
Loading