Skip to content

Commit 2818fe5

Browse files
committed
coordinate TransformStream finish operations per spec
1 parent b18e842 commit 2818fe5

File tree

3 files changed

+139
-28
lines changed

3 files changed

+139
-28
lines changed

src/workerd/api/streams/standard.c++

Lines changed: 132 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,10 @@ class WritableStreamJsController final: public WritableStreamController {
783783

784784
void doError(jsg::Lock& js, v8::Local<v8::Value> reason);
785785

786+
// Error through the underlying controller if available, going through the proper
787+
// error transition (Erroring -> Errored).
788+
void errorIfNeeded(jsg::Lock& js, v8::Local<v8::Value> reason);
789+
786790
kj::Maybe<int> getDesiredSize() override;
787791

788792
kj::Maybe<v8::Local<v8::Value>> isErroring(jsg::Lock& js) override;
@@ -3394,6 +3398,16 @@ void WritableStreamJsController::doError(jsg::Lock& js, v8::Local<v8::Value> rea
33943398
}
33953399
}
33963400

3401+
void WritableStreamJsController::errorIfNeeded(jsg::Lock& js, v8::Local<v8::Value> reason) {
3402+
// Error through the underlying controller if available, which goes through the proper
3403+
// error transition (Erroring -> Errored). This allows close() to be called while the
3404+
// stream is "erroring" and reject with the stored error.
3405+
KJ_IF_SOME(controller, state.tryGet<Controller>()) {
3406+
controller->error(js, reason);
3407+
}
3408+
// If state is not Controller (already Closed or Errored), this is a no-op.
3409+
}
3410+
33973411
kj::Maybe<int> WritableStreamJsController::getDesiredSize() {
33983412
KJ_SWITCH_ONEOF(state) {
33993413
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
@@ -3790,23 +3804,41 @@ jsg::Promise<void> TransformStreamDefaultController::write(
37903804

37913805
jsg::Promise<void> TransformStreamDefaultController::abort(
37923806
jsg::Lock& js, v8::Local<v8::Value> reason) {
3793-
KJ_IF_SOME(finish, algorithms.maybeFinish) {
3794-
return finish.whenResolved(js);
3807+
if (FeatureFlags::get(js).getPedanticWpt()) {
3808+
// If a finish operation is already in progress, return the existing promise
3809+
// or handle the case where we're being called synchronously from within another
3810+
// finish operation.
3811+
if (algorithms.finishStarted) {
3812+
KJ_IF_SOME(finish, algorithms.maybeFinish) {
3813+
return finish.whenResolved(js);
3814+
}
3815+
// finishStarted is true but maybeFinish is not set yet - this means we're being
3816+
// called synchronously from within another finish operation (like cancel).
3817+
// We need to error the stream with the abort reason so that both the current
3818+
// operation and this abort reject with the abort reason.
3819+
error(js, reason);
3820+
return js.rejectedPromise<void>(js.v8Ref(reason));
3821+
}
3822+
3823+
// Mark that we're starting a finish operation before running the algorithm.
3824+
algorithms.finishStarted = true;
3825+
} else {
3826+
KJ_IF_SOME(finish, algorithms.maybeFinish) {
3827+
return finish.whenResolved(js);
3828+
}
37953829
}
3830+
37963831
return algorithms.maybeFinish
37973832
.emplace(maybeRunAlgorithm(js, algorithms.cancel,
37983833
JSG_VISITABLE_LAMBDA(
37993834
(this, ref = JSG_THIS, reason = jsg::JsRef(js, jsg::JsValue(reason))), (ref, reason),
38003835
(jsg::Lock & js)->jsg::Promise<void> {
38013836
// If the readable side is errored, return a rejected promise with the stored error
3802-
KJ_IF_SOME(controller, tryGetReadableController()) {
3803-
KJ_IF_SOME(error, controller.getMaybeErrorState(js)) {
3804-
return js.rejectedPromise<void>(kj::mv(error));
3805-
} else {
3806-
} // Else block to avert dangling else compiler warning.
3807-
} else {
3808-
} // Else block to avert dangling else compiler warning.
3809-
3837+
{
3838+
KJ_IF_SOME(err, getReadableErrorState(js)) {
3839+
return js.rejectedPromise<void>(kj::mv(err));
3840+
}
3841+
}
38103842
// Otherwise... error with the given reason and resolve the abort promise
38113843
error(js, reason.getHandle(js));
38123844
return js.resolvedPromise();
@@ -3821,15 +3853,53 @@ jsg::Promise<void> TransformStreamDefaultController::abort(
38213853
}
38223854

38233855
jsg::Promise<void> TransformStreamDefaultController::close(jsg::Lock& js) {
3856+
auto flags = FeatureFlags::get(js);
3857+
if (flags.getPedanticWpt()) {
3858+
// If a finish operation is already in progress (e.g., from cancel or abort),
3859+
// we should not run flush. Per the WHATWG streams spec, close/flush should
3860+
// coordinate with cancel to avoid calling both.
3861+
if (algorithms.finishStarted) {
3862+
KJ_IF_SOME(finish, algorithms.maybeFinish) {
3863+
return finish.whenResolved(js);
3864+
}
3865+
// finishStarted is true but maybeFinish is not set yet - this means we're being
3866+
// called synchronously from within another finish operation. If the stream was
3867+
// errored during that operation, return a rejected promise with the error.
3868+
KJ_IF_SOME(writableController, tryGetWritableController()) {
3869+
KJ_IF_SOME(err, writableController.isErroredOrErroring(js)) {
3870+
return js.rejectedPromise<void>(err);
3871+
}
3872+
}
3873+
KJ_IF_SOME(err, getReadableErrorState(js)) {
3874+
return js.rejectedPromise<void>(kj::mv(err));
3875+
}
3876+
return js.resolvedPromise();
3877+
}
3878+
3879+
// Mark that we're starting a finish operation before running the algorithm,
3880+
// since the algorithm may synchronously call other finish operations.
3881+
algorithms.finishStarted = true;
3882+
}
3883+
38243884
auto onSuccess =
38253885
JSG_VISITABLE_LAMBDA((ref = JSG_THIS), (ref), (jsg::Lock & js)->jsg::Promise<void> {
3826-
KJ_IF_SOME(readableController, ref->tryGetReadableController()) {
3886+
// If the stream was errored during the flush algorithm (e.g., by controller.error()
3887+
// or by a parallel cancel() calling abort()), we should reject with that error.
3888+
if (FeatureFlags::get(js).getPedanticWpt()) {
3889+
KJ_IF_SOME(err, ref->getReadableErrorState(js)) {
3890+
return js.rejectedPromise<void>(kj::mv(err));
3891+
}
3892+
}
38273893
// Allows for a graceful close of the readable side. Close will
38283894
// complete once all of the queued data is read or the stream
3829-
// errors.
3895+
// errors. Only close if the stream can still be closed (e.g.,
3896+
// it wasn't closed by a cancel operation from within flush).
3897+
{
3898+
KJ_IF_SOME(readableController, ref->tryGetReadableController()) {
3899+
if (readableController.canCloseOrEnqueue()) {
38303900
readableController.close(js);
3831-
} else {
3832-
// Else block to avert dangling else compiler warning.
3901+
}
3902+
}
38333903
}
38343904
return js.resolvedPromise();
38353905
});
@@ -3840,6 +3910,13 @@ jsg::Promise<void> TransformStreamDefaultController::close(jsg::Lock& js) {
38403910
return js.rejectedPromise<void>(kj::mv(reason));
38413911
});
38423912

3913+
if (flags.getPedanticWpt()) {
3914+
return algorithms.maybeFinish
3915+
.emplace(
3916+
maybeRunAlgorithm(js, algorithms.flush, kj::mv(onSuccess), kj::mv(onFailure), JSG_THIS))
3917+
.whenResolved(js);
3918+
}
3919+
38433920
return maybeRunAlgorithm(js, algorithms.flush, kj::mv(onSuccess), kj::mv(onFailure), JSG_THIS);
38443921
}
38453922

@@ -3851,14 +3928,40 @@ jsg::Promise<void> TransformStreamDefaultController::pull(jsg::Lock& js) {
38513928

38523929
jsg::Promise<void> TransformStreamDefaultController::cancel(
38533930
jsg::Lock& js, v8::Local<v8::Value> reason) {
3854-
KJ_IF_SOME(finish, algorithms.maybeFinish) {
3855-
return finish.whenResolved(js);
3931+
if (FeatureFlags::get(js).getPedanticWpt()) {
3932+
// If a finish operation is already in progress, return the existing promise
3933+
// or check for errors if we're being called synchronously from within another
3934+
// finish operation.
3935+
if (algorithms.finishStarted) {
3936+
KJ_IF_SOME(finish, algorithms.maybeFinish) {
3937+
return finish.whenResolved(js);
3938+
}
3939+
// finishStarted is true but maybeFinish is not set yet - check if the stream
3940+
// was errored during that operation.
3941+
KJ_IF_SOME(err, getReadableErrorState(js)) {
3942+
return js.rejectedPromise<void>(kj::mv(err));
3943+
}
3944+
return js.resolvedPromise();
3945+
}
3946+
3947+
// Mark that we're starting a finish operation before running the algorithm.
3948+
algorithms.finishStarted = true;
38563949
}
3950+
38573951
return algorithms.maybeFinish
38583952
.emplace(maybeRunAlgorithm(js, algorithms.cancel,
38593953
JSG_VISITABLE_LAMBDA(
38603954
(this, ref = JSG_THIS, reason = jsg::JsRef(js, jsg::JsValue(reason))), (ref, reason),
38613955
(jsg::Lock & js)->jsg::Promise<void> {
3956+
// If the stream was errored during the cancel algorithm (e.g., by controller.error()
3957+
// or by a parallel abort()), we should reject with that error.
3958+
if (FeatureFlags::get(js).getPedanticWpt()) {
3959+
KJ_IF_SOME(err, getReadableErrorState(js)) {
3960+
readable = kj::none;
3961+
errorWritableAndUnblockWrite(js, reason.getHandle(js));
3962+
return js.rejectedPromise<void>(kj::mv(err));
3963+
}
3964+
}
38623965
readable = kj::none;
38633966
errorWritableAndUnblockWrite(js, reason.getHandle(js));
38643967
return js.resolvedPromise();
@@ -3907,7 +4010,12 @@ void TransformStreamDefaultController::errorWritableAndUnblockWrite(
39074010
jsg::Lock& js, v8::Local<v8::Value> reason) {
39084011
algorithms.clear();
39094012
KJ_IF_SOME(writableController, tryGetWritableController()) {
3910-
if (writableController.isWritable()) {
4013+
if (FeatureFlags::get(js).getPedanticWpt()) {
4014+
// Use errorIfNeeded which goes through the proper error transition (Erroring -> Errored).
4015+
// This allows close() to be called while the stream is "erroring" and reject with the
4016+
// stored error, which is the expected behavior per the WHATWG streams spec.
4017+
writableController.errorIfNeeded(js, reason);
4018+
} else if (writableController.isWritable()) {
39114019
writableController.doError(js, reason);
39124020
}
39134021
writable = kj::none;
@@ -3991,6 +4099,13 @@ kj::Maybe<WritableStreamJsController&> TransformStreamDefaultController::
39914099
return kj::none;
39924100
}
39934101

4102+
kj::Maybe<jsg::Value> TransformStreamDefaultController::getReadableErrorState(jsg::Lock& js) {
4103+
KJ_IF_SOME(controller, tryGetReadableController()) {
4104+
return controller.getMaybeErrorState(js);
4105+
}
4106+
return kj::none;
4107+
}
4108+
39944109
template <class Self>
39954110
kj::StringPtr WritableImpl<Self>::jsgGetMemoryName() const {
39964111
return "WritableImpl"_kjc;

src/workerd/api/streams/standard.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,11 @@ class TransformStreamDefaultController: public jsg::Object {
690690
kj::Maybe<jsg::Function<Transformer::CancelAlgorithm>> cancel;
691691

692692
kj::Maybe<jsg::Promise<void>> maybeFinish = kj::none;
693+
// This flag is set to true at the start of a finish operation (close/cancel/abort)
694+
// before the algorithm runs. This is needed because emplace() evaluates its argument
695+
// before setting maybeFinish, so if the algorithm calls another finish operation
696+
// synchronously, maybeFinish wouldn't be set yet.
697+
bool finishStarted = false;
693698

694699
Algorithms() {};
695700
Algorithms(Algorithms&& other) = default;
@@ -716,6 +721,8 @@ class TransformStreamDefaultController: public jsg::Object {
716721
kj::Maybe<ReadableStreamDefaultController&> tryGetReadableController();
717722
kj::Maybe<WritableStreamJsController&> tryGetWritableController();
718723

724+
kj::Maybe<jsg::Value> getReadableErrorState(jsg::Lock& js);
725+
719726
// Currently, JS-backed transform streams only support value-oriented streams.
720727
// In the future, that may change and this will need to become a kj::OneOf
721728
// that includes a ReadableByteStreamController.

src/wpt/streams-test.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -596,12 +596,8 @@ export default {
596596
comment: 'To be investigated',
597597
expectedFailures: [
598598
'readable.cancel() and a parallel writable.close() should reject if a transformer.cancel() calls controller.error()',
599-
'closing the writable side should reject if a parallel transformer.cancel() throws',
600599
'writable.abort() and readable.cancel() should reject if a transformer.cancel() calls controller.error()',
601-
'readable.cancel() should not call cancel() again when already called from writable.abort()',
602-
'writable.close() should not call flush() when cancel() is already called from readable.cancel()',
603600
'writable.abort() should not call cancel() again when already called from readable.cancel()',
604-
'readable.cancel() should not call cancel() when flush() is already called from writable.close()',
605601
],
606602
},
607603
'transform-streams/errors.any.js': {
@@ -611,16 +607,11 @@ export default {
611607
'an exception from transform() should error the stream if terminate has been requested but not completed',
612608
],
613609
expectedFailures: [
614-
'when controller.error is followed by a rejection, the error reason should come from controller.error',
615610
'TransformStream constructor should throw when start does',
616611
'when strategy.size throws inside start(), the constructor should throw the same error',
617612
'when strategy.size calls controller.error() then throws, the constructor should throw the first error',
618-
'it should be possible to error the readable between close requested and complete',
619613
'controller.error() should do nothing after a transformer method has thrown an exception',
620-
'controller.error() should do nothing the second time it is called',
621-
'abort should set the close reason for the writable when it happens before cancel during start, and cancel should reject',
622614
'controller.error() should close writable immediately after readable.cancel()',
623-
'abort should set the close reason for the writable when it happens before cancel during underlying sink write, but cancel should still succeed',
624615
'erroring during write with backpressure should result in the write failing',
625616
],
626617
},
@@ -691,8 +682,6 @@ export default {
691682
comment: 'To be investigated',
692683
expectedFailures: [
693684
'controller.error() after controller.terminate() with queued chunk should error the readable',
694-
'controller.error() after controller.terminate() without queued chunk should do nothing',
695-
'controller.terminate() inside flush() should not prevent writer.close() from succeeding',
696685
],
697686
},
698687

0 commit comments

Comments
 (0)