Skip to content

Allow attaching multiple WebSocket proxies #258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ An array that contains the types of the methods. Default: `['DELETE', 'GET', 'HE
This module has _partial_ support for forwarding websockets by passing a
`websocket` option. All those options are going to be forwarded to
[`@fastify/websocket`](https://github.com/fastify/fastify-websocket).

Multiple websocket proxies may be attached to the same HTTP server at different paths.
In this case, only the first `wsServerOptions` is applied.

A few things are missing:

1. forwarding headers as well as `rewriteHeaders`. Note: Only cookie headers are being forwarded
Expand Down
122 changes: 81 additions & 41 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,37 +58,76 @@ function proxyWebSockets (source, target) {
target.on('unexpected-response', () => close(1011, 'unexpected response'))
}

function setupWebSocketProxy (fastify, options, rewritePrefix) {
const server = new WebSocket.Server({
server: fastify.server,
...options.wsServerOptions
})

fastify.addHook('onClose', (instance, done) => server.close(done))

// To be able to close the HTTP server,
// all WebSocket clients need to be disconnected.
// Fastify is missing a pre-close event, or the ability to
// add a hook before the server.close call. We need to resort
// to monkeypatching for now.
const oldClose = fastify.server.close
fastify.server.close = function (done) {
for (const client of server.clients) {
client.close()
class WebSocketProxy {
constructor (fastify, wsServerOptions) {
this.logger = fastify.log

const wss = new WebSocket.Server({
server: fastify.server,
...wsServerOptions
})

// To be able to close the HTTP server,
// all WebSocket clients need to be disconnected.
// Fastify is missing a pre-close event, or the ability to
// add a hook before the server.close call. We need to resort
// to monkeypatching for now.
const oldClose = fastify.server.close
fastify.server.close = function (done) {
for (const client of wss.clients) {
client.close()
}
oldClose.call(this, done)
}
oldClose.call(this, done)

wss.on('error', (err) => {
this.logger.error(err)
})

wss.on('connection', this.handleConnection.bind(this))

this.wss = wss
this.prefixList = []
}

server.on('error', (err) => {
fastify.log.error(err)
})
close (done) {
this.wss.close(done)
}

addUpstream (prefix, rewritePrefix, upstream, wsClientOptions) {
this.prefixList.push({
prefix: new URL(prefix, 'ws://127.0.0.1').pathname,
rewritePrefix,
upstream: convertUrlToWebSocket(upstream),
wsClientOptions
})

// sort by decreasing prefix length, so that findUpstreamUrl() does longest prefix match
this.prefixList.sort((a, b) => b.prefix.length - a.prefix.length)
}

server.on('connection', (source, request) => {
if (fastify.prefix && !request.url.startsWith(fastify.prefix)) {
fastify.log.debug({ url: request.url }, 'not matching prefix')
findUpstream (request) {
const source = new URL(request.url, 'ws://127.0.0.1')

for (const { prefix, rewritePrefix, upstream, wsClientOptions } of this.prefixList) {
if (source.pathname.startsWith(prefix)) {
const target = new URL(source.pathname.replace(prefix, rewritePrefix), upstream)
target.search = source.search
return { target, wsClientOptions }
}
}

return undefined
}

handleConnection (source, request) {
const upstream = this.findUpstream(request)
if (!upstream) {
this.logger.debug({ url: request.url }, 'not matching prefix')
source.close()
return
}
const { target: url, wsClientOptions } = upstream

const subprotocols = []
if (source.protocol) {
Expand All @@ -98,31 +137,32 @@ function setupWebSocketProxy (fastify, options, rewritePrefix) {
let optionsWs = {}
if (request.headers.cookie) {
const headers = { cookie: request.headers.cookie }
optionsWs = { ...options.wsClientOptions, headers }
optionsWs = { ...wsClientOptions, headers }
} else {
optionsWs = options.wsClientOptions
optionsWs = wsClientOptions
}

const url = createWebSocketUrl(request)

const target = new WebSocket(url, subprotocols, optionsWs)

fastify.log.debug({ url: url.href }, 'proxy websocket')
this.logger.debug({ url: url.href }, 'proxy websocket')
proxyWebSockets(source, target)
})

function createWebSocketUrl (request) {
const source = new URL(request.url, 'ws://127.0.0.1')

const target = new URL(
source.pathname.replace(fastify.prefix, rewritePrefix),
convertUrlToWebSocket(options.upstream)
)
}
}

target.search = source.search
const httpWss = new WeakMap() // http.Server => WebSocketProxy

return target
function setupWebSocketProxy (fastify, options, rewritePrefix) {
let wsProxy = httpWss.get(fastify.server)
if (!wsProxy) {
wsProxy = new WebSocketProxy(fastify, options.wsServerOptions)
httpWss.set(fastify.server, wsProxy)

fastify.addHook('onClose', (instance, done) => {
httpWss.delete(fastify.server)
wsProxy.close(done)
})
}

wsProxy.addUpstream(fastify.prefix, rewritePrefix, options.upstream, options.wsClientOptions)
}

function generateRewritePrefix (prefix = '', opts) {
Expand Down
46 changes: 46 additions & 0 deletions test/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,52 @@ test('basic websocket proxy', async (t) => {
])
})

test('multiple websocket upstreams', async (t) => {
t.plan(8)

const server = Fastify()

for (const name of ['/A', '/A/B', '/C/D', '/C']) {
const origin = createServer()
const wss = new WebSocket.Server({ server: origin })
t.teardown(wss.close.bind(wss))
t.teardown(origin.close.bind(origin))

wss.once('connection', (ws) => {
ws.once('message', message => {
t.equal(message.toString(), `hello ${name}`)
// echo
ws.send(message)
})
})

await promisify(origin.listen.bind(origin))({ port: 0 })
server.register(proxy, {
prefix: name,
upstream: `ws://localhost:${origin.address().port}`,
websocket: true
})
}

await server.listen({ port: 0 })
t.teardown(server.close.bind(server))

const wsClients = []
for (const name of ['/A', '/A/B', '/C/D', '/C']) {
const ws = new WebSocket(`ws://localhost:${server.server.address().port}${name}`)
await once(ws, 'open')
ws.send(`hello ${name}`)
const [reply] = await once(ws, 'message')
t.equal(reply.toString(), `hello ${name}`)
wsClients.push(ws)
}

await Promise.all([
...wsClients.map(ws => once(ws, 'close')),
server.close()
])
})

test('captures errors on start', async (t) => {
const app = Fastify()
await app.listen({ port: 0 })
Expand Down