Skip to content

Commit 1333871

Browse files
committed
Removed clients with unrecoverable errors from the Pool (#4088)
1 parent a0e76c7 commit 1333871

File tree

3 files changed

+381
-0
lines changed

3 files changed

+381
-0
lines changed

lib/dispatcher/pool.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,20 @@ class Pool extends PoolBase {
7373
? { ...options.interceptors }
7474
: undefined
7575
this[kFactory] = factory
76+
77+
this.on('connectionError', (origin, targets, error) => {
78+
// If a connection error occurs, we remove the client from the pool,
79+
// and emit a connectionError event. They will not be re-used.
80+
// Fixes https://github.com/nodejs/undici/issues/3895
81+
for (const target of targets) {
82+
// Do not use kRemoveClient here, as it will close the client,
83+
// but the client cannot be closed in this state.
84+
const idx = this[kClients].indexOf(target)
85+
if (idx !== -1) {
86+
this[kClients].splice(idx, 1)
87+
}
88+
}
89+
})
7690
}
7791

7892
[kGetDispatcher] () {
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
'use strict'
2+
3+
const { test } = require('node:test')
4+
const assert = require('node:assert')
5+
const { Pool } = require('..')
6+
const { createServer } = require('node:http')
7+
const { kClients } = require('../lib/dispatcher/pool-base')
8+
9+
// This test verifies that clients are properly removed from the pool when they encounter connection errors,
10+
// which is the fix implemented for issue #3895 (memory leak with connection errors)
11+
test('Pool client count does not grow on repeated connection errors', async (t) => {
12+
// Setup a pool pointing to a non-existent server
13+
const pool = new Pool('http://localhost:1', {
14+
connections: 10,
15+
connectTimeout: 100, // Short timeout to speed up the test
16+
bodyTimeout: 100,
17+
headersTimeout: 100
18+
})
19+
20+
try {
21+
const clientCounts = []
22+
23+
// Track initial client count
24+
clientCounts.push(pool[kClients].length)
25+
26+
// Make several requests that will fail with connection errors
27+
const requests = 5
28+
for (let i = 0; i < requests; i++) {
29+
try {
30+
await pool.request({
31+
path: `/${i}`,
32+
method: 'GET'
33+
})
34+
assert.fail('Request should have failed with a connection error')
35+
} catch (err) {
36+
// We expect connection errors, but the error might be wrapped
37+
assert.ok(
38+
err.code === 'ECONNREFUSED' ||
39+
err.cause?.code === 'ECONNREFUSED' ||
40+
err.code === 'UND_ERR_CONNECT',
41+
`Expected connection error but got: ${err.message} (${err.code})`
42+
)
43+
}
44+
45+
// Track client count after each request
46+
clientCounts.push(pool[kClients].length)
47+
48+
// Small delay to allow for client cleanup
49+
await new Promise(resolve => setTimeout(resolve, 10))
50+
}
51+
52+
// The key test: verify that client count doesn't increase monotonically,
53+
// which would indicate the memory leak that was fixed
54+
const maxCount = Math.max(...clientCounts)
55+
assert.ok(
56+
clientCounts[clientCounts.length - 1] <= maxCount,
57+
`Client count should not increase continuously. Counts: ${clientCounts.join(', ')}`
58+
)
59+
60+
// Ensure the last two counts are similar (stabilized)
61+
const lastCount = clientCounts[clientCounts.length - 1]
62+
const secondLastCount = clientCounts[clientCounts.length - 2]
63+
64+
assert.ok(
65+
Math.abs(lastCount - secondLastCount) <= 1,
66+
`Client count should stabilize. Last counts: ${secondLastCount}, ${lastCount}`
67+
)
68+
69+
// Additional verification: make many more requests to check for significant growth
70+
const moreRequests = 10
71+
const startCount = pool[kClients].length
72+
73+
for (let i = 0; i < moreRequests; i++) {
74+
try {
75+
await pool.request({
76+
path: `/more-${i}`,
77+
method: 'GET'
78+
})
79+
} catch (err) {
80+
// Expected error
81+
}
82+
83+
// Small delay to allow for client cleanup
84+
await new Promise(resolve => setTimeout(resolve, 10))
85+
}
86+
87+
const endCount = pool[kClients].length
88+
89+
// The maximum tolerable growth - some growth may occur due to timing issues,
90+
// but it should be limited and not proportional to the number of requests
91+
const maxGrowth = 3
92+
93+
assert.ok(
94+
endCount - startCount <= maxGrowth,
95+
`Client count should not grow significantly after many failed requests. Start: ${startCount}, End: ${endCount}`
96+
)
97+
} finally {
98+
await pool.close()
99+
}
100+
})
101+
102+
// This test specifically verifies the fix in pool-base.js for connectionError event
103+
test('Pool clients are removed on connectionError event', async (t) => {
104+
// Create a server we'll use to track connection events
105+
const server = createServer((req, res) => {
106+
res.writeHead(200, { 'Content-Type': 'text/plain' })
107+
res.end('ok')
108+
})
109+
110+
await new Promise(resolve => server.listen(0, resolve))
111+
const port = server.address().port
112+
113+
const pool = new Pool(`http://localhost:${port}`, {
114+
connections: 3 // Small pool to make testing easier
115+
})
116+
117+
try {
118+
// Make an initial successful request to create a client
119+
await pool.request({
120+
path: '/',
121+
method: 'GET'
122+
})
123+
124+
// Save the initial number of clients
125+
const initialCount = pool[kClients].length
126+
assert.ok(initialCount > 0, 'Should have at least one client after a successful request')
127+
128+
// Manually trigger a connectionError on all clients
129+
for (const client of [...pool[kClients]]) {
130+
client.emit('connectionError', 'origin', [client], new Error('Simulated connection error'))
131+
}
132+
133+
// Allow some time for the event to be processed
134+
await new Promise(resolve => setTimeout(resolve, 50))
135+
136+
// After the fix, all clients should be removed when they emit a connectionError
137+
assert.strictEqual(
138+
pool[kClients].length,
139+
0,
140+
'All clients should be removed from pool after connectionError events'
141+
)
142+
143+
// Make another request to verify the pool can create new clients
144+
await pool.request({
145+
path: '/after-error',
146+
method: 'GET'
147+
})
148+
149+
// Verify new clients were created
150+
assert.ok(
151+
pool[kClients].length > 0,
152+
'Pool should create new clients after previous ones were removed'
153+
)
154+
} finally {
155+
await pool.close()
156+
await new Promise(resolve => server.close(resolve))
157+
}
158+
})

test/tls-cert-leak.js

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
'use strict'
2+
3+
const { test } = require('node:test')
4+
const assert = require('node:assert')
5+
const { tspl } = require('@matteo.collina/tspl')
6+
const { fetch } = require('..')
7+
const https = require('node:https')
8+
const fs = require('node:fs')
9+
const path = require('node:path')
10+
const { closeServerAsPromise } = require('./utils/node-http')
11+
12+
const hasGC = typeof global.gc !== 'undefined'
13+
14+
// This test verifies that there is no memory leak when handling TLS certificate errors.
15+
// It simulates the error by using a server with a self-signed certificate.
16+
test('no memory leak with TLS certificate errors', { timeout: 20000 }, async (t) => {
17+
if (!hasGC) {
18+
throw new Error('gc is not available. Run with \'--expose-gc\'.')
19+
}
20+
21+
const { ok } = tspl(t, { plan: 1 })
22+
23+
// Create HTTPS server with self-signed certificate
24+
const serverOptions = {
25+
key: fs.readFileSync(path.join(__dirname, 'fixtures', 'key.pem')),
26+
cert: fs.readFileSync(path.join(__dirname, 'fixtures', 'cert.pem'))
27+
}
28+
29+
// Create a server that always responds with a simple message
30+
const server = https.createServer(serverOptions, (req, res) => {
31+
res.writeHead(200)
32+
res.end('test response')
33+
})
34+
35+
// Start server on a random port
36+
await new Promise(resolve => server.listen(0, resolve))
37+
const serverUrl = `https://localhost:${server.address().port}`
38+
39+
t.after(closeServerAsPromise(server))
40+
41+
// Function to make a request that will trigger a certificate error
42+
async function makeRequest (i) {
43+
try {
44+
// The request will fail with CERT_SIGNATURE_FAILURE or similar
45+
// because we're using a self-signed certificate and not telling
46+
// Node.js to accept it
47+
const res = await fetch(`${serverUrl}/request-${i}`, {
48+
signal: AbortSignal.timeout(2000) // Short timeout to prevent hanging
49+
})
50+
const text = await res.text()
51+
return { status: res.status, text }
52+
} catch (e) {
53+
// In real code, without the fix, this would leak memory
54+
if (e?.cause?.code === 'CERT_SIGNATURE_FAILURE' ||
55+
e?.cause?.code === 'DEPTH_ZERO_SELF_SIGNED_CERT' ||
56+
e?.cause?.code === 'ERR_TLS_CERT_ALTNAME_INVALID') {
57+
return { status: 524, text: 'Certificate Error' }
58+
}
59+
// Return for any other error to avoid test interruption
60+
return { status: 500, text: e.message }
61+
}
62+
}
63+
64+
// Counter for completed requests
65+
let complete = 0
66+
const requestCount = 400
67+
68+
// Track memory usage
69+
const measurements = []
70+
let baselineMemory = 0
71+
72+
// Process a batch of requests
73+
async function processBatch (start, batchSize) {
74+
const promises = []
75+
const end = Math.min(start + batchSize, requestCount)
76+
77+
for (let i = start; i < end; i++) {
78+
promises.push(makeRequest(i))
79+
}
80+
81+
await Promise.all(promises)
82+
complete += promises.length
83+
84+
// Measure memory after each batch
85+
if (complete % 50 === 0 || complete === end) {
86+
// Run GC multiple times to get more stable readings
87+
global.gc()
88+
await new Promise(resolve => setTimeout(resolve, 50))
89+
global.gc()
90+
91+
const memUsage = process.memoryUsage()
92+
93+
// Establish baseline after first batch
94+
if (measurements.length === 0) {
95+
baselineMemory = memUsage.heapUsed
96+
}
97+
98+
measurements.push({
99+
complete,
100+
heapUsed: memUsage.heapUsed
101+
})
102+
103+
console.log(`Completed ${complete}/${requestCount}: Heap: ${Math.round(memUsage.heapUsed / 1024 / 1024)}MB`)
104+
105+
// Check memory trend after we have enough data
106+
if (measurements.length >= 4) {
107+
const hasLeak = checkMemoryTrend()
108+
if (hasLeak) {
109+
return true // Indicates a leak was detected
110+
}
111+
}
112+
}
113+
114+
return false // No leak detected
115+
}
116+
117+
// Main test logic
118+
async function runTest () {
119+
const batchSize = 50
120+
121+
for (let i = 0; i < requestCount; i += batchSize) {
122+
const leakDetected = await processBatch(i, batchSize)
123+
if (leakDetected) {
124+
// If a leak is detected, fail the test
125+
assert.fail('Memory leak detected: heap usage is consistently increasing at a significant rate')
126+
return
127+
}
128+
129+
// Check if we have sufficient measurements or have done 350 requests
130+
if (measurements.length >= 7 || complete >= 350) {
131+
break
132+
}
133+
}
134+
135+
// Final check
136+
const finalCheckResult = finalMemoryCheck()
137+
if (finalCheckResult) {
138+
assert.fail(`Memory leak detected: ${finalCheckResult}`)
139+
} else {
140+
ok(true, 'Memory usage has stabilized')
141+
}
142+
}
143+
144+
// Check if memory usage has a concerning trend
145+
function checkMemoryTrend () {
146+
// Calculate memory growth between each measurement
147+
const growthRates = []
148+
for (let i = 1; i < measurements.length; i++) {
149+
const prev = measurements[i - 1].heapUsed
150+
const current = measurements[i].heapUsed
151+
growthRates.push((current - prev) / prev)
152+
}
153+
154+
// Calculate growth from baseline
155+
const totalGrowthFromBaseline = (measurements[measurements.length - 1].heapUsed - baselineMemory) / baselineMemory
156+
157+
// Calculate average growth rate
158+
const avgGrowthRate = growthRates.reduce((sum, rate) => sum + rate, 0) / growthRates.length
159+
160+
console.log(`Growth from baseline: ${(totalGrowthFromBaseline * 100).toFixed(2)}%`)
161+
console.log(`Average growth rate: ${(avgGrowthRate * 100).toFixed(2)}%`)
162+
console.log(`Growth rates: ${growthRates.map(r => (r * 100).toFixed(2) + '%').join(', ')}`)
163+
164+
// Only flag as leak if all conditions are met:
165+
// 1. Consistent growth (majority of measurements show growth)
166+
// 2. Average growth rate is significant (>2%)
167+
// 3. Total growth from baseline is significant (>20%)
168+
169+
// Count how many positive growth rates we have
170+
const positiveGrowthRates = growthRates.filter(rate => rate > 0.01).length
171+
172+
return (
173+
positiveGrowthRates >= Math.ceil(growthRates.length * 0.75) && // 75% of measurements show growth >1%
174+
avgGrowthRate > 0.02 && // Average growth >2%
175+
totalGrowthFromBaseline > 0.2 // Total growth >20%
176+
)
177+
}
178+
179+
// Final memory check with adjusted requirements
180+
function finalMemoryCheck () {
181+
if (measurements.length < 4) return false
182+
183+
// Calculate growth from baseline to the last measurement
184+
const totalGrowthFromBaseline = (measurements[measurements.length - 1].heapUsed - baselineMemory) / baselineMemory
185+
console.log(`Final growth from baseline: ${(totalGrowthFromBaseline * 100).toFixed(2)}%`)
186+
187+
// Calculate final slope over the last 150 requests
188+
const lastMeasurements = measurements.slice(-3)
189+
const finalSlope = (lastMeasurements[2].heapUsed - lastMeasurements[0].heapUsed) /
190+
(lastMeasurements[2].complete - lastMeasurements[0].complete)
191+
192+
console.log(`Final memory slope: ${finalSlope.toFixed(2)} bytes per request`)
193+
194+
// Only consider it a leak if:
195+
// 1. Total growth is very significant (>25%)
196+
if (totalGrowthFromBaseline > 0.25) {
197+
return `Excessive memory growth of ${(totalGrowthFromBaseline * 100).toFixed(2)}%`
198+
}
199+
200+
// 2. Memory is still growing rapidly at the end (>2000 bytes per request)
201+
if (finalSlope > 2000) {
202+
return `Memory still growing rapidly at ${finalSlope.toFixed(2)} bytes per request`
203+
}
204+
205+
return false
206+
}
207+
208+
await runTest()
209+
})

0 commit comments

Comments
 (0)