Skip to content

RUBY-3680 Close pipe fds #2936

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 14 additions & 51 deletions .evergreen/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -377,38 +377,9 @@ functions:
script: |
${PREPARE_SHELL}
TEST_CMD="bundle exec rake driver_bench" PERFORMANCE_RESULTS_FILE="$PROJECT_DIRECTORY/perf.json" .evergreen/run-tests.sh
- command: shell.exec
- command: perf.send
params:
script: |
# We use the requester expansion to determine whether the data is from a mainline evergreen run or not
if [ "${requester}" == "commit" ]; then
is_mainline=true
else
is_mainline=false
fi

# We parse the username out of the order_id as patches append that in and SPS does not need that information
parsed_order_id=$(echo "${revision_order_id}" | awk -F'_' '{print $NF}')

# Submit the performance data to the SPS endpoint
response=$(curl -s -w "\nHTTP_STATUS:%{http_code}" -X 'POST' \
"https://performance-monitoring-api.corp.mongodb.com/raw_perf_results/cedar_report?project=${project_id}&version=${version_id}&variant=${build_variant}&order=$parsed_order_id&task_name=${task_name}&task_id=${task_id}&execution=${execution}&mainline=$is_mainline" \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d @${PROJECT_DIRECTORY}/perf.json)

http_status=$(echo "$response" | grep "HTTP_STATUS" | awk -F':' '{print $2}')
response_body=$(echo "$response" | sed '/HTTP_STATUS/d')

# We want to throw an error if the data was not successfully submitted
if [ "$http_status" -ne 200 ]; then
echo "Error: Received HTTP status $http_status"
echo "Response Body: $response_body"
exit 1
fi

echo "Response Body: $response_body"
echo "HTTP Status: $http_status"
file: "${PROJECT_DIRECTORY}/perf.json"

"run tests":
- command: shell.exec
Expand Down Expand Up @@ -1947,17 +1918,18 @@ buildvariants:
# - name: testgcpkms_task_group
# batchtime: 20160 # Use a batchtime of 14 days as suggested by the CSFLE test README

- matrix_name: testazurekms-variant
matrix_spec:
ruby: ruby-3.0
fle: helper
topology: standalone
os: debian11 # could eventually look at updating this to rhel80
mongodb-version: 6.0
display_name: "AZURE KMS"
tasks:
- name: testazurekms_task_group
batchtime: 20160 # Use a batchtime of 14 days as suggested by the CSFLE test README
# https://jira.mongodb.org/browse/RUBY-3672
#- matrix_name: testazurekms-variant
# matrix_spec:
# ruby: ruby-3.0
# fle: helper
# topology: standalone
# os: debian11 # could eventually look at updating this to rhel80
# mongodb-version: 6.0
# display_name: "AZURE KMS"
# tasks:
# - name: testazurekms_task_group
# batchtime: 20160 # Use a batchtime of 14 days as suggested by the CSFLE test README

- matrix_name: atlas-full
matrix_spec:
Expand All @@ -1975,15 +1947,6 @@ buildvariants:
tasks:
- name: testatlas_task_group

- matrix_name: "serverless"
matrix_spec:
ruby: "ruby-3.3"
fle: path
os: ubuntu2204
display_name: "Atlas serverless ${ruby}"
tasks:
- name: serverless_task_group

- matrix_name: "aws-lambda"
matrix_spec:
ruby: 'ruby-3.2'
Expand Down
32 changes: 12 additions & 20 deletions .evergreen/config/standard.yml.erb
Original file line number Diff line number Diff line change
Expand Up @@ -494,17 +494,18 @@ buildvariants:
# - name: testgcpkms_task_group
# batchtime: 20160 # Use a batchtime of 14 days as suggested by the CSFLE test README

- matrix_name: testazurekms-variant
matrix_spec:
ruby: ruby-3.0
fle: helper
topology: standalone
os: debian11 # could eventually look at updating this to rhel80
mongodb-version: 6.0
display_name: "AZURE KMS"
tasks:
- name: testazurekms_task_group
batchtime: 20160 # Use a batchtime of 14 days as suggested by the CSFLE test README
# https://jira.mongodb.org/browse/RUBY-3672
#- matrix_name: testazurekms-variant
# matrix_spec:
# ruby: ruby-3.0
# fle: helper
# topology: standalone
# os: debian11 # could eventually look at updating this to rhel80
# mongodb-version: 6.0
# display_name: "AZURE KMS"
# tasks:
# - name: testazurekms_task_group
# batchtime: 20160 # Use a batchtime of 14 days as suggested by the CSFLE test README

