Skip to content
This repository was archived by the owner on Aug 11, 2020. It is now read-only.

Commit aa99a6a

Browse files
committed
quic: fixes and improvements
PR-URL: #31
1 parent ce3eff2 commit aa99a6a

File tree

6 files changed

+256
-75
lines changed

6 files changed

+256
-75
lines changed

lib/internal/quic/core.js

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,18 +1348,42 @@ class QuicStream extends Duplex {
13481348
this[kHandle] = handle;
13491349
this.#id = id;
13501350
this.#session = session;
1351+
this._readableState.readingMore = true;
1352+
1353+
// See src/node_quic_stream.h for an explanation
1354+
// of the initial states for unidirectional streams.
1355+
if (this.unidirectional) {
1356+
if (session instanceof QuicServerSession) {
1357+
if (this.serverInitiated) {
1358+
// Close the readable side
1359+
this.push(null);
1360+
this.read();
1361+
} else {
1362+
// Close the writable side
1363+
this.end();
1364+
}
1365+
} else {
1366+
if (this.serverInitiated) {
1367+
// Close the writable side
1368+
this.end();
1369+
} else {
1370+
this.push(null);
1371+
this.read();
1372+
}
1373+
}
1374+
}
13511375
}
13521376

13531377
get serverInitiated() {
1354-
return this.#id & 0b01;
1378+
return !!(this.#id & 0b01);
13551379
}
13561380

13571381
get clientInitiated() {
13581382
return !this.serverInitiated;
13591383
}
13601384

13611385
get unidirectional() {
1362-
return this.#id & 0b10;
1386+
return !!(this.#id & 0b10);
13631387
}
13641388

13651389
get bidirectional() {
@@ -1372,6 +1396,7 @@ class QuicStream extends Duplex {
13721396
// remaining within the duplex writable side queue.
13731397
this.end();
13741398
this.push(null);
1399+
this.read();
13751400
process.nextTick(emit.bind(this, 'reset', finalSize, appErrorCode));
13761401
// TODO(@jasnell): Should we destroy here? It's not yet clear
13771402
// what else should be done

lib/internal/stream_base_commons.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,9 @@ function afterWriteDispatched(self, req, err) {
143143
req.async = !!streamBaseState[kLastWriteWasAsync];
144144

145145
if (err !== 0)
146-
return self.destroy(errnoException(err, 'write', req.error), cb);
146+
return self.destroy(
147+
errnoException(err, 'write', req.error),
148+
req.callback());
147149

148150
if (!req.async && typeof req.callback === 'function') {
149151
req.callback();

src/node_quic_session.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1426,7 +1426,6 @@ int QuicSession::Send0RTTStreamData(
14261426
ssize_t ndatalen = 0;
14271427

14281428
std::vector<ngtcp2_vec> vec;
1429-
uint8_t fin = stream->IsShutdown() ? 1 : 0;
14301429
size_t count = stream->DrainInto(&vec, from);
14311430
size_t c = count;
14321431
ngtcp2_vec* v = vec.data();
@@ -1439,7 +1438,7 @@ int QuicSession::Send0RTTStreamData(
14391438
max_pktlen_,
14401439
&ndatalen,
14411440
stream->GetID(),
1442-
fin,
1441+
stream->IsWritable() ? 0 : 1,
14431442
reinterpret_cast<const ngtcp2_vec*>(v),
14441443
c,
14451444
uv_hrtime());
@@ -1505,7 +1504,7 @@ int QuicSession::SendStreamData(
15051504
max_pktlen_,
15061505
&ndatalen,
15071506
stream->GetID(),
1508-
stream->IsShutdown() ? 1 : 0,
1507+
stream->IsWritable() ? 0 : 1,
15091508
reinterpret_cast<const ngtcp2_vec*>(v),
15101509
c,
15111510
uv_hrtime());
@@ -1719,7 +1718,9 @@ void QuicSession::StopRetransmitTimer() {
17191718
// Called by ngtcp2 when a stream has been closed. If the stream does
17201719
// not exist, the close is ignored.
17211720
void QuicSession::StreamClose(int64_t stream_id, uint16_t app_error_code) {
1722-
CHECK(!IsDestroyed());
1721+
// Ignore if the session has already been destroyed
1722+
if (IsDestroyed())
1723+
return;
17231724
Debug(this, "Closing stream %llu with code %d",
17241725
stream_id, app_error_code);
17251726
QuicStream* stream = FindStream(stream_id);

src/node_quic_stream.cc

Lines changed: 81 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,10 @@ QuicStream::QuicStream(
6060
StreamBase(session->env()),
6161
session_(session),
6262
stream_id_(stream_id),
63-
flags_(QUIC_STREAM_FLAG_NONE),
63+
flags_(QUICSTREAM_FLAG_INITIAL),
6464
available_outbound_length_(0) {
6565
CHECK_NOT_NULL(session);
66+
SetInitialFlags();
6667
session->AddStream(this);
6768
StreamBase::AttachToObject(GetObject());
6869
PushStreamListener(&stream_listener_);
@@ -74,14 +75,43 @@ QuicStream::~QuicStream() {
7475
CHECK_EQ(0, streambuf_.Length());
7576
}
7677

78+
inline void QuicStream::SetInitialFlags() {
79+
if (GetDirection() == QUIC_STREAM_UNIDIRECTIONAL) {
80+
if (session_->IsServer()) {
81+
switch (GetOrigin()) {
82+
case QUIC_STREAM_SERVER:
83+
SetReadClose();
84+
break;
85+
case QUIC_STREAM_CLIENT:
86+
SetWriteClose();
87+
break;
88+
default:
89+
UNREACHABLE();
90+
}
91+
} else {
92+
switch (GetOrigin()) {
93+
case QUIC_STREAM_SERVER:
94+
SetWriteClose();
95+
break;
96+
case QUIC_STREAM_CLIENT:
97+
SetReadClose();
98+
break;
99+
default:
100+
UNREACHABLE();
101+
}
102+
}
103+
}
104+
}
105+
77106
// QuicStream::Close() is called by the QuicSession when ngtcp2 detects that
78107
// a stream has been closed. This, in turn, calls out to the JavaScript to
79108
// start the process of tearing down and destroying the QuicStream instance.
80109
void QuicStream::Close(uint16_t app_error_code) {
81110
Debug(this, "Stream %llu closed with code %d", GetID(), app_error_code);
111+
SetReadClose();
112+
SetWriteClose();
82113
HandleScope scope(env()->isolate());
83114
Context::Scope context_context(env()->context());
84-
flags_ |= QUIC_STREAM_FLAG_CLOSED;
85115
Local<Value> arg = Number::New(env()->isolate(), app_error_code);
86116
MakeCallback(env()->quic_on_stream_close_function(), 1, &arg);
87117
}
@@ -108,15 +138,23 @@ void QuicStream::Reset(uint64_t final_size, uint16_t app_error_code) {
108138
}
109139

110140
void QuicStream::Destroy() {
141+
SetReadClose();
142+
SetWriteClose();
111143
streambuf_.Cancel();
112144
session_->RemoveStream(stream_id_);
113145
session_ = nullptr;
114146
}
115147

148+
// Do shutdown is called when the JS stream writable side is closed.
149+
// We want to mark the writable side closed and send pending data.
116150
int QuicStream::DoShutdown(ShutdownWrap* req_wrap) {
117151
if (IsDestroyed())
118152
return UV_EPIPE;
119-
flags_ |= QUIC_STREAM_FLAG_SHUT;
153+
// Do nothing if the stream was already shutdown. Specifically,
154+
// we should not attempt to send anything on the QuicSession
155+
if (!IsWritable())
156+
return 1;
157+
SetWriteClose();
120158
session_->SendStreamData(this);
121159
return 1;
122160
}
@@ -128,7 +166,9 @@ int QuicStream::DoWrite(
128166
uv_stream_t* send_handle) {
129167
CHECK_NULL(send_handle);
130168

131-
if (IsDestroyed()) {
169+
// A write should not have happened if we've been destroyed or
170+
// the QuicStream is no longer writable.
171+
if (IsDestroyed() || !IsWritable()) {
132172
req_wrap->Done(UV_EOF);
133173
return 0;
134174
}
@@ -174,23 +214,17 @@ int QuicStream::DoWrite(
174214
// to be careful not to allow the internal buffer to grow
175215
// too large, or we'll run into several other problems.
176216

177-
uint64_t len = streambuf_.Copy(bufs, nbufs);
217+
streambuf_.Copy(bufs, nbufs);
178218
req_wrap->Done(0);
219+
session_->SendStreamData(this);
179220

180221
// IncrementAvailableOutboundLength(len);
181-
session_->SendStreamData(this);
182222
return 0;
183223
}
184224

185-
uint64_t QuicStream::GetID() const {
186-
return stream_id_;
187-
}
188-
189-
QuicSession* QuicStream::Session() {
190-
return session_;
191-
}
192-
193225
void QuicStream::AckedDataOffset(uint64_t offset, size_t datalen) {
226+
if (IsDestroyed())
227+
return;
194228
streambuf_.Consume(datalen);
195229
}
196230

@@ -212,40 +246,34 @@ inline void QuicStream::DecrementAvailableOutboundLength(size_t amount) {
212246
available_outbound_length_ -= amount;
213247
}
214248

215-
QuicStream* QuicStream::New(
216-
QuicSession* session,
217-
uint64_t stream_id) {
218-
Local<Object> obj;
219-
if (!session->env()
220-
->quicserverstream_constructor_template()
221-
->NewInstance(session->env()->context()).ToLocal(&obj)) {
222-
return nullptr;
223-
}
224-
return new QuicStream(session, obj, stream_id);
225-
}
226-
227249
int QuicStream::ReadStart() {
228250
CHECK(!this->IsDestroyed());
229-
Debug(this, "Reading started.");
230-
flags_ |= QUIC_STREAM_FLAG_READ_START;
231-
flags_ &= ~QUIC_STREAM_FLAG_READ_PAUSED;
251+
CHECK(IsReadable());
252+
SetReadStart();
253+
SetReadResume();
232254
return 0;
233255
}
234256

235257
int QuicStream::ReadStop() {
236258
CHECK(!this->IsDestroyed());
237-
if (!IsReading())
238-
return 0;
239-
Debug(this, "Reading stopped");
240-
flags_ |= QUIC_STREAM_FLAG_READ_PAUSED;
259+
CHECK(IsReadable());
260+
SetReadPause();
241261
return 0;
242262
}
243263

244264
// Passes chunks of data on to the JavaScript side as soon as they are
245-
// received. The caller of this must have a HandleScope.
265+
// received but only if we're still readable. The caller of this must have a
266+
// HandleScope.
267+
// TODO(@jasnell): There's currently no flow control here. The data is pushed
268+
// up to the JavaScript side regardless of whether the JS stream is flowing and
269+
// the connected peer is told to keep sending. We need to implement back
270+
// pressure.
246271
void QuicStream::ReceiveData(int fin, const uint8_t* data, size_t datalen) {
247-
Debug(this, "Receiving %d bytes of data. Final? %s",
248-
datalen, fin ? "yes" : "no");
272+
Debug(this, "Receiving %d bytes of data. Final? %s. Readable? %s",
273+
datalen, fin ? "yes" : "no", IsReadable() ? "yes" : "no");
274+
275+
if (!IsReadable())
276+
return;
249277

250278
while (datalen > 0) {
251279
uv_buf_t buf = EmitAlloc(datalen);
@@ -265,8 +293,24 @@ void QuicStream::ReceiveData(int fin, const uint8_t* data, size_t datalen) {
265293
EmitRead(avail, buf);
266294
};
267295

268-
if (fin)
296+
// When fin != 0, we've received that last chunk of data for this
297+
// stream, indicating that the stream is no longer readable.
298+
if (fin) {
299+
SetReadClose();
269300
EmitRead(UV_EOF);
301+
}
302+
}
303+
304+
QuicStream* QuicStream::New(
305+
QuicSession* session,
306+
uint64_t stream_id) {
307+
Local<Object> obj;
308+
if (!session->env()
309+
->quicserverstream_constructor_template()
310+
->NewInstance(session->env()->context()).ToLocal(&obj)) {
311+
return nullptr;
312+
}
313+
return new QuicStream(session, obj, stream_id);
270314
}
271315

272316
// JavaScript API

0 commit comments

Comments
 (0)