From 2a17d4a5c0519ed81000eed08fa122b5701aa8b6 Mon Sep 17 00:00:00 2001 From: Simone Sanfratello Date: Mon, 24 Feb 2025 10:51:23 +0100 Subject: [PATCH] feat: add context to hooks --- README.md | 13 ++++++----- examples/reconnection/proxy/index.js | 4 ++-- index.js | 26 +++++++++++++-------- test/websocket.js | 26 ++++++++++----------- test/ws-reconnect.js | 34 ++++++++++++++-------------- 5 files changed, 55 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index 0e52d6c..434337a 100644 --- a/README.md +++ b/README.md @@ -249,14 +249,15 @@ See the example in [examples/reconnection](examples/reconnection). ## wsHooks -On websocket events, the following hooks are available, note **the hooks are all synchronous**. +On websocket events, the following hooks are available, note **the hooks are all synchronous**. +The `context` object is passed to all hooks and contains the `log` property. -- `onIncomingMessage`: A hook function that is called when the request is received from the client `onIncomingMessage(source, target, { data, binary })` (default: `undefined`). -- `onOutgoingMessage`: A hook function that is called when the response is received from the target `onOutgoingMessage(source, target, { data, binary })` (default: `undefined`). +- `onIncomingMessage`: A hook function that is called when the request is received from the client `onIncomingMessage(context, source, target, { data, binary })` (default: `undefined`). +- `onOutgoingMessage`: A hook function that is called when the response is received from the target `onOutgoingMessage(context, source, target, { data, binary })` (default: `undefined`). - `onConnect`: A hook function that is called when the connection is established `onConnect(source, target)` (default: `undefined`). -- `onDisconnect`: A hook function that is called when the connection is closed `onDisconnect(source)` (default: `undefined`). -- `onReconnect`: A hook function that is called when the connection is reconnected `onReconnect(source, target)` (default: `undefined`). The function is called if reconnection feature is enabled. -- `onPong`: A hook function that is called when the target responds to the ping `onPong(source, target)` (default: `undefined`). The function is called if reconnection feature is enabled. +- `onDisconnect`: A hook function that is called when the connection is closed `onDisconnect(context, source)` (default: `undefined`). +- `onReconnect`: A hook function that is called when the connection is reconnected `onReconnect(context, source, target)` (default: `undefined`). The function is called if reconnection feature is enabled. +- `onPong`: A hook function that is called when the target responds to the ping `onPong(context, source, target)` (default: `undefined`). The function is called if reconnection feature is enabled. ## Benchmarks diff --git a/examples/reconnection/proxy/index.js b/examples/reconnection/proxy/index.js index 770ec67..bc883a9 100644 --- a/examples/reconnection/proxy/index.js +++ b/examples/reconnection/proxy/index.js @@ -40,7 +40,7 @@ async function main () { // clean backup from the last ping backup = backup.filter(message => message.timestamp > lastPong) }, - onIncomingMessage: (source, target, message) => { + onIncomingMessage: (context, source, target, message) => { const m = message.data.toString() console.log('onIncomingMessage backup', m) backup.push({ message: m, timestamp: Date.now() }) @@ -49,7 +49,7 @@ async function main () { console.log('onDisconnect') backup.length = 0 }, - onReconnect: (source, target) => { + onReconnect: (context, source, target) => { console.log('onReconnect') resendMessages(target) }, diff --git a/index.js b/index.js index 4f0e5af..7ac0ccc 100644 --- a/index.js +++ b/index.js @@ -82,12 +82,17 @@ function isExternalUrl (url) { function noop () { } +function createContext (logger) { + return { log: logger } +} + function proxyWebSockets (logger, source, target, hooks) { + const context = createContext(logger) function close (code, reason) { if (hooks.onDisconnect) { waitConnection(target, () => { try { - hooks.onDisconnect(source) + hooks.onDisconnect(context, source) } catch (err) { logger.error({ err }, 'proxy ws error from onDisconnect hook') } @@ -100,7 +105,7 @@ function proxyWebSockets (logger, source, target, hooks) { source.on('message', (data, binary) => { if (hooks.onIncomingMessage) { try { - hooks.onIncomingMessage(source, target, { data, binary }) + hooks.onIncomingMessage(context, source, target, { data, binary }) } catch (err) { logger.error({ err }, 'proxy ws error from onIncomingMessage hook') } @@ -121,7 +126,7 @@ function proxyWebSockets (logger, source, target, hooks) { target.on('message', (data, binary) => { if (hooks.onOutgoingMessage) { try { - hooks.onOutgoingMessage(source, target, { data, binary }) + hooks.onOutgoingMessage(context, source, target, { data, binary }) } catch (err) { logger.error({ err }, 'proxy ws error from onOutgoingMessage hook') } @@ -141,7 +146,7 @@ function proxyWebSockets (logger, source, target, hooks) { if (hooks.onConnect) { waitConnection(target, () => { try { - hooks.onConnect(source, target) + hooks.onConnect(context, source, target) } catch (err) { logger.error({ err }, 'proxy ws error from onConnect hook') } @@ -189,6 +194,7 @@ async function reconnect (logger, source, reconnectOptions, hooks, targetParams) } function proxyWebSocketsWithReconnection (logger, source, target, options, hooks, targetParams, isReconnecting = false) { + const context = createContext(logger) function close (code, reason) { target.pingTimer && clearInterval(target.pingTimer) target.pingTimer = undefined @@ -206,7 +212,7 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, hooks if (hooks.onDisconnect) { try { - hooks.onDisconnect(source) + hooks.onDisconnect(context, source) } catch (err) { options.logs && logger.error({ target: targetParams.url, err }, 'proxy ws error from onDisconnect hook') } @@ -231,7 +237,7 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, hooks source.isAlive = true if (hooks.onIncomingMessage) { try { - hooks.onIncomingMessage(source, target, { data, binary }) + hooks.onIncomingMessage(context, source, target, { data, binary }) } catch (err) { logger.error({ target: targetParams.url, err }, 'proxy ws error from onIncomingMessage hook') } @@ -281,7 +287,7 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, hooks target.isAlive = true if (hooks.onOutgoingMessage) { try { - hooks.onOutgoingMessage(source, target, { data, binary }) + hooks.onOutgoingMessage(context, source, target, { data, binary }) } catch (err) { logger.error({ target: targetParams.url, err }, 'proxy ws error from onOutgoingMessage hook') } @@ -296,7 +302,7 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, hooks target.isAlive = true if (hooks.onPong) { try { - hooks.onPong(source, target) + hooks.onPong(context, source, target) } catch (err) { logger.error({ target: targetParams.url, err }, 'proxy ws error from onPong hook') } @@ -336,13 +342,13 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, hooks // call onConnect and onReconnect callbacks after the events are bound if (isReconnecting && hooks.onReconnect) { try { - hooks.onReconnect(source, target) + hooks.onReconnect(context, source, target) } catch (err) { options.logs && logger.error({ target: targetParams.url, err }, 'proxy ws error from onReconnect hook') } } else if (hooks.onConnect) { try { - hooks.onConnect(source, target) + hooks.onConnect(context, source, target) } catch (err) { options.logs && logger.error({ target: targetParams.url, err }, 'proxy ws error from onConnect hook') } diff --git a/test/websocket.js b/test/websocket.js index 7c10e97..4a8fecd 100644 --- a/test/websocket.js +++ b/test/websocket.js @@ -716,18 +716,18 @@ test('multiple websocket upstreams with distinct server options', async (t) => { test('should call onIncomingMessage and onOutgoingMessage hooks', async (t) => { const request = 'query () { ... }' const response = 'data ...' - const onIncomingMessage = (source, target, { data, binary }) => { + const onIncomingMessage = (context, source, target, { data, binary }) => { assert.strictEqual(data.toString(), request) assert.strictEqual(binary, false) - logger.info('onIncomingMessage called') + context.log.info('onIncomingMessage called') } - const onOutgoingMessage = (source, target, { data, binary }) => { + const onOutgoingMessage = (context, source, target, { data, binary }) => { assert.strictEqual(data.toString(), response) assert.strictEqual(binary, false) - logger.info('onOutgoingMessage called') + context.log.info('onOutgoingMessage called') } - const { target, loggerSpy, logger, client } = await createServices({ t, wsHooks: { onIncomingMessage, onOutgoingMessage } }) + const { target, loggerSpy, client } = await createServices({ t, wsHooks: { onIncomingMessage, onOutgoingMessage } }) target.ws.on('connection', async (socket) => { socket.on('message', async (data, binary) => { @@ -744,12 +744,12 @@ test('should call onIncomingMessage and onOutgoingMessage hooks', async (t) => { test('should handle throwing an error in onIncomingMessage and onOutgoingMessage hooks', async (t) => { const request = 'query () { ... }' const response = 'data ...' - const onIncomingMessage = (source, target, { data, binary }) => { + const onIncomingMessage = (context, source, target, { data, binary }) => { assert.strictEqual(data.toString(), request) assert.strictEqual(binary, false) throw new Error('onIncomingMessage error') } - const onOutgoingMessage = (source, target, { data, binary }) => { + const onOutgoingMessage = (context, source, target, { data, binary }) => { assert.strictEqual(data.toString(), response) assert.strictEqual(binary, false) throw new Error('onOutgoingMessage error') @@ -770,11 +770,11 @@ test('should handle throwing an error in onIncomingMessage and onOutgoingMessage }) test('should call onConnect hook', async (t) => { - const onConnect = () => { - logger.info('onConnect called') + const onConnect = (context) => { + context.log.info('onConnect called') } - const { loggerSpy, logger } = await createServices({ t, wsHooks: { onConnect } }) + const { loggerSpy } = await createServices({ t, wsHooks: { onConnect } }) await waitForLogMessage(loggerSpy, 'onConnect called') }) @@ -790,11 +790,11 @@ test('should handle throwing an error in onConnect hook', async (t) => { }) test('should call onDisconnect hook', async (t) => { - const onDisconnect = () => { - logger.info('onDisconnect called') + const onDisconnect = (context) => { + context.log.info('onDisconnect called') } - const { loggerSpy, logger, client } = await createServices({ t, wsHooks: { onDisconnect } }) + const { loggerSpy, client } = await createServices({ t, wsHooks: { onDisconnect } }) client.close() await waitForLogMessage(loggerSpy, 'onDisconnect called') diff --git a/test/ws-reconnect.js b/test/ws-reconnect.js index 56f34b7..4cbbc73 100644 --- a/test/ws-reconnect.js +++ b/test/ws-reconnect.js @@ -145,8 +145,8 @@ test('should reconnect when the target connection is closed gracefully and recon }) test('should call onReconnect hook when the connection is reconnected', async (t) => { - const onReconnect = (source, target) => { - logger.info('onReconnect called') + const onReconnect = (context, source, target) => { + context.log.info('onReconnect called') } const wsReconnectOptions = { pingInterval: 100, @@ -156,7 +156,7 @@ test('should call onReconnect hook when the connection is reconnected', async (t logs: true, } - const { target, loggerSpy, logger } = await createServices({ t, wsReconnectOptions, wsHooks: { onReconnect } }) + const { target, loggerSpy } = await createServices({ t, wsReconnectOptions, wsHooks: { onReconnect } }) target.ws.on('connection', async (socket) => { socket.on('ping', async () => { @@ -173,7 +173,7 @@ test('should call onReconnect hook when the connection is reconnected', async (t }) test('should handle throwing an error in onReconnect hook', async (t) => { - const onReconnect = (source, target) => { + const onReconnect = () => { throw new Error('onReconnect error') } const wsReconnectOptions = { @@ -203,15 +203,15 @@ test('should handle throwing an error in onReconnect hook', async (t) => { test('should call onIncomingMessage and onOutgoingMessage hooks, with reconnection', async (t) => { const request = 'query () { ... }' const response = 'data ...' - const onIncomingMessage = (source, target, { data, binary }) => { + const onIncomingMessage = (context, source, target, { data, binary }) => { assert.strictEqual(data.toString(), request) assert.strictEqual(binary, false) - logger.info('onIncomingMessage called') + context.log.info('onIncomingMessage called') } - const onOutgoingMessage = (source, target, { data, binary }) => { + const onOutgoingMessage = (context, source, target, { data, binary }) => { assert.strictEqual(data.toString(), response) assert.strictEqual(binary, false) - logger.info('onOutgoingMessage called') + context.log.info('onOutgoingMessage called') } const wsReconnectOptions = { pingInterval: 100, @@ -220,7 +220,7 @@ test('should call onIncomingMessage and onOutgoingMessage hooks, with reconnecti logs: true, } - const { target, loggerSpy, logger, client } = await createServices({ t, wsReconnectOptions, wsHooks: { onIncomingMessage, onOutgoingMessage } }) + const { target, loggerSpy, client } = await createServices({ t, wsReconnectOptions, wsHooks: { onIncomingMessage, onOutgoingMessage } }) target.ws.on('connection', async (socket) => { socket.on('message', async (data, binary) => { @@ -237,12 +237,12 @@ test('should call onIncomingMessage and onOutgoingMessage hooks, with reconnecti test('should handle throwing an error in onIncomingMessage and onOutgoingMessage hooks, with reconnection', async (t) => { const request = 'query () { ... }' const response = 'data ...' - const onIncomingMessage = ({ data, binary }) => { + const onIncomingMessage = (context, source, target, { data, binary }) => { assert.strictEqual(data.toString(), request) assert.strictEqual(binary, false) throw new Error('onIncomingMessage error') } - const onOutgoingMessage = ({ data, binary }) => { + const onOutgoingMessage = (context, source, target, { data, binary }) => { assert.strictEqual(data.toString(), response) assert.strictEqual(binary, false) throw new Error('onOutgoingMessage error') @@ -269,15 +269,15 @@ test('should handle throwing an error in onIncomingMessage and onOutgoingMessage }) test('should call onConnect hook', async (t) => { - const onConnect = () => { - logger.info('onConnect called') + const onConnect = (context) => { + context.log.info('onConnect called') } const wsReconnectOptions = { logs: true, } - const { loggerSpy, logger } = await createServices({ t, wsReconnectOptions, wsHooks: { onConnect } }) + const { loggerSpy } = await createServices({ t, wsReconnectOptions, wsHooks: { onConnect } }) await waitForLogMessage(loggerSpy, 'onConnect called') }) @@ -297,15 +297,15 @@ test('should handle throwing an error in onConnect hook', async (t) => { }) test('should call onDisconnect hook', async (t) => { - const onDisconnect = () => { - logger.info('onDisconnect called') + const onDisconnect = (context) => { + context.log.info('onDisconnect called') } const wsReconnectOptions = { logs: true, } - const { loggerSpy, logger, client } = await createServices({ t, wsReconnectOptions, wsHooks: { onDisconnect } }) + const { loggerSpy, client } = await createServices({ t, wsReconnectOptions, wsHooks: { onDisconnect } }) client.close() await waitForLogMessage(loggerSpy, 'onDisconnect called')