diff --git a/doc/api/quic.md b/doc/api/quic.md index 6969a43ec9f716..4feba4d2bb595f 100644 --- a/doc/api/quic.md +++ b/doc/api/quic.md @@ -288,11 +288,6 @@ added: REPLACEME * `validateAddress` {boolean} When `true`, the `QuicSocket` will use explicit address validation using a QUIC `RETRY` frame when listening for new server sessions. Default: `false`. - * `validateAddressLRU` {boolean} When `true`, validation will be skipped if - the address has been recently validated. Currently, only the 10 most - recently validated addresses are remembered. Setting `validateAddressLRU` - to `true`, will enable the `validateAddress` option as well. Default: - `false`. The `net.createQuicSocket()` function is used to create new `QuicSocket` instances associated with a local UDP address. diff --git a/lib/internal/quic/core.js b/lib/internal/quic/core.js index 47269ec00aa23e..11515ebf5fa476 100644 --- a/lib/internal/quic/core.js +++ b/lib/internal/quic/core.js @@ -179,7 +179,6 @@ const { QUICCLIENTSESSION_OPTION_REQUEST_OCSP, QUICCLIENTSESSION_OPTION_VERIFY_HOSTNAME_IDENTITY, QUICSOCKET_OPTIONS_VALIDATE_ADDRESS, - QUICSOCKET_OPTIONS_VALIDATE_ADDRESS_LRU, QUICSTREAM_HEADERS_KIND_NONE, QUICSTREAM_HEADERS_KIND_INFORMATIONAL, QUICSTREAM_HEADERS_KIND_INITIAL, @@ -225,6 +224,7 @@ const kOnFileOpened = Symbol('kOnFileOpened'); const kOnFileUnpipe = Symbol('kOnFileUnpipe'); const kOnPipedFileHandleRead = Symbol('kOnPipedFileHandleRead'); const kReady = Symbol('kReady'); +const kRemoveFromSocket = Symbol('kRemoveFromSocket'); const kRemoveSession = Symbol('kRemove'); const kRemoveStream = Symbol('kRemoveStream'); const kServerBusy = Symbol('kServerBusy'); @@ -940,9 +940,6 @@ class QuicSocket extends EventEmitter { // True if address verification should be used. validateAddress, - // True if an LRU should be used for add validation - validateAddressLRU, - // Whether qlog should be enabled for sessions qlog, @@ -963,8 +960,6 @@ class QuicSocket extends EventEmitter { let socketOptions = 0; if (validateAddress) socketOptions |= (1 << QUICSOCKET_OPTIONS_VALIDATE_ADDRESS); - if (validateAddressLRU) - socketOptions |= (1 << QUICSOCKET_OPTIONS_VALIDATE_ADDRESS_LRU); this[kSetHandle]( new QuicSocketHandle( @@ -2247,9 +2242,7 @@ class QuicSession extends EventEmitter { return this[kInternalState].handshakeContinuationHistogram; } - // TODO(addaleax): This is a temporary solution for testing and should be - // removed later. - removeFromSocket() { + [kRemoveFromSocket]() { return this[kHandle].removeFromSocket(); } } @@ -2695,7 +2688,17 @@ class QuicStream extends Duplex { } [kAfterAsyncWrite]({ bytes }) { - // TODO(@jasnell): Implement this + // There's currently nothing we need to do here. We have + // to have this but it's a non-op + } + + [kUpdateTimer]() { + // Timeout is implemented in the QuicSession at the native + // layer. We have to have this here but it's a non-op + } + + [kTrackWriteState](stream, bytes) { + // There's currently nothing to do here. } [kInspect](depth, options) { @@ -2712,17 +2715,6 @@ class QuicStream extends Duplex { }, depth, options); } - [kTrackWriteState](stream, bytes) { - // TODO(@jasnell): Not yet sure what we want to do with these - // this.#writeQueueSize += bytes; - // this.#writeQueueSize += bytes; - // this[kHandle].chunksSentSinceLastWrite = 0; - } - - [kUpdateTimer]() { - // TODO(@jasnell): Implement this later - } - get detached() { // The QuicStream is detached if it is yet destroyed // but the underlying handle is undefined. While in @@ -2750,7 +2742,7 @@ class QuicStream extends Duplex { [kWriteGeneric](writev, data, encoding, cb) { if (this.destroyed || this.detached) - return; // TODO(addaleax): Can this happen? + return; this[kUpdateTimer](); const req = (writev) ? @@ -2810,7 +2802,7 @@ class QuicStream extends Duplex { } _read(nread) { - if (this.destroyed) { // TODO(addaleax): Can this happen? + if (this.destroyed) { this.push(null); return; } @@ -2910,11 +2902,6 @@ class QuicStream extends Duplex { undefined; } - get bufferSize() { - // TODO(@jasnell): Implement this - return undefined; - } - get id() { return this[kInternalState].id; } @@ -2923,10 +2910,6 @@ class QuicStream extends Duplex { return this[kInternalState].push_id; } - _onTimeout() { - // TODO(@jasnell): Implement this - } - get session() { return this[kInternalState].session; } @@ -3127,7 +3110,8 @@ function createSocket(options) { module.exports = { createSocket, - kUDPHandleForTesting + kUDPHandleForTesting, + kRemoveFromSocket, }; /* eslint-enable no-use-before-define */ diff --git a/lib/internal/quic/util.js b/lib/internal/quic/util.js index 1d742b5da7a5b9..77ca135a272178 100644 --- a/lib/internal/quic/util.js +++ b/lib/internal/quic/util.js @@ -539,7 +539,6 @@ function validateQuicSocketOptions(options = {}) { retryTokenTimeout = DEFAULT_RETRYTOKEN_EXPIRATION, server = {}, statelessResetSecret, - validateAddressLRU = false, validateAddress = false, } = options; @@ -548,7 +547,6 @@ function validateQuicSocketOptions(options = {}) { validateObject(server, 'options.server'); validateLookup(lookup); validateBoolean(validateAddress, 'options.validateAddress'); - validateBoolean(validateAddressLRU, 'options.validateAddressLRU'); validateBoolean(qlog, 'options.qlog'); validateBoolean(disableStatelessReset, 'options.disableStatelessReset'); @@ -597,8 +595,7 @@ function validateQuicSocketOptions(options = {}) { retryTokenTimeout, server, type, - validateAddress: validateAddress || validateAddressLRU, - validateAddressLRU, + validateAddress, qlog, statelessResetSecret, disableStatelessReset, diff --git a/src/node_sockaddr-inl.h b/src/node_sockaddr-inl.h index a9d0ed061a126c..c8b985aedda2f6 100644 --- a/src/node_sockaddr-inl.h +++ b/src/node_sockaddr-inl.h @@ -7,6 +7,7 @@ #include "node_internals.h" #include "node_sockaddr.h" #include "util-inl.h" +#include "memory_tracker-inl.h" #include @@ -164,6 +165,72 @@ bool SocketAddress::operator==(const SocketAddress& other) const { bool SocketAddress::operator!=(const SocketAddress& other) const { return !(*this == other); } + +template +SocketAddressLRU::SocketAddressLRU( + size_t max_size) + : max_size_(max_size) {} + +template +typename T::Type* SocketAddressLRU::Peek( + const SocketAddress& address) const { + auto it = map_.find(address); + return it == std::end(map_) ? nullptr : &it->second->second; +} + +template +void SocketAddressLRU::CheckExpired() { + auto it = list_.rbegin(); + while (it != list_.rend()) { + if (T::CheckExpired(it->first, it->second)) { + map_.erase(it->first); + list_.pop_back(); + it = list_.rbegin(); + continue; + } else { + break; + } + } +} + +template +void SocketAddressLRU::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackFieldWithSize("list", size() * sizeof(Pair)); +} + +// If an item already exists for the given address, bump up it's +// position in the LRU list and return it. If the item does not +// exist, create it. If an item is created, check the size of the +// cache and adjust if necessary. Whether the item exists or not, +// purge expired items. +template +typename T::Type* SocketAddressLRU::Upsert( + const SocketAddress& address) { + + auto on_exit = OnScopeLeave([&]() { CheckExpired(); }); + + auto it = map_.find(address); + if (it != std::end(map_)) { + list_.splice(list_.begin(), list_, it->second); + T::Touch(it->first, &it->second->second); + return &it->second->second; + } + + list_.push_front(Pair(address, { })); + map_[address] = list_.begin(); + T::Touch(list_.begin()->first, &list_.begin()->second); + + // Drop the last item in the list if we are + // over the size limit... + if (map_.size() > max_size_) { + auto last = list_.end(); + map_.erase((--last)->first); + list_.pop_back(); + } + + return &map_[address]->second; +} + } // namespace node #endif // NODE_WANT_INTERNALS diff --git a/src/node_sockaddr.h b/src/node_sockaddr.h index c0d006a4d6e681..5d20487f93dbe8 100644 --- a/src/node_sockaddr.h +++ b/src/node_sockaddr.h @@ -9,6 +9,7 @@ #include "v8.h" #include +#include #include namespace node { @@ -116,6 +117,41 @@ class SocketAddress : public MemoryRetainer { sockaddr_storage address_; }; +template +class SocketAddressLRU : public MemoryRetainer { + public: + using Type = typename T::Type; + + inline explicit SocketAddressLRU(size_t max_size); + + // If the item already exists, returns a reference to + // the existing item, adjusting items position in the + // LRU. If the item does not exist, emplaces the item + // and returns the new item. + Type* Upsert(const SocketAddress& address); + + // Returns a reference to the item if it exists, or + // nullptr. The position in the LRU is not modified. + Type* Peek(const SocketAddress& address) const; + + size_t size() const { return map_.size(); } + size_t max_size() const { return max_size_; } + + void MemoryInfo(MemoryTracker* tracker) const override; + SET_MEMORY_INFO_NAME(SocketAddressLRU) + SET_SELF_SIZE(SocketAddressLRU) + + private: + using Pair = std::pair; + using Iterator = typename std::list::iterator; + + void CheckExpired(); + + std::list list_; + SocketAddress::Map map_; + size_t max_size_; +}; + } // namespace node #endif // NOE_WANT_INTERNALS diff --git a/src/quic/node_quic.cc b/src/quic/node_quic.cc index 29473492198d61..48ab6ac2fdc56f 100644 --- a/src/quic/node_quic.cc +++ b/src/quic/node_quic.cc @@ -189,7 +189,6 @@ void Initialize(Local target, V(QUICSERVERSESSION_OPTION_REJECT_UNAUTHORIZED) \ V(QUICSERVERSESSION_OPTION_REQUEST_CERT) \ V(QUICSOCKET_OPTIONS_VALIDATE_ADDRESS) \ - V(QUICSOCKET_OPTIONS_VALIDATE_ADDRESS_LRU) \ V(QUICSTREAM_HEADER_FLAGS_NONE) \ V(QUICSTREAM_HEADER_FLAGS_TERMINAL) \ V(QUICSTREAM_HEADERS_KIND_NONE) \ diff --git a/src/quic/node_quic_socket-inl.h b/src/quic/node_quic_socket-inl.h index 4d334a19f5b665..8e7bc65d7848f0 100644 --- a/src/quic/node_quic_socket-inl.h +++ b/src/quic/node_quic_socket-inl.h @@ -4,7 +4,7 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #include "node_quic_socket.h" -#include "node_sockaddr.h" +#include "node_sockaddr-inl.h" #include "node_quic_session.h" #include "node_crypto.h" #include "debug_utils-inl.h" @@ -112,33 +112,27 @@ void QuicSocket::ReportSendError(int error) { } void QuicSocket::IncrementStatelessResetCounter(const SocketAddress& addr) { - reset_counts_[addr]++; + addrLRU_.Upsert(addr)->reset_count++; } void QuicSocket::IncrementSocketAddressCounter(const SocketAddress& addr) { - addr_counts_[addr]++; + addrLRU_.Upsert(addr)->active_connections++; } void QuicSocket::DecrementSocketAddressCounter(const SocketAddress& addr) { - auto it = addr_counts_.find(addr); - if (it == std::end(addr_counts_)) - return; - it->second--; - // Remove the address if the counter reaches zero again. - if (it->second == 0) { - addr_counts_.erase(addr); - reset_counts_.erase(addr); - } + SocketAddressInfo* counts = addrLRU_.Peek(addr); + if (counts != nullptr && counts->active_connections > 0) + counts->active_connections--; } size_t QuicSocket::GetCurrentSocketAddressCounter(const SocketAddress& addr) { - auto it = addr_counts_.find(addr); - return it == std::end(addr_counts_) ? 0 : it->second; + SocketAddressInfo* counts = addrLRU_.Peek(addr); + return counts != nullptr ? counts->active_connections : 0; } size_t QuicSocket::GetCurrentStatelessResetCounter(const SocketAddress& addr) { - auto it = reset_counts_.find(addr); - return it == std::end(reset_counts_) ? 0 : it->second; + SocketAddressInfo* counts = addrLRU_.Peek(addr); + return counts != nullptr ? counts->reset_count : 0; } void QuicSocket::ServerBusy(bool on) { @@ -160,22 +154,12 @@ void QuicSocket::set_diagnostic_packet_loss(double rx, double tx) { } void QuicSocket::set_validated_address(const SocketAddress& addr) { - if (has_option_validate_address_lru()) { - // Remove the oldest item if we've hit the LRU limit - validated_addrs_.push_back(SocketAddress::Hash()(addr)); - if (validated_addrs_.size() > kMaxValidateAddressLru) - validated_addrs_.pop_front(); - } + addrLRU_.Upsert(addr)->validated = true; } bool QuicSocket::is_validated_address(const SocketAddress& addr) const { - if (has_option_validate_address_lru()) { - auto res = std::find(std::begin(validated_addrs_), - std::end(validated_addrs_), - SocketAddress::Hash()(addr)); - return res != std::end(validated_addrs_); - } - return false; + auto info = addrLRU_.Peek(addr); + return info != nullptr ? info->validated : false; } void QuicSocket::AddSession( diff --git a/src/quic/node_quic_socket.cc b/src/quic/node_quic_socket.cc index ae0847d8d27bd9..e8dd6f23b159cb 100644 --- a/src/quic/node_quic_socket.cc +++ b/src/quic/node_quic_socket.cc @@ -261,6 +261,7 @@ QuicSocket::QuicSocket( retry_token_expiration_(retry_token_expiration), qlog_(qlog), server_alpn_(NGHTTP3_ALPN_H3), + addrLRU_(DEFAULT_MAX_SOCKETADDRESS_LRU_SIZE), quic_state_(quic_state) { MakeWeak(); PushListener(&default_listener_); @@ -309,10 +310,8 @@ void QuicSocket::MemoryInfo(MemoryTracker* tracker) const { tracker->TrackField("endpoints", endpoints_); tracker->TrackField("sessions", sessions_); tracker->TrackField("dcid_to_scid", dcid_to_scid_); - tracker->TrackField("addr_counts", addr_counts_); - tracker->TrackField("reset_counts", reset_counts_); + tracker->TrackField("address_counts", addrLRU_); tracker->TrackField("token_map", token_map_); - tracker->TrackField("validated_addrs", validated_addrs_); StatsBase::StatsMemoryInfo(tracker); tracker->TrackFieldWithSize( "current_ngtcp2_memory", @@ -677,23 +676,27 @@ bool QuicSocket::SendStatelessReset( // peer must termination it's initial attempt to // establish a connection and start a new attempt. // -// TODO(@jasnell): Retry packets will only ever be -// generated by QUIC servers, and only if the QuicSocket -// is configured for explicit path validation. There is -// no way for a client to force a retry packet to be created. -// However, once a client determines that explicit -// path validation is enabled, it could attempt to -// DOS by sending a large number of malicious -// initial packets to intentionally ellicit retry -// packets (It can do so by intentionally sending -// initial packets that ignore the retry token). -// To help mitigate that risk, we should limit the number -// of retries we send to a given remote endpoint. +// Retry packets will only ever be generated by QUIC servers, +// and only if the QuicSocket is configured for explicit path +// validation. There is no way for a client to force a retry +// packet to be created. However, once a client determines that +// explicit path validation is enabled, it could attempt to +// DOS by sending a large number of malicious initial packets +// to intentionally ellicit retry packets (It can do so by +// intentionally sending initial packets that ignore the retry +// token). To help mitigate that risk, we limit the number of +// retries we send to a given remote endpoint. bool QuicSocket::SendRetry( const QuicCID& dcid, const QuicCID& scid, const SocketAddress& local_addr, const SocketAddress& remote_addr) { + auto info = addrLRU_.Upsert(remote_addr); + // Do not send a retry if the retry count is greater + // than the retry limit. + // TODO(@jasnell): Make the retry limit configurable. + if (++(info->retry_count) > DEFAULT_MAX_RETRY_LIMIT) + return true; std::unique_ptr packet = GenerateRetryPacket(token_secret_, dcid, scid, local_addr, remote_addr); return packet ? @@ -978,6 +981,18 @@ void QuicSocket::RemoveListener(QuicSocketListener* listener) { listener->previous_listener_ = nullptr; } +bool QuicSocket::SocketAddressInfoTraits::CheckExpired( + const SocketAddress& address, + const Type& type) { + return (uv_hrtime() - type.timestamp) > 1e10; // 10 seconds. +} + +void QuicSocket::SocketAddressInfoTraits::Touch( + const SocketAddress& address, + Type* type) { + type->timestamp = uv_hrtime(); +} + // JavaScript API namespace { void NewQuicEndpoint(const FunctionCallbackInfo& args) { diff --git a/src/quic/node_quic_socket.h b/src/quic/node_quic_socket.h index f108eb47901b81..c760bba6034ba9 100644 --- a/src/quic/node_quic_socket.h +++ b/src/quic/node_quic_socket.h @@ -37,9 +37,11 @@ namespace quic { class QuicSocket; class QuicEndpoint; +constexpr size_t DEFAULT_MAX_SOCKETADDRESS_LRU_SIZE = 1000; +constexpr size_t DEFAULT_MAX_RETRY_LIMIT = 10; + #define QUICSOCKET_OPTIONS(V) \ - V(VALIDATE_ADDRESS, validate_address) \ - V(VALIDATE_ADDRESS_LRU, validate_address_lru) + V(VALIDATE_ADDRESS, validate_address) #define V(id, _) QUICSOCKET_OPTIONS_##id, enum QuicSocketOptions : uint32_t { @@ -546,36 +548,24 @@ class QuicSocket : public AsyncWrap, uint8_t token_secret_[kTokenSecretLen]; uint8_t reset_token_secret_[NGTCP2_STATELESS_RESET_TOKENLEN]; - // Counts the number of active connections per remote - // address. A custom std::hash specialization for - // sockaddr instances is used. Values are incremented - // when a QuicSession is added to the socket, and - // decremented when the QuicSession is removed. If the - // value reaches the value of max_connections_per_host_, - // attempts to create new connections will be ignored - // until the value falls back below the limit. - SocketAddress::Map addr_counts_; - - // Counts the number of stateless resets sent per - // remote address. - // TODO(@jasnell): this counter persists through the - // lifetime of the QuicSocket, and therefore can become - // a possible risk. Specifically, a malicious peer could - // attempt the local peer to count an increasingly large - // number of remote addresses. Need to mitigate the - // potential risk. - SocketAddress::Map reset_counts_; - - // Counts the number of retry attempts sent per - // remote address. + struct SocketAddressInfo { + size_t active_connections; + size_t reset_count; + size_t retry_count; + bool validated; + uint64_t timestamp; + }; - StatelessResetToken::Map token_map_; + struct SocketAddressInfoTraits { + using Type = SocketAddressInfo; - // The validated_addrs_ vector is used as an LRU cache for - // validated addresses only when the VALIDATE_ADDRESS_LRU - // option is set. - typedef size_t SocketAddressHash; - std::deque validated_addrs_; + static bool CheckExpired(const SocketAddress& address, const Type& type); + static void Touch(const SocketAddress& address, Type* type); + }; + + SocketAddressLRU addrLRU_; + + StatelessResetToken::Map token_map_; class SendWrap : public ReqWrap { public: diff --git a/test/cctest/test_sockaddr.cc b/test/cctest/test_sockaddr.cc index 8c23463f11d2d1..9abcd8ba819c64 100644 --- a/test/cctest/test_sockaddr.cc +++ b/test/cctest/test_sockaddr.cc @@ -2,6 +2,7 @@ #include "gtest/gtest.h" using node::SocketAddress; +using node::SocketAddressLRU; TEST(SocketAddress, SocketAddress) { CHECK(SocketAddress::is_numeric_host("123.123.123.123")); @@ -55,3 +56,72 @@ TEST(SocketAddress, SocketAddressIPv6) { addr.set_flow_label(12345); CHECK_EQ(addr.flow_label(), 12345); } + +TEST(SocketAddressLRU, SocketAddressLRU) { + struct Foo { + int c; + bool expired; + }; + + struct FooLRUTraits { + using Type = Foo; + + static bool CheckExpired(const SocketAddress& address, const Type& type) { + return type.expired; + } + + static void Touch(const SocketAddress& address, Type* type) { + type->expired = false; + } + }; + + SocketAddressLRU lru(2); + + sockaddr_storage storage[4]; + + SocketAddress::ToSockAddr(AF_INET, "123.123.123.123", 443, &storage[0]); + SocketAddress::ToSockAddr(AF_INET, "123.123.123.124", 443, &storage[1]); + SocketAddress::ToSockAddr(AF_INET, "123.123.123.125", 443, &storage[2]); + SocketAddress::ToSockAddr(AF_INET, "123.123.123.123", 443, &storage[3]); + + SocketAddress addr1(reinterpret_cast(&storage[0])); + SocketAddress addr2(reinterpret_cast(&storage[1])); + SocketAddress addr3(reinterpret_cast(&storage[2])); + SocketAddress addr4(reinterpret_cast(&storage[3])); + + Foo* foo = lru.Upsert(addr1); + CHECK_NOT_NULL(foo); + CHECK_EQ(foo->c, 0); + CHECK_EQ(foo->expired, false); + + foo->c = 1; + foo->expired = true; + + foo = lru.Upsert(addr1); + CHECK_NOT_NULL(lru.Peek(addr1)); + CHECK_EQ(lru.Peek(addr1), lru.Peek(addr4)); + CHECK_EQ(lru.Peek(addr1)->c, 1); + CHECK_EQ(lru.Peek(addr1)->expired, false); + CHECK_EQ(lru.size(), 1); + + foo = lru.Upsert(addr2); + foo->c = 2; + foo->expired = true; + CHECK_NOT_NULL(lru.Peek(addr2)); + CHECK_EQ(lru.Peek(addr2)->c, 2); + CHECK_EQ(lru.size(), 2); + + foo->expired = true; + + foo = lru.Upsert(addr3); + foo->c = 3; + foo->expired = false; + CHECK_NOT_NULL(lru.Peek(addr3)); + CHECK_EQ(lru.Peek(addr3)->c, 3); + CHECK_EQ(lru.size(), 1); + + // addr1 was removed because we exceeded size. + // addr2 was removed because it was expired. + CHECK_NULL(lru.Peek(addr1)); + CHECK_NULL(lru.Peek(addr2)); +} diff --git a/test/parallel/test-quic-errors-quicsocket-create.js b/test/parallel/test-quic-errors-quicsocket-create.js index 570f02b1a95661..e7e47339fe34cb 100644 --- a/test/parallel/test-quic-errors-quicsocket-create.js +++ b/test/parallel/test-quic-errors-quicsocket-create.js @@ -68,13 +68,6 @@ const { createQuicSocket } = require('net'); }); }); -// Test invalid QuicSocket validateAddressLRU argument option -[1, NaN, 1n, null, {}, []].forEach((validateAddressLRU) => { - assert.throws(() => createQuicSocket({ validateAddressLRU }), { - code: 'ERR_INVALID_ARG_TYPE' - }); -}); - // Test invalid QuicSocket qlog argument option [1, NaN, 1n, null, {}, []].forEach((qlog) => { assert.throws(() => createQuicSocket({ qlog }), { diff --git a/test/parallel/test-quic-process-cleanup.js b/test/parallel/test-quic-process-cleanup.js index 2f057a52a9e9c3..b7ba0caa70d841 100644 --- a/test/parallel/test-quic-process-cleanup.js +++ b/test/parallel/test-quic-process-cleanup.js @@ -1,4 +1,4 @@ -// Flags: --no-warnings +// Flags: --no-warnings --expose-internals 'use strict'; const common = require('../common'); if (!common.hasQuic) @@ -8,6 +8,7 @@ if (!common.hasQuic) // well. We use Workers because they have a more clearly defined shutdown // sequence and we can stop execution at any point. +const { kRemoveFromSocket } = require('internal/quic/core'); const { createQuicSocket } = require('net'); const { Worker, workerData } = require('worker_threads'); @@ -46,7 +47,7 @@ client.on('close', common.mustNotCall()); req.on('stream', common.mustCall(() => { if (workerData.removeFromSocket) - req.removeFromSocket(); + req[kRemoveFromSocket](); process.exit(); // Exits the worker thread }));