@@ -68,6 +68,9 @@ ValueQueue::Consumer::Consumer(
6868 QueueImpl& impl, kj::Maybe<ConsumerImpl::StateListener&> stateListener)
6969 : impl(impl, stateListener) {}
7070
71+ ValueQueue::Consumer::Consumer (kj::Maybe<ConsumerImpl::StateListener&> stateListener)
72+ : impl(stateListener) {}
73+
7174void ValueQueue::Consumer::cancel (jsg::Lock& js, jsg::Optional<v8::Local<v8::Value>> maybeReason) {
7275 impl.cancel (js, maybeReason);
7376}
@@ -102,7 +105,14 @@ size_t ValueQueue::Consumer::size() {
102105
103106kj::Own<ValueQueue::Consumer> ValueQueue::Consumer::clone (
104107 jsg::Lock& js, kj::Maybe<ConsumerImpl::StateListener&> stateListener) {
105- auto consumer = kj::heap<Consumer>(impl.queue , stateListener);
108+ // If the queue was destroyed (e.g., stream was closed), we can still clone
109+ // the consumer - the cloneTo() will copy the closed/errored state.
110+ kj::Own<Consumer> consumer;
111+ KJ_IF_SOME (q, impl.queue ) {
112+ consumer = kj::heap<Consumer>(q, stateListener);
113+ } else {
114+ consumer = kj::heap<Consumer>(stateListener);
115+ }
106116 impl.cloneTo (js, consumer->impl );
107117 return kj::mv (consumer);
108118}
@@ -160,7 +170,7 @@ jsg::Promise<DrainingReadResult> ValueQueue::Consumer::drainingRead(jsg::Lock& j
160170 }
161171
162172 auto & ready = impl.state .requireActiveUnsafe ();
163- ConsumerImpl::UpdateBackpressureScope scope (impl. queue );
173+ ConsumerImpl::UpdateBackpressureScope scope (impl);
164174
165175 // Mark that we're doing a draining read. This allows onConsumerWantsData()
166176 // to use forcePull() which bypasses backpressure checks. The flag is cleared
@@ -320,7 +330,7 @@ size_t ValueQueue::size() const {
320330}
321331
322332void ValueQueue::handlePush (
323- jsg::Lock& js, ConsumerImpl::Ready& state, QueueImpl& queue, kj::Rc<Entry> entry) {
333+ jsg::Lock& js, ConsumerImpl::Ready& state, kj::Maybe< QueueImpl&> queue, kj::Rc<Entry> entry) {
324334 // If there are no pending reads, just add the entry to the buffer and return, adjusting
325335 // the size of the queue in the process.
326336 if (state.readRequests .empty ()) {
@@ -339,7 +349,7 @@ void ValueQueue::handlePush(
339349void ValueQueue::handleRead (jsg::Lock& js,
340350 ConsumerImpl::Ready& state,
341351 ConsumerImpl& consumer,
342- QueueImpl& queue,
352+ kj::Maybe< QueueImpl&> queue,
343353 ReadRequest request) {
344354 // If there are no pending read requests and there is data in the buffer,
345355 // we will try to fulfill the read request immediately.
@@ -392,8 +402,10 @@ void ValueQueue::handleRead(jsg::Lock& js,
392402 }
393403}
394404
395- bool ValueQueue::handleMaybeClose (
396- jsg::Lock& js, ConsumerImpl::Ready& state, ConsumerImpl& consumer, QueueImpl& queue) {
405+ bool ValueQueue::handleMaybeClose (jsg::Lock& js,
406+ ConsumerImpl::Ready& state,
407+ ConsumerImpl& consumer,
408+ kj::Maybe<QueueImpl&> queue) {
397409 // If the value queue is not yet empty we have to keep waiting for more reads to consume it.
398410 // Return false to indicate that we cannot close yet.
399411 return false ;
@@ -518,6 +530,9 @@ ByteQueue::Consumer::Consumer(
518530 QueueImpl& impl, kj::Maybe<ConsumerImpl::StateListener&> stateListener)
519531 : impl(impl, stateListener) {}
520532
533+ ByteQueue::Consumer::Consumer (kj::Maybe<ConsumerImpl::StateListener&> stateListener)
534+ : impl(stateListener) {}
535+
521536void ByteQueue::Consumer::cancel (jsg::Lock& js, jsg::Optional<v8::Local<v8::Value>> maybeReason) {
522537 impl.cancel (js, maybeReason);
523538}
@@ -552,7 +567,14 @@ size_t ByteQueue::Consumer::size() const {
552567
553568kj::Own<ByteQueue::Consumer> ByteQueue::Consumer::clone (
554569 jsg::Lock& js, kj::Maybe<ConsumerImpl::StateListener&> stateListener) {
555- auto consumer = kj::heap<Consumer>(impl.queue , stateListener);
570+ // If the queue was destroyed (e.g., stream was closed), we can still clone
571+ // the consumer - the cloneTo() will copy the closed/errored state.
572+ kj::Own<Consumer> consumer;
573+ KJ_IF_SOME (q, impl.queue ) {
574+ consumer = kj::heap<Consumer>(q, stateListener);
575+ } else {
576+ consumer = kj::heap<Consumer>(stateListener);
577+ }
556578 impl.cloneTo (js, consumer->impl );
557579 return kj::mv (consumer);
558580}
@@ -582,7 +604,7 @@ jsg::Promise<DrainingReadResult> ByteQueue::Consumer::drainingRead(jsg::Lock& js
582604 }
583605
584606 auto & ready = impl.state .requireActiveUnsafe ();
585- ConsumerImpl::UpdateBackpressureScope scope (impl. queue );
607+ ConsumerImpl::UpdateBackpressureScope scope (impl);
586608
587609 // Mark that we're doing a draining read. This allows onConsumerWantsData()
588610 // to use forcePull() which bypasses backpressure checks. The flag is cleared
@@ -898,8 +920,10 @@ size_t ByteQueue::size() const {
898920 return impl.size ();
899921}
900922
901- void ByteQueue::handlePush (
902- jsg::Lock& js, ConsumerImpl::Ready& state, QueueImpl& queue, kj::Rc<Entry> newEntry) {
923+ void ByteQueue::handlePush (jsg::Lock& js,
924+ ConsumerImpl::Ready& state,
925+ kj::Maybe<QueueImpl&> queue,
926+ kj::Rc<Entry> newEntry) {
903927 const auto bufferData = [&](size_t offset) {
904928 state.queueTotalSize += newEntry->getSize () - offset;
905929 state.buffer .emplace_back (QueueEntry{
@@ -1046,7 +1070,7 @@ void ByteQueue::handlePush(
10461070void ByteQueue::handleRead (jsg::Lock& js,
10471071 ConsumerImpl::Ready& state,
10481072 ConsumerImpl& consumer,
1049- QueueImpl& queue,
1073+ kj::Maybe< QueueImpl&> queue,
10501074 ReadRequest request) {
10511075 const auto pendingRead = [&]() {
10521076 bool isByob = request.pullInto .type == ReadRequest::Type::BYOB;
@@ -1055,11 +1079,14 @@ void ByteQueue::handleRead(jsg::Lock& js,
10551079 // Because ReadRequest is movable, and because the ByobRequest captures
10561080 // a reference to the ReadRequest, we wait until after it is added to
10571081 // state.readRequests to create the associated ByobRequest.
1058- // If the queue state is nullptr here, it means the queue has already
1059- // been closed.
1060- KJ_IF_SOME (queueState, queue.getState ()) {
1061- queueState.pendingByobReadRequests .push_back (
1062- state.readRequests .back ()->makeByobReadRequest (consumer, queue));
1082+ // If the queue is none, the consumer was cloned from a closed stream
1083+ // and we can't create a ByobRequest. If the queue state is none,
1084+ // the queue has already been closed.
1085+ KJ_IF_SOME (q, queue) {
1086+ KJ_IF_SOME (queueState, q.getState ()) {
1087+ queueState.pendingByobReadRequests .push_back (
1088+ state.readRequests .back ()->makeByobReadRequest (consumer, q));
1089+ }
10631090 }
10641091 }
10651092 KJ_IF_SOME (listener, consumer.stateListener ) {
@@ -1178,8 +1205,10 @@ void ByteQueue::handleRead(jsg::Lock& js,
11781205 }
11791206}
11801207
1181- bool ByteQueue::handleMaybeClose (
1182- jsg::Lock& js, ConsumerImpl::Ready& state, ConsumerImpl& consumer, QueueImpl& queue) {
1208+ bool ByteQueue::handleMaybeClose (jsg::Lock& js,
1209+ ConsumerImpl::Ready& state,
1210+ ConsumerImpl& consumer,
1211+ kj::Maybe<QueueImpl&> queue) {
11831212 // This is called when we know that we are closing and we still have data in
11841213 // the queue. We want to see if we can drain as much of it into pending reads
11851214 // as possible. If we're able to drain all of it, then yay! We can go ahead and
0 commit comments