Skip to content

Commit 7dd97b9

Browse files
wip
1 parent 9350edc commit 7dd97b9

File tree

3 files changed

+141
-55
lines changed

3 files changed

+141
-55
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ The default implementation forwards the `cookie` header.
232232
The `wsReconnect` option contains the configuration for the WebSocket reconnection feature; is an object with the following properties:
233233

234234
- `pingInterval`: The interval between ping messages in ms (default: `30_000`).
235-
- `maxReconnectionRetries`: The maximum number of reconnection attempts (default: `3`).
235+
- `maxReconnectionRetries`: The maximum number of reconnection attempts (default: `3`). The counter is reset when the connection is established.
236236
- `reconnectInterval`: The interval between reconnection attempts in ms (default: `1_000`).
237237
- `reconnectDecay`: The decay factor for the reconnection interval (default: `1.5`).
238238
- `connectionTimeout`: The timeout for the connection in ms (default: `5_000`).

index.js

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ function liftErrorCode (code) {
2929
}
3030

3131
function closeWebSocket (socket, code, reason) {
32+
socket.isAlive = false
3233
if (socket.readyState === WebSocket.OPEN) {
3334
socket.close(liftErrorCode(code), reason)
3435
}
3536
}
3637

37-
// TODO timeout
3838
function waitConnection (socket, write) {
3939
if (socket.readyState === WebSocket.CONNECTING) {
4040
socket.once('open', write)
@@ -43,7 +43,6 @@ function waitConnection (socket, write) {
4343
}
4444
}
4545

46-
// TODO timeout
4746
// TODO merge with waitConnection
4847
function waitForConnection (target, timeout) {
4948
return new Promise((resolve, reject) => {
@@ -115,7 +114,6 @@ async function reconnect (logger, source, wsReconnectOptions, targetParams) {
115114
let target
116115
do {
117116
const reconnectWait = wsReconnectOptions.reconnectInterval * (wsReconnectOptions.reconnectDecay * attempts || 1)
118-
logger.info({ target: targetParams.url, attempts }, `proxy ws reconnecting in ${reconnectWait} ms`)
119117
await wait(reconnectWait)
120118

121119
try {
@@ -129,7 +127,7 @@ async function reconnect (logger, source, wsReconnectOptions, targetParams) {
129127
} while (!target && attempts < wsReconnectOptions.maxReconnectionRetries)
130128

131129
if (!target) {
132-
logger.error({ target: targetParams.url, attempts }, 'proxy ws failed to reconnect')
130+
logger.error({ target: targetParams.url, attempts }, 'proxy ws failed to reconnect! No more retries')
133131
return
134132
}
135133

@@ -143,10 +141,11 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, targe
143141
target.pingTimer && clearTimeout(source.pingTimer)
144142
target.pingTimer = undefined
145143

146-
// endless reconnect on close
147-
// as long as the source connection is active
144+
// reconnect target as long as the source connection is active
148145
if (source.isAlive && (target.broken || options.reconnectOnClose)) {
149146
target.isAlive = false
147+
target.removeAllListeners()
148+
// TODO source.removeAllListeners()
150149
reconnect(logger, source, options, targetParams)
151150
return
152151
}
@@ -156,6 +155,7 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, targe
156155
closeWebSocket(target, code, reason)
157156
}
158157

158+
source.isAlive = true
159159
source.on('message', (data, binary) => {
160160
source.isAlive = true
161161
waitConnection(target, () => target.send(data, { binary }))
@@ -170,16 +170,16 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, targe
170170
})
171171
/* c8 ignore stop */
172172
source.on('close', (code, reason) => {
173-
logger.warn({ target: targetParams.url, code, reason }, 'proxy ws source close')
173+
logger.warn({ target: targetParams.url, code, reason }, 'proxy ws source close event')
174174
close(code, reason)
175175
})
176176
/* c8 ignore start */
177177
source.on('error', error => {
178-
logger.warn({ target: targetParams.url, error: error.message }, 'proxy ws source error')
178+
logger.warn({ target: targetParams.url, error: error.message }, 'proxy ws source error event')
179179
close(1011, error.message)
180180
})
181181
source.on('unexpected-response', () => {
182-
logger.warn({ target: targetParams.url }, 'proxy ws source unexpected-response')
182+
logger.warn({ target: targetParams.url }, 'proxy ws source unexpected-response event')
183183
close(1011, 'unexpected response')
184184
})
185185
/* c8 ignore stop */
@@ -191,16 +191,16 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, targe
191191
/* c8 ignore stop */
192192
target.on('pong', data => source.pong(data))
193193
target.on('close', (code, reason) => {
194-
logger.warn({ target: targetParams.url, code, reason }, 'proxy ws target close')
194+
logger.warn({ target: targetParams.url, code, reason }, 'proxy ws target close event')
195195
close(code, reason)
196196
})
197197
/* c8 ignore start */
198198
target.on('error', error => {
199-
logger.warn({ target: targetParams.url, error: error.message }, 'proxy ws target error')
199+
logger.warn({ target: targetParams.url, error: error.message }, 'proxy ws target error event')
200200
close(1011, error.message)
201201
})
202202
target.on('unexpected-response', () => {
203-
logger.warn({ target: targetParams.url }, 'proxy ws target unexpected-response')
203+
logger.warn({ target: targetParams.url }, 'proxy ws target unexpected-response event')
204204
close(1011, 'unexpected response')
205205
})
206206
/* c8 ignore stop */

test/ws-reconnect.js

Lines changed: 128 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,27 @@ const pinoTest = require('pino-test')
1111
const pino = require('pino')
1212
const proxyPlugin = require('../')
1313

14-
async function createServices ({ t, wsReconnectOptions, wsTargetOptions, wsServerOptions }) {
14+
function waitForLogMessage(loggerSpy, message, max = 100) {
15+
return new Promise((resolve, reject) => {
16+
let count = 0
17+
const fn = (received) => {
18+
console.log(received)
19+
20+
if (received.msg === message) {
21+
loggerSpy.off('data', fn)
22+
resolve()
23+
}
24+
count++
25+
if (count > max) {
26+
loggerSpy.off('data', fn)
27+
reject(new Error(`Max message count reached on waitForLogMessage: ${message}`))
28+
}
29+
}
30+
loggerSpy.on('data', fn)
31+
})
32+
}
33+
34+
async function createServices({ t, upstream, wsReconnectOptions, wsTargetOptions, wsServerOptions }) {
1535
const targetServer = createServer()
1636
const targetWs = new WebSocket.Server({ server: targetServer, ...wsTargetOptions })
1737

@@ -21,7 +41,7 @@ async function createServices ({ t, wsReconnectOptions, wsTargetOptions, wsServe
2141
const logger = pino(loggerSpy)
2242
const proxy = Fastify({ loggerInstance: logger })
2343
proxy.register(proxyPlugin, {
24-
upstream: `ws://127.0.0.1:${targetServer.address().port}`,
44+
upstream: upstream || `ws://127.0.0.1:${targetServer.address().port}`,
2545
websocket: true,
2646
wsReconnect: wsReconnectOptions,
2747
wsServerOptions
@@ -46,12 +66,11 @@ async function createServices ({ t, wsReconnectOptions, wsTargetOptions, wsServe
4666
},
4767
proxy,
4868
client,
49-
loggerSpy
69+
loggerSpy,
70+
upstream
5071
}
5172
}
52-
53-
// TODO use fake timers ?
54-
73+
/*
5574
test('should use ping/pong to verify connection is alive - from source (server on proxy) to target', async (t) => {
5675
const wsReconnectOptions = { pingInterval: 100, reconnectInterval: 100, maxReconnectionRetries: 1 }
5776
@@ -70,64 +89,131 @@ test('should use ping/pong to verify connection is alive - from source (server o
7089
})
7190
7291
test('should reconnect on broken connection', async (t) => {
73-
const wsReconnectOptions = { pingInterval: 250, reconnectInterval: 100, maxReconnectionRetries: 1, reconnectDecay: 2 }
92+
const wsReconnectOptions = { pingInterval: 500, reconnectInterval: 100, maxReconnectionRetries: 1, reconnectDecay: 2 }
7493
7594
const { target, loggerSpy } = await createServices({ t, wsReconnectOptions, wsTargetOptions: { autoPong: false } })
7695
96+
let breakConnection = true
7797
target.ws.on('connection', async (socket) => {
7898
socket.on('ping', async () => {
79-
// add latency to break the connection
80-
await wait(500)
99+
// add latency to break the connection once
100+
if (breakConnection) {
101+
await wait(wsReconnectOptions.pingInterval * 2)
102+
breakConnection = false
103+
}
81104
socket.pong()
82105
})
83106
})
84107
85-
await pinoTest.once(loggerSpy, 'warn', 'proxy ws connection is broken')
86-
await pinoTest.once(loggerSpy, 'info', 'proxy ws reconnecting in 100 ms')
87-
await pinoTest.once(loggerSpy, 'info', 'proxy ws reconnected')
88-
})
108+
await waitForLogMessage(loggerSpy, 'proxy ws connection is broken')
109+
await waitForLogMessage(loggerSpy, 'proxy ws target close event')
110+
await waitForLogMessage(loggerSpy, 'proxy ws reconnected')
89111
112+
// TODO fix with source.removeAllListeners
90113
91-
test('should reconnect after failingwith retries', async (t) => {
92-
const wsReconnectOptions = { pingInterval: 150, reconnectInterval: 100, reconnectOnClose: true }
114+
t.end()
115+
})
93116
94-
const { target, logger } = await createServices({ t, wsReconnectOptions, wsTargetOptions: { autoPong: false } })
117+
test('should not reconnect after max retries', async (t) => {
118+
const wsReconnectOptions = { pingInterval: 150, reconnectInterval: 100, maxReconnectionRetries: 1 }
95119
96-
const refuseNewConnections = false
120+
const { target, loggerSpy } = await createServices({ t, wsReconnectOptions, wsTargetOptions: { autoPong: false } })
121+
122+
let breakConnection = true
97123
98124
target.ws.on('connection', async (socket) => {
99125
socket.on('ping', async () => {
100-
// add latency to break the connection
101-
await wait(500)
126+
// add latency to break the connection once
127+
if (breakConnection) {
128+
await wait(wsReconnectOptions.pingInterval * 2)
129+
breakConnection = false
130+
}
102131
socket.pong()
103132
})
104133
})
105134
135+
await waitForLogMessage(loggerSpy, 'proxy ws connection is broken')
136+
137+
target.ws.close()
138+
target.server.close()
139+
140+
await waitForLogMessage(loggerSpy, 'proxy ws target close event')
141+
await waitForLogMessage(loggerSpy, 'proxy ws reconnect error')
142+
await waitForLogMessage(loggerSpy, 'proxy ws failed to reconnect! No more retries')
143+
144+
t.end()
145+
})
146+
*/
147+
148+
test('should not reconnect because of connection timeout', async (t) => {
149+
const wsReconnectOptions = { pingInterval: 150, reconnectInterval: 100, maxReconnectionRetries: 1, connectionTimeout: 100 }
150+
151+
const { target, loggerSpy } = await createServices({ t, wsReconnectOptions, wsTargetOptions: { autoPong: false } })
152+
153+
let breakConnection = true
154+
106155
target.ws.on('upgrade', (request, socket, head) => {
107-
if (refuseNewConnections) {
108-
socket.destroy()
109-
}
156+
console.log('upgrade')
110157
})
111158

112-
// TODO use pino-test
113-
// await pinoTest.once(logger, 'warn', 'proxy ws connection is broken')
114-
115-
// close the target server to fail new connections
116-
// setTimeout(() => {
117-
// refuseNewConnections = true
118-
// setTimeout(() => {
119-
// refuseNewConnections = false
120-
// }, 500)
121-
// }, 1000)
122-
123-
// t.ok(logger._warn.find(l => l[1] === 'proxy ws connection is broken'))
124-
// t.ok(logger._info.find(l => l[1] === 'proxy ws reconnecting in 100 ms'))
125-
// t.ok(logger._error.find(l => l[1] === 'proxy ws reconnect error' && l[0].attempts === 1))
126-
// t.ok(logger._info.find(l => l[1] === 'proxy ws reconnected' && l[0].attempts === 2))
159+
target.ws.on('connection', async (socket) => {
160+
socket.on('ping', async () => {
161+
// add latency to break the connection once
162+
if (breakConnection) {
163+
await wait(wsReconnectOptions.pingInterval * 2)
164+
breakConnection = false
165+
}
166+
socket.pong()
167+
})
168+
})
169+
170+
await waitForLogMessage(loggerSpy, 'proxy ws connection is broken')
171+
172+
target.ws.close()
173+
target.server.close()
174+
175+
await waitForLogMessage(loggerSpy, 'proxy ws target close event')
176+
await waitForLogMessage(loggerSpy, 'proxy ws reconnect error')
177+
await waitForLogMessage(loggerSpy, 'proxy ws failed to reconnect! No more retries')
178+
179+
t.end()
180+
})
181+
182+
// TODO reconnect regular close
183+
184+
/*
185+
test('should reconnect with retry', async (t) => {
186+
const wsReconnectOptions = { pingInterval: 150, reconnectInterval: 100, reconnectOnClose: true }
187+
188+
const { target, loggerSpy, upstream } = await createServices({ t, wsReconnectOptions, wsTargetOptions: { autoPong: false } })
189+
190+
let breakConnection = true
191+
192+
target.ws.on('connection', async (socket) => {
193+
socket.on('ping', async () => {
194+
// add latency to break the connection once
195+
if (breakConnection) {
196+
await wait(wsReconnectOptions.pingInterval * 2)
197+
breakConnection = false
198+
}
199+
socket.pong()
200+
})
201+
})
202+
203+
await waitForLogMessage(loggerSpy, 'proxy ws connection is broken')
204+
205+
// recreate a new target with the same upstream
206+
207+
target.ws.close()
208+
target.server.close()
209+
await createServices({ t, upstream, wsReconnectOptions, wsTargetOptions: { autoPong: false } })
210+
211+
await waitForLogMessage(loggerSpy, 'proxy ws target close event')
212+
await waitForLogMessage(loggerSpy, 'proxy ws reconnect error')
213+
await waitForLogMessage(loggerSpy, 'proxy ws reconnected')
214+
215+
t.end()
127216
})
217+
*/
128218

129-
// TODO reconnect fails becase of timeout
130-
// cant reconnect
131-
// TODO reconnect on close/error/unexpected-response
132-
// TODO reconnectOnClose ... on shutdown
133-
// TODO check only socket to target
219+
// TODO reconnectOnClose but close all on shutdown

0 commit comments

Comments
 (0)