- matrix_name: atlas-full
matrix_spec:
Expand All @@ -522,15 +523,6 @@ buildvariants:
tasks:
- name: testatlas_task_group

- matrix_name: "serverless"
matrix_spec:
ruby: <%= latest_ruby %>
fle: path
os: ubuntu2204
display_name: "Atlas serverless ${ruby}"
tasks:
- name: serverless_task_group

- matrix_name: "aws-lambda"
matrix_spec:
ruby: 'ruby-3.2'
Expand Down
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,6 @@ Style/TrailingCommaInHashLiteral:

RSpec/ExampleLength:
Max: 10

RSpec/MessageSpies:
EnforcedStyle: receive
1 change: 1 addition & 0 deletions lib/mongo/server/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ def close(options = nil)

@max_connecting_cv.broadcast
@size_cv.broadcast
@generation_manager.close_all_pipes
end

publish_cmap_event(
Expand Down
27 changes: 26 additions & 1 deletion lib/mongo/server/connection_pool/generation_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ def generation_unlocked(service_id: nil)
end

def pipe_fds(service_id: nil)
@pipe_fds[service_id][@map[service_id]]
@pipe_fds.dig(service_id, @map[service_id])
end

def remove_pipe_fds(generation, service_id: nil)
validate_service_id!(service_id)

r, w = @pipe_fds[service_id].delete(generation)
return unless r && w

w.close
# Schedule the read end of the pipe to be closed. We cannot close it
# immediately since we need to wait for any Kernel#select calls to
Expand Down Expand Up @@ -89,8 +91,31 @@ def bump(service_id: nil)
end
end

# Close all pipes in the generation manager.
#
# This method should be called only when the +ConnectionPool+ that
# owns this +GenerationManager+ is closed, to ensure that all
# pipes are closed properly.
def close_all_pipes
@lock.synchronize do
close_all_scheduled
@pipe_fds.keys.each do |service_id|
generations = @pipe_fds.delete(service_id)
generations.values.each do |(r, w)|
r.close
w.close
rescue IOError
# Ignore any IOError that occurs when closing the
# pipe, as there is nothing we can do about it.
end
end
end
end


private


def validate_service_id!(service_id)
if service_id
unless server.load_balancer?
Expand Down
13 changes: 11 additions & 2 deletions lib/mongo/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class Socket
# connection (for non-monitoring connections) that created this socket.
# @option options [ true | false ] :monitor Whether this socket was
# created by a monitoring connection.
# @option options :pipe [ IO ] The file descriptor for the read end of the
# pipe to listen on during the select system call when reading from the
# socket.
#
# @api private
def initialize(timeout, options)
Expand Down Expand Up @@ -106,6 +109,13 @@ def monitor?
!!options[:monitor]
end

# @return [ IO ] The file descriptor for the read end of the pipe to
# listen on during the select system call when reading from the
# socket.
def pipe
options[:pipe]
end

# @return [ String ] Human-readable summary of the socket for debugging.
#
# @api private
Expand Down Expand Up @@ -161,7 +171,7 @@ def close
begin
# Sometimes it seems the close call can hang for a long time
::Timeout.timeout(5) do
@socket.close
@socket&.close
end
rescue
# Silence all errors
Expand Down Expand Up @@ -390,7 +400,6 @@ def read_from_socket(length, socket_timeout: nil, csot: false)
raise_timeout_error!("Took more than #{_timeout} seconds to receive data", csot)
end
end
pipe = options[:pipe]
if exc.is_a?(IO::WaitReadable)
if pipe
select_args = [[@socket, pipe], nil, [@socket, pipe], select_timeout]
Expand Down
4 changes: 2 additions & 2 deletions spec/mongo/client_construction_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# possible future work: re-enable these one at a time and do the hard work of
# making them right.
#
# rubocop:disable RSpec/ExpectInHook, RSpec/MessageSpies, RSpec/ExampleLength
# rubocop:disable RSpec/ExpectInHook, RSpec/ExampleLength
# rubocop:disable RSpec/ContextWording, RSpec/RepeatedExampleGroupDescription
# rubocop:disable RSpec/ExampleWording, Style/BlockComments, RSpec/AnyInstance
# rubocop:disable RSpec/VerifiedDoubles
Expand Down Expand Up @@ -2717,7 +2717,7 @@
it_behaves_like 'duplicated client with reused monitoring'
end
end
# rubocop:enable RSpec/ExpectInHook, RSpec/MessageSpies, RSpec/ExampleLength
# rubocop:enable RSpec/ExpectInHook, RSpec/ExampleLength
# rubocop:enable RSpec/ContextWording, RSpec/RepeatedExampleGroupDescription
# rubocop:enable RSpec/ExampleWording, Style/BlockComments, RSpec/AnyInstance
# rubocop:enable RSpec/VerifiedDoubles
30 changes: 30 additions & 0 deletions spec/mongo/server/connection_pool/generation_manager_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

require 'spec_helper'

describe Mongo::Server::ConnectionPool::GenerationManager do
describe '#close_all_pipes' do
let(:service_id) { 'test_service_id' }

let(:server) { instance_double(Mongo::Server) }

let(:manager) { described_class.new(server: server) }

before do
manager.pipe_fds(service_id: service_id)
end

it 'closes all pipes and removes them from the map' do
expect(manager.pipe_fds(service_id: service_id).size).to eq(2)

manager.instance_variable_get(:@pipe_fds)[service_id].each do |_gen, (r, w)|
expect(r).to receive(:close).and_call_original
expect(w).to receive(:close).and_call_original
end

manager.close_all_pipes

expect(manager.instance_variable_get(:@pipe_fds)).to be_empty
end
end
end
5 changes: 5 additions & 0 deletions spec/mongo/server/connection_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,11 @@ def create_pool(min_pool_size)
expect(pool).to be_closed
end
end

it 'closes all pipes' do
expect(pool.generation_manager).to receive(:close_all_pipes).and_call_original
pool.close
end
end

describe '#inspect' do
Expand Down
2 changes: 1 addition & 1 deletion spec/shared
Submodule shared updated 1 files
+1 −1 shlib/server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ schemaVersion: "1.9"

runOnRequirements:
- minServerVersion: "4.4"
serverless: forbid # Capped collections are not allowed for serverless.

createEntities:
- client:
Expand All @@ -12,7 +13,7 @@ createEntities:
- client:
id: &client client
uriOptions:
timeoutMS: 10
timeoutMS: 200
useMultipleMongoses: false
observeEvents:
- commandStartedEvent
Expand Down Expand Up @@ -83,7 +84,7 @@ tests:
data:
failCommands: ["find"]
blockConnection: true
blockTimeMS: 15
blockTimeMS: 300
- name: find
object: *collection
arguments:
Expand Down Expand Up @@ -117,13 +118,13 @@ tests:
data:
failCommands: ["find", "getMore"]
blockConnection: true
blockTimeMS: 15
blockTimeMS: 150
- name: createFindCursor
object: *collection
arguments:
filter: {}
cursorType: tailableAwait
timeoutMS: 29
timeoutMS: 250
batchSize: 1
saveResultAsEntity: &tailableCursor tailableCursor
# Iterate twice to force a getMore. The first iteration will return the document from the first batch and the
Expand Down Expand Up @@ -165,13 +166,13 @@ tests:
data:
failCommands: ["find", "getMore"]
blockConnection: true
blockTimeMS: 15
blockTimeMS: 150
- name: createFindCursor
object: *collection
arguments:
filter: {}
cursorType: tailableAwait
timeoutMS: 29
timeoutMS: 250
batchSize: 1
maxAwaitTimeMS: 1
saveResultAsEntity: &tailableCursor tailableCursor
Expand Down Expand Up @@ -199,8 +200,8 @@ tests:
collection: *collectionName
maxTimeMS: 1

# The timeoutMS value should be refreshed for getMore's. This is a failure test. The find inherits timeoutMS=10 from
# the collection and the getMore blocks for 15ms, causing iteration to fail with a timeout error.
# The timeoutMS value should be refreshed for getMore's. This is a failure test. The find inherits timeoutMS=200 from
# the collection and the getMore blocks for 250ms, causing iteration to fail with a timeout error.
- description: "timeoutMS is refreshed for getMore - failure"
operations:
- name: failPoint
Expand All @@ -213,7 +214,7 @@ tests:
data:
failCommands: ["getMore"]
blockConnection: true
blockTimeMS: 15
blockTimeMS: 250
- name: createFindCursor
object: *collection
arguments:
Expand Down
Loading