Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 43 additions & 21 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const kOpenStreams = Symbol('open streams')
const kRequestStreamId = Symbol('request stream id')
const kRequestStream = Symbol('request stream')
const kRequestStreamCleanup = Symbol('request stream cleanup')
const kRequestStreamOnData = Symbol('request stream on data')
const kRequestStreamOnCloseError = Symbol('request stream on close error')
const kReceivedGoAway = Symbol('received goaway')

let extractBody
Expand Down Expand Up @@ -491,6 +493,37 @@ function onSocketClose () {
this[kClosed] = true
}

function noop () {}

function closeStreamSession (stream) {
const session = stream[kHTTP2Session]

stream[kHTTP2Session] = null
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
}
}

function onUpgradeStreamClose () {
this.off('error', noop)

const failUpgradeStream = this[kRequestStreamOnCloseError]
this[kRequestStreamOnCloseError] = null

failUpgradeStream(new InformationalError('HTTP/2: stream closed before response headers'))
closeStreamSession(this)
}

function onRequestStreamClose () {
const onData = this[kRequestStreamOnData]

this[kRequestStreamOnData] = null
this.off('data', onData)
this.off('error', noop)
closeStreamSession(this)
}

// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
function shouldSendContentLength (method) {
return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
Expand Down Expand Up @@ -623,14 +656,13 @@ function writeH2 (client, request) {

const setupUpgradeStream = (stream) => {
let responseReceived = false
const onDetachedUpgradeStreamError = () => {}

const removeUpgradeStreamListeners = () => {
stream.off('response', onUpgradeResponse)
stream.off('error', onUpgradeStreamError)
stream.off('end', onUpgradeStreamEnd)
stream.off('timeout', onUpgradeStreamTimeout)
stream.off('error', onDetachedUpgradeStreamError)
stream.off('error', noop)
}

const releaseUpgradeStream = () => {
Expand All @@ -641,7 +673,7 @@ function writeH2 (client, request) {
removeUpgradeStreamListeners()

if (!stream.destroyed && !stream.closed) {
stream.once('error', onDetachedUpgradeStreamError)
stream.once('error', noop)
}
}

Expand Down Expand Up @@ -691,13 +723,9 @@ function writeH2 (client, request) {
stream.on('error', onUpgradeStreamError)
stream.once('end', onUpgradeStreamEnd)
stream.on('timeout', onUpgradeStreamTimeout)
stream.once('close', () => {
stream.off('error', onDetachedUpgradeStreamError)
failUpgradeStream(new InformationalError('HTTP/2: stream closed before response headers'))

session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) session.unref()
})
stream[kHTTP2Session] = session
stream[kRequestStreamOnCloseError] = failUpgradeStream
stream.once('close', onUpgradeStreamClose)

++session[kOpenStreams]
stream.setTimeout(requestTimeout)
Expand Down Expand Up @@ -855,7 +883,6 @@ function writeH2 (client, request) {

// Track whether we received a response (headers)
let responseReceived = false
const onDetachedStreamError = () => {}
const onData = (chunk) => {
if (request.aborted || request.completed) {
return
Expand All @@ -867,7 +894,7 @@ function writeH2 (client, request) {
}

const removeRequestStreamListeners = () => {
stream.off('error', onDetachedStreamError)
stream.off('error', noop)
stream.off('continue', writeBodyH2)
stream.off('response', onResponse)
stream.off('end', onEnd)
Expand All @@ -887,7 +914,7 @@ function writeH2 (client, request) {
removeRequestStreamListeners()

if (!stream.destroyed && !stream.closed) {
stream.once('error', onDetachedStreamError)
stream.once('error', noop)
}
}

Expand Down Expand Up @@ -933,14 +960,9 @@ function writeH2 (client, request) {
}
}

stream.once('close', () => {
stream.off('data', onData)
stream.off('error', onDetachedStreamError)
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
}
})
stream[kHTTP2Session] = session
stream[kRequestStreamOnData] = onData
stream.once('close', onRequestStreamClose)

const onError = function (err) {
stream.off('error', onError)
Expand Down
Loading