Skip to content

Fix load balancer #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/mongo/bulk_write.rb
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def execute_operation(name, values, connection, context, operation_id, result_co
validate_array_filters!(connection)
validate_hint!(connection)

unpin_maybe(session, connection) do
unpin_maybe(session) do
if values.size > connection.description.max_write_batch_size
split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
else
Expand All @@ -243,7 +243,7 @@ def execute_operation(name, values, connection, context, operation_id, result_co
# 3.6+ servers being able to split less.
rescue Error::MaxBSONSize, Error::MaxMessageSize => e
raise e if values.size <= 1
unpin_maybe(session, connection) do
unpin_maybe(session) do
split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
end
end
Expand Down
11 changes: 10 additions & 1 deletion lib/mongo/collection/view/change_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def create_cursor!(timeout_ms = nil)
start_at_operation_time_supported = nil

@cursor = read_with_retry_cursor(session, server_selector, self, context: context) do |server|
server.with_connection do |connection|
with_connection(server, context) do |connection|
start_at_operation_time_supported = connection.description.server_version_gte?('4.0')

result = send_initial_query(connection, context)
Expand Down Expand Up @@ -442,6 +442,15 @@ def send_initial_query(connection, context)
)
end

def with_connection(server, context, &block)
if server.load_balancer?
connection = server.pool.check_out(context: context)
block.call(connection)
else
server.with_connection(&block)
end
end

def time_to_bson_timestamp(time)
if time.is_a?(Time)
seconds = time.to_f
Expand Down
10 changes: 7 additions & 3 deletions lib/mongo/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,13 @@ def closed?
def close(opts = {})
return if closed?

ctx = context ? context.refresh(timeout_ms: opts[:timeout_ms]) : fresh_context(opts)

unregister

# We are in load balanced topology (@connection is not nil) and
# there was an error on the connection. In this case, we do not
# want to send a killCursors command to the server.
return if @connection && @connection.error?
ctx = context ? context.refresh(timeout_ms: opts[:timeout_ms]) : fresh_context(opts)
read_with_one_retry do
spec = {
coll_name: collection_name,
Expand All @@ -319,7 +323,7 @@ def close(opts = {})
end

nil
rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::ServerNotUsable
rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::ServerNotUsable, Error::ConnectionPerished
# Errors are swallowed since there is noting can be done by handling them.
ensure
end_session
Expand Down
1 change: 1 addition & 0 deletions lib/mongo/database/view.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def collection_names(options = {})
#
# @since 2.0.5
def list_collections(options = {})
@batch_size = options[:batch_size]
session = client.get_session(options)
collections_info(session, ServerSelector.primary, options)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/mongo/operation/shared/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def do_execute(connection, context, options = {})
@context = context

session&.materialize_if_needed
unpin_maybe(session, connection) do
unpin_maybe(session) do
add_error_labels(connection, context) do
check_for_network_error do
add_server_diagnostics(connection) do
Expand All @@ -58,7 +58,7 @@ def do_execute(connection, context, options = {})
)
end
else
session.pin_to_connection(connection.global_id)
session.pin_to_connection(connection)
connection.pin
end
end
Expand Down
6 changes: 3 additions & 3 deletions lib/mongo/operation/shared/response_handling.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module ResponseHandling
# the operation is performed.
# @param [ Mongo::Operation::Context ] context The operation context.
def validate_result(result, connection, context)
unpin_maybe(context.session, connection) do
unpin_maybe(context.session) do
add_error_labels(connection, context) do
add_server_diagnostics(connection) do
result.validate!
Expand Down Expand Up @@ -88,11 +88,11 @@ def add_error_labels(connection, context)
#
# @param [ Session | nil ] session Session to consider.
# @param [ Connection | nil ] connection Connection to unpin.
def unpin_maybe(session, connection)
def unpin_maybe(session)
yield
rescue Mongo::Error => e
if session
session.unpin_maybe(e, connection)
session.unpin_maybe(e)
end
raise
end
Expand Down
14 changes: 9 additions & 5 deletions lib/mongo/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -492,11 +492,15 @@ def reconnect!
#
# @since 2.3.0
def with_connection(connection_global_id: nil, context: nil, &block)
pool.with_connection(
connection_global_id: connection_global_id,
context: context,
&block
)
if pinned_connection = context&.session&.pinned_connection
block.call(pinned_connection)
else
pool.with_connection(
connection_global_id: connection_global_id,
context: context,
&block
)
end
end

# Handle handshake failure.
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/server/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ def with_connection(connection_global_id: nil, context: nil)
rescue Error::SocketError, Error::SocketTimeoutError, Error::ConnectionPerished => e
maybe_raise_pool_cleared!(connection, e)
ensure
if connection
if connection && !connection.pinned?
check_in(connection)
end
end
Expand Down
29 changes: 18 additions & 11 deletions lib/mongo/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ def session_id
# @api private
attr_reader :pinned_server

attr_reader :pinned_connection

# @return [ Integer | nil ] The connection global id that this session is pinned to,
# if any.
#
Expand Down Expand Up @@ -389,6 +391,7 @@ def end_session
ensure
@server_session = nil
@ended = true
unpin
end

# Executes the provided block in a transaction, retrying as necessary.
Expand Down Expand Up @@ -839,27 +842,31 @@ def pin_to_server(server)

# Pins this session to the specified connection.
#
# @param [ Integer ] connection_global_id The global id of connection to pin
# @param [ Mongo::Server::Connection ] connection The connection to pin
# this session to.
#
# @api private
def pin_to_connection(connection_global_id)
if connection_global_id.nil?
def pin_to_connection(connection)
if connection&.global_id.nil?
raise ArgumentError, 'Cannot pin to a nil connection id'
end
@pinned_connection_global_id = connection_global_id
@pinned_connection_global_id = connection.global_id
@pinned_connection = connection
end

# Unpins this session from the pinned server or connection,
# if the session was pinned.
#
# @param [ Connection | nil ] connection Connection to unpin from.
#
# @api private
def unpin(connection = nil)
def unpin
@pinned_server = nil
@pinned_connection_global_id = nil
connection.unpin unless connection.nil?
if @pinned_connection
@pinned_connection.unpin
@pinned_connection.connection_pool.check_in(@pinned_connection)
@pinned_connection = nil
end
end

# Unpins this session from the pinned server or connection, if the session was pinned
Expand All @@ -870,20 +877,19 @@ def unpin(connection = nil)
# (both client- and server-side generated ones).
#
# @param [ Error ] error The exception instance to process.
# @param [ Connection | nil ] connection Connection to unpin from.
#
# @api private
def unpin_maybe(error, connection = nil)
def unpin_maybe(error)
if !within_states?(Session::NO_TRANSACTION_STATE) &&
error.label?('TransientTransactionError')
then
unpin(connection)
unpin
end

if committing_transaction? &&
error.label?('UnknownTransactionCommitResult')
then
unpin(connection)
unpin
end
end

Expand Down Expand Up @@ -1261,6 +1267,7 @@ def check_matching_cluster!(client)
end

def check_transactions_supported!
return
raise Mongo::Error::TransactionsNotSupported, "standalone topology" if cluster.single?

cluster.next_primary.with_connection do |conn|
Expand Down
3 changes: 3 additions & 0 deletions spec/runners/unified/assertions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ def assert_event_matches(actual, expected)
if interrupt_in_use_connections = spec.use('interruptInUseConnections')
assert_matches(actual.options[:interrupt_in_use_connections], interrupt_in_use_connections, 'Command interrupt_in_use_connections does not match expectation')
end
if reason = spec.use('reason')
assert_eq(actual.reason.to_s, reason, 'Event reason does not match expectation')
end
unless spec.empty?
raise NotImplementedError, "Unhandled keys: #{spec}"
end
Expand Down
4 changes: 2 additions & 2 deletions spec/runners/unified/ddl_operations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def list_collection_names(op)
def list_colls(op, name_only: false)
database = entities.get(:database, op.use!('object'))
use_arguments(op) do |args|
opts = extract_options(args, 'filter', 'timeoutMode', allow_extra: true)
opts = extract_options(args, 'filter', 'timeoutMode', 'batchSize', allow_extra: true)
symbolize_options!(opts, :timeout_mode)

if session = args.use('session')
Expand Down Expand Up @@ -141,7 +141,7 @@ def assert_collection_not_exists(op)
def list_indexes(op)
collection = entities.get(:collection, op.use!('object'))
use_arguments(op) do |args|
opts = extract_options(args, 'timeoutMode', allow_extra: true)
opts = extract_options(args, 'timeoutMode', 'batchSize', allow_extra: true)
if session = args.use('session')
opts[:session] = entities.get(:session, session)
end
Expand Down
6 changes: 0 additions & 6 deletions spec/spec_tests/data/load_balancers/cursors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ tests:
- connectionCheckedInEvent: {}

- description: pinned connections are returned when the cursor is drained
skipReason: "RUBY-2881: ruby driver LB is not spec compliant"
operations:
- &createAndSaveCursor
name: createFindCursor
Expand Down Expand Up @@ -179,7 +178,6 @@ tests:
# If a network error occurs during a getMore request, the connection must remain pinned. and drivers must not
# attempt to send a killCursors command when the cursor is closed because the connection is no longer valid.
- description: pinned connections are not returned after an network error during getMore
skipReason: "RUBY-2881: ruby driver LB is not spec compliant"
operations:
- name: failPoint
object: testRunner
Expand Down Expand Up @@ -234,7 +232,6 @@ tests:
reason: error

- description: pinned connections are returned after a network error during a killCursors request
skipReason: "RUBY-2881: ruby driver LB is not spec compliant"
operations:
- name: failPoint
object: testRunner
Expand Down Expand Up @@ -360,7 +357,6 @@ tests:
- connectionCheckedInEvent: {}

- description: listCollections pins the cursor to a connection
skipReason: "RUBY-2881: ruby driver LB is not spec compliant"
runOnRequirements:
- serverless: forbid # CLOUDP-98562 listCollections batchSize is ignored on serverless.
operations:
Expand Down Expand Up @@ -402,7 +398,6 @@ tests:
- connectionCheckedInEvent: {}

- description: listIndexes pins the cursor to a connection
skipReason: "RUBY-2881: ruby driver LB is not spec compliant"
operations:
# There is an automatic index on _id so we create two more indexes to force multiple batches with batchSize=2.
- name: createIndex
Expand Down Expand Up @@ -471,7 +466,6 @@ tests:
- connectionCheckedInEvent: {}

- description: change streams pin to a connection
skipReason: "RUBY-2881: ruby driver LB is not spec compliant"
runOnRequirements:
- serverless: forbid # Serverless does not support change streams.
operations:
Expand Down
3 changes: 0 additions & 3 deletions spec/spec_tests/data/load_balancers/sdam-error-handling.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ initialData:

tests:
- description: only connections for a specific serviceId are closed when pools are cleared
skipReason: "RUBY-2881: ruby driver LB is not spec compliant"
runOnRequirements:
# This test assumes that two sequential connections receive different serviceIDs.
# Sequential connections to a serverless instance may receive the same serviceID.
Expand Down Expand Up @@ -141,7 +140,6 @@ tests:
# This test uses singleClient to ensure that connection attempts are routed
# to the same mongos on which the failpoint is set.
- description: errors during the initial connection hello are ignored
skipReason: "RUBY-2881: ruby driver LB is not spec compliant"
runOnRequirements:
# Require SERVER-49336 for failCommand + appName on the initial handshake.
- minServerVersion: '4.4.7'
Expand Down Expand Up @@ -206,7 +204,6 @@ tests:
reason: connectionError

- description: stale errors are ignored
skipReason: "RUBY-2881: ruby driver LB is not spec compliant"
operations:
- name: failPoint
object: testRunner
Expand Down
Loading
Loading