Skip to content

Commit 02e3f20

Browse files
committed
coordinate TransformStream finish operations per spec
1 parent b18e842 commit 02e3f20

File tree

3 files changed

+115
-31
lines changed

3 files changed

+115
-31
lines changed

src/workerd/api/streams/standard.c++

Lines changed: 108 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,10 @@ class WritableStreamJsController final: public WritableStreamController {
783783

784784
void doError(jsg::Lock& js, v8::Local<v8::Value> reason);
785785

786+
// Error through the underlying controller if available, going through the proper
787+
// error transition (Erroring -> Errored).
788+
void errorIfNeeded(jsg::Lock& js, v8::Local<v8::Value> reason);
789+
786790
kj::Maybe<int> getDesiredSize() override;
787791

788792
kj::Maybe<v8::Local<v8::Value>> isErroring(jsg::Lock& js) override;
@@ -3394,6 +3398,16 @@ void WritableStreamJsController::doError(jsg::Lock& js, v8::Local<v8::Value> rea
33943398
}
33953399
}
33963400

3401+
void WritableStreamJsController::errorIfNeeded(jsg::Lock& js, v8::Local<v8::Value> reason) {
3402+
// Error through the underlying controller if available, which goes through the proper
3403+
// error transition (Erroring -> Errored). This allows close() to be called while the
3404+
// stream is "erroring" and reject with the stored error.
3405+
KJ_IF_SOME(controller, state.tryGet<Controller>()) {
3406+
controller->error(js, reason);
3407+
}
3408+
// If state is not Controller (already Closed or Errored), this is a no-op.
3409+
}
3410+
33973411
kj::Maybe<int> WritableStreamJsController::getDesiredSize() {
33983412
KJ_SWITCH_ONEOF(state) {
33993413
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
@@ -3790,23 +3804,33 @@ jsg::Promise<void> TransformStreamDefaultController::write(
37903804

37913805
jsg::Promise<void> TransformStreamDefaultController::abort(
37923806
jsg::Lock& js, v8::Local<v8::Value> reason) {
3793-
KJ_IF_SOME(finish, algorithms.maybeFinish) {
3794-
return finish.whenResolved(js);
3807+
// If a finish operation is already in progress, return the existing promise
3808+
// or handle the case where we're being called synchronously from within another
3809+
// finish operation.
3810+
if (algorithms.finishStarted) {
3811+
KJ_IF_SOME(finish, algorithms.maybeFinish) {
3812+
return finish.whenResolved(js);
3813+
}
3814+
// finishStarted is true but maybeFinish is not set yet - this means we're being
3815+
// called synchronously from within another finish operation (like cancel).
3816+
// We need to error the stream with the abort reason so that both the current
3817+
// operation and this abort reject with the abort reason.
3818+
error(js, reason);
3819+
return js.rejectedPromise<void>(js.v8Ref(reason));
37953820
}
3821+
3822+
// Mark that we're starting a finish operation before running the algorithm.
3823+
algorithms.finishStarted = true;
3824+
37963825
return algorithms.maybeFinish
37973826
.emplace(maybeRunAlgorithm(js, algorithms.cancel,
37983827
JSG_VISITABLE_LAMBDA(
37993828
(this, ref = JSG_THIS, reason = jsg::JsRef(js, jsg::JsValue(reason))), (ref, reason),
38003829
(jsg::Lock & js)->jsg::Promise<void> {
38013830
// If the readable side is errored, return a rejected promise with the stored error
3802-
KJ_IF_SOME(controller, tryGetReadableController()) {
3803-
KJ_IF_SOME(error, controller.getMaybeErrorState(js)) {
3804-
return js.rejectedPromise<void>(kj::mv(error));
3805-
} else {
3806-
} // Else block to avert dangling else compiler warning.
3807-
} else {
3808-
} // Else block to avert dangling else compiler warning.
3809-
3831+
KJ_IF_SOME(err, getReadableErrorState(js)) {
3832+
return js.rejectedPromise<void>(kj::mv(err));
3833+
}
38103834
// Otherwise... error with the given reason and resolve the abort promise
38113835
error(js, reason.getHandle(js));
38123836
return js.resolvedPromise();
@@ -3821,15 +3845,46 @@ jsg::Promise<void> TransformStreamDefaultController::abort(
38213845
}
38223846

38233847
jsg::Promise<void> TransformStreamDefaultController::close(jsg::Lock& js) {
3848+
// If a finish operation is already in progress (e.g., from cancel or abort),
3849+
// we should not run flush. Per the WHATWG streams spec, close/flush should
3850+
// coordinate with cancel to avoid calling both.
3851+
if (algorithms.finishStarted) {
3852+
KJ_IF_SOME(finish, algorithms.maybeFinish) {
3853+
return finish.whenResolved(js);
3854+
}
3855+
// finishStarted is true but maybeFinish is not set yet - this means we're being
3856+
// called synchronously from within another finish operation. If the stream was
3857+
// errored during that operation, return a rejected promise with the error.
3858+
KJ_IF_SOME(writableController, tryGetWritableController()) {
3859+
KJ_IF_SOME(err, writableController.isErroredOrErroring(js)) {
3860+
return js.rejectedPromise<void>(err);
3861+
}
3862+
}
3863+
KJ_IF_SOME(err, getReadableErrorState(js)) {
3864+
return js.rejectedPromise<void>(kj::mv(err));
3865+
}
3866+
return js.resolvedPromise();
3867+
}
3868+
3869+
// Mark that we're starting a finish operation before running the algorithm,
3870+
// since the algorithm may synchronously call other finish operations.
3871+
algorithms.finishStarted = true;
3872+
38243873
auto onSuccess =
38253874
JSG_VISITABLE_LAMBDA((ref = JSG_THIS), (ref), (jsg::Lock & js)->jsg::Promise<void> {
3826-
KJ_IF_SOME(readableController, ref->tryGetReadableController()) {
3875+
// If the stream was errored during the flush algorithm (e.g., by controller.error()
3876+
// or by a parallel cancel() calling abort()), we should reject with that error.
3877+
KJ_IF_SOME(err, ref->getReadableErrorState(js)) {
3878+
return js.rejectedPromise<void>(kj::mv(err));
3879+
}
38273880
// Allows for a graceful close of the readable side. Close will
38283881
// complete once all of the queued data is read or the stream
3829-
// errors.
3882+
// errors. Only close if the stream can still be closed (e.g.,
3883+
// it wasn't closed by a cancel operation from within flush).
3884+
KJ_IF_SOME(readableController, ref->tryGetReadableController()) {
3885+
if (readableController.canCloseOrEnqueue()) {
38303886
readableController.close(js);
3831-
} else {
3832-
// Else block to avert dangling else compiler warning.
3887+
}
38333888
}
38343889
return js.resolvedPromise();
38353890
});
@@ -3840,7 +3895,10 @@ jsg::Promise<void> TransformStreamDefaultController::close(jsg::Lock& js) {
38403895
return js.rejectedPromise<void>(kj::mv(reason));
38413896
});
38423897

3843-
return maybeRunAlgorithm(js, algorithms.flush, kj::mv(onSuccess), kj::mv(onFailure), JSG_THIS);
3898+
return algorithms.maybeFinish
3899+
.emplace(
3900+
maybeRunAlgorithm(js, algorithms.flush, kj::mv(onSuccess), kj::mv(onFailure), JSG_THIS))
3901+
.whenResolved(js);
38443902
}
38453903

38463904
jsg::Promise<void> TransformStreamDefaultController::pull(jsg::Lock& js) {
@@ -3851,14 +3909,36 @@ jsg::Promise<void> TransformStreamDefaultController::pull(jsg::Lock& js) {
38513909

38523910
jsg::Promise<void> TransformStreamDefaultController::cancel(
38533911
jsg::Lock& js, v8::Local<v8::Value> reason) {
3854-
KJ_IF_SOME(finish, algorithms.maybeFinish) {
3855-
return finish.whenResolved(js);
3912+
// If a finish operation is already in progress, return the existing promise
3913+
// or check for errors if we're being called synchronously from within another
3914+
// finish operation.
3915+
if (algorithms.finishStarted) {
3916+
KJ_IF_SOME(finish, algorithms.maybeFinish) {
3917+
return finish.whenResolved(js);
3918+
}
3919+
// finishStarted is true but maybeFinish is not set yet - check if the stream
3920+
// was errored during that operation.
3921+
KJ_IF_SOME(err, getReadableErrorState(js)) {
3922+
return js.rejectedPromise<void>(kj::mv(err));
3923+
}
3924+
return js.resolvedPromise();
38563925
}
3926+
3927+
// Mark that we're starting a finish operation before running the algorithm.
3928+
algorithms.finishStarted = true;
3929+
38573930
return algorithms.maybeFinish
38583931
.emplace(maybeRunAlgorithm(js, algorithms.cancel,
38593932
JSG_VISITABLE_LAMBDA(
38603933
(this, ref = JSG_THIS, reason = jsg::JsRef(js, jsg::JsValue(reason))), (ref, reason),
38613934
(jsg::Lock & js)->jsg::Promise<void> {
3935+
// If the stream was errored during the cancel algorithm (e.g., by controller.error()
3936+
// or by a parallel abort()), we should reject with that error.
3937+
KJ_IF_SOME(err, getReadableErrorState(js)) {
3938+
readable = kj::none;
3939+
errorWritableAndUnblockWrite(js, reason.getHandle(js));
3940+
return js.rejectedPromise<void>(kj::mv(err));
3941+
}
38623942
readable = kj::none;
38633943
errorWritableAndUnblockWrite(js, reason.getHandle(js));
38643944
return js.resolvedPromise();
@@ -3907,9 +3987,10 @@ void TransformStreamDefaultController::errorWritableAndUnblockWrite(
39073987
jsg::Lock& js, v8::Local<v8::Value> reason) {
39083988
algorithms.clear();
39093989
KJ_IF_SOME(writableController, tryGetWritableController()) {
3910-
if (writableController.isWritable()) {
3911-
writableController.doError(js, reason);
3912-
}
3990+
// Use errorIfNeeded which goes through the proper error transition (Erroring -> Errored).
3991+
// This allows close() to be called while the stream is "erroring" and reject with the
3992+
// stored error, which is the expected behavior per the WHATWG streams spec.
3993+
writableController.errorIfNeeded(js, reason);
39133994
writable = kj::none;
39143995
}
39153996
if (backpressure) {
@@ -3991,6 +4072,13 @@ kj::Maybe<WritableStreamJsController&> TransformStreamDefaultController::
39914072
return kj::none;
39924073
}
39934074

4075+
kj::Maybe<jsg::Value> TransformStreamDefaultController::getReadableErrorState(jsg::Lock& js) {
4076+
KJ_IF_SOME(controller, tryGetReadableController()) {
4077+
return controller.getMaybeErrorState(js);
4078+
}
4079+
return kj::none;
4080+
}
4081+
39944082
template <class Self>
39954083
kj::StringPtr WritableImpl<Self>::jsgGetMemoryName() const {
39964084
return "WritableImpl"_kjc;

src/workerd/api/streams/standard.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,11 @@ class TransformStreamDefaultController: public jsg::Object {
690690
kj::Maybe<jsg::Function<Transformer::CancelAlgorithm>> cancel;
691691

692692
kj::Maybe<jsg::Promise<void>> maybeFinish = kj::none;
693+
// This flag is set to true at the start of a finish operation (close/cancel/abort)
694+
// before the algorithm runs. This is needed because emplace() evaluates its argument
695+
// before setting maybeFinish, so if the algorithm calls another finish operation
696+
// synchronously, maybeFinish wouldn't be set yet.
697+
bool finishStarted = false;
693698

694699
Algorithms() {};
695700
Algorithms(Algorithms&& other) = default;
@@ -716,6 +721,8 @@ class TransformStreamDefaultController: public jsg::Object {
716721
kj::Maybe<ReadableStreamDefaultController&> tryGetReadableController();
717722
kj::Maybe<WritableStreamJsController&> tryGetWritableController();
718723

724+
kj::Maybe<jsg::Value> getReadableErrorState(jsg::Lock& js);
725+
719726
// Currently, JS-backed transform streams only support value-oriented streams.
720727
// In the future, that may change and this will need to become a kj::OneOf
721728
// that includes a ReadableByteStreamController.

src/wpt/streams-test.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -596,12 +596,8 @@ export default {
596596
comment: 'To be investigated',
597597
expectedFailures: [
598598
'readable.cancel() and a parallel writable.close() should reject if a transformer.cancel() calls controller.error()',
599-
'closing the writable side should reject if a parallel transformer.cancel() throws',
600599
'writable.abort() and readable.cancel() should reject if a transformer.cancel() calls controller.error()',
601-
'readable.cancel() should not call cancel() again when already called from writable.abort()',
602-
'writable.close() should not call flush() when cancel() is already called from readable.cancel()',
603600
'writable.abort() should not call cancel() again when already called from readable.cancel()',
604-
'readable.cancel() should not call cancel() when flush() is already called from writable.close()',
605601
],
606602
},
607603
'transform-streams/errors.any.js': {
@@ -611,16 +607,11 @@ export default {
611607
'an exception from transform() should error the stream if terminate has been requested but not completed',
612608
],
613609
expectedFailures: [
614-
'when controller.error is followed by a rejection, the error reason should come from controller.error',
615610
'TransformStream constructor should throw when start does',
616611
'when strategy.size throws inside start(), the constructor should throw the same error',
617612
'when strategy.size calls controller.error() then throws, the constructor should throw the first error',
618-
'it should be possible to error the readable between close requested and complete',
619613
'controller.error() should do nothing after a transformer method has thrown an exception',
620-
'controller.error() should do nothing the second time it is called',
621-
'abort should set the close reason for the writable when it happens before cancel during start, and cancel should reject',
622614
'controller.error() should close writable immediately after readable.cancel()',
623-
'abort should set the close reason for the writable when it happens before cancel during underlying sink write, but cancel should still succeed',
624615
'erroring during write with backpressure should result in the write failing',
625616
],
626617
},
@@ -691,8 +682,6 @@ export default {
691682
comment: 'To be investigated',
692683
expectedFailures: [
693684
'controller.error() after controller.terminate() with queued chunk should error the readable',
694-
'controller.error() after controller.terminate() without queued chunk should do nothing',
695-
'controller.terminate() inside flush() should not prevent writer.close() from succeeding',
696685
],
697686
},
698687

0 commit comments

Comments
 (0)