Skip to content

Commit b73532d

Browse files
authored
Implement drainingRead mechanism for JS-backed streams (#5838)
1 parent e9faa47 commit b73532d

19 files changed

+2887
-234
lines changed

src/workerd/api/streams/common.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,14 @@ struct ReadResult {
7171
}
7272
};
7373

74+
// Result type for draining read operations. Always returns bytes, even for value streams.
75+
// Used by DrainingReader for optimized pipe-to operations with vectored writes.
76+
// This is a C++ only type - not exposed to JavaScript.
77+
struct DrainingReadResult {
78+
kj::Array<kj::Array<kj::byte>> chunks; // Multiple byte arrays for vectored writes
79+
bool done = false; // True if stream is closed/closing
80+
};
81+
7482
struct StreamQueuingStrategy {
7583
using SizeAlgorithm = uint64_t(v8::Local<v8::Value>);
7684

@@ -498,6 +506,25 @@ class ReadableStreamController {
498506
virtual kj::Maybe<jsg::Promise<ReadResult>> read(
499507
jsg::Lock& js, kj::Maybe<ByobOptions> byobOptions) = 0;
500508

509+
// Performs a draining read operation that:
510+
// 1. Drains all currently buffered data from the queue
511+
// 2. Pumps the controller for synchronously available data (respecting pull promise state)
512+
// 3. Returns bytes even for value streams (converting ArrayBuffer/ArrayBufferView/string)
513+
// 4. Has mutual exclusion with regular reads - returns rejected promise if pending regular reads
514+
// 5. Returns done: true with final data when stream is closing
515+
//
516+
// This is a C++ only API (not exposed to JavaScript) intended for optimized pipe operations.
517+
// Returns kj::none if the stream is locked in a way that prevents the read.
518+
//
519+
// The maxRead parameter provides a soft limit on how much data to read. It only applies to
520+
// subsequent synchronous pump attempts after draining the currently buffered data. That is,
521+
// drainingRead will first drain all currently buffered data (potentially exceeding maxRead),
522+
// then will only proceed with additional synchronous reads if the total bytes read so far
523+
// is less than maxRead. This prevents runaway reads from neverending or slow streams while
524+
// still allowing efficient batch reads for normal streams.
525+
virtual kj::Maybe<jsg::Promise<DrainingReadResult>> drainingRead(
526+
jsg::Lock& js, size_t maxRead = kj::maxValue) = 0;
527+
501528
// The pipeTo implementation fully consumes the stream by directing all of its data at the
502529
// destination. Controllers should try to be as efficient as possible here. For instance, if
503530
// a ReadableStreamInternalController is piping to a WritableStreamInternalController, then

src/workerd/api/streams/internal.c++

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,14 @@ kj::Maybe<jsg::Promise<ReadResult>> ReadableStreamInternalController::read(
677677
KJ_UNREACHABLE;
678678
}
679679

680+
kj::Maybe<jsg::Promise<DrainingReadResult>> ReadableStreamInternalController::drainingRead(
681+
jsg::Lock& js, size_t maxRead) {
682+
// TODO(later): Implement proper drainingRead for internal controller.
683+
// For now, return a rejected promise as a placeholder.
684+
return js.rejectedPromise<DrainingReadResult>(
685+
js.v8TypeError("drainingRead is not yet implemented for internal streams"_kj));
686+
}
687+
680688
jsg::Promise<void> ReadableStreamInternalController::pipeTo(
681689
jsg::Lock& js, WritableStreamController& destination, PipeToOptions options) {
682690

src/workerd/api/streams/internal.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ class ReadableStreamInternalController: public ReadableStreamController {
6262
kj::Maybe<jsg::Promise<ReadResult>> read(
6363
jsg::Lock& js, kj::Maybe<ByobOptions> byobOptions) override;
6464

65+
kj::Maybe<jsg::Promise<DrainingReadResult>> drainingRead(
66+
jsg::Lock& js, size_t maxRead = kj::maxValue) override;
67+
6568
jsg::Promise<void> pipeTo(
6669
jsg::Lock& js, WritableStreamController& destination, PipeToOptions options) override;
6770

0 commit comments

Comments
 (0)